您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

tasks_loop.py 9.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. from asgiref.sync import async_to_sync,sync_to_async
  11. p=print
  12. async def anop(*a,**kw):pass
  13. def nop(*a,**kw):pass
  14. class Edict(dict):pass
  15. class Eobj():pass
  16. def connect_redis1():
  17. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  18. def connect_redis():
  19. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  20. def eclass_factory(n):
  21. ret = []
  22. for k in range(n):
  23. class nx:pass
  24. # _rcls_name
  25. ret.append(nx)
  26. return ret
  27. def callable_helper(fn):
  28. ret = {
  29. "is_callable":0,
  30. "is_coroutine":0,
  31. }
  32. ret["is_callable"]= callable(fn)
  33. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  34. # if (ca)
  35. return ret
  36. # verbose = print
  37. verbose=nop
  38. class DbgMixin:
  39. async def ws_rec(self,text_data,*a,**kw):
  40. # if "dbg" in
  41. text_data_json = json.loads(text_data)
  42. if not 'dbg' in text_data_json:
  43. return
  44. p("DBG!")
  45. exec(text_data_json["dbg"])
  46. class AconMixin(DbgMixin):
  47. # class AconMixin:
  48. async def connect(self):
  49. p(end="",flush=True)
  50. await self.accept()
  51. verbose("<MRO")
  52. verbose(type(self))
  53. await self.call_all_mro("ws_conn0")
  54. await self.call_all_mro("ws_conn")
  55. await self.call_all_mro("ws_conn2")
  56. verbose("/MRO>")
  57. await self.ws_conn_once()
  58. async def ws_conn_once(self, *a,**kw):pass
  59. async def ws_disconn_once(self, *a,**kw):pass
  60. async def ws_rec_once(self, *a,**kw):pass
  61. async def receive(self, *a,**kw):
  62. await self.call_all_mro("ws_rec",*a,**kw)
  63. await self.ws_rec_once(*a,**kw)
  64. print("",end="",flush=True)
  65. async def call_all_mro(self,mthd_name,*args,**kwargs):
  66. called = set([None])
  67. for cls_obj in self.__class__.__mro__:
  68. mthd = getattr(cls_obj,mthd_name,None)
  69. if not mthd in called:
  70. called.add(mthd)
  71. await mthd(self,*args,**kwargs)
  72. async def disconnect(self, close_code):
  73. await self.call_all_mro("ws_disconn",close_code)
  74. await self.ws_disconn_once(close_code)
  75. class AclTaskMixin:
  76. async def ws_disconn(self,*a,**kw):
  77. pass
  78. async def ws_conn0(self):
  79. self.groups_set = set()
  80. if glob_cl.has_cl:
  81. return
  82. glob_cl.has_cl = True
  83. glob_cl.run_persistent_loop(self)
  84. glob_cl.acl = self.channel_layer
  85. class DevMixin(*eclass_factory(10)):pass
  86. class DevMroMixin(*eclass_factory(10)):pass
  87. class WSStoMixin(*eclass_factory(10)):pass
  88. class GCLMixin(*eclass_factory(10)):pass
  89. class RldManMixin(*eclass_factory(10)):pass
  90. class LoneMixinA():pass
  91. class LoneMixinB():pass
  92. Mixins = Eobj()
  93. Mixins.DevMixin = DevMixin
  94. Mixins.DevMroMixin = DevMroMixin
  95. Mixins.WSStoMixin = WSStoMixin
  96. Mixins.GCLMixin = GCLMixin
  97. Mixins.RldManMixin = RldManMixin
  98. Mixins.LoneMixinA = LoneMixinA
  99. Mixins.LoneMixinB = LoneMixinB
  100. Mixins.DbgMixin = DbgMixin
  101. Mixins.AconMixin = AconMixin
  102. Mixins.AclTaskMixin = AclTaskMixin
  103. class globCL(GCLMixin):
  104. def __init__(self,*a,**kw):
  105. # for now we are not going to do any weird class stuff
  106. self.acl = None
  107. self.has_cl = False
  108. self.rconn1 = connect_redis1()
  109. self.rconn = connect_redis()
  110. self.rman = RldMan()
  111. self.cbs_once = []
  112. self.cbs_asap2 = []
  113. self.cbs_asap = []
  114. self.cbs_once_set = set()
  115. self.cbs = {"fn1":fn1,"fn2x":nop}
  116. self.timeout = 1
  117. self.fast_timeout = .1
  118. pass
  119. async def persistent_fast_cb_loop(self,*a,**kw):
  120. while 1:
  121. if len(self.cbs_asap2):
  122. cbs_once = [*self.cbs_asap2]
  123. # self.cbs_asap2=[]
  124. k=0
  125. for v in cbs_once:
  126. try:
  127. call_info = callable_helper(v)
  128. if (call_info["is_callable"]):
  129. if call_info["is_coroutine"]:
  130. pass
  131. await v({k,self})
  132. else:
  133. v({k,self})
  134. else:
  135. pass
  136. except Exception as e:
  137. p("persistent_fast_cb_loop Exception cbs_once:",e)
  138. p(flush=True)
  139. else:
  140. pass
  141. finally:
  142. pass
  143. k += 1
  144. if len(self.cbs_asap):
  145. self.cbs_asap2 = self.cbs_asap
  146. self.cbs_asap=[]
  147. await asyncio.sleep(self.fast_timeout)
  148. async def persistent_loop(self,*a,**kw):
  149. while 1:
  150. for k,v in [*self.cbs.items()]:
  151. try:
  152. call_info = callable_helper(v)
  153. if (call_info["is_callable"]):
  154. if call_info["is_coroutine"]:
  155. await v({k,self})
  156. else:
  157. v({k,self})
  158. else:
  159. pass
  160. pass
  161. except Exception as e:
  162. p("persistent_loop Exception",e)
  163. p(flush=True)
  164. else:
  165. pass
  166. finally:
  167. pass
  168. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  169. self.cbs_once = []
  170. self.cbs_once_set = set()
  171. k=0
  172. for v in cbs_once:
  173. try:
  174. call_info = callable_helper(v)
  175. if (call_info["is_callable"]):
  176. if call_info["is_coroutine"]:
  177. pass
  178. await v({k,self})
  179. else:
  180. v({k,self})
  181. else:
  182. pass
  183. except Exception as e:
  184. p("persistent_loop Exception cbs_once:",e)
  185. p(flush=True)
  186. else:
  187. pass
  188. finally:
  189. pass
  190. k += 1
  191. await asyncio.sleep(self.timeout)
  192. def _run_persistent_loop(self,*a,**kw):
  193. p("_run_persistent_loop:")
  194. loop = asyncio.get_event_loop()
  195. self.run_persistent_loop = nop
  196. self.rman.persistent_loop_cb()
  197. loop.create_task(self.persistent_loop())
  198. loop.create_task(self.persistent_fast_cb_loop())
  199. self.loop = loop
  200. run_persistent_loop = _run_persistent_loop
  201. def sync_dev_group_discard(self,*a,**kw):
  202. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  203. async def dev_group_discard(self,group_name,consumer):
  204. channel_layer = glob_cl.acl
  205. if consumer.channel_name in consumer.groups:
  206. consumer.groups.remove(group_name)
  207. await channel_layer.group_discard(group_name,consumer.channel_name)
  208. def sync_dev_group_add(self,*a,**kw):
  209. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  210. async def dev_group_add(self,group_name,consumer):
  211. channel_layer = glob_cl.acl
  212. # if not consumer.channel_name in consumer.groups_set:
  213. # consumer.groups_set.add(groups)
  214. if not consumer.channel_name in consumer.groups:
  215. consumer.groups.append(group_name)
  216. await channel_layer.group_add(group_name,consumer.channel_name)
  217. # RldManMixin
  218. class RldMan(RldManMixin):
  219. def __init__(self,*a,**kw):
  220. self.files = {}
  221. z="ABC"
  222. self.scopes = {
  223. "dflt_scope":{"globals":globals(),"locals":locals()},
  224. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  225. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  226. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  227. # "dflt_scope":{"globals":globals(),"locals":{}}
  228. }
  229. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  230. # self.scope_opt = ""locals
  231. self.scope_opt = "globals"
  232. self.print_tb = 0
  233. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  234. file_list = [
  235. base_path+"i0.py",
  236. base_path+"i1.py",
  237. base_path+"i2.py",
  238. base_path+"i3.py",
  239. base_path+"i4.py",
  240. base_path+"i5.py",
  241. ]
  242. self.add_files(file_list)
  243. # self.add_files(file_list,{"run"})
  244. def persistent_loop_cb(self,*a,**kw):
  245. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  246. file_list = [
  247. base_path+"r0.py",
  248. base_path+"r1.py",
  249. base_path+"r2.py",
  250. base_path+"r3.py",
  251. base_path+"r4.py",
  252. base_path+"r5.py",
  253. ]
  254. self.add_files(file_list)
  255. def add_files(self,files):
  256. for file in files:
  257. if type(file)==str:
  258. self.add_file(file)
  259. elif type(file)==list:
  260. self.add_file(*file)
  261. else:
  262. p("add files???",file)
  263. def add_file(self,file_name,fnx={}):
  264. self.files[file_name] = {"ftxt":"",**fnx}
  265. def get_scope(self,rfile_obj,file_name):
  266. return self.scopes["current_scope"]
  267. return {
  268. "scope":self.scopes["current_scope"],
  269. }
  270. # rfile_obj.get("scope")
  271. # if
  272. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  273. if file_name in self.files:
  274. rfile_obj = self.files[file_name]
  275. st = os.stat(file_name)
  276. st_tuple = (st.st_mtime,st.st_size)
  277. # if rfile_obj["ftxt"] == "":
  278. eflag ="nd"
  279. # rfile_obj["ftxt"] = st_tuple
  280. if rfile_obj["ftxt"] != st_tuple:
  281. # p(rfile_obj["ftxt"])
  282. rfile_obj["ftxt"] = st_tuple
  283. try:
  284. f = open(file_name,"r")
  285. ftxt = f.read()
  286. # scope_key = rfile_obj.get("scope")
  287. # p(scope_key,rfile_obj)
  288. f.close()
  289. scope_obj = self.get_scope(file_name,rfile_obj)
  290. if self.scope_opt == "locals":
  291. eflag ="locals"
  292. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  293. elif self.scope_opt == "globals":
  294. eflag ="globals"
  295. exec(ftxt,scope_obj["globals"])
  296. else:
  297. eflag ="[]"
  298. exec(ftxt)
  299. except Exception as e:
  300. p()
  301. print("file:",file_name)
  302. print("EXCEPT",eflag,e)
  303. if self.print_tb:
  304. traceback.print_tb(e.__traceback__,file=sys.stdout)
  305. else:
  306. pass
  307. finally:
  308. pass
  309. print(end="",flush=True)
  310. return ret
  311. def rld_files(self):
  312. ret = {
  313. "errs":{},
  314. "all":{},
  315. "alle":{},
  316. }
  317. for k in [*self.files]:
  318. self.rld_file(k,ret)
  319. p("",end="",flush=True)
  320. return ret
  321. def fn1(*a,**kw):
  322. glob_cl.rman.rld_files()
  323. glob_cl = globCL()
  324. glob_cl.rman.rld_files()
  325. def atexit_fn():
  326. print("atexit_fn!\n\n")
  327. rconn = glob_cl.rconn
  328. # rconn.flushall()
  329. rconn.flushdb()
  330. print("",end="",flush=True)
  331. atexit.register(atexit_fn)