Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

tasks_loop.py 9.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import sys
  10. import atexit
  11. from asgiref.sync import async_to_sync,sync_to_async
  12. # hot_reload_build = True
  13. hot_reload_build = False
  14. if not hot_reload_build:
  15. import wsps.hot_reload_temp_staging
  16. else:
  17. pass
  18. # import wsps.hot_reload_temp_staging
  19. p=print
  20. async def anop(*a,**kw):pass
  21. def nop(*a,**kw):pass
  22. class Edict(dict):pass
  23. class Eobj():pass
  24. def connect_redis1():
  25. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  26. def connect_redis():
  27. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  28. def eclass_factory(n):
  29. ret = []
  30. for k in range(n):
  31. class nx:pass
  32. # _rcls_name
  33. ret.append(nx)
  34. return ret
  35. def callable_helper(fn):
  36. ret = {
  37. "is_callable":0,
  38. "is_coroutine":0,
  39. }
  40. ret["is_callable"]= callable(fn)
  41. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  42. # if (ca)
  43. return ret
  44. # verbose = print
  45. verbose=nop
  46. class DbgMixin:
  47. async def ws_rec(self,text_data,*a,**kw):
  48. # if "dbg" in
  49. text_data_json = json.loads(text_data)
  50. if not 'dbg' in text_data_json:
  51. return
  52. p("DBG!")
  53. exec(text_data_json["dbg"])
  54. class AconMixin(DbgMixin):
  55. # class AconMixin:
  56. async def connect(self):
  57. p(end="",flush=True)
  58. await self.accept()
  59. verbose("<MRO")
  60. verbose(type(self))
  61. await self.call_all_mro("ws_conn0")
  62. await self.call_all_mro("ws_conn")
  63. await self.call_all_mro("ws_conn2")
  64. verbose("/MRO>")
  65. await self.ws_conn_once()
  66. async def ws_conn_once(self, *a,**kw):pass
  67. async def ws_disconn_once(self, *a,**kw):pass
  68. async def ws_rec_once(self, *a,**kw):pass
  69. async def receive(self, *a,**kw):
  70. await self.call_all_mro("ws_rec",*a,**kw)
  71. await self.ws_rec_once(*a,**kw)
  72. print("",end="",flush=True)
  73. async def call_all_mro(self,mthd_name,*args,**kwargs):
  74. called = set([None])
  75. for cls_obj in self.__class__.__mro__:
  76. mthd = getattr(cls_obj,mthd_name,None)
  77. if not mthd in called:
  78. called.add(mthd)
  79. await mthd(self,*args,**kwargs)
  80. async def disconnect(self, close_code):
  81. await self.call_all_mro("ws_disconn",close_code)
  82. await self.ws_disconn_once(close_code)
  83. class AclTaskMixin:
  84. async def ws_disconn(self,*a,**kw):
  85. pass
  86. async def ws_conn0(self):
  87. self.groups_set = set()
  88. if glob_cl.has_cl:
  89. return
  90. glob_cl.has_cl = True
  91. glob_cl.run_persistent_loop(self)
  92. glob_cl.acl = self.channel_layer
  93. class DevMixin(*eclass_factory(10)):pass
  94. class DevMroMixin(*eclass_factory(10)):pass
  95. class WSStoMixin(*eclass_factory(10)):pass
  96. class GCLMixin(*eclass_factory(10)):pass
  97. class RldManMixin(*eclass_factory(10)):pass
  98. class LoneMixinA():pass
  99. class LoneMixinB():pass
  100. Mixins = Eobj()
  101. Mixins.DevMixin = DevMixin
  102. Mixins.DevMroMixin = DevMroMixin
  103. Mixins.WSStoMixin = WSStoMixin
  104. Mixins.GCLMixin = GCLMixin
  105. Mixins.RldManMixin = RldManMixin
  106. Mixins.LoneMixinA = LoneMixinA
  107. Mixins.LoneMixinB = LoneMixinB
  108. Mixins.DbgMixin = DbgMixin
  109. Mixins.AconMixin = AconMixin
  110. Mixins.AclTaskMixin = AclTaskMixin
  111. if not hot_reload_build:
  112. wsps.hot_reload_temp_staging.init_class_edits()
  113. class globCL(GCLMixin):
  114. def __init__(self,*a,**kw):
  115. # for now we are not going to do any weird class stuff
  116. self.acl = None
  117. self.has_cl = False
  118. self.rconn1 = connect_redis1()
  119. self.rconn = connect_redis()
  120. self.rman = RldMan()
  121. self.cbs_once = []
  122. self.cbs_asap2 = []
  123. self.cbs_asap = []
  124. self.cbs_once_set = set()
  125. if (hot_reload_build):
  126. self.cbs = {"fn1":fn1,"fn2x":nop}
  127. else:
  128. self.cbs = {}
  129. self.timeout = 1
  130. self.fast_timeout = .1
  131. pass
  132. async def persistent_fast_cb_loop_min_timeout(self,*a,**kw):
  133. while 1:
  134. if len(self.cbs_asap2):
  135. cbs_once = [*self.cbs_asap2]
  136. self.cbs_asap2=[]
  137. k=0
  138. for v in cbs_once:
  139. try:
  140. call_info = callable_helper(v)
  141. if (call_info["is_callable"]):
  142. if call_info["is_coroutine"]:
  143. pass
  144. await v({k,self})
  145. else:
  146. v({k,self})
  147. else:
  148. pass
  149. except Exception as e:
  150. p("persistent_fast_cb_loop Exception cbs_once:",e)
  151. p(flush=True)
  152. else:
  153. pass
  154. finally:
  155. pass
  156. k += 1
  157. if len(self.cbs_asap):
  158. self.cbs_asap2 = self.cbs_asap
  159. self.cbs_asap=[]
  160. await asyncio.sleep(self.fast_timeout)
  161. async def persistent_fast_cb_loop(self,*a,**kw):
  162. while 1:
  163. if len(self.cbs_asap):
  164. cbs_once = [*self.cbs_asap]
  165. self.cbs_asap=[]
  166. k=0
  167. for v in cbs_once:
  168. try:
  169. call_info = callable_helper(v)
  170. if (call_info["is_callable"]):
  171. if call_info["is_coroutine"]:
  172. pass
  173. await v({k,self})
  174. else:
  175. v({k,self})
  176. else:
  177. pass
  178. except Exception as e:
  179. p("persistent_fast_cb_loop Exception cbs_once:",e)
  180. p(flush=True)
  181. else:
  182. pass
  183. finally:
  184. pass
  185. k += 1
  186. await asyncio.sleep(self.fast_timeout)
  187. async def persistent_loop(self,*a,**kw):
  188. while 1:
  189. for k,v in [*self.cbs.items()]:
  190. try:
  191. call_info = callable_helper(v)
  192. if (call_info["is_callable"]):
  193. if call_info["is_coroutine"]:
  194. await v({k,self})
  195. else:
  196. v({k,self})
  197. else:
  198. pass
  199. pass
  200. except Exception as e:
  201. p("persistent_loop Exception",e)
  202. p(flush=True)
  203. else:
  204. pass
  205. finally:
  206. pass
  207. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  208. self.cbs_once = []
  209. self.cbs_once_set = set()
  210. k=0
  211. for v in cbs_once:
  212. try:
  213. call_info = callable_helper(v)
  214. if (call_info["is_callable"]):
  215. if call_info["is_coroutine"]:
  216. pass
  217. await v({k,self})
  218. else:
  219. v({k,self})
  220. else:
  221. pass
  222. except Exception as e:
  223. p("persistent_loop Exception cbs_once:",e)
  224. p(flush=True)
  225. else:
  226. pass
  227. finally:
  228. pass
  229. k += 1
  230. await asyncio.sleep(self.timeout)
  231. def _run_persistent_loop(self,*a,**kw):
  232. p("_run_persistent_loop:")
  233. loop = asyncio.get_event_loop()
  234. self.run_persistent_loop = nop
  235. self.rman.persistent_loop_cb()
  236. loop.create_task(self.persistent_loop())
  237. # loop.create_task(self.persistent_fast_cb_loop())
  238. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  239. self.loop = loop
  240. run_persistent_loop = _run_persistent_loop
  241. def sync_dev_group_discard(self,*a,**kw):
  242. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  243. async def dev_group_discard(self,group_name,consumer):
  244. channel_layer = glob_cl.acl
  245. if consumer.channel_name in consumer.groups:
  246. consumer.groups.remove(group_name)
  247. await channel_layer.group_discard(group_name,consumer.channel_name)
  248. def sync_dev_group_add(self,*a,**kw):
  249. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  250. async def dev_group_add(self,group_name,consumer):
  251. channel_layer = glob_cl.acl
  252. # if not consumer.channel_name in consumer.groups_set:
  253. # consumer.groups_set.add(groups)
  254. if not consumer.channel_name in consumer.groups:
  255. consumer.groups.append(group_name)
  256. await channel_layer.group_add(group_name,consumer.channel_name)
  257. # RldManMixin
  258. class RldMan(RldManMixin):
  259. def __init__(self,*a,**kw):
  260. self.files = {}
  261. z="ABC"
  262. self.scopes = {
  263. "dflt_scope":{"globals":globals(),"locals":locals()},
  264. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  265. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  266. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  267. # "dflt_scope":{"globals":globals(),"locals":{}}
  268. }
  269. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  270. # self.scope_opt = ""locals
  271. self.scope_opt = "globals"
  272. self.print_tb = 0
  273. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  274. file_list = [
  275. base_path+"i0.py",
  276. base_path+"i1.py",
  277. base_path+"i2.py",
  278. base_path+"i3.py",
  279. base_path+"i4.py",
  280. base_path+"i5.py",
  281. ]
  282. self.add_files(file_list)
  283. # self.add_files(file_list,{"run"})
  284. def persistent_loop_cb(self,*a,**kw):
  285. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  286. file_list = [
  287. base_path+"r0.py",
  288. base_path+"r1.py",
  289. base_path+"r2.py",
  290. base_path+"r3.py",
  291. base_path+"r4.py",
  292. base_path+"r5.py",
  293. ]
  294. self.add_files(file_list)
  295. def add_files(self,files):
  296. for file in files:
  297. if type(file)==str:
  298. self.add_file(file)
  299. elif type(file)==list:
  300. self.add_file(*file)
  301. else:
  302. p("add files???",file)
  303. def add_file(self,file_name,fnx={}):
  304. self.files[file_name] = {"ftxt":"",**fnx}
  305. def get_scope(self,rfile_obj,file_name):
  306. return self.scopes["current_scope"]
  307. return {
  308. "scope":self.scopes["current_scope"],
  309. }
  310. # rfile_obj.get("scope")
  311. # if
  312. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  313. if file_name in self.files:
  314. rfile_obj = self.files[file_name]
  315. st = os.stat(file_name)
  316. st_tuple = (st.st_mtime,st.st_size)
  317. # if rfile_obj["ftxt"] == "":
  318. eflag ="nd"
  319. # rfile_obj["ftxt"] = st_tuple
  320. if rfile_obj["ftxt"] != st_tuple:
  321. # p(rfile_obj["ftxt"])
  322. rfile_obj["ftxt"] = st_tuple
  323. try:
  324. f = open(file_name,"r")
  325. ftxt = f.read()
  326. # scope_key = rfile_obj.get("scope")
  327. # p(scope_key,rfile_obj)
  328. f.close()
  329. scope_obj = self.get_scope(file_name,rfile_obj)
  330. if self.scope_opt == "locals":
  331. eflag ="locals"
  332. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  333. elif self.scope_opt == "globals":
  334. eflag ="globals"
  335. exec(ftxt,scope_obj["globals"])
  336. else:
  337. eflag ="[]"
  338. exec(ftxt)
  339. except Exception as e:
  340. p()
  341. print("file:",file_name)
  342. print("EXCEPT",eflag,e)
  343. if self.print_tb:
  344. traceback.print_tb(e.__traceback__,file=sys.stdout)
  345. else:
  346. pass
  347. finally:
  348. pass
  349. print(end="",flush=True)
  350. return ret
  351. def rld_files(self):
  352. ret = {
  353. "errs":{},
  354. "all":{},
  355. "alle":{},
  356. }
  357. for k in [*self.files]:
  358. self.rld_file(k,ret)
  359. p("",end="",flush=True)
  360. return ret
  361. def fn1(*a,**kw):
  362. glob_cl.rman.rld_files()
  363. glob_cl = globCL()
  364. glob_cl.rman.rld_files()
  365. def atexit_fn():
  366. print("atexit_fn!\n\n")
  367. rconn = glob_cl.rconn
  368. # rconn.flushall()
  369. rconn.flushdb()
  370. print("",end="",flush=True)
  371. atexit.register(atexit_fn)