You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks_loop.py 10KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import sys
  10. import atexit
  11. from asgiref.sync import async_to_sync,sync_to_async
  12. hot_reload_build = False
  13. # hot_reload_build = True
  14. p=print
  15. async def anop(*a,**kw):pass
  16. def nop(*a,**kw):pass
  17. class Edict(dict):pass
  18. class Eobj():pass
  19. def connect_redis1():
  20. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  21. def connect_redis():
  22. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  23. def eclass_factory(n):
  24. ret = []
  25. for k in range(n):
  26. class nx:pass
  27. # _rcls_name
  28. ret.append(nx)
  29. return ret
  30. def callable_helper(fn):
  31. ret = {
  32. "is_callable":0,
  33. "is_coroutine":0,
  34. }
  35. ret["is_callable"]= callable(fn)
  36. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  37. # if (ca)
  38. return ret
  39. # verbose = print
  40. verbose=nop
  41. class DbgMixin:
  42. async def ws_rec(self,text_data,*a,**kw):
  43. # if "dbg" in
  44. text_data_json = json.loads(text_data)
  45. if not 'dbg' in text_data_json:
  46. return
  47. p("DBG!")
  48. exec(text_data_json["dbg"])
  49. class AconMixin(DbgMixin):
  50. # class AconMixin:
  51. async def connect(self):
  52. p(end="",flush=True)
  53. await self.accept()
  54. verbose("<MRO")
  55. verbose(type(self))
  56. await self.call_all_mro("ws_conn0")
  57. await self.call_all_mro("ws_conn")
  58. await self.call_all_mro("ws_conn2")
  59. verbose("/MRO>")
  60. await self.ws_conn_once()
  61. async def ws_conn_once(self, *a,**kw):pass
  62. async def ws_disconn_once(self, *a,**kw):pass
  63. async def ws_rec_once(self, *a,**kw):pass
  64. async def receive(self, *a,**kw):
  65. await self.call_all_mro("ws_rec",*a,**kw)
  66. await self.ws_rec_once(*a,**kw)
  67. print("",end="",flush=True)
  68. async def call_all_mro(self,mthd_name,*args,**kwargs):
  69. called = set([None])
  70. for cls_obj in self.__class__.__mro__:
  71. mthd = getattr(cls_obj,mthd_name,None)
  72. if not mthd in called:
  73. called.add(mthd)
  74. await mthd(self,*args,**kwargs)
  75. async def disconnect(self, close_code):
  76. await self.call_all_mro("ws_disconn",close_code)
  77. await self.ws_disconn_once(close_code)
  78. class AclTaskMixin:
  79. async def ws_disconn(self,*a,**kw):
  80. pass
  81. async def ws_conn0(self):
  82. self.groups_set = set()
  83. if glob_cl.has_cl:
  84. return
  85. glob_cl.has_cl = True
  86. glob_cl.run_persistent_loop(self)
  87. glob_cl.acl = self.channel_layer
  88. class DevMixin(*eclass_factory(10)):pass
  89. class DevMroMixin(*eclass_factory(10)):pass
  90. class WSStoMixin(*eclass_factory(10)):pass
  91. class GCLMixin(*eclass_factory(10)):pass
  92. class RldManMixin(*eclass_factory(10)):pass
  93. class LoneMixinA():pass
  94. class LoneMixinB():pass
  95. Mixins = Eobj()
  96. Mixins.DevMixin = DevMixin
  97. Mixins.DevMroMixin = DevMroMixin
  98. Mixins.WSStoMixin = WSStoMixin
  99. Mixins.GCLMixin = GCLMixin
  100. Mixins.RldManMixin = RldManMixin
  101. Mixins.LoneMixinA = LoneMixinA
  102. Mixins.LoneMixinB = LoneMixinB
  103. Mixins.DbgMixin = DbgMixin
  104. Mixins.AconMixin = AconMixin
  105. Mixins.AclTaskMixin = AclTaskMixin
  106. class globCL(GCLMixin):
  107. def __init__(self,*a,**kw):
  108. # for now we are not going to do any weird class stuff
  109. self.acl = None
  110. self.has_cl = False
  111. self.rconn1 = connect_redis1()
  112. self.rconn = connect_redis()
  113. self.rman = RldMan()
  114. self.cbs_once = []
  115. self.cbs_asap2 = []
  116. self.cbs_asap = []
  117. self.cbs_once_set = set()
  118. if (hot_reload_build):
  119. self.cbs = {"fn1":fn1,"fn2x":nop}
  120. else:
  121. self.cbs = {}
  122. self.timeout = 1
  123. self.fast_timeout = .1
  124. pass
  125. async def persistent_fast_cb_loop_min_timeout(self,*a,**kw):
  126. while 1:
  127. if len(self.cbs_asap2):
  128. cbs_once = [*self.cbs_asap2]
  129. self.cbs_asap2=[]
  130. k=0
  131. for v in cbs_once:
  132. try:
  133. call_info = callable_helper(v)
  134. if (call_info["is_callable"]):
  135. if call_info["is_coroutine"]:
  136. pass
  137. await v({k,self})
  138. else:
  139. v({k,self})
  140. else:
  141. pass
  142. except Exception as e:
  143. p("persistent_fast_cb_loop Exception cbs_once:",e)
  144. p(flush=True)
  145. else:
  146. pass
  147. finally:
  148. pass
  149. k += 1
  150. if len(self.cbs_asap):
  151. self.cbs_asap2 = self.cbs_asap
  152. self.cbs_asap=[]
  153. await asyncio.sleep(self.fast_timeout)
  154. async def persistent_fast_cb_loop(self,*a,**kw):
  155. while 1:
  156. if len(self.cbs_asap):
  157. cbs_once = [*self.cbs_asap]
  158. self.cbs_asap=[]
  159. k=0
  160. for v in cbs_once:
  161. try:
  162. call_info = callable_helper(v)
  163. if (call_info["is_callable"]):
  164. if call_info["is_coroutine"]:
  165. pass
  166. await v({k,self})
  167. else:
  168. v({k,self})
  169. else:
  170. pass
  171. except Exception as e:
  172. p("persistent_fast_cb_loop Exception cbs_once:",e)
  173. p(flush=True)
  174. else:
  175. pass
  176. finally:
  177. pass
  178. k += 1
  179. await asyncio.sleep(self.fast_timeout)
  180. async def persistent_loop(self,*a,**kw):
  181. while 1:
  182. for k,v in [*self.cbs.items()]:
  183. try:
  184. call_info = callable_helper(v)
  185. if (call_info["is_callable"]):
  186. if call_info["is_coroutine"]:
  187. await v({k,self})
  188. else:
  189. v({k,self})
  190. else:
  191. pass
  192. pass
  193. except Exception as e:
  194. p("persistent_loop Exception",e)
  195. p(flush=True)
  196. else:
  197. pass
  198. finally:
  199. pass
  200. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  201. self.cbs_once = []
  202. self.cbs_once_set = set()
  203. k=0
  204. for v in cbs_once:
  205. try:
  206. call_info = callable_helper(v)
  207. if (call_info["is_callable"]):
  208. if call_info["is_coroutine"]:
  209. pass
  210. await v({k,self})
  211. else:
  212. v({k,self})
  213. else:
  214. pass
  215. except Exception as e:
  216. p("persistent_loop Exception cbs_once:",e)
  217. p(flush=True)
  218. else:
  219. pass
  220. finally:
  221. pass
  222. k += 1
  223. await asyncio.sleep(self.timeout)
  224. def _run_persistent_loop(self,*a,**kw):
  225. p("_run_persistent_loop:")
  226. loop = asyncio.get_event_loop()
  227. self.run_persistent_loop = nop
  228. self.rman.persistent_loop_cb()
  229. loop.create_task(self.persistent_loop())
  230. # loop.create_task(self.persistent_fast_cb_loop())
  231. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  232. self.loop = loop
  233. run_persistent_loop = _run_persistent_loop
  234. def sync_dev_group_discard(self,*a,**kw):
  235. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  236. async def dev_group_discard(self,group_name,consumer):
  237. channel_layer = glob_cl.acl
  238. if consumer.channel_name in consumer.groups:
  239. consumer.groups.remove(group_name)
  240. await channel_layer.group_discard(group_name,consumer.channel_name)
  241. def sync_dev_group_add(self,*a,**kw):
  242. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  243. async def dev_group_add(self,group_name,consumer):
  244. channel_layer = glob_cl.acl
  245. # if not consumer.channel_name in consumer.groups_set:
  246. # consumer.groups_set.add(groups)
  247. if not consumer.channel_name in consumer.groups:
  248. consumer.groups.append(group_name)
  249. await channel_layer.group_add(group_name,consumer.channel_name)
  250. # RldManMixin
  251. class RldMan(RldManMixin):
  252. def __init__(self,*a,**kw):
  253. self.files = {}
  254. z="ABC"
  255. self.scopes = {
  256. "dflt_scope":{"globals":globals(),"locals":locals()},
  257. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  258. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  259. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  260. # "dflt_scope":{"globals":globals(),"locals":{}}
  261. }
  262. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  263. # self.scope_opt = ""locals
  264. self.scope_opt = "globals"
  265. self.print_tb = 0
  266. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  267. file_list = [
  268. base_path+"i0.py",
  269. base_path+"i1.py",
  270. base_path+"i2.py",
  271. base_path+"i3.py",
  272. base_path+"i4.py",
  273. base_path+"i5.py",
  274. ]
  275. self.add_files(file_list)
  276. # self.add_files(file_list,{"run"})
  277. def persistent_loop_cb(self,*a,**kw):
  278. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  279. file_list = [
  280. base_path+"r0.py",
  281. base_path+"r1.py",
  282. base_path+"r2.py",
  283. base_path+"r3.py",
  284. base_path+"r4.py",
  285. base_path+"r5.py",
  286. ]
  287. self.add_files(file_list)
  288. def add_files(self,files):
  289. for file in files:
  290. if type(file)==str:
  291. self.add_file(file)
  292. elif type(file)==list:
  293. self.add_file(*file)
  294. else:
  295. p("add files???",file)
  296. def add_file(self,file_name,fnx={}):
  297. self.files[file_name] = {"ftxt":"",**fnx}
  298. def get_scope(self,rfile_obj,file_name):
  299. return self.scopes["current_scope"]
  300. return {
  301. "scope":self.scopes["current_scope"],
  302. }
  303. # rfile_obj.get("scope")
  304. # if
  305. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  306. if file_name in self.files:
  307. rfile_obj = self.files[file_name]
  308. st = os.stat(file_name)
  309. st_tuple = (st.st_mtime,st.st_size)
  310. # if rfile_obj["ftxt"] == "":
  311. eflag ="nd"
  312. # rfile_obj["ftxt"] = st_tuple
  313. if rfile_obj["ftxt"] != st_tuple:
  314. # p(rfile_obj["ftxt"])
  315. rfile_obj["ftxt"] = st_tuple
  316. try:
  317. f = open(file_name,"r")
  318. ftxt = f.read()
  319. # scope_key = rfile_obj.get("scope")
  320. # p(scope_key,rfile_obj)
  321. f.close()
  322. scope_obj = self.get_scope(file_name,rfile_obj)
  323. if self.scope_opt == "locals":
  324. eflag ="locals"
  325. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  326. elif self.scope_opt == "globals":
  327. eflag ="globals"
  328. exec(ftxt,scope_obj["globals"])
  329. else:
  330. eflag ="[]"
  331. exec(ftxt)
  332. except Exception as e:
  333. p()
  334. print("file:",file_name)
  335. print("EXCEPT",eflag,e)
  336. if self.print_tb:
  337. traceback.print_tb(e.__traceback__,file=sys.stdout)
  338. else:
  339. pass
  340. finally:
  341. pass
  342. print(end="",flush=True)
  343. return ret
  344. def rld_files(self):
  345. ret = {
  346. "errs":{},
  347. "all":{},
  348. "alle":{},
  349. }
  350. for k in [*self.files]:
  351. self.rld_file(k,ret)
  352. p("",end="",flush=True)
  353. return ret
  354. def fn1(*a,**kw):
  355. glob_cl.rman.rld_files()
  356. glob_cl = globCL()
  357. # we are not flushing databases if
  358. def atexit_fn():
  359. print("atexit_fn!\n\n")
  360. rconn = glob_cl.rconn
  361. # rconn.flushall()
  362. rconn.flushdb()
  363. print("",end="",flush=True)
  364. if hot_reload_build:
  365. glob_cl.rman.rld_files()
  366. atexit.register(atexit_fn)
  367. else:
  368. import wsps.hot_reload_temp_staging
  369. wsps.hot_reload_temp_staging.init_class_edits(Mixins)
  370. # if not hot_reload_build:
  371. # glob_cl.rman.rld_files()
  372. # print("############################ ##############################")