選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

tasks_loop.py 8.0KB


  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import atexit
  10. '''
  11. umod
  12. # '''
  13. p=print
  14. async def anop(*a,**kw):pass
  15. def nop(*a,**kw):pass
  16. def connect_redis():
  17. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  18. '''
  19. # '''
  20. def callable_helper(fn):
  21. ret = {
  22. "is_callable":0,
  23. "is_coroutine":0,
  24. }
  25. ret["is_callable"]= callable(fn)
  26. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  27. # if (ca)
  28. return ret
  29. verbose = print
  30. class DbgMixin:
  31. async def ws_rec(self,text_data,*a,**kw):
  32. # if "dbg" in
  33. text_data_json = json.loads(text_data)
  34. if not 'dbg' in text_data_json:
  35. return
  36. p("DBG!")
  37. exec(text_data_json["dbg"])
  38. class AconMixin(DbgMixin):
  39. # class AconMixin:
  40. async def connect(self):
  41. # print("-----------CONNECTING async def connect")
  42. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  43. p(flush=True)
  44. await self.accept()
  45. verbose("<MRO")
  46. verbose(type(self))
  47. await self.call_all_mro("ws_conn0")
  48. await self.call_all_mro("ws_conn")
  49. await self.call_all_mro("ws_conn2")
  50. verbose("/MRO>")
  51. # await self.ws_conn_once(event)
  52. await self.ws_conn_once()
  53. async def ws_conn_once(self, *a,**kw):pass
  54. async def ws_disconn_once(self, *a,**kw):pass
  55. async def ws_rec_once(self, *a,**kw):pass
  56. async def receive(self, *a,**kw):
  57. # print("REC")
  58. # print("-----------REC")
  59. await self.call_all_mro("ws_rec",*a,**kw)
  60. await self.ws_rec_once(*a,**kw)
  61. async def call_all_mro(self,mthd_name,*args,**kwargs):
  62. called = set([None])
  63. for cls_obj in self.__class__.__mro__:
  64. mthd = getattr(cls_obj,mthd_name,None)
  65. # print("~",cls_obj,mthd)
  66. if not mthd in called:
  67. # print(cls_obj,mthd)
  68. called.add(mthd)
  69. await mthd(self,*args,**kwargs)
  70. async def disconnect(self, close_code):
  71. await self.call_all_mro("ws_disconn",close_code)
  72. await self.ws_disconn_once(close_code)
  73. async def websocket_connect_x(self, event):
  74. await super().websocket_connect(event)
  75. print("<MRO")
  76. print(type(self))
  77. await self.call_all_mro("ws_conn",event)
  78. await self.call_all_mro("ws_conn2",event)
  79. print("/MRO>")
  80. await self.ws_conn_once(event)
  81. class AclTaskMixin:
  82. async def ws_conn0(self):
  83. glob_cl.run_persistent_loop(self)
  84. print("++++++++++++++++++++++++++")
  85. async def ws_conn_once(self,*a,**kw):
  86. print("\t\tws_conn_once:","AclTaskMixin")
  87. # GCLMixin
  88. class globCL():
  89. def __init__(self,*a,**kw):
  90. # for now we are not going to do any weird class stuff
  91. self.rconn = connect_redis()
  92. self.rman = RldMan()
  93. self.cbs_once = []
  94. self.cbs_once_set = set()
  95. self.cbs = {"fn1":fn1,"fn2x":nop}
  96. self.timeout = 1
  97. pass
  98. async def persistent_loop(self,*a,**kw):
  99. while 1:
  100. # p("PL:",time.time(),flush=1)
  101. # await asyncio.sleep(5)
  102. # for k,v in cls.cbs.items():
  103. for k,v in [*self.cbs.items()]:
  104. try:
  105. call_info = callable_helper(v)
  106. if (call_info["is_callable"]):
  107. if call_info["is_coroutine"]:
  108. pass
  109. await v({k,self})
  110. else:
  111. v({k,self})
  112. else:
  113. pass
  114. # p(k,v)
  115. pass
  116. except Exception as e:
  117. p("persistent_loop Exception",e)
  118. p(flush=True)
  119. # raise
  120. else:
  121. pass
  122. finally:
  123. pass
  124. # cbs_once
  125. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  126. self.cbs_once = []
  127. self.cbs_once_set = set()
  128. k=0
  129. for v in cbs_once:
  130. try:
  131. call_info = callable_helper(v)
  132. if (call_info["is_callable"]):
  133. if call_info["is_coroutine"]:
  134. pass
  135. await v({k,self})
  136. else:
  137. v({k,self})
  138. else:
  139. pass
  140. # p(k,v)
  141. pass
  142. except Exception as e:
  143. p("persistent_loop Exception cbs_once:",e)
  144. p(flush=True)
  145. # raise
  146. else:
  147. pass
  148. finally:
  149. pass
  150. k += 1
  151. await asyncio.sleep(self.timeout)
  152. def _run_persistent_loop(self,*a,**kw):
  153. p("_run_persistent_loop:")
  154. loop = asyncio.get_event_loop()
  155. self.run_persistent_loop = nop
  156. self.rman.persistent_loop_cb()
  157. loop.create_task(self.persistent_loop())
  158. self.loop = loop
  159. run_persistent_loop = _run_persistent_loop
  160. # RldManMixin
  161. class RldMan():
  162. def __init__(self,*a,**kw):
  163. self.files = {}
  164. z="ABC"
  165. self.scopes = {
  166. "dflt_scope":{"globals":globals(),"locals":locals()},
  167. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  168. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  169. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  170. # "dflt_scope":{"globals":globals(),"locals":{}}
  171. }
  172. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  173. self.scope_opt = ""
  174. self.print_tb = 0
  175. # self.dflt_scope = {"globals":globals(),"locals":locals()}
  176. # self.dflt_scope2 = {"globals":copy.copy(globals()),"locals":copy.copy(locals())}
  177. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  178. # [base_path+"r0.py",{"ftxt":"zz2"}],
  179. file_list = [
  180. base_path+"i0.py",
  181. base_path+"i1.py",
  182. base_path+"i2.py",
  183. base_path+"i3.py",
  184. base_path+"i4.py",
  185. base_path+"i5.py",
  186. ]
  187. self.add_files(file_list)
  188. # self.add_files(file_list,{"run"})
  189. def persistent_loop_cb(self,*a,**kw):
  190. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  191. file_list = [
  192. base_path+"r0.py",
  193. base_path+"r1.py",
  194. base_path+"r2.py",
  195. base_path+"r3.py",
  196. base_path+"r4.py",
  197. base_path+"r5.py",
  198. ]
  199. self.add_files(file_list)
  200. def add_files(self,files):
  201. for file in files:
  202. if type(file)==str:
  203. self.add_file(file)
  204. elif type(file)==list:
  205. self.add_file(*file)
  206. else:
  207. p("add files???",file)
  208. def add_file(self,file_name,fnx={}):
  209. self.files[file_name] = {"ftxt":"",**fnx}
  210. def get_scope(self,rfile_obj,file_name):
  211. return self.scopes["current_scope"]
  212. return {
  213. "scope":self.scopes["current_scope"],
  214. }
  215. # rfile_obj.get("scope")
  216. # if
  217. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  218. if file_name in self.files:
  219. rfile_obj = self.files[file_name]
  220. st = os.stat(file_name)
  221. st_tuple = (st.st_mtime,st.st_size)
  222. # if rfile_obj["ftxt"] == "":
  223. eflag ="nd"
  224. # rfile_obj["ftxt"] = st_tuple
  225. if rfile_obj["ftxt"] != st_tuple:
  226. # p(rfile_obj["ftxt"])
  227. rfile_obj["ftxt"] = st_tuple
  228. try:
  229. f = open(file_name,"r")
  230. ftxt = f.read()
  231. # scope_key = rfile_obj.get("scope")
  232. # p(scope_key,rfile_obj)
  233. f.close()
  234. scope_obj = self.get_scope(file_name,rfile_obj)
  235. if self.scope_opt == "locals":
  236. eflag ="locals"
  237. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  238. elif self.scope_opt == "globals":
  239. eflag ="globals"
  240. exec(ftxt,scope_obj["globals"])
  241. else:
  242. eflag ="[]"
  243. exec(ftxt)
  244. except Exception as e:
  245. print("EXCEPT",eflag,e)
  246. if self.print_tb:
  247. traceback.print_tb(e.__traceback__,file=sys.stdout)
  248. else:
  249. pass
  250. # print("ELSE")
  251. finally:
  252. # print("FINALLY")
  253. pass
  254. print(end="",flush=True)
  255. return ret
  256. def rld_files(self):
  257. # p()
  258. ret = {
  259. "errs":{},
  260. "all":{},
  261. "alle":{},
  262. }
  263. # for k in self.files:
  264. for k in [*self.files]:
  265. self.rld_file(k,ret)
  266. # p("rld_files!")
  267. p("",end="",flush=True)
  268. return ret
  269. def fn1(*a,**kw):
  270. # rld_files
  271. glob_cl.rman.rld_files()
  272. glob_cl = globCL()
  273. glob_cl.rman.rld_files()
  274. # print("..",end="\n",flush=True)
  275. def redis_info():
  276. rconn = glob_cl.rconn
  277. keys = rconn.keys()
  278. for k in keys:
  279. p(":",k)
  280. p("len(keys)",len(keys))
  281. print("",end="",flush=True)
  282. # rconn.flushall()
  283. redis_info()
  284. r"""
  285. def atexit_fn():
  286. pass
  287. # rconn.flushall()
  288. print("ATEXIT FN")
  289. rinfo_len()
  290. keys = rconn.keys()
  291. print(keys)
  292. print(len(keys))
  293. print("t 000")
  294. # time.sleep(10)
  295. print("t 010")
  296. # time.sleep(20)
  297. print("t 020")
  298. print("ATEXIT FN...")
  299. rconn.flushall()
  300. atexit.register(atexit_fn)
  301. # """
  302. def atexit_fn():
  303. print("atexit_fn!\n\n")
  304. print("",end="",flush=True)
  305. atexit.register(atexit_fn)