You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks_loop.py 8.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. '''
  11. umod
  12. # '''
  13. p=print
  14. async def anop(*a,**kw):pass
  15. def nop(*a,**kw):pass
  16. class Edict(dict):pass
  17. class Eobj():pass
  18. def connect_redis():
  19. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  20. def eclass_factory(n):
  21. ret = []
  22. for k in range(n):
  23. class nx:pass
  24. # _rcls_name
  25. ret.append(nx)
  26. return ret
  27. '''
  28. # '''
  29. class DevMixin(*eclass_factory(10)):pass
  30. class DevMroMixin(*eclass_factory(10)):pass
  31. class WSStoMixin(*eclass_factory(10)):pass
  32. class GCLMixin(*eclass_factory(10)):pass
  33. class RldManMixin(*eclass_factory(10)):pass
  34. Mixins = Eobj()
  35. Mixins.DevMixin = DevMixin
  36. Mixins.DevMroMixin = DevMroMixin
  37. Mixins.WSStoMixin = WSStoMixin
  38. Mixins.GCLMixin = GCLMixin
  39. Mixins.RldManMixin = RldManMixin
  40. Mixins.DbgMixin = DbgMixin
  41. Mixins.AconMixin = AconMixin
  42. def callable_helper(fn):
  43. ret = {
  44. "is_callable":0,
  45. "is_coroutine":0,
  46. }
  47. ret["is_callable"]= callable(fn)
  48. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  49. # if (ca)
  50. return ret
  51. verbose = print
  52. class DbgMixin:
  53. async def ws_rec(self,text_data,*a,**kw):
  54. # if "dbg" in
  55. text_data_json = json.loads(text_data)
  56. if not 'dbg' in text_data_json:
  57. return
  58. p("DBG!")
  59. exec(text_data_json["dbg"])
  60. class AconMixin(DbgMixin):
  61. # class AconMixin:
  62. async def connect(self):
  63. # print("-----------CONNECTING async def connect")
  64. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  65. p(flush=True)
  66. await self.accept()
  67. verbose("<MRO")
  68. verbose(type(self))
  69. await self.call_all_mro("ws_conn0")
  70. await self.call_all_mro("ws_conn")
  71. await self.call_all_mro("ws_conn2")
  72. verbose("/MRO>")
  73. # await self.ws_conn_once(event)
  74. await self.ws_conn_once()
  75. async def ws_conn_once(self, *a,**kw):pass
  76. async def ws_disconn_once(self, *a,**kw):pass
  77. async def ws_rec_once(self, *a,**kw):pass
  78. async def receive(self, *a,**kw):
  79. # print("REC")
  80. # print("-----------REC")
  81. await self.call_all_mro("ws_rec",*a,**kw)
  82. await self.ws_rec_once(*a,**kw)
  83. async def call_all_mro(self,mthd_name,*args,**kwargs):
  84. called = set([None])
  85. for cls_obj in self.__class__.__mro__:
  86. mthd = getattr(cls_obj,mthd_name,None)
  87. # print("~",cls_obj,mthd)
  88. if not mthd in called:
  89. # print(cls_obj,mthd)
  90. called.add(mthd)
  91. await mthd(self,*args,**kwargs)
  92. async def disconnect(self, close_code):
  93. await self.call_all_mro("ws_disconn",close_code)
  94. await self.ws_disconn_once(close_code)
  95. async def websocket_connect_x(self, event):
  96. await super().websocket_connect(event)
  97. print("<MRO")
  98. print(type(self))
  99. await self.call_all_mro("ws_conn",event)
  100. await self.call_all_mro("ws_conn2",event)
  101. print("/MRO>")
  102. await self.ws_conn_once(event)
  103. class AclTaskMixin:
  104. async def ws_conn0(self):
  105. glob_cl.run_persistent_loop(self)
  106. print("++++++++++++++++++++++++++")
  107. async def ws_conn_once(self,*a,**kw):
  108. print("\t\tws_conn_once:","AclTaskMixin")
  109. # GCLMixin
  110. class globCL():
  111. def __init__(self,*a,**kw):
  112. # for now we are not going to do any weird class stuff
  113. self.rconn = connect_redis()
  114. self.rman = RldMan()
  115. self.cbs_once = []
  116. self.cbs_once_set = set()
  117. self.cbs = {"fn1":fn1,"fn2x":nop}
  118. self.timeout = 1
  119. pass
  120. async def persistent_loop(self,*a,**kw):
  121. while 1:
  122. # p("PL:",time.time(),flush=1)
  123. # await asyncio.sleep(5)
  124. # for k,v in cls.cbs.items():
  125. for k,v in [*self.cbs.items()]:
  126. try:
  127. call_info = callable_helper(v)
  128. if (call_info["is_callable"]):
  129. if call_info["is_coroutine"]:
  130. pass
  131. await v({k,self})
  132. else:
  133. v({k,self})
  134. else:
  135. pass
  136. # p(k,v)
  137. pass
  138. except Exception as e:
  139. p("persistent_loop Exception",e)
  140. p(flush=True)
  141. # raise
  142. else:
  143. pass
  144. finally:
  145. pass
  146. # cbs_once
  147. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  148. self.cbs_once = []
  149. self.cbs_once_set = set()
  150. k=0
  151. for v in cbs_once:
  152. try:
  153. call_info = callable_helper(v)
  154. if (call_info["is_callable"]):
  155. if call_info["is_coroutine"]:
  156. pass
  157. await v({k,self})
  158. else:
  159. v({k,self})
  160. else:
  161. pass
  162. # p(k,v)
  163. pass
  164. except Exception as e:
  165. p("persistent_loop Exception cbs_once:",e)
  166. p(flush=True)
  167. # raise
  168. else:
  169. pass
  170. finally:
  171. pass
  172. k += 1
  173. await asyncio.sleep(self.timeout)
  174. def _run_persistent_loop(self,*a,**kw):
  175. p("_run_persistent_loop:")
  176. loop = asyncio.get_event_loop()
  177. self.run_persistent_loop = nop
  178. self.rman.persistent_loop_cb()
  179. loop.create_task(self.persistent_loop())
  180. self.loop = loop
  181. run_persistent_loop = _run_persistent_loop
  182. # RldManMixin
  183. class RldMan():
  184. def __init__(self,*a,**kw):
  185. self.files = {}
  186. z="ABC"
  187. self.scopes = {
  188. "dflt_scope":{"globals":globals(),"locals":locals()},
  189. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  190. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  191. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  192. # "dflt_scope":{"globals":globals(),"locals":{}}
  193. }
  194. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  195. self.scope_opt = ""
  196. self.print_tb = 0
  197. # self.dflt_scope = {"globals":globals(),"locals":locals()}
  198. # self.dflt_scope2 = {"globals":copy.copy(globals()),"locals":copy.copy(locals())}
  199. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  200. # [base_path+"r0.py",{"ftxt":"zz2"}],
  201. file_list = [
  202. base_path+"i0.py",
  203. base_path+"i1.py",
  204. base_path+"i2.py",
  205. base_path+"i3.py",
  206. base_path+"i4.py",
  207. base_path+"i5.py",
  208. ]
  209. self.add_files(file_list)
  210. # self.add_files(file_list,{"run"})
  211. def persistent_loop_cb(self,*a,**kw):
  212. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  213. file_list = [
  214. base_path+"r0.py",
  215. base_path+"r1.py",
  216. base_path+"r2.py",
  217. base_path+"r3.py",
  218. base_path+"r4.py",
  219. base_path+"r5.py",
  220. ]
  221. self.add_files(file_list)
  222. def add_files(self,files):
  223. for file in files:
  224. if type(file)==str:
  225. self.add_file(file)
  226. elif type(file)==list:
  227. self.add_file(*file)
  228. else:
  229. p("add files???",file)
  230. def add_file(self,file_name,fnx={}):
  231. self.files[file_name] = {"ftxt":"",**fnx}
  232. def get_scope(self,rfile_obj,file_name):
  233. return self.scopes["current_scope"]
  234. return {
  235. "scope":self.scopes["current_scope"],
  236. }
  237. # rfile_obj.get("scope")
  238. # if
  239. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  240. if file_name in self.files:
  241. rfile_obj = self.files[file_name]
  242. st = os.stat(file_name)
  243. st_tuple = (st.st_mtime,st.st_size)
  244. # if rfile_obj["ftxt"] == "":
  245. eflag ="nd"
  246. # rfile_obj["ftxt"] = st_tuple
  247. if rfile_obj["ftxt"] != st_tuple:
  248. # p(rfile_obj["ftxt"])
  249. rfile_obj["ftxt"] = st_tuple
  250. try:
  251. f = open(file_name,"r")
  252. ftxt = f.read()
  253. # scope_key = rfile_obj.get("scope")
  254. # p(scope_key,rfile_obj)
  255. f.close()
  256. scope_obj = self.get_scope(file_name,rfile_obj)
  257. if self.scope_opt == "locals":
  258. eflag ="locals"
  259. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  260. elif self.scope_opt == "globals":
  261. eflag ="globals"
  262. exec(ftxt,scope_obj["globals"])
  263. else:
  264. eflag ="[]"
  265. exec(ftxt)
  266. except Exception as e:
  267. print("EXCEPT",eflag,e)
  268. if self.print_tb:
  269. traceback.print_tb(e.__traceback__,file=sys.stdout)
  270. else:
  271. pass
  272. # print("ELSE")
  273. finally:
  274. # print("FINALLY")
  275. pass
  276. print(end="",flush=True)
  277. return ret
  278. def rld_files(self):
  279. # p()
  280. ret = {
  281. "errs":{},
  282. "all":{},
  283. "alle":{},
  284. }
  285. # for k in self.files:
  286. for k in [*self.files]:
  287. self.rld_file(k,ret)
  288. # p("rld_files!")
  289. p("",end="",flush=True)
  290. return ret
  291. def fn1(*a,**kw):
  292. # rld_files
  293. glob_cl.rman.rld_files()
  294. glob_cl = globCL()
  295. glob_cl.rman.rld_files()
  296. # print("..",end="\n",flush=True)
  297. def redis_info():
  298. rconn = glob_cl.rconn
  299. keys = rconn.keys()
  300. for k in keys:
  301. p(":",k)
  302. p("len(keys)",len(keys))
  303. print("",end="",flush=True)
  304. # rconn.flushall()
  305. redis_info()
  306. r"""
  307. def atexit_fn():
  308. pass
  309. # rconn.flushall()
  310. print("ATEXIT FN")
  311. rinfo_len()
  312. keys = rconn.keys()
  313. print(keys)
  314. print(len(keys))
  315. print("t 000")
  316. # time.sleep(10)
  317. print("t 010")
  318. # time.sleep(20)
  319. print("t 020")
  320. print("ATEXIT FN...")
  321. rconn.flushall()
  322. atexit.register(atexit_fn)
  323. # """
  324. def atexit_fn():
  325. print("atexit_fn!\n\n")
  326. print("",end="",flush=True)
  327. atexit.register(atexit_fn)