Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

tasks_loop.py 8.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. from asgiref.sync import async_to_sync,sync_to_async
  11. '''
  12. umod
  13. # '''
  14. p=print
  15. async def anop(*a,**kw):pass
  16. def nop(*a,**kw):pass
  17. class Edict(dict):pass
  18. class Eobj():pass
  19. def connect_redis():
  20. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  21. def eclass_factory(n):
  22. ret = []
  23. for k in range(n):
  24. class nx:pass
  25. # _rcls_name
  26. ret.append(nx)
  27. return ret
  28. '''
  29. # '''
  30. def callable_helper(fn):
  31. ret = {
  32. "is_callable":0,
  33. "is_coroutine":0,
  34. }
  35. ret["is_callable"]= callable(fn)
  36. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  37. # if (ca)
  38. return ret
  39. verbose = print
  40. class DbgMixin:
  41. async def ws_rec(self,text_data,*a,**kw):
  42. # if "dbg" in
  43. text_data_json = json.loads(text_data)
  44. if not 'dbg' in text_data_json:
  45. return
  46. p("DBG!")
  47. exec(text_data_json["dbg"])
  48. class AconMixin(DbgMixin):
  49. # class AconMixin:
  50. async def connect(self):
  51. # print("-----------CONNECTING async def connect")
  52. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  53. p(flush=True)
  54. await self.accept()
  55. verbose("<MRO")
  56. verbose(type(self))
  57. await self.call_all_mro("ws_conn0")
  58. await self.call_all_mro("ws_conn")
  59. await self.call_all_mro("ws_conn2")
  60. verbose("/MRO>")
  61. # await self.ws_conn_once(event)
  62. await self.ws_conn_once()
  63. async def ws_conn_once(self, *a,**kw):pass
  64. async def ws_disconn_once(self, *a,**kw):pass
  65. async def ws_rec_once(self, *a,**kw):pass
  66. async def receive(self, *a,**kw):
  67. print("REC")
  68. print("",end="",flush=True)
  69. # print("-----------REC")
  70. await self.call_all_mro("ws_rec",*a,**kw)
  71. await self.ws_rec_once(*a,**kw)
  72. async def call_all_mro(self,mthd_name,*args,**kwargs):
  73. called = set([None])
  74. for cls_obj in self.__class__.__mro__:
  75. mthd = getattr(cls_obj,mthd_name,None)
  76. # print("~",cls_obj,mthd)
  77. if not mthd in called:
  78. # print(cls_obj,mthd)
  79. called.add(mthd)
  80. await mthd(self,*args,**kwargs)
  81. async def disconnect(self, close_code):
  82. await self.call_all_mro("ws_disconn",close_code)
  83. await self.ws_disconn_once(close_code)
  84. async def websocket_connect_x(self, event):
  85. await super().websocket_connect(event)
  86. print("<MRO")
  87. print(type(self))
  88. await self.call_all_mro("ws_conn",event)
  89. await self.call_all_mro("ws_conn2",event)
  90. print("/MRO>")
  91. await self.ws_conn_once(event)
  92. class AclTaskMixin:
  93. async def ws_disconn(self):
  94. for group in self.groups_set:
  95. async def ws_conn0(self):
  96. self.groups_set = set()
  97. if glob_cl.has_cl:
  98. return
  99. glob_cl.has_cl = True
  100. print("AclTaskMixin:ws_conn0")
  101. glob_cl.run_persistent_loop(self)
  102. glob_cl.acl = self.channel_layer
  103. # print("++++++++++++++++++++++++++")
  104. async def ws_conn_once(self,*a,**kw):
  105. print("\t\tws_conn_once:","AclTaskMixin")
  106. class DevMixin(*eclass_factory(10)):pass
  107. class DevMroMixin(*eclass_factory(10)):pass
  108. class WSStoMixin(*eclass_factory(10)):pass
  109. class GCLMixin(*eclass_factory(10)):pass
  110. class RldManMixin(*eclass_factory(10)):pass
  111. class LoneMixinA():pass
  112. class LoneMixinB():pass
  113. Mixins = Eobj()
  114. Mixins.DevMixin = DevMixin
  115. Mixins.DevMroMixin = DevMroMixin
  116. Mixins.WSStoMixin = WSStoMixin
  117. Mixins.GCLMixin = GCLMixin
  118. Mixins.RldManMixin = RldManMixin
  119. Mixins.LoneMixinA = LoneMixinA
  120. Mixins.LoneMixinB = LoneMixinB
  121. Mixins.DbgMixin = DbgMixin
  122. Mixins.AconMixin = AconMixin
  123. Mixins.AclTaskMixin = AclTaskMixin
  124. # GCLMixin
  125. class globCL(GCLMixin):
  126. def __init__(self,*a,**kw):
  127. # for now we are not going to do any weird class stuff
  128. self.acl = None
  129. self.has_cl = False
  130. self.rconn = connect_redis()
  131. self.rman = RldMan()
  132. self.cbs_once = []
  133. self.cbs_once_set = set()
  134. self.cbs = {"fn1":fn1,"fn2x":nop}
  135. self.timeout = 1
  136. pass
  137. async def persistent_loop(self,*a,**kw):
  138. while 1:
  139. # p("PL:",time.time(),flush=1)
  140. # await asyncio.sleep(5)
  141. # for k,v in cls.cbs.items():
  142. for k,v in [*self.cbs.items()]:
  143. try:
  144. call_info = callable_helper(v)
  145. if (call_info["is_callable"]):
  146. if call_info["is_coroutine"]:
  147. pass
  148. await v({k,self})
  149. else:
  150. v({k,self})
  151. else:
  152. pass
  153. # p(k,v)
  154. pass
  155. except Exception as e:
  156. p("persistent_loop Exception",e)
  157. p(flush=True)
  158. # raise
  159. else:
  160. pass
  161. finally:
  162. pass
  163. # cbs_once
  164. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  165. self.cbs_once = []
  166. self.cbs_once_set = set()
  167. k=0
  168. for v in cbs_once:
  169. try:
  170. call_info = callable_helper(v)
  171. if (call_info["is_callable"]):
  172. if call_info["is_coroutine"]:
  173. pass
  174. await v({k,self})
  175. else:
  176. v({k,self})
  177. else:
  178. pass
  179. # p(k,v)
  180. pass
  181. except Exception as e:
  182. p("persistent_loop Exception cbs_once:",e)
  183. p(flush=True)
  184. # raise
  185. else:
  186. pass
  187. finally:
  188. pass
  189. k += 1
  190. await asyncio.sleep(self.timeout)
  191. def _run_persistent_loop(self,*a,**kw):
  192. p("_run_persistent_loop:")
  193. loop = asyncio.get_event_loop()
  194. self.run_persistent_loop = nop
  195. self.rman.persistent_loop_cb()
  196. loop.create_task(self.persistent_loop())
  197. self.loop = loop
  198. run_persistent_loop = _run_persistent_loop
  199. def sync_dev_group_add(self,*a,**kw):
  200. glob_cl.cbs_once.append(group_add(*a,**kw))
  201. async def dev_group_add(self,group_name,consumer):
  202. channel_layer = glob_cl.acl
  203. if not consumer.channel_name in consumer.groups_set:
  204. consumer.groups_set.add(groups)
  205. await channel_layer.group_add(group_name,consumer.channel_name)
  206. # RldManMixin
  207. class RldMan(RldManMixin):
  208. def __init__(self,*a,**kw):
  209. self.files = {}
  210. z="ABC"
  211. self.scopes = {
  212. "dflt_scope":{"globals":globals(),"locals":locals()},
  213. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  214. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  215. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  216. # "dflt_scope":{"globals":globals(),"locals":{}}
  217. }
  218. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  219. # self.scope_opt = "locals"
  220. self.scope_opt = "globals"
  221. self.print_tb = 0
  222. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  223. file_list = [
  224. base_path+"i0.py",
  225. base_path+"i1.py",
  226. base_path+"i2.py",
  227. base_path+"i3.py",
  228. base_path+"i4.py",
  229. base_path+"i5.py",
  230. ]
  231. self.add_files(file_list)
  232. # self.add_files(file_list,{"run"})
  233. def persistent_loop_cb(self,*a,**kw):
  234. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  235. file_list = [
  236. base_path+"r0.py",
  237. base_path+"r1.py",
  238. base_path+"r2.py",
  239. base_path+"r3.py",
  240. base_path+"r4.py",
  241. base_path+"r5.py",
  242. ]
  243. self.add_files(file_list)
  244. def add_files(self,files):
  245. for file in files:
  246. if type(file)==str:
  247. self.add_file(file)
  248. elif type(file)==list:
  249. self.add_file(*file)
  250. else:
  251. p("add files???",file)
  252. def add_file(self,file_name,fnx={}):
  253. self.files[file_name] = {"ftxt":"",**fnx}
  254. def get_scope(self,rfile_obj,file_name):
  255. return self.scopes["current_scope"]
  256. return {
  257. "scope":self.scopes["current_scope"],
  258. }
  259. # rfile_obj.get("scope")
  260. # if
  261. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  262. if file_name in self.files:
  263. rfile_obj = self.files[file_name]
  264. st = os.stat(file_name)
  265. st_tuple = (st.st_mtime,st.st_size)
  266. # if rfile_obj["ftxt"] == "":
  267. eflag ="nd"
  268. # rfile_obj["ftxt"] = st_tuple
  269. if rfile_obj["ftxt"] != st_tuple:
  270. # p(rfile_obj["ftxt"])
  271. rfile_obj["ftxt"] = st_tuple
  272. try:
  273. f = open(file_name,"r")
  274. ftxt = f.read()
  275. # scope_key = rfile_obj.get("scope")
  276. # p(scope_key,rfile_obj)
  277. f.close()
  278. scope_obj = self.get_scope(file_name,rfile_obj)
  279. if self.scope_opt == "locals":
  280. eflag ="locals"
  281. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  282. elif self.scope_opt == "globals":
  283. eflag ="globals"
  284. exec(ftxt,scope_obj["globals"])
  285. else:
  286. eflag ="[]"
  287. exec(ftxt)
  288. except Exception as e:
  289. p()
  290. print("file:",file_name)
  291. print("EXCEPT",eflag,e)
  292. if self.print_tb:
  293. traceback.print_tb(e.__traceback__,file=sys.stdout)
  294. else:
  295. pass
  296. # print("ELSE")
  297. finally:
  298. # print("FINALLY")
  299. pass
  300. print(end="",flush=True)
  301. return ret
  302. def rld_files(self):
  303. # p()
  304. ret = {
  305. "errs":{},
  306. "all":{},
  307. "alle":{},
  308. }
  309. # for k in self.files:
  310. for k in [*self.files]:
  311. self.rld_file(k,ret)
  312. # p("rld_files!")
  313. p("",end="",flush=True)
  314. return ret
  315. def fn1(*a,**kw):
  316. # rld_files
  317. glob_cl.rman.rld_files()
  318. glob_cl = globCL()
  319. glob_cl.rman.rld_files()
  320. # print("..",end="\n",flush=True)
  321. r"""
  322. def atexit_fn():
  323. pass
  324. # rconn.flushall()
  325. print("ATEXIT FN")
  326. rinfo_len()
  327. keys = rconn.keys()
  328. print(keys)
  329. print(len(keys))
  330. print("t 000")
  331. # time.sleep(10)
  332. print("t 010")
  333. # time.sleep(20)
  334. print("t 020")
  335. print("ATEXIT FN...")
  336. rconn.flushall()
  337. atexit.register(atexit_fn)
  338. # """
  339. def atexit_fn():
  340. print("atexit_fn!\n\n")
  341. print("",end="",flush=True)
  342. atexit.register(atexit_fn)