You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks_loop.py 8.3KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. from asgiref.sync import async_to_sync,sync_to_async
  11. '''
  12. umod
  13. # '''
  14. p=print
  15. async def anop(*a,**kw):pass
  16. def nop(*a,**kw):pass
  17. class Edict(dict):pass
  18. class Eobj():pass
  19. def connect_redis():
  20. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  21. def eclass_factory(n):
  22. ret = []
  23. for k in range(n):
  24. class nx:pass
  25. # _rcls_name
  26. ret.append(nx)
  27. return ret
  28. '''
  29. # '''
  30. def callable_helper(fn):
  31. ret = {
  32. "is_callable":0,
  33. "is_coroutine":0,
  34. }
  35. ret["is_callable"]= callable(fn)
  36. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  37. # if (ca)
  38. return ret
  39. verbose = print
  40. class DbgMixin:
  41. async def ws_rec(self,text_data,*a,**kw):
  42. # if "dbg" in
  43. text_data_json = json.loads(text_data)
  44. if not 'dbg' in text_data_json:
  45. return
  46. p("DBG!")
  47. exec(text_data_json["dbg"])
  48. class AconMixin(DbgMixin):
  49. # class AconMixin:
  50. async def connect(self):
  51. # print("-----------CONNECTING async def connect")
  52. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  53. p(flush=True)
  54. await self.accept()
  55. verbose("<MRO")
  56. verbose(type(self))
  57. await self.call_all_mro("ws_conn0")
  58. await self.call_all_mro("ws_conn")
  59. await self.call_all_mro("ws_conn2")
  60. verbose("/MRO>")
  61. # await self.ws_conn_once(event)
  62. await self.ws_conn_once()
  63. async def ws_conn_once(self, *a,**kw):pass
  64. async def ws_disconn_once(self, *a,**kw):pass
  65. async def ws_rec_once(self, *a,**kw):pass
  66. async def receive(self, *a,**kw):
  67. print("REC")
  68. print("",end="",flush=True)
  69. # print("-----------REC")
  70. await self.call_all_mro("ws_rec",*a,**kw)
  71. await self.ws_rec_once(*a,**kw)
  72. async def call_all_mro(self,mthd_name,*args,**kwargs):
  73. called = set([None])
  74. for cls_obj in self.__class__.__mro__:
  75. mthd = getattr(cls_obj,mthd_name,None)
  76. # print("~",cls_obj,mthd)
  77. if not mthd in called:
  78. # print(cls_obj,mthd)
  79. called.add(mthd)
  80. await mthd(self,*args,**kwargs)
  81. async def disconnect(self, close_code):
  82. await self.call_all_mro("ws_disconn",close_code)
  83. await self.ws_disconn_once(close_code)
  84. async def websocket_connect_x(self, event):
  85. await super().websocket_connect(event)
  86. print("<MRO")
  87. print(type(self))
  88. await self.call_all_mro("ws_conn",event)
  89. await self.call_all_mro("ws_conn2",event)
  90. print("/MRO>")
  91. await self.ws_conn_once(event)
  92. class AclTaskMixin:
  93. async def ws_conn0(self):
  94. print("AclTaskMixin:ws_conn0")
  95. glob_cl.run_persistent_loop(self)
  96. glob_cl.acl = self.channel_layer
  97. # print("++++++++++++++++++++++++++")
  98. async def ws_conn_once(self,*a,**kw):
  99. print("\t\tws_conn_once:","AclTaskMixin")
  100. class DevMixin(*eclass_factory(10)):pass
  101. class DevMroMixin(*eclass_factory(10)):pass
  102. class WSStoMixin(*eclass_factory(10)):pass
  103. class GCLMixin(*eclass_factory(10)):pass
  104. class RldManMixin(*eclass_factory(10)):pass
  105. class LoneMixinA():pass
  106. class LoneMixinB():pass
  107. Mixins = Eobj()
  108. Mixins.DevMixin = DevMixin
  109. Mixins.DevMroMixin = DevMroMixin
  110. Mixins.WSStoMixin = WSStoMixin
  111. Mixins.GCLMixin = GCLMixin
  112. Mixins.RldManMixin = RldManMixin
  113. Mixins.LoneMixinA = LoneMixinA
  114. Mixins.LoneMixinB = LoneMixinB
  115. Mixins.DbgMixin = DbgMixin
  116. Mixins.AconMixin = AconMixin
  117. Mixins.AclTaskMixin = AclTaskMixin
  118. # GCLMixin
  119. class globCL(GCLMixin):
  120. def __init__(self,*a,**kw):
  121. # for now we are not going to do any weird class stuff
  122. self.acl = None
  123. self.rconn = connect_redis()
  124. self.rman = RldMan()
  125. self.cbs_once = []
  126. self.cbs_once_set = set()
  127. self.cbs = {"fn1":fn1,"fn2x":nop}
  128. self.timeout = 1
  129. pass
  130. async def persistent_loop(self,*a,**kw):
  131. while 1:
  132. # p("PL:",time.time(),flush=1)
  133. # await asyncio.sleep(5)
  134. # for k,v in cls.cbs.items():
  135. for k,v in [*self.cbs.items()]:
  136. try:
  137. call_info = callable_helper(v)
  138. if (call_info["is_callable"]):
  139. if call_info["is_coroutine"]:
  140. pass
  141. await v({k,self})
  142. else:
  143. v({k,self})
  144. else:
  145. pass
  146. # p(k,v)
  147. pass
  148. except Exception as e:
  149. p("persistent_loop Exception",e)
  150. p(flush=True)
  151. # raise
  152. else:
  153. pass
  154. finally:
  155. pass
  156. # cbs_once
  157. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  158. self.cbs_once = []
  159. self.cbs_once_set = set()
  160. k=0
  161. for v in cbs_once:
  162. try:
  163. call_info = callable_helper(v)
  164. if (call_info["is_callable"]):
  165. if call_info["is_coroutine"]:
  166. pass
  167. await v({k,self})
  168. else:
  169. v({k,self})
  170. else:
  171. pass
  172. # p(k,v)
  173. pass
  174. except Exception as e:
  175. p("persistent_loop Exception cbs_once:",e)
  176. p(flush=True)
  177. # raise
  178. else:
  179. pass
  180. finally:
  181. pass
  182. k += 1
  183. await asyncio.sleep(self.timeout)
  184. def _run_persistent_loop(self,*a,**kw):
  185. p("_run_persistent_loop:")
  186. loop = asyncio.get_event_loop()
  187. self.run_persistent_loop = nop
  188. self.rman.persistent_loop_cb()
  189. loop.create_task(self.persistent_loop())
  190. self.loop = loop
  191. run_persistent_loop = _run_persistent_loop
  192. # RldManMixin
  193. class RldMan(RldManMixin):
  194. def __init__(self,*a,**kw):
  195. self.files = {}
  196. z="ABC"
  197. self.scopes = {
  198. "dflt_scope":{"globals":globals(),"locals":locals()},
  199. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  200. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  201. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  202. # "dflt_scope":{"globals":globals(),"locals":{}}
  203. }
  204. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  205. # self.scope_opt = "locals"
  206. self.scope_opt = "globals"
  207. self.print_tb = 0
  208. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  209. file_list = [
  210. base_path+"i0.py",
  211. base_path+"i1.py",
  212. base_path+"i2.py",
  213. base_path+"i3.py",
  214. base_path+"i4.py",
  215. base_path+"i5.py",
  216. ]
  217. self.add_files(file_list)
  218. # self.add_files(file_list,{"run"})
  219. def persistent_loop_cb(self,*a,**kw):
  220. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  221. file_list = [
  222. base_path+"r0.py",
  223. base_path+"r1.py",
  224. base_path+"r2.py",
  225. base_path+"r3.py",
  226. base_path+"r4.py",
  227. base_path+"r5.py",
  228. ]
  229. self.add_files(file_list)
  230. def add_files(self,files):
  231. for file in files:
  232. if type(file)==str:
  233. self.add_file(file)
  234. elif type(file)==list:
  235. self.add_file(*file)
  236. else:
  237. p("add files???",file)
  238. def add_file(self,file_name,fnx={}):
  239. self.files[file_name] = {"ftxt":"",**fnx}
  240. def get_scope(self,rfile_obj,file_name):
  241. return self.scopes["current_scope"]
  242. return {
  243. "scope":self.scopes["current_scope"],
  244. }
  245. # rfile_obj.get("scope")
  246. # if
  247. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  248. if file_name in self.files:
  249. rfile_obj = self.files[file_name]
  250. st = os.stat(file_name)
  251. st_tuple = (st.st_mtime,st.st_size)
  252. # if rfile_obj["ftxt"] == "":
  253. eflag ="nd"
  254. # rfile_obj["ftxt"] = st_tuple
  255. if rfile_obj["ftxt"] != st_tuple:
  256. # p(rfile_obj["ftxt"])
  257. rfile_obj["ftxt"] = st_tuple
  258. try:
  259. f = open(file_name,"r")
  260. ftxt = f.read()
  261. # scope_key = rfile_obj.get("scope")
  262. # p(scope_key,rfile_obj)
  263. f.close()
  264. scope_obj = self.get_scope(file_name,rfile_obj)
  265. if self.scope_opt == "locals":
  266. eflag ="locals"
  267. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  268. elif self.scope_opt == "globals":
  269. eflag ="globals"
  270. exec(ftxt,scope_obj["globals"])
  271. else:
  272. eflag ="[]"
  273. exec(ftxt)
  274. except Exception as e:
  275. p()
  276. print("file:",file_name)
  277. print("EXCEPT",eflag,e)
  278. if self.print_tb:
  279. traceback.print_tb(e.__traceback__,file=sys.stdout)
  280. else:
  281. pass
  282. # print("ELSE")
  283. finally:
  284. # print("FINALLY")
  285. pass
  286. print(end="",flush=True)
  287. return ret
  288. def rld_files(self):
  289. # p()
  290. ret = {
  291. "errs":{},
  292. "all":{},
  293. "alle":{},
  294. }
  295. # for k in self.files:
  296. for k in [*self.files]:
  297. self.rld_file(k,ret)
  298. # p("rld_files!")
  299. p("",end="",flush=True)
  300. return ret
  301. def fn1(*a,**kw):
  302. # rld_files
  303. glob_cl.rman.rld_files()
  304. glob_cl = globCL()
  305. glob_cl.rman.rld_files()
  306. # print("..",end="\n",flush=True)
  307. r"""
  308. def atexit_fn():
  309. pass
  310. # rconn.flushall()
  311. print("ATEXIT FN")
  312. rinfo_len()
  313. keys = rconn.keys()
  314. print(keys)
  315. print(len(keys))
  316. print("t 000")
  317. # time.sleep(10)
  318. print("t 010")
  319. # time.sleep(20)
  320. print("t 020")
  321. print("ATEXIT FN...")
  322. rconn.flushall()
  323. atexit.register(atexit_fn)
  324. # """
  325. def atexit_fn():
  326. print("atexit_fn!\n\n")
  327. print("",end="",flush=True)
  328. atexit.register(atexit_fn)