Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

tasks_loop.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. from asgiref.sync import async_to_sync,sync_to_async
  11. '''
  12. umod
  13. # '''
  14. p=print
  15. async def anop(*a,**kw):pass
  16. def nop(*a,**kw):pass
  17. class Edict(dict):pass
  18. class Eobj():pass
  19. def connect_redis1():
  20. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  21. def connect_redis():
  22. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  23. # return redis.Redis(host='localhost', port=6379,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  24. r'''
  25. return {
  26. "rconn0":redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace"),
  27. "rconn1":redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace"),
  28. }
  29. # '''
  30. def eclass_factory(n):
  31. ret = []
  32. for k in range(n):
  33. class nx:pass
  34. # _rcls_name
  35. ret.append(nx)
  36. return ret
  37. '''
  38. # '''
  39. def callable_helper(fn):
  40. ret = {
  41. "is_callable":0,
  42. "is_coroutine":0,
  43. }
  44. ret["is_callable"]= callable(fn)
  45. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  46. # if (ca)
  47. return ret
  48. verbose = print
  49. verbose=nop
  50. class DbgMixin:
  51. async def ws_rec(self,text_data,*a,**kw):
  52. # if "dbg" in
  53. text_data_json = json.loads(text_data)
  54. if not 'dbg' in text_data_json:
  55. return
  56. p("DBG!")
  57. exec(text_data_json["dbg"])
  58. class AconMixin(DbgMixin):
  59. # class AconMixin:
  60. async def connect(self):
  61. # print("-----------CONNECTING async def connect")
  62. # p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  63. p(flush=True)
  64. await self.accept()
  65. verbose("<MRO")
  66. verbose(type(self))
  67. await self.call_all_mro("ws_conn0")
  68. await self.call_all_mro("ws_conn")
  69. await self.call_all_mro("ws_conn2")
  70. verbose("/MRO>")
  71. # await self.ws_conn_once(event)
  72. await self.ws_conn_once()
  73. async def ws_conn_once(self, *a,**kw):pass
  74. async def ws_disconn_once(self, *a,**kw):pass
  75. async def ws_rec_once(self, *a,**kw):pass
  76. async def receive(self, *a,**kw):
  77. # print("REC")
  78. # print("-----------REC")
  79. await self.call_all_mro("ws_rec",*a,**kw)
  80. await self.ws_rec_once(*a,**kw)
  81. print("",end="",flush=True)
  82. async def call_all_mro(self,mthd_name,*args,**kwargs):
  83. called = set([None])
  84. for cls_obj in self.__class__.__mro__:
  85. mthd = getattr(cls_obj,mthd_name,None)
  86. # print("~",cls_obj,mthd)
  87. if not mthd in called:
  88. # print(cls_obj,mthd)
  89. called.add(mthd)
  90. await mthd(self,*args,**kwargs)
  91. async def disconnect(self, close_code):
  92. await self.call_all_mro("ws_disconn",close_code)
  93. await self.ws_disconn_once(close_code)
  94. async def websocket_connect_x(self, event):
  95. await super().websocket_connect(event)
  96. print("<MRO")
  97. print(type(self))
  98. await self.call_all_mro("ws_conn",event)
  99. await self.call_all_mro("ws_conn2",event)
  100. print("/MRO>")
  101. await self.ws_conn_once(event)
  102. class AclTaskMixin:
  103. async def ws_disconn(self,*a,**kw):
  104. # for group in self.groups_set:
  105. pass
  106. async def ws_conn0(self):
  107. self.groups_set = set()
  108. if glob_cl.has_cl:
  109. return
  110. glob_cl.has_cl = True
  111. print("AclTaskMixin:ws_conn0")
  112. glob_cl.run_persistent_loop(self)
  113. glob_cl.acl = self.channel_layer
  114. # print("++++++++++++++++++++++++++")
  115. async def ws_conn_once(self,*a,**kw):
  116. print("\t\tws_conn_once:","AclTaskMixin")
  117. class DevMixin(*eclass_factory(10)):pass
  118. class DevMroMixin(*eclass_factory(10)):pass
  119. class WSStoMixin(*eclass_factory(10)):pass
  120. class GCLMixin(*eclass_factory(10)):pass
  121. class RldManMixin(*eclass_factory(10)):pass
  122. class LoneMixinA():pass
  123. class LoneMixinB():pass
  124. Mixins = Eobj()
  125. Mixins.DevMixin = DevMixin
  126. Mixins.DevMroMixin = DevMroMixin
  127. Mixins.WSStoMixin = WSStoMixin
  128. Mixins.GCLMixin = GCLMixin
  129. Mixins.RldManMixin = RldManMixin
  130. Mixins.LoneMixinA = LoneMixinA
  131. Mixins.LoneMixinB = LoneMixinB
  132. Mixins.DbgMixin = DbgMixin
  133. Mixins.AconMixin = AconMixin
  134. Mixins.AclTaskMixin = AclTaskMixin
  135. # GCLMixin
  136. class globCL(GCLMixin):
  137. def __init__(self,*a,**kw):
  138. # for now we are not going to do any weird class stuff
  139. self.acl = None
  140. self.has_cl = False
  141. # rconns = connect_redis()
  142. self.rconn1 = connect_redis1()
  143. self.rconn = connect_redis()
  144. self.rman = RldMan()
  145. self.cbs_once = []
  146. self.cbs_asap = []
  147. self.cbs_once_set = set()
  148. self.cbs = {"fn1":fn1,"fn2x":nop}
  149. self.timeout = 1
  150. self.fast_timeout = .1
  151. pass
  152. async def persistent_fast_cb_loop(self,*a,**kw):
  153. while 1:
  154. if len(self.cbs_asap):
  155. cbs_once = [*self.cbs_asap]
  156. self.cbs_asap=[]
  157. k=0
  158. for v in cbs_once:
  159. try:
  160. call_info = callable_helper(v)
  161. if (call_info["is_callable"]):
  162. if call_info["is_coroutine"]:
  163. pass
  164. await v({k,self})
  165. else:
  166. v({k,self})
  167. else:
  168. pass
  169. # p(k,v)
  170. pass
  171. except Exception as e:
  172. p("persistent_fast_cb_loop Exception cbs_once:",e)
  173. p(flush=True)
  174. # raise
  175. else:
  176. pass
  177. finally:
  178. pass
  179. k += 1
  180. await asyncio.sleep(self.fast_timeout)
  181. async def persistent_loop(self,*a,**kw):
  182. while 1:
  183. # p("PL:",time.time(),flush=1)
  184. # await asyncio.sleep(5)
  185. # for k,v in cls.cbs.items():
  186. for k,v in [*self.cbs.items()]:
  187. try:
  188. call_info = callable_helper(v)
  189. if (call_info["is_callable"]):
  190. if call_info["is_coroutine"]:
  191. pass
  192. await v({k,self})
  193. else:
  194. v({k,self})
  195. else:
  196. pass
  197. # p(k,v)
  198. pass
  199. except Exception as e:
  200. p("persistent_loop Exception",e)
  201. p(flush=True)
  202. # raise
  203. else:
  204. pass
  205. finally:
  206. pass
  207. # cbs_once
  208. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  209. self.cbs_once = []
  210. self.cbs_once_set = set()
  211. k=0
  212. for v in cbs_once:
  213. try:
  214. call_info = callable_helper(v)
  215. if (call_info["is_callable"]):
  216. if call_info["is_coroutine"]:
  217. pass
  218. await v({k,self})
  219. else:
  220. v({k,self})
  221. else:
  222. pass
  223. # p(k,v)
  224. pass
  225. except Exception as e:
  226. p("persistent_loop Exception cbs_once:",e)
  227. p(flush=True)
  228. # raise
  229. else:
  230. pass
  231. finally:
  232. pass
  233. k += 1
  234. await asyncio.sleep(self.timeout)
  235. def _run_persistent_loop(self,*a,**kw):
  236. p("_run_persistent_loop:")
  237. loop = asyncio.get_event_loop()
  238. self.run_persistent_loop = nop
  239. self.rman.persistent_loop_cb()
  240. loop.create_task(self.persistent_loop())
  241. loop.create_task(self.persistent_fast_cb_loop())
  242. self.loop = loop
  243. run_persistent_loop = _run_persistent_loop
  244. def sync_dev_group_discard(self,*a,**kw):
  245. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  246. async def dev_group_discard(self,group_name,consumer):
  247. channel_layer = glob_cl.acl
  248. if consumer.channel_name in consumer.groups:
  249. consumer.groups.remove(group_name)
  250. await channel_layer.group_discard(group_name,consumer.channel_name)
  251. def sync_dev_group_add(self,*a,**kw):
  252. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  253. async def dev_group_add(self,group_name,consumer):
  254. channel_layer = glob_cl.acl
  255. # if not consumer.channel_name in consumer.groups_set:
  256. # consumer.groups_set.add(groups)
  257. if not consumer.channel_name in consumer.groups:
  258. consumer.groups.append(group_name)
  259. await channel_layer.group_add(group_name,consumer.channel_name)
  260. # RldManMixin
  261. class RldMan(RldManMixin):
  262. def __init__(self,*a,**kw):
  263. self.files = {}
  264. z="ABC"
  265. self.scopes = {
  266. "dflt_scope":{"globals":globals(),"locals":locals()},
  267. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  268. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  269. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  270. # "dflt_scope":{"globals":globals(),"locals":{}}
  271. }
  272. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  273. # self.scope_opt = "locals"
  274. self.scope_opt = "globals"
  275. self.print_tb = 0
  276. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  277. file_list = [
  278. base_path+"i0.py",
  279. base_path+"i1.py",
  280. base_path+"i2.py",
  281. base_path+"i3.py",
  282. base_path+"i4.py",
  283. base_path+"i5.py",
  284. ]
  285. self.add_files(file_list)
  286. # self.add_files(file_list,{"run"})
  287. def persistent_loop_cb(self,*a,**kw):
  288. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  289. file_list = [
  290. base_path+"r0.py",
  291. base_path+"r1.py",
  292. base_path+"r2.py",
  293. base_path+"r3.py",
  294. base_path+"r4.py",
  295. base_path+"r5.py",
  296. ]
  297. self.add_files(file_list)
  298. def add_files(self,files):
  299. for file in files:
  300. if type(file)==str:
  301. self.add_file(file)
  302. elif type(file)==list:
  303. self.add_file(*file)
  304. else:
  305. p("add files???",file)
  306. def add_file(self,file_name,fnx={}):
  307. self.files[file_name] = {"ftxt":"",**fnx}
  308. def get_scope(self,rfile_obj,file_name):
  309. return self.scopes["current_scope"]
  310. return {
  311. "scope":self.scopes["current_scope"],
  312. }
  313. # rfile_obj.get("scope")
  314. # if
  315. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  316. if file_name in self.files:
  317. rfile_obj = self.files[file_name]
  318. st = os.stat(file_name)
  319. st_tuple = (st.st_mtime,st.st_size)
  320. # if rfile_obj["ftxt"] == "":
  321. eflag ="nd"
  322. # rfile_obj["ftxt"] = st_tuple
  323. if rfile_obj["ftxt"] != st_tuple:
  324. # p(rfile_obj["ftxt"])
  325. rfile_obj["ftxt"] = st_tuple
  326. try:
  327. f = open(file_name,"r")
  328. ftxt = f.read()
  329. # scope_key = rfile_obj.get("scope")
  330. # p(scope_key,rfile_obj)
  331. f.close()
  332. scope_obj = self.get_scope(file_name,rfile_obj)
  333. if self.scope_opt == "locals":
  334. eflag ="locals"
  335. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  336. elif self.scope_opt == "globals":
  337. eflag ="globals"
  338. exec(ftxt,scope_obj["globals"])
  339. else:
  340. eflag ="[]"
  341. exec(ftxt)
  342. except Exception as e:
  343. p()
  344. print("file:",file_name)
  345. print("EXCEPT",eflag,e)
  346. if self.print_tb:
  347. traceback.print_tb(e.__traceback__,file=sys.stdout)
  348. else:
  349. pass
  350. # print("ELSE")
  351. finally:
  352. # print("FINALLY")
  353. pass
  354. print(end="",flush=True)
  355. return ret
  356. def rld_files(self):
  357. # p()
  358. ret = {
  359. "errs":{},
  360. "all":{},
  361. "alle":{},
  362. }
  363. # for k in self.files:
  364. for k in [*self.files]:
  365. self.rld_file(k,ret)
  366. # p("rld_files!")
  367. p("",end="",flush=True)
  368. return ret
  369. def fn1(*a,**kw):
  370. # rld_files
  371. glob_cl.rman.rld_files()
  372. glob_cl = globCL()
  373. glob_cl.rman.rld_files()
  374. # print("..",end="\n",flush=True)
  375. r"""
  376. def atexit_fn():
  377. pass
  378. # rconn.flushall()
  379. print("ATEXIT FN")
  380. rinfo_len()
  381. keys = rconn.keys()
  382. print(keys)
  383. print(len(keys))
  384. print("t 000")
  385. # time.sleep(10)
  386. print("t 010")
  387. # time.sleep(20)
  388. print("t 020")
  389. print("ATEXIT FN...")
  390. rconn.flushall()
  391. atexit.register(atexit_fn)
  392. # """
  393. def atexit_fn():
  394. print("atexit_fn!\n\n")
  395. rconn = glob_cl.rconn
  396. # rconn.flushall()
  397. rconn.flushdb()
  398. print("",end="",flush=True)
  399. atexit.register(atexit_fn)