Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

tasks_loop.py 5.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. '''
  10. umod
  11. # '''
  12. p=print
  13. async def anop(*a,**kw):pass
  14. def nop(*a,**kw):pass
  15. def connect_redis():
  16. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  17. '''
  18. # '''
  19. def callable_helper(fn):
  20. ret = {
  21. "is_callable":0,
  22. "is_coroutine":0,
  23. }
  24. ret["is_callable"]= callable(fn)
  25. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  26. # if (ca)
  27. return ret
  28. class DbgMixin:
  29. async def ws_rec(self,text_data,*a,**kw):
  30. # if "dbg" in
  31. text_data_json = json.loads(text_data)
  32. if not 'dbg' in text_data_json:
  33. return
  34. p("DBG!")
  35. exec(text_data_json["dbg"])
  36. class AconMixin(DbgMixin):
  37. # class AconMixin:
  38. async def connect(self):
  39. # print("-----------CONNECTING async def connect")
  40. verbose("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!")
  41. await self.accept()
  42. verbose("<MRO")
  43. verbose(type(self))
  44. await self.call_all_mro("ws_conn0")
  45. await self.call_all_mro("ws_conn")
  46. await self.call_all_mro("ws_conn2")
  47. verbose("/MRO>")
  48. # await self.ws_conn_once(event)
  49. await self.ws_conn_once()
  50. async def ws_conn_once(self, *a,**kw):pass
  51. async def ws_disconn_once(self, *a,**kw):pass
  52. async def ws_rec_once(self, *a,**kw):pass
  53. async def receive(self, *a,**kw):
  54. # print("REC")
  55. # print("-----------REC")
  56. await self.call_all_mro("ws_rec",*a,**kw)
  57. await self.ws_rec_once(*a,**kw)
  58. async def call_all_mro(self,mthd_name,*args,**kwargs):
  59. called = set([None])
  60. for cls_obj in self.__class__.__mro__:
  61. mthd = getattr(cls_obj,mthd_name,None)
  62. # print("~",cls_obj,mthd)
  63. if not mthd in called:
  64. # print(cls_obj,mthd)
  65. called.add(mthd)
  66. await mthd(self,*args,**kwargs)
  67. async def disconnect(self, close_code):
  68. await self.call_all_mro("ws_disconn",close_code)
  69. await self.ws_disconn_once(close_code)
  70. async def websocket_connect_x(self, event):
  71. await super().websocket_connect(event)
  72. print("<MRO")
  73. print(type(self))
  74. await self.call_all_mro("ws_conn",event)
  75. await self.call_all_mro("ws_conn2",event)
  76. print("/MRO>")
  77. await self.ws_conn_once(event)
  78. class AclTaskMixin:
  79. async def ws_conn0(self):
  80. glob_cl.run_persistent_loop(self)
  81. print("++++++++++++++++++++++++++")
  82. # oinfo(glob_cl)
  83. async def ws_conn_once(self,*a,**kw):
  84. print("\t\tws_conn_once:","AclTaskMixin")
  85. # GCLMixin
  86. class globCL():
  87. def __init__(self,*a,**kw):
  88. # for now we are not going to do any weird class stuff
  89. self.rconn = connect_redis()
  90. self.rman = RldMan()
  91. self.cbs_once = []
  92. self.cbs_once_set = set()
  93. self.cbs = {"fn1":fn1,"fn2x":nop}
  94. pass
  95. async def persistent_loop(self,*a,**kw):
  96. while 1:
  97. # p("PL:",time.time(),flush=1)
  98. # await asyncio.sleep(5)
  99. # for k,v in cls.cbs.items():
  100. for k,v in [*self.cbs.items()]:
  101. try:
  102. call_info = callable_helper(v)
  103. if (call_info["is_callable"]):
  104. if call_info["is_coroutine"]:
  105. pass
  106. await v({k,self})
  107. else:
  108. v({k,self})
  109. else:
  110. pass
  111. # p(k,v)
  112. pass
  113. except Exception as e:
  114. p("persistent_loop Exception",e)
  115. p(flush=True)
  116. # raise
  117. else:
  118. pass
  119. finally:
  120. pass
  121. # cbs_once
  122. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  123. self.cbs_once = []
  124. self.cbs_once_set = set()
  125. k=0
  126. for v in cbs_once:
  127. try:
  128. call_info = callable_helper(v)
  129. if (call_info["is_callable"]):
  130. if call_info["is_coroutine"]:
  131. pass
  132. await v({k,self})
  133. else:
  134. v({k,self})
  135. else:
  136. pass
  137. # p(k,v)
  138. pass
  139. except Exception as e:
  140. p("persistent_loop Exception cbs_once:",e)
  141. p(flush=True)
  142. # raise
  143. else:
  144. pass
  145. finally:
  146. pass
  147. k += 1
  148. await asyncio.sleep(cls.timeout)
  149. def _run_persistent_loop(self,*a,**kw):
  150. p("_run_persistent_loop:")
  151. loop = asyncio.get_event_loop()
  152. self.run_persistent_loop = nop
  153. loop.create_task(self.persistent_loop())
  154. self.loop = loop
  155. run_persistent_loop = _run_persistent_loop
  156. # RldManMixin
  157. class RldMan():
  158. def __init__(self,*a,**kw):
  159. self.files = {}
  160. base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/"
  161. file_list = [
  162. base_path+"r0.py",
  163. base_path+"r1.py",
  164. base_path+"r2.py",
  165. base_path+"r3.py",
  166. base_path+"r4.py",
  167. base_path+"r5.py",
  168. ]
  169. self.add_files(file_list)
  170. def add_files(self,files):
  171. for file in files:
  172. if type(file)==str:
  173. self.add_file(file)
  174. elif type(file)==list:
  175. self.add_file(*file)
  176. else:
  177. p("add files???",file)
  178. def add_file(self,file_name,fnx={}):
  179. self.files[file_name] = {"ftxt":"",**fnx}
  180. def rld_files(self):
  181. # p()
  182. ret = {
  183. "errs":{},
  184. "all":{},
  185. "alle":{},
  186. }
  187. # for k in self.files:
  188. for k in [*self.files]:
  189. self.rld_file(k,ret)
  190. p("rld_files!")
  191. p("",end="",flush=True)
  192. return ret
  193. def fn1(*a,**kw):
  194. # rld_files
  195. glob_cl.rman.rld_files()
  196. glob_cl = globCL()
  197. # print("..",end="\n",flush=True)
  198. def redis_info():
  199. rconn = glob_cl.rconn
  200. keys = rconn.keys()
  201. for k in keys:
  202. p(":",k)
  203. p("len(keys)",len(keys))
  204. print("",end="",flush=True)
  205. # rconn.flushall()
  206. redis_info()
  207. r"""
  208. def atexit_fn():
  209. pass
  210. # rconn.flushall()
  211. print("ATEXIT FN")
  212. rinfo_len()
  213. keys = rconn.keys()
  214. print(keys)
  215. print(len(keys))
  216. print("t 000")
  217. # time.sleep(10)
  218. print("t 010")
  219. # time.sleep(20)
  220. print("t 020")
  221. print("ATEXIT FN...")
  222. rconn.flushall()
  223. atexit.register(atexit_fn)
  224. # """