您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

tasks_loop.py 8.6KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. '''
  11. umod
  12. # '''
  13. p=print
  14. async def anop(*a,**kw):pass
  15. def nop(*a,**kw):pass
  16. class Edict(dict):pass
  17. class Eobj():pass
  18. def connect_redis():
  19. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  20. def eclass_factory(n):
  21. ret = []
  22. for k in range(n):
  23. class nx:pass
  24. # _rcls_name
  25. ret.append(nx)
  26. return ret
  27. '''
  28. # '''
  29. def callable_helper(fn):
  30. ret = {
  31. "is_callable":0,
  32. "is_coroutine":0,
  33. }
  34. ret["is_callable"]= callable(fn)
  35. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  36. # if (ca)
  37. return ret
  38. verbose = print
  39. class DbgMixin:
  40. async def ws_rec(self,text_data,*a,**kw):
  41. # if "dbg" in
  42. text_data_json = json.loads(text_data)
  43. if not 'dbg' in text_data_json:
  44. return
  45. p("DBG!")
  46. exec(text_data_json["dbg"])
  47. class AconMixin(DbgMixin):
  48. # class AconMixin:
  49. async def connect(self):
  50. # print("-----------CONNECTING async def connect")
  51. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  52. p(flush=True)
  53. await self.accept()
  54. verbose("<MRO")
  55. verbose(type(self))
  56. await self.call_all_mro("ws_conn0")
  57. await self.call_all_mro("ws_conn")
  58. await self.call_all_mro("ws_conn2")
  59. verbose("/MRO>")
  60. # await self.ws_conn_once(event)
  61. await self.ws_conn_once()
  62. async def ws_conn_once(self, *a,**kw):pass
  63. async def ws_disconn_once(self, *a,**kw):pass
  64. async def ws_rec_once(self, *a,**kw):pass
  65. async def receive(self, *a,**kw):
  66. print("REC")
  67. print("",end="",flush=True)
  68. # print("-----------REC")
  69. await self.call_all_mro("ws_rec",*a,**kw)
  70. await self.ws_rec_once(*a,**kw)
  71. async def call_all_mro(self,mthd_name,*args,**kwargs):
  72. called = set([None])
  73. for cls_obj in self.__class__.__mro__:
  74. mthd = getattr(cls_obj,mthd_name,None)
  75. # print("~",cls_obj,mthd)
  76. if not mthd in called:
  77. # print(cls_obj,mthd)
  78. called.add(mthd)
  79. await mthd(self,*args,**kwargs)
  80. async def disconnect(self, close_code):
  81. await self.call_all_mro("ws_disconn",close_code)
  82. await self.ws_disconn_once(close_code)
  83. async def websocket_connect_x(self, event):
  84. await super().websocket_connect(event)
  85. print("<MRO")
  86. print(type(self))
  87. await self.call_all_mro("ws_conn",event)
  88. await self.call_all_mro("ws_conn2",event)
  89. print("/MRO>")
  90. await self.ws_conn_once(event)
  91. class AclTaskMixin:
  92. async def ws_conn0(self):
  93. glob_cl.run_persistent_loop(self)
  94. print("++++++++++++++++++++++++++")
  95. async def ws_conn_once(self,*a,**kw):
  96. print("\t\tws_conn_once:","AclTaskMixin")
  97. class DevMixin(*eclass_factory(10)):pass
  98. class DevMroMixin(*eclass_factory(10)):pass
  99. class WSStoMixin(*eclass_factory(10)):pass
  100. class GCLMixin(*eclass_factory(10)):pass
  101. class RldManMixin(*eclass_factory(10)):pass
  102. class LoneMixinA():pass
  103. class LoneMixinB():pass
  104. Mixins = Eobj()
  105. Mixins.DevMixin = DevMixin
  106. Mixins.DevMroMixin = DevMroMixin
  107. Mixins.WSStoMixin = WSStoMixin
  108. Mixins.GCLMixin = GCLMixin
  109. Mixins.RldManMixin = RldManMixin
  110. Mixins.LoneMixinA = LoneMixinA
  111. Mixins.LoneMixinB = LoneMixinB
  112. Mixins.DbgMixin = DbgMixin
  113. Mixins.AconMixin = AconMixin
  114. Mixins.AclTaskMixin = AclTaskMixin
  115. # GCLMixin
  116. class globCL(GCLMixin):
  117. def __init__(self,*a,**kw):
  118. # for now we are not going to do any weird class stuff
  119. self.rconn = connect_redis()
  120. self.rman = RldMan()
  121. self.cbs_once = []
  122. self.cbs_once_set = set()
  123. self.cbs = {"fn1":fn1,"fn2x":nop}
  124. self.timeout = 1
  125. pass
  126. async def persistent_loop(self,*a,**kw):
  127. while 1:
  128. # p("PL:",time.time(),flush=1)
  129. # await asyncio.sleep(5)
  130. # for k,v in cls.cbs.items():
  131. for k,v in [*self.cbs.items()]:
  132. try:
  133. call_info = callable_helper(v)
  134. if (call_info["is_callable"]):
  135. if call_info["is_coroutine"]:
  136. pass
  137. await v({k,self})
  138. else:
  139. v({k,self})
  140. else:
  141. pass
  142. # p(k,v)
  143. pass
  144. except Exception as e:
  145. p("persistent_loop Exception",e)
  146. p(flush=True)
  147. # raise
  148. else:
  149. pass
  150. finally:
  151. pass
  152. # cbs_once
  153. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  154. self.cbs_once = []
  155. self.cbs_once_set = set()
  156. k=0
  157. for v in cbs_once:
  158. try:
  159. call_info = callable_helper(v)
  160. if (call_info["is_callable"]):
  161. if call_info["is_coroutine"]:
  162. pass
  163. await v({k,self})
  164. else:
  165. v({k,self})
  166. else:
  167. pass
  168. # p(k,v)
  169. pass
  170. except Exception as e:
  171. p("persistent_loop Exception cbs_once:",e)
  172. p(flush=True)
  173. # raise
  174. else:
  175. pass
  176. finally:
  177. pass
  178. k += 1
  179. await asyncio.sleep(self.timeout)
  180. def _run_persistent_loop(self,*a,**kw):
  181. p("_run_persistent_loop:")
  182. loop = asyncio.get_event_loop()
  183. self.run_persistent_loop = nop
  184. self.rman.persistent_loop_cb()
  185. loop.create_task(self.persistent_loop())
  186. self.loop = loop
  187. run_persistent_loop = _run_persistent_loop
  188. # RldManMixin
  189. class RldMan(RldManMixin):
  190. def __init__(self,*a,**kw):
  191. self.files = {}
  192. z="ABC"
  193. self.scopes = {
  194. "dflt_scope":{"globals":globals(),"locals":locals()},
  195. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  196. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  197. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  198. # "dflt_scope":{"globals":globals(),"locals":{}}
  199. }
  200. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  201. # self.scope_opt = "locals"
  202. self.scope_opt = "globals"
  203. self.print_tb = 0
  204. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  205. file_list = [
  206. base_path+"i0.py",
  207. base_path+"i1.py",
  208. base_path+"i2.py",
  209. base_path+"i3.py",
  210. base_path+"i4.py",
  211. base_path+"i5.py",
  212. ]
  213. self.add_files(file_list)
  214. # self.add_files(file_list,{"run"})
  215. def persistent_loop_cb(self,*a,**kw):
  216. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  217. file_list = [
  218. base_path+"r0.py",
  219. base_path+"r1.py",
  220. base_path+"r2.py",
  221. base_path+"r3.py",
  222. base_path+"r4.py",
  223. base_path+"r5.py",
  224. ]
  225. self.add_files(file_list)
  226. def add_files(self,files):
  227. for file in files:
  228. if type(file)==str:
  229. self.add_file(file)
  230. elif type(file)==list:
  231. self.add_file(*file)
  232. else:
  233. p("add files???",file)
  234. def add_file(self,file_name,fnx={}):
  235. self.files[file_name] = {"ftxt":"",**fnx}
  236. def get_scope(self,rfile_obj,file_name):
  237. return self.scopes["current_scope"]
  238. return {
  239. "scope":self.scopes["current_scope"],
  240. }
  241. # rfile_obj.get("scope")
  242. # if
  243. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  244. if file_name in self.files:
  245. rfile_obj = self.files[file_name]
  246. st = os.stat(file_name)
  247. st_tuple = (st.st_mtime,st.st_size)
  248. # if rfile_obj["ftxt"] == "":
  249. eflag ="nd"
  250. # rfile_obj["ftxt"] = st_tuple
  251. if rfile_obj["ftxt"] != st_tuple:
  252. # p(rfile_obj["ftxt"])
  253. rfile_obj["ftxt"] = st_tuple
  254. try:
  255. f = open(file_name,"r")
  256. ftxt = f.read()
  257. # scope_key = rfile_obj.get("scope")
  258. # p(scope_key,rfile_obj)
  259. f.close()
  260. scope_obj = self.get_scope(file_name,rfile_obj)
  261. if self.scope_opt == "locals":
  262. eflag ="locals"
  263. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  264. elif self.scope_opt == "globals":
  265. eflag ="globals"
  266. exec(ftxt,scope_obj["globals"])
  267. else:
  268. eflag ="[]"
  269. exec(ftxt)
  270. except Exception as e:
  271. print("EXCEPT",eflag,e)
  272. if self.print_tb:
  273. traceback.print_tb(e.__traceback__,file=sys.stdout)
  274. else:
  275. pass
  276. # print("ELSE")
  277. finally:
  278. # print("FINALLY")
  279. pass
  280. print(end="",flush=True)
  281. return ret
  282. def rld_files(self):
  283. # p()
  284. ret = {
  285. "errs":{},
  286. "all":{},
  287. "alle":{},
  288. }
  289. # for k in self.files:
  290. for k in [*self.files]:
  291. self.rld_file(k,ret)
  292. # p("rld_files!")
  293. p("",end="",flush=True)
  294. return ret
  295. def fn1(*a,**kw):
  296. # rld_files
  297. glob_cl.rman.rld_files()
  298. glob_cl = globCL()
  299. glob_cl.rman.rld_files()
  300. # print("..",end="\n",flush=True)
  301. def redis_info():
  302. rconn = glob_cl.rconn
  303. keys = rconn.keys()
  304. for k in keys:
  305. p(":",k)
  306. p("len(keys)",len(keys))
  307. print("",end="",flush=True)
  308. # rconn.flushall()
  309. redis_info()
  310. r"""
  311. def atexit_fn():
  312. pass
  313. # rconn.flushall()
  314. print("ATEXIT FN")
  315. rinfo_len()
  316. keys = rconn.keys()
  317. print(keys)
  318. print(len(keys))
  319. print("t 000")
  320. # time.sleep(10)
  321. print("t 010")
  322. # time.sleep(20)
  323. print("t 020")
  324. print("ATEXIT FN...")
  325. rconn.flushall()
  326. atexit.register(atexit_fn)
  327. # """
  328. def atexit_fn():
  329. print("atexit_fn!\n\n")
  330. print("",end="",flush=True)
  331. atexit.register(atexit_fn)