Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

tasks_loop.py 10KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import sys
  10. import atexit
  11. from asgiref.sync import async_to_sync,sync_to_async
  12. # import gmod
  13. import wsps.gmod as gmod
  14. hot_reload_build = False
  15. hot_reload_build = True
  16. print("QWERTYU")
  17. p=print
  18. async def anop(*a,**kw):pass
  19. def nop(*a,**kw):pass
  20. class Edict(dict):pass
  21. class Eobj():pass
  22. def connect_redis1():
  23. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  24. def connect_redis():
  25. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  26. def eclass_factory(n):
  27. ret = []
  28. for k in range(n):
  29. class nx:pass
  30. # _rcls_name
  31. ret.append(nx)
  32. return ret
  33. def callable_helper(fn):
  34. ret = {
  35. "is_callable":0,
  36. "is_coroutine":0,
  37. }
  38. ret["is_callable"]= callable(fn)
  39. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  40. # if (ca)
  41. return ret
  42. # verbose = print
  43. verbose=nop
  44. class DbgMixin:
  45. async def ws_rec(self,text_data,*a,**kw):
  46. # if "dbg" in
  47. text_data_json = json.loads(text_data)
  48. if not 'dbg' in text_data_json:
  49. return
  50. p("DBG!")
  51. exec(text_data_json["dbg"])
  52. class AconMixin(DbgMixin):
  53. # class AconMixin:
  54. async def connect(self):
  55. p(end="",flush=True)
  56. await self.accept()
  57. verbose("<MRO")
  58. verbose(type(self))
  59. await self.call_all_mro("ws_conn0")
  60. await self.call_all_mro("ws_conn")
  61. await self.call_all_mro("ws_conn2")
  62. verbose("/MRO>")
  63. await self.ws_conn_once()
  64. async def ws_conn_once(self, *a,**kw):pass
  65. async def ws_disconn_once(self, *a,**kw):pass
  66. async def ws_rec_once(self, *a,**kw):pass
  67. async def receive(self, *a,**kw):
  68. await self.call_all_mro("ws_rec",*a,**kw)
  69. await self.ws_rec_once(*a,**kw)
  70. print("",end="",flush=True)
  71. async def call_all_mro(self,mthd_name,*args,**kwargs):
  72. called = set([None])
  73. for cls_obj in self.__class__.__mro__:
  74. mthd = getattr(cls_obj,mthd_name,None)
  75. if not mthd in called:
  76. called.add(mthd)
  77. await mthd(self,*args,**kwargs)
  78. async def disconnect(self, close_code):
  79. await self.call_all_mro("ws_disconn",close_code)
  80. await self.ws_disconn_once(close_code)
  81. class AclTaskMixin:
  82. async def ws_disconn(self,*a,**kw):
  83. pass
  84. async def ws_conn0(self):
  85. self.groups_set = set()
  86. if glob_cl.has_cl:
  87. return
  88. glob_cl.has_cl = True
  89. glob_cl.run_persistent_loop(self)
  90. glob_cl.acl = self.channel_layer
  91. class DevMixin(*eclass_factory(10)):pass
  92. class DevMroMixin(*eclass_factory(10)):pass
  93. class WSStoMixin(*eclass_factory(10)):pass
  94. class GCLMixin(*eclass_factory(10)):pass
  95. class RldManMixin(*eclass_factory(10)):pass
  96. class LoneMixinA():pass
  97. class LoneMixinB():pass
  98. Mixins = Eobj()
  99. Mixins.DevMixin = DevMixin
  100. Mixins.DevMroMixin = DevMroMixin
  101. Mixins.WSStoMixin = WSStoMixin
  102. Mixins.GCLMixin = GCLMixin
  103. Mixins.RldManMixin = RldManMixin
  104. Mixins.LoneMixinA = LoneMixinA
  105. Mixins.LoneMixinB = LoneMixinB
  106. Mixins.DbgMixin = DbgMixin
  107. Mixins.AconMixin = AconMixin
  108. Mixins.AclTaskMixin = AclTaskMixin
  109. class globCL(GCLMixin):
  110. def __init__(self,*a,**kw):
  111. # for now we are not going to do any weird class stuff
  112. self.acl = None
  113. self.has_cl = False
  114. self.rconn1 = connect_redis1()
  115. self.rconn = connect_redis()
  116. self.rman = RldMan()
  117. self.cbs_once = []
  118. self.cbs_asap2 = []
  119. self.cbs_asap = []
  120. self.cbs_once_set = set()
  121. if (hot_reload_build):
  122. self.cbs = {"fn1":fn1,"fn2x":nop}
  123. else:
  124. self.cbs = {}
  125. self.timeout = 1
  126. self.fast_timeout = .1
  127. pass
  128. async def persistent_fast_cb_loop_min_timeout(self,*a,**kw):
  129. while 1:
  130. if len(self.cbs_asap2):
  131. cbs_once = [*self.cbs_asap2]
  132. self.cbs_asap2=[]
  133. k=0
  134. for v in cbs_once:
  135. try:
  136. call_info = callable_helper(v)
  137. if (call_info["is_callable"]):
  138. if call_info["is_coroutine"]:
  139. pass
  140. await v({k,self})
  141. else:
  142. v({k,self})
  143. else:
  144. pass
  145. except Exception as e:
  146. p("persistent_fast_cb_loop Exception cbs_once:",e)
  147. p(flush=True)
  148. else:
  149. pass
  150. finally:
  151. pass
  152. k += 1
  153. if len(self.cbs_asap):
  154. self.cbs_asap2 = self.cbs_asap
  155. self.cbs_asap=[]
  156. await asyncio.sleep(self.fast_timeout)
  157. async def persistent_fast_cb_loop(self,*a,**kw):
  158. while 1:
  159. if len(self.cbs_asap):
  160. cbs_once = [*self.cbs_asap]
  161. self.cbs_asap=[]
  162. k=0
  163. for v in cbs_once:
  164. try:
  165. call_info = callable_helper(v)
  166. if (call_info["is_callable"]):
  167. if call_info["is_coroutine"]:
  168. pass
  169. await v({k,self})
  170. else:
  171. v({k,self})
  172. else:
  173. pass
  174. except Exception as e:
  175. p("persistent_fast_cb_loop Exception cbs_once:",e)
  176. p(flush=True)
  177. else:
  178. pass
  179. finally:
  180. pass
  181. k += 1
  182. await asyncio.sleep(self.fast_timeout)
  183. async def persistent_loop(self,*a,**kw):
  184. while 1:
  185. for k,v in [*self.cbs.items()]:
  186. try:
  187. call_info = callable_helper(v)
  188. if (call_info["is_callable"]):
  189. if call_info["is_coroutine"]:
  190. await v({k,self})
  191. else:
  192. v({k,self})
  193. else:
  194. pass
  195. pass
  196. except Exception as e:
  197. p("persistent_loop Exception",e)
  198. p(flush=True)
  199. else:
  200. pass
  201. finally:
  202. pass
  203. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  204. self.cbs_once = []
  205. self.cbs_once_set = set()
  206. k=0
  207. for v in cbs_once:
  208. try:
  209. call_info = callable_helper(v)
  210. if (call_info["is_callable"]):
  211. if call_info["is_coroutine"]:
  212. pass
  213. await v({k,self})
  214. else:
  215. v({k,self})
  216. else:
  217. pass
  218. except Exception as e:
  219. p("persistent_loop Exception cbs_once:",e)
  220. p(flush=True)
  221. else:
  222. pass
  223. finally:
  224. pass
  225. k += 1
  226. await asyncio.sleep(self.timeout)
  227. def _run_persistent_loop(self,*a,**kw):
  228. p("_run_persistent_loop:")
  229. loop = asyncio.get_event_loop()
  230. self.run_persistent_loop = nop
  231. self.rman.persistent_loop_cb()
  232. loop.create_task(self.persistent_loop())
  233. # loop.create_task(self.persistent_fast_cb_loop())
  234. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  235. self.loop = loop
  236. run_persistent_loop = _run_persistent_loop
  237. def sync_dev_group_discard(self,*a,**kw):
  238. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  239. async def dev_group_discard(self,group_name,consumer):
  240. channel_layer = glob_cl.acl
  241. if consumer.channel_name in consumer.groups:
  242. consumer.groups.remove(group_name)
  243. await channel_layer.group_discard(group_name,consumer.channel_name)
  244. def sync_dev_group_add(self,*a,**kw):
  245. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  246. async def dev_group_add(self,group_name,consumer):
  247. channel_layer = glob_cl.acl
  248. # if not consumer.channel_name in consumer.groups_set:
  249. # consumer.groups_set.add(groups)
  250. if not consumer.channel_name in consumer.groups:
  251. consumer.groups.append(group_name)
  252. await channel_layer.group_add(group_name,consumer.channel_name)
  253. # RldManMixin
  254. class RldMan(RldManMixin):
  255. def __init__(self,*a,**kw):
  256. self.files = {}
  257. z="ABC"
  258. self.scopes = {
  259. "dflt_scope":{"globals":globals(),"locals":locals()},
  260. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  261. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  262. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  263. # "dflt_scope":{"globals":globals(),"locals":{}}
  264. }
  265. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  266. # self.scope_opt = ""locals
  267. self.scope_opt = "globals"
  268. self.print_tb = 0
  269. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  270. file_list = [
  271. base_path+"i0.py",
  272. base_path+"i1.py",
  273. base_path+"i2.py",
  274. base_path+"i3.py",
  275. base_path+"i4.py",
  276. base_path+"i5.py",
  277. base_path+"v0.py",
  278. base_path+"v1.py",
  279. base_path+"v2.py",
  280. base_path+"v3.py",
  281. base_path+"v4.py",
  282. base_path+"v5.py",
  283. ]
  284. self.add_files(file_list)
  285. # self.add_files(file_list,{"run"})
  286. def persistent_loop_cb(self,*a,**kw):
  287. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  288. file_list = [
  289. base_path+"r0.py",
  290. base_path+"r1.py",
  291. base_path+"r2.py",
  292. base_path+"r3.py",
  293. base_path+"r4.py",
  294. base_path+"r5.py",
  295. base_path+"vr0.py",
  296. base_path+"vr1.py",
  297. ]
  298. self.add_files(file_list)
  299. def add_files(self,files):
  300. for file in files:
  301. if type(file)==str:
  302. self.add_file(file)
  303. elif type(file)==list:
  304. self.add_file(*file)
  305. else:
  306. p("add files???",file)
  307. def add_file(self,file_name,fnx={}):
  308. self.files[file_name] = {"ftxt":"",**fnx}
  309. def get_scope(self,rfile_obj,file_name):
  310. return self.scopes["current_scope"]
  311. return {
  312. "scope":self.scopes["current_scope"],
  313. }
  314. # rfile_obj.get("scope")
  315. # if
  316. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  317. if file_name in self.files:
  318. rfile_obj = self.files[file_name]
  319. st = os.stat(file_name)
  320. st_tuple = (st.st_mtime,st.st_size)
  321. # if rfile_obj["ftxt"] == "":
  322. eflag ="nd"
  323. # rfile_obj["ftxt"] = st_tuple
  324. if rfile_obj["ftxt"] != st_tuple:
  325. # p(rfile_obj["ftxt"])
  326. rfile_obj["ftxt"] = st_tuple
  327. try:
  328. f = open(file_name,"r")
  329. ftxt = f.read()
  330. # scope_key = rfile_obj.get("scope")
  331. # p(scope_key,rfile_obj)
  332. f.close()
  333. scope_obj = self.get_scope(file_name,rfile_obj)
  334. if self.scope_opt == "locals":
  335. eflag ="locals"
  336. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  337. elif self.scope_opt == "globals":
  338. eflag ="globals"
  339. exec(ftxt,scope_obj["globals"])
  340. else:
  341. eflag ="[]"
  342. exec(ftxt)
  343. except Exception as e:
  344. p()
  345. print("file:",file_name)
  346. print("EXCEPT",eflag,e)
  347. if self.print_tb:
  348. traceback.print_tb(e.__traceback__,file=sys.stdout)
  349. else:
  350. pass
  351. finally:
  352. pass
  353. print(end="",flush=True)
  354. return ret
  355. def rld_files(self):
  356. ret = {
  357. "errs":{},
  358. "all":{},
  359. "alle":{},
  360. }
  361. for k in [*self.files]:
  362. self.rld_file(k,ret)
  363. p("",end="",flush=True)
  364. return ret
  365. def fn1(*a,**kw):
  366. glob_cl.rman.rld_files()
  367. glob_cl = globCL()
  368. # we are not flushing databases if
  369. def atexit_fn():
  370. print("atexit_fn!\n\n")
  371. rconn = glob_cl.rconn
  372. # rconn.flushall()
  373. rconn.flushdb()
  374. print("",end="",flush=True)
  375. if hot_reload_build:
  376. glob_cl.rman.rld_files()
  377. atexit.register(atexit_fn)
  378. else:
  379. import wsps.hot_reload_temp_staging
  380. wsps.hot_reload_temp_staging.init_class_edits(Mixins)
  381. glob_cl.dev_room_dashboard = "jfi_dash"
  382. # if not hot_reload_build:
  383. # glob_cl.rman.rld_files()
  384. # print("############################ ##############################")