您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

tasks_loop.py 10KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. from asgiref.sync import async_to_sync,sync_to_async
  11. '''
  12. umod
  13. # '''
  14. p=print
  15. async def anop(*a,**kw):pass
  16. def nop(*a,**kw):pass
  17. class Edict(dict):pass
  18. class Eobj():pass
  19. def connect_redis():
  20. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace"),
  21. r'''
  22. return {
  23. "rconn0":redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace"),
  24. "rconn1":redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace"),
  25. }
  26. # '''
  27. def eclass_factory(n):
  28. ret = []
  29. for k in range(n):
  30. class nx:pass
  31. # _rcls_name
  32. ret.append(nx)
  33. return ret
  34. '''
  35. # '''
  36. def callable_helper(fn):
  37. ret = {
  38. "is_callable":0,
  39. "is_coroutine":0,
  40. }
  41. ret["is_callable"]= callable(fn)
  42. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  43. # if (ca)
  44. return ret
  45. verbose = print
  46. class DbgMixin:
  47. async def ws_rec(self,text_data,*a,**kw):
  48. # if "dbg" in
  49. text_data_json = json.loads(text_data)
  50. if not 'dbg' in text_data_json:
  51. return
  52. p("DBG!")
  53. exec(text_data_json["dbg"])
  54. class AconMixin(DbgMixin):
  55. # class AconMixin:
  56. async def connect(self):
  57. # print("-----------CONNECTING async def connect")
  58. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  59. p(flush=True)
  60. await self.accept()
  61. verbose("<MRO")
  62. verbose(type(self))
  63. await self.call_all_mro("ws_conn0")
  64. await self.call_all_mro("ws_conn")
  65. await self.call_all_mro("ws_conn2")
  66. verbose("/MRO>")
  67. # await self.ws_conn_once(event)
  68. await self.ws_conn_once()
  69. async def ws_conn_once(self, *a,**kw):pass
  70. async def ws_disconn_once(self, *a,**kw):pass
  71. async def ws_rec_once(self, *a,**kw):pass
  72. async def receive(self, *a,**kw):
  73. print("REC")
  74. print("",end="",flush=True)
  75. # print("-----------REC")
  76. await self.call_all_mro("ws_rec",*a,**kw)
  77. await self.ws_rec_once(*a,**kw)
  78. async def call_all_mro(self,mthd_name,*args,**kwargs):
  79. called = set([None])
  80. for cls_obj in self.__class__.__mro__:
  81. mthd = getattr(cls_obj,mthd_name,None)
  82. # print("~",cls_obj,mthd)
  83. if not mthd in called:
  84. # print(cls_obj,mthd)
  85. called.add(mthd)
  86. await mthd(self,*args,**kwargs)
  87. async def disconnect(self, close_code):
  88. await self.call_all_mro("ws_disconn",close_code)
  89. await self.ws_disconn_once(close_code)
  90. async def websocket_connect_x(self, event):
  91. await super().websocket_connect(event)
  92. print("<MRO")
  93. print(type(self))
  94. await self.call_all_mro("ws_conn",event)
  95. await self.call_all_mro("ws_conn2",event)
  96. print("/MRO>")
  97. await self.ws_conn_once(event)
  98. class AclTaskMixin:
  99. async def ws_disconn(self,*a,**kw):
  100. # for group in self.groups_set:
  101. pass
  102. async def ws_conn0(self):
  103. self.groups_set = set()
  104. if glob_cl.has_cl:
  105. return
  106. glob_cl.has_cl = True
  107. print("AclTaskMixin:ws_conn0")
  108. glob_cl.run_persistent_loop(self)
  109. glob_cl.acl = self.channel_layer
  110. # print("++++++++++++++++++++++++++")
  111. async def ws_conn_once(self,*a,**kw):
  112. print("\t\tws_conn_once:","AclTaskMixin")
  113. class DevMixin(*eclass_factory(10)):pass
  114. class DevMroMixin(*eclass_factory(10)):pass
  115. class WSStoMixin(*eclass_factory(10)):pass
  116. class GCLMixin(*eclass_factory(10)):pass
  117. class RldManMixin(*eclass_factory(10)):pass
  118. class LoneMixinA():pass
  119. class LoneMixinB():pass
  120. Mixins = Eobj()
  121. Mixins.DevMixin = DevMixin
  122. Mixins.DevMroMixin = DevMroMixin
  123. Mixins.WSStoMixin = WSStoMixin
  124. Mixins.GCLMixin = GCLMixin
  125. Mixins.RldManMixin = RldManMixin
  126. Mixins.LoneMixinA = LoneMixinA
  127. Mixins.LoneMixinB = LoneMixinB
  128. Mixins.DbgMixin = DbgMixin
  129. Mixins.AconMixin = AconMixin
  130. Mixins.AclTaskMixin = AclTaskMixin
  131. # GCLMixin
  132. class globCL(GCLMixin):
  133. def __init__(self,*a,**kw):
  134. # for now we are not going to do any weird class stuff
  135. self.acl = None
  136. self.has_cl = False
  137. # rconns = connect_redis()
  138. self.rconn = connect_redis()
  139. self.rman = RldMan()
  140. self.cbs_once = []
  141. self.cbs_asap = []
  142. self.cbs_once_set = set()
  143. self.cbs = {"fn1":fn1,"fn2x":nop}
  144. self.timeout = 1
  145. self.fast_timeout = .1
  146. pass
  147. async def persistent_fast_cb_loop(self,*a,**kw):
  148. while 1:
  149. if len(self.cbs_asap):
  150. cbs_once = [*self.cbs_asap]
  151. self.cbs_asap=[]
  152. k=0
  153. for v in cbs_once:
  154. try:
  155. call_info = callable_helper(v)
  156. if (call_info["is_callable"]):
  157. if call_info["is_coroutine"]:
  158. pass
  159. await v({k,self})
  160. else:
  161. v({k,self})
  162. else:
  163. pass
  164. # p(k,v)
  165. pass
  166. except Exception as e:
  167. p("persistent_fast_cb_loop Exception cbs_once:",e)
  168. p(flush=True)
  169. # raise
  170. else:
  171. pass
  172. finally:
  173. pass
  174. k += 1
  175. await asyncio.sleep(self.fast_timeout)
  176. async def persistent_loop(self,*a,**kw):
  177. while 1:
  178. # p("PL:",time.time(),flush=1)
  179. # await asyncio.sleep(5)
  180. # for k,v in cls.cbs.items():
  181. for k,v in [*self.cbs.items()]:
  182. try:
  183. call_info = callable_helper(v)
  184. if (call_info["is_callable"]):
  185. if call_info["is_coroutine"]:
  186. pass
  187. await v({k,self})
  188. else:
  189. v({k,self})
  190. else:
  191. pass
  192. # p(k,v)
  193. pass
  194. except Exception as e:
  195. p("persistent_loop Exception",e)
  196. p(flush=True)
  197. # raise
  198. else:
  199. pass
  200. finally:
  201. pass
  202. # cbs_once
  203. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  204. self.cbs_once = []
  205. self.cbs_once_set = set()
  206. k=0
  207. for v in cbs_once:
  208. try:
  209. call_info = callable_helper(v)
  210. if (call_info["is_callable"]):
  211. if call_info["is_coroutine"]:
  212. pass
  213. await v({k,self})
  214. else:
  215. v({k,self})
  216. else:
  217. pass
  218. # p(k,v)
  219. pass
  220. except Exception as e:
  221. p("persistent_loop Exception cbs_once:",e)
  222. p(flush=True)
  223. # raise
  224. else:
  225. pass
  226. finally:
  227. pass
  228. k += 1
  229. await asyncio.sleep(self.timeout)
  230. def _run_persistent_loop(self,*a,**kw):
  231. p("_run_persistent_loop:")
  232. loop = asyncio.get_event_loop()
  233. self.run_persistent_loop = nop
  234. self.rman.persistent_loop_cb()
  235. loop.create_task(self.persistent_loop())
  236. loop.create_task(self.persistent_fast_cb_loop())
  237. self.loop = loop
  238. run_persistent_loop = _run_persistent_loop
  239. def sync_dev_group_discard(self,*a,**kw):
  240. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  241. async def dev_group_discard(self,group_name,consumer):
  242. channel_layer = glob_cl.acl
  243. if consumer.channel_name in consumer.groups:
  244. consumer.groups.remove(group_name)
  245. await channel_layer.group_discard(group_name,consumer.channel_name)
  246. def sync_dev_group_add(self,*a,**kw):
  247. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  248. async def dev_group_add(self,group_name,consumer):
  249. channel_layer = glob_cl.acl
  250. # if not consumer.channel_name in consumer.groups_set:
  251. # consumer.groups_set.add(groups)
  252. if not consumer.channel_name in consumer.groups:
  253. consumer.groups.append(group_name)
  254. await channel_layer.group_add(group_name,consumer.channel_name)
  255. # RldManMixin
  256. class RldMan(RldManMixin):
  257. def __init__(self,*a,**kw):
  258. self.files = {}
  259. z="ABC"
  260. self.scopes = {
  261. "dflt_scope":{"globals":globals(),"locals":locals()},
  262. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  263. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  264. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  265. # "dflt_scope":{"globals":globals(),"locals":{}}
  266. }
  267. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  268. # self.scope_opt = "locals"
  269. self.scope_opt = "globals"
  270. self.print_tb = 0
  271. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  272. file_list = [
  273. base_path+"i0.py",
  274. base_path+"i1.py",
  275. base_path+"i2.py",
  276. base_path+"i3.py",
  277. base_path+"i4.py",
  278. base_path+"i5.py",
  279. ]
  280. self.add_files(file_list)
  281. # self.add_files(file_list,{"run"})
  282. def persistent_loop_cb(self,*a,**kw):
  283. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  284. file_list = [
  285. base_path+"r0.py",
  286. base_path+"r1.py",
  287. base_path+"r2.py",
  288. base_path+"r3.py",
  289. base_path+"r4.py",
  290. base_path+"r5.py",
  291. ]
  292. self.add_files(file_list)
  293. def add_files(self,files):
  294. for file in files:
  295. if type(file)==str:
  296. self.add_file(file)
  297. elif type(file)==list:
  298. self.add_file(*file)
  299. else:
  300. p("add files???",file)
  301. def add_file(self,file_name,fnx={}):
  302. self.files[file_name] = {"ftxt":"",**fnx}
  303. def get_scope(self,rfile_obj,file_name):
  304. return self.scopes["current_scope"]
  305. return {
  306. "scope":self.scopes["current_scope"],
  307. }
  308. # rfile_obj.get("scope")
  309. # if
  310. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  311. if file_name in self.files:
  312. rfile_obj = self.files[file_name]
  313. st = os.stat(file_name)
  314. st_tuple = (st.st_mtime,st.st_size)
  315. # if rfile_obj["ftxt"] == "":
  316. eflag ="nd"
  317. # rfile_obj["ftxt"] = st_tuple
  318. if rfile_obj["ftxt"] != st_tuple:
  319. # p(rfile_obj["ftxt"])
  320. rfile_obj["ftxt"] = st_tuple
  321. try:
  322. f = open(file_name,"r")
  323. ftxt = f.read()
  324. # scope_key = rfile_obj.get("scope")
  325. # p(scope_key,rfile_obj)
  326. f.close()
  327. scope_obj = self.get_scope(file_name,rfile_obj)
  328. if self.scope_opt == "locals":
  329. eflag ="locals"
  330. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  331. elif self.scope_opt == "globals":
  332. eflag ="globals"
  333. exec(ftxt,scope_obj["globals"])
  334. else:
  335. eflag ="[]"
  336. exec(ftxt)
  337. except Exception as e:
  338. p()
  339. print("file:",file_name)
  340. print("EXCEPT",eflag,e)
  341. if self.print_tb:
  342. traceback.print_tb(e.__traceback__,file=sys.stdout)
  343. else:
  344. pass
  345. # print("ELSE")
  346. finally:
  347. # print("FINALLY")
  348. pass
  349. print(end="",flush=True)
  350. return ret
  351. def rld_files(self):
  352. # p()
  353. ret = {
  354. "errs":{},
  355. "all":{},
  356. "alle":{},
  357. }
  358. # for k in self.files:
  359. for k in [*self.files]:
  360. self.rld_file(k,ret)
  361. # p("rld_files!")
  362. p("",end="",flush=True)
  363. return ret
  364. def fn1(*a,**kw):
  365. # rld_files
  366. glob_cl.rman.rld_files()
  367. glob_cl = globCL()
  368. glob_cl.rman.rld_files()
  369. # print("..",end="\n",flush=True)
  370. r"""
  371. def atexit_fn():
  372. pass
  373. # rconn.flushall()
  374. print("ATEXIT FN")
  375. rinfo_len()
  376. keys = rconn.keys()
  377. print(keys)
  378. print(len(keys))
  379. print("t 000")
  380. # time.sleep(10)
  381. print("t 010")
  382. # time.sleep(20)
  383. print("t 020")
  384. print("ATEXIT FN...")
  385. rconn.flushall()
  386. atexit.register(atexit_fn)
  387. # """
  388. def atexit_fn():
  389. print("atexit_fn!\n\n")
  390. rconn = glob_cl.rconn
  391. rconn.flushall()
  392. print("",end="",flush=True)
  393. atexit.register(atexit_fn)