Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

tasks_loop.py 9.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import sys
  10. import atexit
  11. from asgiref.sync import async_to_sync,sync_to_async
  12. p=print
  13. async def anop(*a,**kw):pass
  14. def nop(*a,**kw):pass
  15. class Edict(dict):pass
  16. class Eobj():pass
  17. def connect_redis1():
  18. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  19. def connect_redis():
  20. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  21. def eclass_factory(n):
  22. ret = []
  23. for k in range(n):
  24. class nx:pass
  25. # _rcls_name
  26. ret.append(nx)
  27. return ret
  28. def callable_helper(fn):
  29. ret = {
  30. "is_callable":0,
  31. "is_coroutine":0,
  32. }
  33. ret["is_callable"]= callable(fn)
  34. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  35. # if (ca)
  36. return ret
  37. # verbose = print
  38. verbose=nop
  39. class DbgMixin:
  40. async def ws_rec(self,text_data,*a,**kw):
  41. # if "dbg" in
  42. text_data_json = json.loads(text_data)
  43. if not 'dbg' in text_data_json:
  44. return
  45. p("DBG!")
  46. exec(text_data_json["dbg"])
  47. class AconMixin(DbgMixin):
  48. # class AconMixin:
  49. async def connect(self):
  50. p(end="",flush=True)
  51. await self.accept()
  52. verbose("<MRO")
  53. verbose(type(self))
  54. await self.call_all_mro("ws_conn0")
  55. await self.call_all_mro("ws_conn")
  56. await self.call_all_mro("ws_conn2")
  57. verbose("/MRO>")
  58. await self.ws_conn_once()
  59. async def ws_conn_once(self, *a,**kw):pass
  60. async def ws_disconn_once(self, *a,**kw):pass
  61. async def ws_rec_once(self, *a,**kw):pass
  62. async def receive(self, *a,**kw):
  63. await self.call_all_mro("ws_rec",*a,**kw)
  64. await self.ws_rec_once(*a,**kw)
  65. print("",end="",flush=True)
  66. async def call_all_mro(self,mthd_name,*args,**kwargs):
  67. called = set([None])
  68. for cls_obj in self.__class__.__mro__:
  69. mthd = getattr(cls_obj,mthd_name,None)
  70. if not mthd in called:
  71. called.add(mthd)
  72. await mthd(self,*args,**kwargs)
  73. async def disconnect(self, close_code):
  74. await self.call_all_mro("ws_disconn",close_code)
  75. await self.ws_disconn_once(close_code)
  76. class AclTaskMixin:
  77. async def ws_disconn(self,*a,**kw):
  78. pass
  79. async def ws_conn0(self):
  80. self.groups_set = set()
  81. if glob_cl.has_cl:
  82. return
  83. glob_cl.has_cl = True
  84. glob_cl.run_persistent_loop(self)
  85. glob_cl.acl = self.channel_layer
  86. class DevMixin(*eclass_factory(10)):pass
  87. class DevMroMixin(*eclass_factory(10)):pass
  88. class WSStoMixin(*eclass_factory(10)):pass
  89. class GCLMixin(*eclass_factory(10)):pass
  90. class RldManMixin(*eclass_factory(10)):pass
  91. class LoneMixinA():pass
  92. class LoneMixinB():pass
  93. Mixins = Eobj()
  94. Mixins.DevMixin = DevMixin
  95. Mixins.DevMroMixin = DevMroMixin
  96. Mixins.WSStoMixin = WSStoMixin
  97. Mixins.GCLMixin = GCLMixin
  98. Mixins.RldManMixin = RldManMixin
  99. Mixins.LoneMixinA = LoneMixinA
  100. Mixins.LoneMixinB = LoneMixinB
  101. Mixins.DbgMixin = DbgMixin
  102. Mixins.AconMixin = AconMixin
  103. Mixins.AclTaskMixin = AclTaskMixin
  104. class globCL(GCLMixin):
  105. def __init__(self,*a,**kw):
  106. # for now we are not going to do any weird class stuff
  107. self.acl = None
  108. self.has_cl = False
  109. self.rconn1 = connect_redis1()
  110. self.rconn = connect_redis()
  111. self.rman = RldMan()
  112. self.cbs_once = []
  113. self.cbs_asap2 = []
  114. self.cbs_asap = []
  115. self.cbs_once_set = set()
  116. self.cbs = {"fn1":fn1,"fn2x":nop}
  117. self.timeout = 1
  118. self.fast_timeout = .1
  119. pass
  120. async def persistent_fast_cb_loop_min_timeout(self,*a,**kw):
  121. while 1:
  122. if len(self.cbs_asap2):
  123. cbs_once = [*self.cbs_asap2]
  124. self.cbs_asap2=[]
  125. k=0
  126. for v in cbs_once:
  127. try:
  128. call_info = callable_helper(v)
  129. if (call_info["is_callable"]):
  130. if call_info["is_coroutine"]:
  131. pass
  132. await v({k,self})
  133. else:
  134. v({k,self})
  135. else:
  136. pass
  137. except Exception as e:
  138. p("persistent_fast_cb_loop Exception cbs_once:",e)
  139. p(flush=True)
  140. else:
  141. pass
  142. finally:
  143. pass
  144. k += 1
  145. if len(self.cbs_asap):
  146. self.cbs_asap2 = self.cbs_asap
  147. self.cbs_asap=[]
  148. await asyncio.sleep(self.fast_timeout)
  149. async def persistent_fast_cb_loop(self,*a,**kw):
  150. while 1:
  151. if len(self.cbs_asap):
  152. cbs_once = [*self.cbs_asap]
  153. self.cbs_asap=[]
  154. k=0
  155. for v in cbs_once:
  156. try:
  157. call_info = callable_helper(v)
  158. if (call_info["is_callable"]):
  159. if call_info["is_coroutine"]:
  160. pass
  161. await v({k,self})
  162. else:
  163. v({k,self})
  164. else:
  165. pass
  166. except Exception as e:
  167. p("persistent_fast_cb_loop Exception cbs_once:",e)
  168. p(flush=True)
  169. else:
  170. pass
  171. finally:
  172. pass
  173. k += 1
  174. await asyncio.sleep(self.fast_timeout)
  175. async def persistent_loop(self,*a,**kw):
  176. while 1:
  177. for k,v in [*self.cbs.items()]:
  178. try:
  179. call_info = callable_helper(v)
  180. if (call_info["is_callable"]):
  181. if call_info["is_coroutine"]:
  182. await v({k,self})
  183. else:
  184. v({k,self})
  185. else:
  186. pass
  187. pass
  188. except Exception as e:
  189. p("persistent_loop Exception",e)
  190. p(flush=True)
  191. else:
  192. pass
  193. finally:
  194. pass
  195. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  196. self.cbs_once = []
  197. self.cbs_once_set = set()
  198. k=0
  199. for v in cbs_once:
  200. try:
  201. call_info = callable_helper(v)
  202. if (call_info["is_callable"]):
  203. if call_info["is_coroutine"]:
  204. pass
  205. await v({k,self})
  206. else:
  207. v({k,self})
  208. else:
  209. pass
  210. except Exception as e:
  211. p("persistent_loop Exception cbs_once:",e)
  212. p(flush=True)
  213. else:
  214. pass
  215. finally:
  216. pass
  217. k += 1
  218. await asyncio.sleep(self.timeout)
  219. def _run_persistent_loop(self,*a,**kw):
  220. p("_run_persistent_loop:")
  221. loop = asyncio.get_event_loop()
  222. self.run_persistent_loop = nop
  223. self.rman.persistent_loop_cb()
  224. loop.create_task(self.persistent_loop())
  225. # loop.create_task(self.persistent_fast_cb_loop())
  226. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  227. self.loop = loop
  228. run_persistent_loop = _run_persistent_loop
  229. def sync_dev_group_discard(self,*a,**kw):
  230. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  231. async def dev_group_discard(self,group_name,consumer):
  232. channel_layer = glob_cl.acl
  233. if consumer.channel_name in consumer.groups:
  234. consumer.groups.remove(group_name)
  235. await channel_layer.group_discard(group_name,consumer.channel_name)
  236. def sync_dev_group_add(self,*a,**kw):
  237. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  238. async def dev_group_add(self,group_name,consumer):
  239. channel_layer = glob_cl.acl
  240. # if not consumer.channel_name in consumer.groups_set:
  241. # consumer.groups_set.add(groups)
  242. if not consumer.channel_name in consumer.groups:
  243. consumer.groups.append(group_name)
  244. await channel_layer.group_add(group_name,consumer.channel_name)
  245. # RldManMixin
  246. class RldMan(RldManMixin):
  247. def __init__(self,*a,**kw):
  248. self.files = {}
  249. z="ABC"
  250. self.scopes = {
  251. "dflt_scope":{"globals":globals(),"locals":locals()},
  252. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  253. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  254. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  255. # "dflt_scope":{"globals":globals(),"locals":{}}
  256. }
  257. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  258. # self.scope_opt = ""locals
  259. self.scope_opt = "globals"
  260. self.print_tb = 0
  261. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  262. file_list = [
  263. base_path+"i0.py",
  264. base_path+"i1.py",
  265. base_path+"i2.py",
  266. base_path+"i3.py",
  267. base_path+"i4.py",
  268. base_path+"i5.py",
  269. ]
  270. self.add_files(file_list)
  271. # self.add_files(file_list,{"run"})
  272. def persistent_loop_cb(self,*a,**kw):
  273. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  274. file_list = [
  275. base_path+"r0.py",
  276. base_path+"r1.py",
  277. base_path+"r2.py",
  278. base_path+"r3.py",
  279. base_path+"r4.py",
  280. base_path+"r5.py",
  281. ]
  282. self.add_files(file_list)
  283. def add_files(self,files):
  284. for file in files:
  285. if type(file)==str:
  286. self.add_file(file)
  287. elif type(file)==list:
  288. self.add_file(*file)
  289. else:
  290. p("add files???",file)
  291. def add_file(self,file_name,fnx={}):
  292. self.files[file_name] = {"ftxt":"",**fnx}
  293. def get_scope(self,rfile_obj,file_name):
  294. return self.scopes["current_scope"]
  295. return {
  296. "scope":self.scopes["current_scope"],
  297. }
  298. # rfile_obj.get("scope")
  299. # if
  300. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  301. if file_name in self.files:
  302. rfile_obj = self.files[file_name]
  303. st = os.stat(file_name)
  304. st_tuple = (st.st_mtime,st.st_size)
  305. # if rfile_obj["ftxt"] == "":
  306. eflag ="nd"
  307. # rfile_obj["ftxt"] = st_tuple
  308. if rfile_obj["ftxt"] != st_tuple:
  309. # p(rfile_obj["ftxt"])
  310. rfile_obj["ftxt"] = st_tuple
  311. try:
  312. f = open(file_name,"r")
  313. ftxt = f.read()
  314. # scope_key = rfile_obj.get("scope")
  315. # p(scope_key,rfile_obj)
  316. f.close()
  317. scope_obj = self.get_scope(file_name,rfile_obj)
  318. if self.scope_opt == "locals":
  319. eflag ="locals"
  320. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  321. elif self.scope_opt == "globals":
  322. eflag ="globals"
  323. exec(ftxt,scope_obj["globals"])
  324. else:
  325. eflag ="[]"
  326. exec(ftxt)
  327. except Exception as e:
  328. p()
  329. print("file:",file_name)
  330. print("EXCEPT",eflag,e)
  331. if self.print_tb:
  332. traceback.print_tb(e.__traceback__,file=sys.stdout)
  333. else:
  334. pass
  335. finally:
  336. pass
  337. print(end="",flush=True)
  338. return ret
  339. def rld_files(self):
  340. ret = {
  341. "errs":{},
  342. "all":{},
  343. "alle":{},
  344. }
  345. for k in [*self.files]:
  346. self.rld_file(k,ret)
  347. p("",end="",flush=True)
  348. return ret
  349. def fn1(*a,**kw):
  350. glob_cl.rman.rld_files()
  351. glob_cl = globCL()
  352. glob_cl.rman.rld_files()
  353. def atexit_fn():
  354. print("atexit_fn!\n\n")
  355. rconn = glob_cl.rconn
  356. # rconn.flushall()
  357. rconn.flushdb()
  358. print("",end="",flush=True)
  359. atexit.register(atexit_fn)