You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks_loop.py 9.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import sys
  10. import atexit
  11. from asgiref.sync import async_to_sync,sync_to_async
  12. hot_reload_build = True
  13. if not hot_reload_build:
  14. import wsps.hot_reload_temp_staging
  15. else:
  16. pass
  17. # import wsps.hot_reload_temp_staging
  18. p=print
  19. async def anop(*a,**kw):pass
  20. def nop(*a,**kw):pass
  21. class Edict(dict):pass
  22. class Eobj():pass
  23. def connect_redis1():
  24. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  25. def connect_redis():
  26. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  27. def eclass_factory(n):
  28. ret = []
  29. for k in range(n):
  30. class nx:pass
  31. # _rcls_name
  32. ret.append(nx)
  33. return ret
  34. def callable_helper(fn):
  35. ret = {
  36. "is_callable":0,
  37. "is_coroutine":0,
  38. }
  39. ret["is_callable"]= callable(fn)
  40. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  41. # if (ca)
  42. return ret
  43. # verbose = print
  44. verbose=nop
  45. class DbgMixin:
  46. async def ws_rec(self,text_data,*a,**kw):
  47. # if "dbg" in
  48. text_data_json = json.loads(text_data)
  49. if not 'dbg' in text_data_json:
  50. return
  51. p("DBG!")
  52. exec(text_data_json["dbg"])
  53. class AconMixin(DbgMixin):
  54. # class AconMixin:
  55. async def connect(self):
  56. p(end="",flush=True)
  57. await self.accept()
  58. verbose("<MRO")
  59. verbose(type(self))
  60. await self.call_all_mro("ws_conn0")
  61. await self.call_all_mro("ws_conn")
  62. await self.call_all_mro("ws_conn2")
  63. verbose("/MRO>")
  64. await self.ws_conn_once()
  65. async def ws_conn_once(self, *a,**kw):pass
  66. async def ws_disconn_once(self, *a,**kw):pass
  67. async def ws_rec_once(self, *a,**kw):pass
  68. async def receive(self, *a,**kw):
  69. await self.call_all_mro("ws_rec",*a,**kw)
  70. await self.ws_rec_once(*a,**kw)
  71. print("",end="",flush=True)
  72. async def call_all_mro(self,mthd_name,*args,**kwargs):
  73. called = set([None])
  74. for cls_obj in self.__class__.__mro__:
  75. mthd = getattr(cls_obj,mthd_name,None)
  76. if not mthd in called:
  77. called.add(mthd)
  78. await mthd(self,*args,**kwargs)
  79. async def disconnect(self, close_code):
  80. await self.call_all_mro("ws_disconn",close_code)
  81. await self.ws_disconn_once(close_code)
  82. class AclTaskMixin:
  83. async def ws_disconn(self,*a,**kw):
  84. pass
  85. async def ws_conn0(self):
  86. self.groups_set = set()
  87. if glob_cl.has_cl:
  88. return
  89. glob_cl.has_cl = True
  90. glob_cl.run_persistent_loop(self)
  91. glob_cl.acl = self.channel_layer
  92. class DevMixin(*eclass_factory(10)):pass
  93. class DevMroMixin(*eclass_factory(10)):pass
  94. class WSStoMixin(*eclass_factory(10)):pass
  95. class GCLMixin(*eclass_factory(10)):pass
  96. class RldManMixin(*eclass_factory(10)):pass
  97. class LoneMixinA():pass
  98. class LoneMixinB():pass
  99. Mixins = Eobj()
  100. Mixins.DevMixin = DevMixin
  101. Mixins.DevMroMixin = DevMroMixin
  102. Mixins.WSStoMixin = WSStoMixin
  103. Mixins.GCLMixin = GCLMixin
  104. Mixins.RldManMixin = RldManMixin
  105. Mixins.LoneMixinA = LoneMixinA
  106. Mixins.LoneMixinB = LoneMixinB
  107. Mixins.DbgMixin = DbgMixin
  108. Mixins.AconMixin = AconMixin
  109. Mixins.AclTaskMixin = AclTaskMixin
  110. class globCL(GCLMixin):
  111. def __init__(self,*a,**kw):
  112. # for now we are not going to do any weird class stuff
  113. self.acl = None
  114. self.has_cl = False
  115. self.rconn1 = connect_redis1()
  116. self.rconn = connect_redis()
  117. self.rman = RldMan()
  118. self.cbs_once = []
  119. self.cbs_asap2 = []
  120. self.cbs_asap = []
  121. self.cbs_once_set = set()
  122. if (hot_reload_build):
  123. self.cbs = {"fn1":fn1,"fn2x":nop}
  124. else:
  125. self.cbs = {}
  126. self.timeout = 1
  127. self.fast_timeout = .1
  128. pass
  129. async def persistent_fast_cb_loop_min_timeout(self,*a,**kw):
  130. while 1:
  131. if len(self.cbs_asap2):
  132. cbs_once = [*self.cbs_asap2]
  133. self.cbs_asap2=[]
  134. k=0
  135. for v in cbs_once:
  136. try:
  137. call_info = callable_helper(v)
  138. if (call_info["is_callable"]):
  139. if call_info["is_coroutine"]:
  140. pass
  141. await v({k,self})
  142. else:
  143. v({k,self})
  144. else:
  145. pass
  146. except Exception as e:
  147. p("persistent_fast_cb_loop Exception cbs_once:",e)
  148. p(flush=True)
  149. else:
  150. pass
  151. finally:
  152. pass
  153. k += 1
  154. if len(self.cbs_asap):
  155. self.cbs_asap2 = self.cbs_asap
  156. self.cbs_asap=[]
  157. await asyncio.sleep(self.fast_timeout)
  158. async def persistent_fast_cb_loop(self,*a,**kw):
  159. while 1:
  160. if len(self.cbs_asap):
  161. cbs_once = [*self.cbs_asap]
  162. self.cbs_asap=[]
  163. k=0
  164. for v in cbs_once:
  165. try:
  166. call_info = callable_helper(v)
  167. if (call_info["is_callable"]):
  168. if call_info["is_coroutine"]:
  169. pass
  170. await v({k,self})
  171. else:
  172. v({k,self})
  173. else:
  174. pass
  175. except Exception as e:
  176. p("persistent_fast_cb_loop Exception cbs_once:",e)
  177. p(flush=True)
  178. else:
  179. pass
  180. finally:
  181. pass
  182. k += 1
  183. await asyncio.sleep(self.fast_timeout)
  184. async def persistent_loop(self,*a,**kw):
  185. while 1:
  186. for k,v in [*self.cbs.items()]:
  187. try:
  188. call_info = callable_helper(v)
  189. if (call_info["is_callable"]):
  190. if call_info["is_coroutine"]:
  191. await v({k,self})
  192. else:
  193. v({k,self})
  194. else:
  195. pass
  196. pass
  197. except Exception as e:
  198. p("persistent_loop Exception",e)
  199. p(flush=True)
  200. else:
  201. pass
  202. finally:
  203. pass
  204. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  205. self.cbs_once = []
  206. self.cbs_once_set = set()
  207. k=0
  208. for v in cbs_once:
  209. try:
  210. call_info = callable_helper(v)
  211. if (call_info["is_callable"]):
  212. if call_info["is_coroutine"]:
  213. pass
  214. await v({k,self})
  215. else:
  216. v({k,self})
  217. else:
  218. pass
  219. except Exception as e:
  220. p("persistent_loop Exception cbs_once:",e)
  221. p(flush=True)
  222. else:
  223. pass
  224. finally:
  225. pass
  226. k += 1
  227. await asyncio.sleep(self.timeout)
  228. def _run_persistent_loop(self,*a,**kw):
  229. p("_run_persistent_loop:")
  230. loop = asyncio.get_event_loop()
  231. self.run_persistent_loop = nop
  232. self.rman.persistent_loop_cb()
  233. loop.create_task(self.persistent_loop())
  234. # loop.create_task(self.persistent_fast_cb_loop())
  235. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  236. self.loop = loop
  237. run_persistent_loop = _run_persistent_loop
  238. def sync_dev_group_discard(self,*a,**kw):
  239. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  240. async def dev_group_discard(self,group_name,consumer):
  241. channel_layer = glob_cl.acl
  242. if consumer.channel_name in consumer.groups:
  243. consumer.groups.remove(group_name)
  244. await channel_layer.group_discard(group_name,consumer.channel_name)
  245. def sync_dev_group_add(self,*a,**kw):
  246. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  247. async def dev_group_add(self,group_name,consumer):
  248. channel_layer = glob_cl.acl
  249. # if not consumer.channel_name in consumer.groups_set:
  250. # consumer.groups_set.add(groups)
  251. if not consumer.channel_name in consumer.groups:
  252. consumer.groups.append(group_name)
  253. await channel_layer.group_add(group_name,consumer.channel_name)
  254. # RldManMixin
  255. class RldMan(RldManMixin):
  256. def __init__(self,*a,**kw):
  257. self.files = {}
  258. z="ABC"
  259. self.scopes = {
  260. "dflt_scope":{"globals":globals(),"locals":locals()},
  261. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  262. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  263. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  264. # "dflt_scope":{"globals":globals(),"locals":{}}
  265. }
  266. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  267. # self.scope_opt = ""locals
  268. self.scope_opt = "globals"
  269. self.print_tb = 0
  270. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  271. file_list = [
  272. base_path+"i0.py",
  273. base_path+"i1.py",
  274. base_path+"i2.py",
  275. base_path+"i3.py",
  276. base_path+"i4.py",
  277. base_path+"i5.py",
  278. ]
  279. self.add_files(file_list)
  280. # self.add_files(file_list,{"run"})
  281. def persistent_loop_cb(self,*a,**kw):
  282. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  283. file_list = [
  284. base_path+"r0.py",
  285. base_path+"r1.py",
  286. base_path+"r2.py",
  287. base_path+"r3.py",
  288. base_path+"r4.py",
  289. base_path+"r5.py",
  290. ]
  291. self.add_files(file_list)
  292. def add_files(self,files):
  293. for file in files:
  294. if type(file)==str:
  295. self.add_file(file)
  296. elif type(file)==list:
  297. self.add_file(*file)
  298. else:
  299. p("add files???",file)
  300. def add_file(self,file_name,fnx={}):
  301. self.files[file_name] = {"ftxt":"",**fnx}
  302. def get_scope(self,rfile_obj,file_name):
  303. return self.scopes["current_scope"]
  304. return {
  305. "scope":self.scopes["current_scope"],
  306. }
  307. # rfile_obj.get("scope")
  308. # if
  309. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  310. if file_name in self.files:
  311. rfile_obj = self.files[file_name]
  312. st = os.stat(file_name)
  313. st_tuple = (st.st_mtime,st.st_size)
  314. # if rfile_obj["ftxt"] == "":
  315. eflag ="nd"
  316. # rfile_obj["ftxt"] = st_tuple
  317. if rfile_obj["ftxt"] != st_tuple:
  318. # p(rfile_obj["ftxt"])
  319. rfile_obj["ftxt"] = st_tuple
  320. try:
  321. f = open(file_name,"r")
  322. ftxt = f.read()
  323. # scope_key = rfile_obj.get("scope")
  324. # p(scope_key,rfile_obj)
  325. f.close()
  326. scope_obj = self.get_scope(file_name,rfile_obj)
  327. if self.scope_opt == "locals":
  328. eflag ="locals"
  329. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  330. elif self.scope_opt == "globals":
  331. eflag ="globals"
  332. exec(ftxt,scope_obj["globals"])
  333. else:
  334. eflag ="[]"
  335. exec(ftxt)
  336. except Exception as e:
  337. p()
  338. print("file:",file_name)
  339. print("EXCEPT",eflag,e)
  340. if self.print_tb:
  341. traceback.print_tb(e.__traceback__,file=sys.stdout)
  342. else:
  343. pass
  344. finally:
  345. pass
  346. print(end="",flush=True)
  347. return ret
  348. def rld_files(self):
  349. ret = {
  350. "errs":{},
  351. "all":{},
  352. "alle":{},
  353. }
  354. for k in [*self.files]:
  355. self.rld_file(k,ret)
  356. p("",end="",flush=True)
  357. return ret
  358. def fn1(*a,**kw):
  359. glob_cl.rman.rld_files()
  360. glob_cl = globCL()
  361. glob_cl.rman.rld_files()
  362. def atexit_fn():
  363. print("atexit_fn!\n\n")
  364. rconn = glob_cl.rconn
  365. # rconn.flushall()
  366. rconn.flushdb()
  367. print("",end="",flush=True)
  368. atexit.register(atexit_fn)