You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tasks_loop.py 7.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. '''
  10. umod
  11. # '''
  12. p=print
  13. async def anop(*a,**kw):pass
  14. def nop(*a,**kw):pass
  15. def connect_redis():
  16. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  17. '''
  18. # '''
  19. def callable_helper(fn):
  20. ret = {
  21. "is_callable":0,
  22. "is_coroutine":0,
  23. }
  24. ret["is_callable"]= callable(fn)
  25. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  26. # if (ca)
  27. return ret
  28. verbose = print
  29. class DbgMixin:
  30. async def ws_rec(self,text_data,*a,**kw):
  31. # if "dbg" in
  32. text_data_json = json.loads(text_data)
  33. if not 'dbg' in text_data_json:
  34. return
  35. p("DBG!")
  36. exec(text_data_json["dbg"])
  37. class AconMixin(DbgMixin):
  38. # class AconMixin:
  39. async def connect(self):
  40. # print("-----------CONNECTING async def connect")
  41. p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  42. p(flush=True)
  43. await self.accept()
  44. verbose("<MRO")
  45. verbose(type(self))
  46. await self.call_all_mro("ws_conn0")
  47. await self.call_all_mro("ws_conn")
  48. await self.call_all_mro("ws_conn2")
  49. verbose("/MRO>")
  50. # await self.ws_conn_once(event)
  51. await self.ws_conn_once()
  52. async def ws_conn_once(self, *a,**kw):pass
  53. async def ws_disconn_once(self, *a,**kw):pass
  54. async def ws_rec_once(self, *a,**kw):pass
  55. async def receive(self, *a,**kw):
  56. # print("REC")
  57. # print("-----------REC")
  58. await self.call_all_mro("ws_rec",*a,**kw)
  59. await self.ws_rec_once(*a,**kw)
  60. async def call_all_mro(self,mthd_name,*args,**kwargs):
  61. called = set([None])
  62. for cls_obj in self.__class__.__mro__:
  63. mthd = getattr(cls_obj,mthd_name,None)
  64. # print("~",cls_obj,mthd)
  65. if not mthd in called:
  66. # print(cls_obj,mthd)
  67. called.add(mthd)
  68. await mthd(self,*args,**kwargs)
  69. async def disconnect(self, close_code):
  70. await self.call_all_mro("ws_disconn",close_code)
  71. await self.ws_disconn_once(close_code)
  72. async def websocket_connect_x(self, event):
  73. await super().websocket_connect(event)
  74. print("<MRO")
  75. print(type(self))
  76. await self.call_all_mro("ws_conn",event)
  77. await self.call_all_mro("ws_conn2",event)
  78. print("/MRO>")
  79. await self.ws_conn_once(event)
  80. class AclTaskMixin:
  81. async def ws_conn0(self):
  82. glob_cl.run_persistent_loop(self)
  83. print("++++++++++++++++++++++++++")
  84. async def ws_conn_once(self,*a,**kw):
  85. print("\t\tws_conn_once:","AclTaskMixin")
  86. # GCLMixin
  87. class globCL():
  88. def __init__(self,*a,**kw):
  89. # for now we are not going to do any weird class stuff
  90. self.rconn = connect_redis()
  91. self.rman = RldMan()
  92. self.cbs_once = []
  93. self.cbs_once_set = set()
  94. self.cbs = {"fn1":fn1,"fn2x":nop}
  95. self.timeout = 1
  96. pass
  97. async def persistent_loop(self,*a,**kw):
  98. while 1:
  99. # p("PL:",time.time(),flush=1)
  100. # await asyncio.sleep(5)
  101. # for k,v in cls.cbs.items():
  102. for k,v in [*self.cbs.items()]:
  103. try:
  104. call_info = callable_helper(v)
  105. if (call_info["is_callable"]):
  106. if call_info["is_coroutine"]:
  107. pass
  108. await v({k,self})
  109. else:
  110. v({k,self})
  111. else:
  112. pass
  113. # p(k,v)
  114. pass
  115. except Exception as e:
  116. p("persistent_loop Exception",e)
  117. p(flush=True)
  118. # raise
  119. else:
  120. pass
  121. finally:
  122. pass
  123. # cbs_once
  124. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  125. self.cbs_once = []
  126. self.cbs_once_set = set()
  127. k=0
  128. for v in cbs_once:
  129. try:
  130. call_info = callable_helper(v)
  131. if (call_info["is_callable"]):
  132. if call_info["is_coroutine"]:
  133. pass
  134. await v({k,self})
  135. else:
  136. v({k,self})
  137. else:
  138. pass
  139. # p(k,v)
  140. pass
  141. except Exception as e:
  142. p("persistent_loop Exception cbs_once:",e)
  143. p(flush=True)
  144. # raise
  145. else:
  146. pass
  147. finally:
  148. pass
  149. k += 1
  150. await asyncio.sleep(self.timeout)
  151. def _run_persistent_loop(self,*a,**kw):
  152. p("_run_persistent_loop:")
  153. loop = asyncio.get_event_loop()
  154. self.run_persistent_loop = nop
  155. self.rman.persistent_loop_cb()
  156. loop.create_task(self.persistent_loop())
  157. self.loop = loop
  158. run_persistent_loop = _run_persistent_loop
  159. # RldManMixin
  160. class RldMan():
  161. def __init__(self,*a,**kw):
  162. self.files = {}
  163. z="ABC"
  164. self.scopes = {
  165. "dflt_scope":{"globals":globals(),"locals":locals()}
  166. "dflt_scope_zloc":{"globals":globals(),"locals":{}}
  167. "dflt_scope_gscope":{"globals":globals(),"locals":{}}
  168. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())}
  169. # "dflt_scope":{"globals":globals(),"locals":{}}
  170. }
  171. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  172. self.scope_opt = ""
  173. self.print_tb = 0
  174. # self.dflt_scope = {"globals":globals(),"locals":locals()}
  175. # self.dflt_scope2 = {"globals":copy.copy(globals()),"locals":copy.copy(locals())}
  176. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  177. # [base_path+"r0.py",{"ftxt":"zz2"}],
  178. file_list = [
  179. base_path+"i0.py",
  180. base_path+"i1.py",
  181. base_path+"i2.py",
  182. base_path+"i3.py",
  183. base_path+"i4.py",
  184. base_path+"i5.py",
  185. ]
  186. self.add_files(file_list)
  187. # self.add_files(file_list,{"run"})
  188. def persistent_loop_cb(self,*a,**kw):
  189. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  190. file_list = [
  191. base_path+"r0.py",
  192. base_path+"r1.py",
  193. base_path+"r2.py",
  194. base_path+"r3.py",
  195. base_path+"r4.py",
  196. base_path+"r5.py",
  197. ]
  198. self.add_files(file_list)
  199. def add_files(self,files):
  200. for file in files:
  201. if type(file)==str:
  202. self.add_file(file)
  203. elif type(file)==list:
  204. self.add_file(*file)
  205. else:
  206. p("add files???",file)
  207. def add_file(self,file_name,fnx={}):
  208. self.files[file_name] = {"ftxt":"",**fnx}
  209. def get_scope(self,rfile_obj,file_name):
  210. return {
  211. "scope":self.scopes["current_scope"],
  212. }
  213. # rfile_obj.get("scope")
  214. # if
  215. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  216. if file_name in self.files:
  217. rfile_obj = self.files[file_name]
  218. st = os.stat(file_name)
  219. st_tuple = (st.st_mtime,st.st_size)
  220. # if rfile_obj["ftxt"] == "":
  221. # rfile_obj["ftxt"] = st_tuple
  222. if rfile_obj["ftxt"] != st_tuple:
  223. # p(rfile_obj["ftxt"])
  224. rfile_obj["ftxt"] = st_tuple
  225. try:
  226. f = open(file_name,"r")
  227. ftxt = f.read()
  228. # scope_key = rfile_obj.get("scope")
  229. p(scope_key,rfile_obj)
  230. f.close()
  231. scope_obj = self.get_scope(file_name,rfile_obj)
  232. if self.scope_opt
  233. exec(ftxt)
  234. except Exception as e:
  235. print("EXCEPT",eflag,e)
  236. if self.print_tb:
  237. traceback.print_tb(e.__traceback__,file=sys.stdout)
  238. else:
  239. pass
  240. # print("ELSE")
  241. finally:
  242. # print("FINALLY")
  243. pass
  244. print(end="",flush=True)
  245. return ret
  246. def rld_files(self):
  247. # p()
  248. ret = {
  249. "errs":{},
  250. "all":{},
  251. "alle":{},
  252. }
  253. # for k in self.files:
  254. for k in [*self.files]:
  255. self.rld_file(k,ret)
  256. # p("rld_files!")
  257. p("",end="",flush=True)
  258. return ret
  259. def fn1(*a,**kw):
  260. # rld_files
  261. glob_cl.rman.rld_files()
  262. glob_cl = globCL()
  263. glob_cl.rman.rld_files()
  264. # print("..",end="\n",flush=True)
  265. def redis_info():
  266. rconn = glob_cl.rconn
  267. keys = rconn.keys()
  268. for k in keys:
  269. p(":",k)
  270. p("len(keys)",len(keys))
  271. print("",end="",flush=True)
  272. # rconn.flushall()
  273. redis_info()
  274. r"""
  275. def atexit_fn():
  276. pass
  277. # rconn.flushall()
  278. print("ATEXIT FN")
  279. rinfo_len()
  280. keys = rconn.keys()
  281. print(keys)
  282. print(len(keys))
  283. print("t 000")
  284. # time.sleep(10)
  285. print("t 010")
  286. # time.sleep(20)
  287. print("t 020")
  288. print("ATEXIT FN...")
  289. rconn.flushall()
  290. atexit.register(atexit_fn)
  291. # """