You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks_loop.py 9.7KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import sys
  10. import atexit
  11. from asgiref.sync import async_to_sync,sync_to_async
  12. hot_reload_build = True
  13. p=print
  14. async def anop(*a,**kw):pass
  15. def nop(*a,**kw):pass
  16. class Edict(dict):pass
  17. class Eobj():pass
  18. def connect_redis1():
  19. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  20. def connect_redis():
  21. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  22. def eclass_factory(n):
  23. ret = []
  24. for k in range(n):
  25. class nx:pass
  26. # _rcls_name
  27. ret.append(nx)
  28. return ret
  29. def callable_helper(fn):
  30. ret = {
  31. "is_callable":0,
  32. "is_coroutine":0,
  33. }
  34. ret["is_callable"]= callable(fn)
  35. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  36. # if (ca)
  37. return ret
  38. # verbose = print
  39. verbose=nop
  40. class DbgMixin:
  41. async def ws_rec(self,text_data,*a,**kw):
  42. # if "dbg" in
  43. text_data_json = json.loads(text_data)
  44. if not 'dbg' in text_data_json:
  45. return
  46. p("DBG!")
  47. exec(text_data_json["dbg"])
  48. class AconMixin(DbgMixin):
  49. # class AconMixin:
  50. async def connect(self):
  51. p(end="",flush=True)
  52. await self.accept()
  53. verbose("<MRO")
  54. verbose(type(self))
  55. await self.call_all_mro("ws_conn0")
  56. await self.call_all_mro("ws_conn")
  57. await self.call_all_mro("ws_conn2")
  58. verbose("/MRO>")
  59. await self.ws_conn_once()
  60. async def ws_conn_once(self, *a,**kw):pass
  61. async def ws_disconn_once(self, *a,**kw):pass
  62. async def ws_rec_once(self, *a,**kw):pass
  63. async def receive(self, *a,**kw):
  64. await self.call_all_mro("ws_rec",*a,**kw)
  65. await self.ws_rec_once(*a,**kw)
  66. print("",end="",flush=True)
  67. async def call_all_mro(self,mthd_name,*args,**kwargs):
  68. called = set([None])
  69. for cls_obj in self.__class__.__mro__:
  70. mthd = getattr(cls_obj,mthd_name,None)
  71. if not mthd in called:
  72. called.add(mthd)
  73. await mthd(self,*args,**kwargs)
  74. async def disconnect(self, close_code):
  75. await self.call_all_mro("ws_disconn",close_code)
  76. await self.ws_disconn_once(close_code)
  77. class AclTaskMixin:
  78. async def ws_disconn(self,*a,**kw):
  79. pass
  80. async def ws_conn0(self):
  81. self.groups_set = set()
  82. if glob_cl.has_cl:
  83. return
  84. glob_cl.has_cl = True
  85. glob_cl.run_persistent_loop(self)
  86. glob_cl.acl = self.channel_layer
  87. class DevMixin(*eclass_factory(10)):pass
  88. class DevMroMixin(*eclass_factory(10)):pass
  89. class WSStoMixin(*eclass_factory(10)):pass
  90. class GCLMixin(*eclass_factory(10)):pass
  91. class RldManMixin(*eclass_factory(10)):pass
  92. class LoneMixinA():pass
  93. class LoneMixinB():pass
  94. Mixins = Eobj()
  95. Mixins.DevMixin = DevMixin
  96. Mixins.DevMroMixin = DevMroMixin
  97. Mixins.WSStoMixin = WSStoMixin
  98. Mixins.GCLMixin = GCLMixin
  99. Mixins.RldManMixin = RldManMixin
  100. Mixins.LoneMixinA = LoneMixinA
  101. Mixins.LoneMixinB = LoneMixinB
  102. Mixins.DbgMixin = DbgMixin
  103. Mixins.AconMixin = AconMixin
  104. Mixins.AclTaskMixin = AclTaskMixin
  105. class globCL(GCLMixin):
  106. def __init__(self,*a,**kw):
  107. # for now we are not going to do any weird class stuff
  108. self.acl = None
  109. self.has_cl = False
  110. self.rconn1 = connect_redis1()
  111. self.rconn = connect_redis()
  112. self.rman = RldMan()
  113. self.cbs_once = []
  114. self.cbs_asap2 = []
  115. self.cbs_asap = []
  116. self.cbs_once_set = set()
  117. if (hot_reload_build):
  118. self.cbs = {"fn1":fn1,"fn2x":nop}
  119. else:
  120. self.cbs = {}
  121. self.timeout = 1
  122. self.fast_timeout = .1
  123. pass
  124. async def persistent_fast_cb_loop_min_timeout(self,*a,**kw):
  125. while 1:
  126. if len(self.cbs_asap2):
  127. cbs_once = [*self.cbs_asap2]
  128. self.cbs_asap2=[]
  129. k=0
  130. for v in cbs_once:
  131. try:
  132. call_info = callable_helper(v)
  133. if (call_info["is_callable"]):
  134. if call_info["is_coroutine"]:
  135. pass
  136. await v({k,self})
  137. else:
  138. v({k,self})
  139. else:
  140. pass
  141. except Exception as e:
  142. p("persistent_fast_cb_loop Exception cbs_once:",e)
  143. p(flush=True)
  144. else:
  145. pass
  146. finally:
  147. pass
  148. k += 1
  149. if len(self.cbs_asap):
  150. self.cbs_asap2 = self.cbs_asap
  151. self.cbs_asap=[]
  152. await asyncio.sleep(self.fast_timeout)
  153. async def persistent_fast_cb_loop(self,*a,**kw):
  154. while 1:
  155. if len(self.cbs_asap):
  156. cbs_once = [*self.cbs_asap]
  157. self.cbs_asap=[]
  158. k=0
  159. for v in cbs_once:
  160. try:
  161. call_info = callable_helper(v)
  162. if (call_info["is_callable"]):
  163. if call_info["is_coroutine"]:
  164. pass
  165. await v({k,self})
  166. else:
  167. v({k,self})
  168. else:
  169. pass
  170. except Exception as e:
  171. p("persistent_fast_cb_loop Exception cbs_once:",e)
  172. p(flush=True)
  173. else:
  174. pass
  175. finally:
  176. pass
  177. k += 1
  178. await asyncio.sleep(self.fast_timeout)
  179. async def persistent_loop(self,*a,**kw):
  180. while 1:
  181. for k,v in [*self.cbs.items()]:
  182. try:
  183. call_info = callable_helper(v)
  184. if (call_info["is_callable"]):
  185. if call_info["is_coroutine"]:
  186. await v({k,self})
  187. else:
  188. v({k,self})
  189. else:
  190. pass
  191. pass
  192. except Exception as e:
  193. p("persistent_loop Exception",e)
  194. p(flush=True)
  195. else:
  196. pass
  197. finally:
  198. pass
  199. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  200. self.cbs_once = []
  201. self.cbs_once_set = set()
  202. k=0
  203. for v in cbs_once:
  204. try:
  205. call_info = callable_helper(v)
  206. if (call_info["is_callable"]):
  207. if call_info["is_coroutine"]:
  208. pass
  209. await v({k,self})
  210. else:
  211. v({k,self})
  212. else:
  213. pass
  214. except Exception as e:
  215. p("persistent_loop Exception cbs_once:",e)
  216. p(flush=True)
  217. else:
  218. pass
  219. finally:
  220. pass
  221. k += 1
  222. await asyncio.sleep(self.timeout)
  223. def _run_persistent_loop(self,*a,**kw):
  224. p("_run_persistent_loop:")
  225. loop = asyncio.get_event_loop()
  226. self.run_persistent_loop = nop
  227. self.rman.persistent_loop_cb()
  228. loop.create_task(self.persistent_loop())
  229. # loop.create_task(self.persistent_fast_cb_loop())
  230. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  231. self.loop = loop
  232. run_persistent_loop = _run_persistent_loop
  233. def sync_dev_group_discard(self,*a,**kw):
  234. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  235. async def dev_group_discard(self,group_name,consumer):
  236. channel_layer = glob_cl.acl
  237. if consumer.channel_name in consumer.groups:
  238. consumer.groups.remove(group_name)
  239. await channel_layer.group_discard(group_name,consumer.channel_name)
  240. def sync_dev_group_add(self,*a,**kw):
  241. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  242. async def dev_group_add(self,group_name,consumer):
  243. channel_layer = glob_cl.acl
  244. # if not consumer.channel_name in consumer.groups_set:
  245. # consumer.groups_set.add(groups)
  246. if not consumer.channel_name in consumer.groups:
  247. consumer.groups.append(group_name)
  248. await channel_layer.group_add(group_name,consumer.channel_name)
  249. # RldManMixin
  250. class RldMan(RldManMixin):
  251. def __init__(self,*a,**kw):
  252. self.files = {}
  253. z="ABC"
  254. self.scopes = {
  255. "dflt_scope":{"globals":globals(),"locals":locals()},
  256. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  257. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  258. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  259. # "dflt_scope":{"globals":globals(),"locals":{}}
  260. }
  261. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  262. # self.scope_opt = ""locals
  263. self.scope_opt = "globals"
  264. self.print_tb = 0
  265. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  266. file_list = [
  267. base_path+"i0.py",
  268. base_path+"i1.py",
  269. base_path+"i2.py",
  270. base_path+"i3.py",
  271. base_path+"i4.py",
  272. base_path+"i5.py",
  273. ]
  274. self.add_files(file_list)
  275. # self.add_files(file_list,{"run"})
  276. def persistent_loop_cb(self,*a,**kw):
  277. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  278. file_list = [
  279. base_path+"r0.py",
  280. base_path+"r1.py",
  281. base_path+"r2.py",
  282. base_path+"r3.py",
  283. base_path+"r4.py",
  284. base_path+"r5.py",
  285. ]
  286. self.add_files(file_list)
  287. def add_files(self,files):
  288. for file in files:
  289. if type(file)==str:
  290. self.add_file(file)
  291. elif type(file)==list:
  292. self.add_file(*file)
  293. else:
  294. p("add files???",file)
  295. def add_file(self,file_name,fnx={}):
  296. self.files[file_name] = {"ftxt":"",**fnx}
  297. def get_scope(self,rfile_obj,file_name):
  298. return self.scopes["current_scope"]
  299. return {
  300. "scope":self.scopes["current_scope"],
  301. }
  302. # rfile_obj.get("scope")
  303. # if
  304. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  305. if file_name in self.files:
  306. rfile_obj = self.files[file_name]
  307. st = os.stat(file_name)
  308. st_tuple = (st.st_mtime,st.st_size)
  309. # if rfile_obj["ftxt"] == "":
  310. eflag ="nd"
  311. # rfile_obj["ftxt"] = st_tuple
  312. if rfile_obj["ftxt"] != st_tuple:
  313. # p(rfile_obj["ftxt"])
  314. rfile_obj["ftxt"] = st_tuple
  315. try:
  316. f = open(file_name,"r")
  317. ftxt = f.read()
  318. # scope_key = rfile_obj.get("scope")
  319. # p(scope_key,rfile_obj)
  320. f.close()
  321. scope_obj = self.get_scope(file_name,rfile_obj)
  322. if self.scope_opt == "locals":
  323. eflag ="locals"
  324. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  325. elif self.scope_opt == "globals":
  326. eflag ="globals"
  327. exec(ftxt,scope_obj["globals"])
  328. else:
  329. eflag ="[]"
  330. exec(ftxt)
  331. except Exception as e:
  332. p()
  333. print("file:",file_name)
  334. print("EXCEPT",eflag,e)
  335. if self.print_tb:
  336. traceback.print_tb(e.__traceback__,file=sys.stdout)
  337. else:
  338. pass
  339. finally:
  340. pass
  341. print(end="",flush=True)
  342. return ret
  343. def rld_files(self):
  344. ret = {
  345. "errs":{},
  346. "all":{},
  347. "alle":{},
  348. }
  349. for k in [*self.files]:
  350. self.rld_file(k,ret)
  351. p("",end="",flush=True)
  352. return ret
  353. def fn1(*a,**kw):
  354. glob_cl.rman.rld_files()
  355. glob_cl = globCL()
  356. glob_cl.rman.rld_files()
  357. def atexit_fn():
  358. print("atexit_fn!\n\n")
  359. rconn = glob_cl.rconn
  360. # rconn.flushall()
  361. rconn.flushdb()
  362. print("",end="",flush=True)
  363. atexit.register(atexit_fn)