選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

tasks_loop.py 10.0KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. from asgiref.sync import async_to_sync,sync_to_async
  11. '''
  12. umod
  13. # '''
  14. p=print
  15. async def anop(*a,**kw):pass
  16. def nop(*a,**kw):pass
  17. class Edict(dict):pass
  18. class Eobj():pass
  19. def connect_redis():
  20. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  21. def eclass_factory(n):
  22. ret = []
  23. for k in range(n):
  24. class nx:pass
  25. # _rcls_name
  26. ret.append(nx)
  27. return ret
  28. '''
  29. # '''
  30. def callable_helper(fn):
  31. ret = {
  32. "is_callable":0,
  33. "is_coroutine":0,
  34. }
  35. ret["is_callable"]= callable(fn)
  36. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  37. # if (ca)
  38. return ret
  39. verbose = print
  40. class DbgMixin:
  41. async def ws_rec(self,text_data,*a,**kw):
  42. # if "dbg" in
  43. text_data_json = json.loads(text_data)
  44. if not 'dbg' in text_data_json:
  45. return
  46. p("DBG!")
  47. exec(text_data_json["dbg"])
  48. class AconMixin(DbgMixin):
  49. # class AconMixin:
  50. async def connect(self):
  51. # print("-----------CONNECTING async def connect")
  52. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  53. p(flush=True)
  54. await self.accept()
  55. verbose("<MRO")
  56. verbose(type(self))
  57. await self.call_all_mro("ws_conn0")
  58. await self.call_all_mro("ws_conn")
  59. await self.call_all_mro("ws_conn2")
  60. verbose("/MRO>")
  61. # await self.ws_conn_once(event)
  62. await self.ws_conn_once()
  63. async def ws_conn_once(self, *a,**kw):pass
  64. async def ws_disconn_once(self, *a,**kw):pass
  65. async def ws_rec_once(self, *a,**kw):pass
  66. async def receive(self, *a,**kw):
  67. print("REC")
  68. print("",end="",flush=True)
  69. # print("-----------REC")
  70. await self.call_all_mro("ws_rec",*a,**kw)
  71. await self.ws_rec_once(*a,**kw)
  72. async def call_all_mro(self,mthd_name,*args,**kwargs):
  73. called = set([None])
  74. for cls_obj in self.__class__.__mro__:
  75. mthd = getattr(cls_obj,mthd_name,None)
  76. # print("~",cls_obj,mthd)
  77. if not mthd in called:
  78. # print(cls_obj,mthd)
  79. called.add(mthd)
  80. await mthd(self,*args,**kwargs)
  81. async def disconnect(self, close_code):
  82. await self.call_all_mro("ws_disconn",close_code)
  83. await self.ws_disconn_once(close_code)
  84. async def websocket_connect_x(self, event):
  85. await super().websocket_connect(event)
  86. print("<MRO")
  87. print(type(self))
  88. await self.call_all_mro("ws_conn",event)
  89. await self.call_all_mro("ws_conn2",event)
  90. print("/MRO>")
  91. await self.ws_conn_once(event)
  92. class AclTaskMixin:
  93. async def ws_disconn(self,*a,**kw):
  94. # for group in self.groups_set:
  95. pass
  96. async def ws_conn0(self):
  97. self.groups_set = set()
  98. if glob_cl.has_cl:
  99. return
  100. glob_cl.has_cl = True
  101. print("AclTaskMixin:ws_conn0")
  102. glob_cl.run_persistent_loop(self)
  103. glob_cl.acl = self.channel_layer
  104. # print("++++++++++++++++++++++++++")
  105. async def ws_conn_once(self,*a,**kw):
  106. print("\t\tws_conn_once:","AclTaskMixin")
  107. class DevMixin(*eclass_factory(10)):pass
  108. class DevMroMixin(*eclass_factory(10)):pass
  109. class WSStoMixin(*eclass_factory(10)):pass
  110. class GCLMixin(*eclass_factory(10)):pass
  111. class RldManMixin(*eclass_factory(10)):pass
  112. class LoneMixinA():pass
  113. class LoneMixinB():pass
  114. Mixins = Eobj()
  115. Mixins.DevMixin = DevMixin
  116. Mixins.DevMroMixin = DevMroMixin
  117. Mixins.WSStoMixin = WSStoMixin
  118. Mixins.GCLMixin = GCLMixin
  119. Mixins.RldManMixin = RldManMixin
  120. Mixins.LoneMixinA = LoneMixinA
  121. Mixins.LoneMixinB = LoneMixinB
  122. Mixins.DbgMixin = DbgMixin
  123. Mixins.AconMixin = AconMixin
  124. Mixins.AclTaskMixin = AclTaskMixin
  125. # GCLMixin
  126. class globCL(GCLMixin):
  127. def __init__(self,*a,**kw):
  128. # for now we are not going to do any weird class stuff
  129. self.acl = None
  130. self.has_cl = False
  131. self.rconn = connect_redis()
  132. self.rman = RldMan()
  133. self.cbs_once = []
  134. self.cbs_asap = []
  135. self.cbs_once_set = set()
  136. self.cbs = {"fn1":fn1,"fn2x":nop}
  137. self.timeout = 1
  138. self.fast_timeout = .1
  139. pass
  140. async def persistent_fast_cb_loop(self,*a,**kw):
  141. while 1:
  142. if len(self.cbs_asap):
  143. cbs_once = [*self.cbs_asap]
  144. self.cbs_asap=[]
  145. k=0
  146. for v in cbs_once:
  147. try:
  148. call_info = callable_helper(v)
  149. if (call_info["is_callable"]):
  150. if call_info["is_coroutine"]:
  151. pass
  152. await v({k,self})
  153. else:
  154. v({k,self})
  155. else:
  156. pass
  157. # p(k,v)
  158. pass
  159. except Exception as e:
  160. p("persistent_fast_cb_loop Exception cbs_once:",e)
  161. p(flush=True)
  162. # raise
  163. else:
  164. pass
  165. finally:
  166. pass
  167. k += 1
  168. await asyncio.sleep(self.fast_timeout)
  169. async def persistent_loop(self,*a,**kw):
  170. while 1:
  171. # p("PL:",time.time(),flush=1)
  172. # await asyncio.sleep(5)
  173. # for k,v in cls.cbs.items():
  174. for k,v in [*self.cbs.items()]:
  175. try:
  176. call_info = callable_helper(v)
  177. if (call_info["is_callable"]):
  178. if call_info["is_coroutine"]:
  179. pass
  180. await v({k,self})
  181. else:
  182. v({k,self})
  183. else:
  184. pass
  185. # p(k,v)
  186. pass
  187. except Exception as e:
  188. p("persistent_loop Exception",e)
  189. p(flush=True)
  190. # raise
  191. else:
  192. pass
  193. finally:
  194. pass
  195. # cbs_once
  196. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  197. self.cbs_once = []
  198. self.cbs_once_set = set()
  199. k=0
  200. for v in cbs_once:
  201. try:
  202. call_info = callable_helper(v)
  203. if (call_info["is_callable"]):
  204. if call_info["is_coroutine"]:
  205. pass
  206. await v({k,self})
  207. else:
  208. v({k,self})
  209. else:
  210. pass
  211. # p(k,v)
  212. pass
  213. except Exception as e:
  214. p("persistent_loop Exception cbs_once:",e)
  215. p(flush=True)
  216. # raise
  217. else:
  218. pass
  219. finally:
  220. pass
  221. k += 1
  222. await asyncio.sleep(self.timeout)
  223. def _run_persistent_loop(self,*a,**kw):
  224. p("_run_persistent_loop:")
  225. loop = asyncio.get_event_loop()
  226. self.run_persistent_loop = nop
  227. self.rman.persistent_loop_cb()
  228. loop.create_task(self.persistent_loop())
  229. loop.create_task(self.persistent_fast_cb_loop())
  230. self.loop = loop
  231. run_persistent_loop = _run_persistent_loop
  232. def sync_dev_group_discard(self,*a,**kw):
  233. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  234. async def dev_group_discard(self,group_name,consumer):
  235. channel_layer = glob_cl.acl
  236. if consumer.channel_name in consumer.groups:
  237. consumer.groups.remove(group_name)
  238. await channel_layer.group_discard(group_name,consumer.channel_name)
  239. def sync_dev_group_add(self,*a,**kw):
  240. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  241. async def dev_group_add(self,group_name,consumer):
  242. channel_layer = glob_cl.acl
  243. # if not consumer.channel_name in consumer.groups_set:
  244. # consumer.groups_set.add(groups)
  245. if not consumer.channel_name in consumer.groups:
  246. consumer.groups.append(group_name)
  247. await channel_layer.group_add(group_name,consumer.channel_name)
  248. # RldManMixin
  249. class RldMan(RldManMixin):
  250. def __init__(self,*a,**kw):
  251. self.files = {}
  252. z="ABC"
  253. self.scopes = {
  254. "dflt_scope":{"globals":globals(),"locals":locals()},
  255. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  256. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  257. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  258. # "dflt_scope":{"globals":globals(),"locals":{}}
  259. }
  260. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  261. # self.scope_opt = "locals"
  262. self.scope_opt = "globals"
  263. self.print_tb = 0
  264. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  265. file_list = [
  266. base_path+"i0.py",
  267. base_path+"i1.py",
  268. base_path+"i2.py",
  269. base_path+"i3.py",
  270. base_path+"i4.py",
  271. base_path+"i5.py",
  272. ]
  273. self.add_files(file_list)
  274. # self.add_files(file_list,{"run"})
  275. def persistent_loop_cb(self,*a,**kw):
  276. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  277. file_list = [
  278. base_path+"r0.py",
  279. base_path+"r1.py",
  280. base_path+"r2.py",
  281. base_path+"r3.py",
  282. base_path+"r4.py",
  283. base_path+"r5.py",
  284. ]
  285. self.add_files(file_list)
  286. def add_files(self,files):
  287. for file in files:
  288. if type(file)==str:
  289. self.add_file(file)
  290. elif type(file)==list:
  291. self.add_file(*file)
  292. else:
  293. p("add files???",file)
  294. def add_file(self,file_name,fnx={}):
  295. self.files[file_name] = {"ftxt":"",**fnx}
  296. def get_scope(self,rfile_obj,file_name):
  297. return self.scopes["current_scope"]
  298. return {
  299. "scope":self.scopes["current_scope"],
  300. }
  301. # rfile_obj.get("scope")
  302. # if
  303. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  304. if file_name in self.files:
  305. rfile_obj = self.files[file_name]
  306. st = os.stat(file_name)
  307. st_tuple = (st.st_mtime,st.st_size)
  308. # if rfile_obj["ftxt"] == "":
  309. eflag ="nd"
  310. # rfile_obj["ftxt"] = st_tuple
  311. if rfile_obj["ftxt"] != st_tuple:
  312. # p(rfile_obj["ftxt"])
  313. rfile_obj["ftxt"] = st_tuple
  314. try:
  315. f = open(file_name,"r")
  316. ftxt = f.read()
  317. # scope_key = rfile_obj.get("scope")
  318. # p(scope_key,rfile_obj)
  319. f.close()
  320. scope_obj = self.get_scope(file_name,rfile_obj)
  321. if self.scope_opt == "locals":
  322. eflag ="locals"
  323. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  324. elif self.scope_opt == "globals":
  325. eflag ="globals"
  326. exec(ftxt,scope_obj["globals"])
  327. else:
  328. eflag ="[]"
  329. exec(ftxt)
  330. except Exception as e:
  331. p()
  332. print("file:",file_name)
  333. print("EXCEPT",eflag,e)
  334. if self.print_tb:
  335. traceback.print_tb(e.__traceback__,file=sys.stdout)
  336. else:
  337. pass
  338. # print("ELSE")
  339. finally:
  340. # print("FINALLY")
  341. pass
  342. print(end="",flush=True)
  343. return ret
  344. def rld_files(self):
  345. # p()
  346. ret = {
  347. "errs":{},
  348. "all":{},
  349. "alle":{},
  350. }
  351. # for k in self.files:
  352. for k in [*self.files]:
  353. self.rld_file(k,ret)
  354. # p("rld_files!")
  355. p("",end="",flush=True)
  356. return ret
  357. def fn1(*a,**kw):
  358. # rld_files
  359. glob_cl.rman.rld_files()
  360. glob_cl = globCL()
  361. glob_cl.rman.rld_files()
  362. # print("..",end="\n",flush=True)
  363. r"""
  364. def atexit_fn():
  365. pass
  366. # rconn.flushall()
  367. print("ATEXIT FN")
  368. rinfo_len()
  369. keys = rconn.keys()
  370. print(keys)
  371. print(len(keys))
  372. print("t 000")
  373. # time.sleep(10)
  374. print("t 010")
  375. # time.sleep(20)
  376. print("t 020")
  377. print("ATEXIT FN...")
  378. rconn.flushall()
  379. atexit.register(atexit_fn)
  380. # """
  381. def atexit_fn():
  382. print("atexit_fn!\n\n")
  383. print("",end="",flush=True)
  384. atexit.register(atexit_fn)