Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

tasks_loop.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. import asyncio
  2. import time
  3. import redis
  4. import os
  5. import copy
  6. import json
  7. import traceback
  8. import inspect
  9. import sys
  10. import atexit
  11. from asgiref.sync import async_to_sync,sync_to_async
  12. # hot_reload_build = False
  13. hot_reload_build = True
  14. print("QWERTYU")
  15. p=print
  16. async def anop(*a,**kw):pass
  17. def nop(*a,**kw):pass
  18. class Edict(dict):pass
  19. class Eobj():pass
  20. def connect_redis1():
  21. return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  22. def connect_redis():
  23. return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace")
  24. def eclass_factory(n):
  25. ret = []
  26. for k in range(n):
  27. class nx:pass
  28. # _rcls_name
  29. ret.append(nx)
  30. return ret
  31. def callable_helper(fn):
  32. ret = {
  33. "is_callable":0,
  34. "is_coroutine":0,
  35. }
  36. ret["is_callable"]= callable(fn)
  37. ret["is_coroutine"]= inspect.iscoroutinefunction(fn)
  38. # if (ca)
  39. return ret
  40. # verbose = print
  41. verbose=nop
  42. class DbgMixin:
  43. async def ws_rec(self,text_data,*a,**kw):
  44. # if "dbg" in
  45. text_data_json = json.loads(text_data)
  46. if not 'dbg' in text_data_json:
  47. return
  48. p("DBG!")
  49. exec(text_data_json["dbg"])
  50. class AconMixin(DbgMixin):
  51. # class AconMixin:
  52. async def connect(self):
  53. p(end="",flush=True)
  54. await self.accept()
  55. verbose("<MRO")
  56. verbose(type(self))
  57. await self.call_all_mro("ws_conn0")
  58. await self.call_all_mro("ws_conn")
  59. await self.call_all_mro("ws_conn2")
  60. verbose("/MRO>")
  61. await self.ws_conn_once()
  62. async def ws_conn_once(self, *a,**kw):pass
  63. async def ws_disconn_once(self, *a,**kw):pass
  64. async def ws_rec_once(self, *a,**kw):pass
  65. async def receive(self, *a,**kw):
  66. await self.call_all_mro("ws_rec",*a,**kw)
  67. await self.ws_rec_once(*a,**kw)
  68. print("",end="",flush=True)
  69. async def call_all_mro(self,mthd_name,*args,**kwargs):
  70. called = set([None])
  71. for cls_obj in self.__class__.__mro__:
  72. mthd = getattr(cls_obj,mthd_name,None)
  73. if not mthd in called:
  74. called.add(mthd)
  75. await mthd(self,*args,**kwargs)
  76. async def disconnect(self, close_code):
  77. await self.call_all_mro("ws_disconn",close_code)
  78. await self.ws_disconn_once(close_code)
  79. class AclTaskMixin:
  80. async def ws_disconn(self,*a,**kw):
  81. pass
  82. async def ws_conn0(self):
  83. self.groups_set = set()
  84. if glob_cl.has_cl:
  85. return
  86. glob_cl.has_cl = True
  87. glob_cl.run_persistent_loop(self)
  88. glob_cl.acl = self.channel_layer
  89. class DevMixin(*eclass_factory(10)):pass
  90. class DevMroMixin(*eclass_factory(10)):pass
  91. class WSStoMixin(*eclass_factory(10)):pass
  92. class GCLMixin(*eclass_factory(10)):pass
  93. class RldManMixin(*eclass_factory(10)):pass
  94. class LoneMixinA():pass
  95. class LoneMixinB():pass
  96. Mixins = Eobj()
  97. Mixins.DevMixin = DevMixin
  98. Mixins.DevMroMixin = DevMroMixin
  99. Mixins.WSStoMixin = WSStoMixin
  100. Mixins.GCLMixin = GCLMixin
  101. Mixins.RldManMixin = RldManMixin
  102. Mixins.LoneMixinA = LoneMixinA
  103. Mixins.LoneMixinB = LoneMixinB
  104. Mixins.DbgMixin = DbgMixin
  105. Mixins.AconMixin = AconMixin
  106. Mixins.AclTaskMixin = AclTaskMixin
  107. class globCL(GCLMixin):
  108. def __init__(self,*a,**kw):
  109. print("IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII")
  110. # for now we are not going to do any weird class stuff
  111. self.acl = None
  112. self.has_cl = False
  113. r'''
  114. self.rconn1 = connect_redis1()
  115. self.rconn = connect_redis()
  116. # '''
  117. self.rman = RldMan()
  118. self.cbs_once = []
  119. self.cbs_asap2 = []
  120. self.cbs_asap = []
  121. self.cbs_once_set = set()
  122. if (hot_reload_build):
  123. self.cbs = {"fn1":fn1,"fn2x":nop}
  124. else:
  125. self.cbs = {}
  126. self.timeout = 1
  127. self.fast_timeout = .1
  128. pass
  129. async def persistent_fast_cb_loop_min_timeout(self,*a,**kw):
  130. print("persistent_fast_cb_loop_min_timeout")
  131. while 1:
  132. if len(self.cbs_asap2):
  133. cbs_once = [*self.cbs_asap2]
  134. self.cbs_asap2=[]
  135. k=0
  136. for v in cbs_once:
  137. try:
  138. call_info = callable_helper(v)
  139. if (call_info["is_callable"]):
  140. if call_info["is_coroutine"]:
  141. pass
  142. await v({k,self})
  143. else:
  144. v({k,self})
  145. else:
  146. pass
  147. except Exception as e:
  148. p("persistent_fast_cb_loop Exception cbs_once:",e)
  149. p(flush=True)
  150. else:
  151. pass
  152. finally:
  153. pass
  154. k += 1
  155. if len(self.cbs_asap):
  156. self.cbs_asap2 = self.cbs_asap
  157. self.cbs_asap=[]
  158. await asyncio.sleep(self.fast_timeout)
  159. async def persistent_fast_cb_loop(self,*a,**kw):
  160. while 1:
  161. if len(self.cbs_asap):
  162. cbs_once = [*self.cbs_asap]
  163. self.cbs_asap=[]
  164. k=0
  165. for v in cbs_once:
  166. try:
  167. call_info = callable_helper(v)
  168. if (call_info["is_callable"]):
  169. if call_info["is_coroutine"]:
  170. pass
  171. await v({k,self})
  172. else:
  173. v({k,self})
  174. else:
  175. pass
  176. except Exception as e:
  177. p("persistent_fast_cb_loop Exception cbs_once:",e)
  178. p(flush=True)
  179. else:
  180. pass
  181. finally:
  182. pass
  183. k += 1
  184. await asyncio.sleep(self.fast_timeout)
  185. async def persistent_loop(self,*a,**kw):
  186. print("persistent_loop")
  187. while 1:
  188. for k,v in [*self.cbs.items()]:
  189. try:
  190. call_info = callable_helper(v)
  191. if (call_info["is_callable"]):
  192. if call_info["is_coroutine"]:
  193. await v({k,self})
  194. else:
  195. v({k,self})
  196. else:
  197. pass
  198. pass
  199. except Exception as e:
  200. p("persistent_loop Exception",e)
  201. p(flush=True)
  202. else:
  203. pass
  204. finally:
  205. pass
  206. cbs_once = [*self.cbs_once,*self.cbs_once_set]
  207. self.cbs_once = []
  208. self.cbs_once_set = set()
  209. k=0
  210. for v in cbs_once:
  211. try:
  212. call_info = callable_helper(v)
  213. if (call_info["is_callable"]):
  214. if call_info["is_coroutine"]:
  215. pass
  216. await v({k,self})
  217. else:
  218. v({k,self})
  219. else:
  220. pass
  221. except Exception as e:
  222. p("persistent_loop Exception cbs_once:",e)
  223. p(flush=True)
  224. else:
  225. pass
  226. finally:
  227. pass
  228. k += 1
  229. await asyncio.sleep(self.timeout)
  230. def _run_persistent_loop2(self,*a,**kw):
  231. p("_run_persistent_loop2:",self,a,kw)
  232. self.run_persistent_loop2 = nop
  233. self.rman.persistent_loop_cb()
  234. # loop = asyncio.get_event_loop()
  235. # loop.create_task(self.persistent_fast_cb_loop())
  236. asyncio.ensure_future(self.persistent_loop())
  237. asyncio.ensure_future(self.persistent_fast_cb_loop_min_timeout())
  238. # self.loop = loop
  239. def _run_persistent_loop3(self,*a,**kw):
  240. p("_run_persistent_loop3:",self,a,kw)
  241. loop = asyncio.new_event_loop()
  242. self.run_persistent_loop = nop
  243. self.rman.persistent_loop_cb()
  244. loop.create_task(self.persistent_loop())
  245. # loop.create_task(self.persistent_fast_cb_loop())
  246. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  247. self.loop = loop
  248. def _run_persistent_loop(self,*a,**kw):
  249. p("_run_persistent_loop:",self,a,kw)
  250. loop = asyncio.get_event_loop()
  251. self.run_persistent_loop = nop
  252. self.rman.persistent_loop_cb()
  253. loop.create_task(self.persistent_loop())
  254. # loop.create_task(self.persistent_fast_cb_loop())
  255. loop.create_task(self.persistent_fast_cb_loop_min_timeout())
  256. self.loop = loop
  257. run_persistent_loop = _run_persistent_loop
  258. run_persistent_loop2 = _run_persistent_loop2
  259. run_persistent_loop3 = _run_persistent_loop3
  260. def sync_dev_group_discard(self,*a,**kw):
  261. glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw))
  262. async def dev_group_discard(self,group_name,consumer):
  263. channel_layer = glob_cl.acl
  264. if consumer.channel_name in consumer.groups:
  265. consumer.groups.remove(group_name)
  266. await channel_layer.group_discard(group_name,consumer.channel_name)
  267. def sync_dev_group_add(self,*a,**kw):
  268. glob_cl.cbs_once.append(self.group_add(*a,**kw))
  269. async def dev_group_add(self,group_name,consumer):
  270. channel_layer = glob_cl.acl
  271. # if not consumer.channel_name in consumer.groups_set:
  272. # consumer.groups_set.add(groups)
  273. if not consumer.channel_name in consumer.groups:
  274. consumer.groups.append(group_name)
  275. await channel_layer.group_add(group_name,consumer.channel_name)
  276. # RldManMixin
  277. class RldMan(RldManMixin):
  278. def __init__(self,*a,**kw):
  279. self.files = {}
  280. z="ABC"
  281. self.scopes = {
  282. "dflt_scope":{"globals":globals(),"locals":locals()},
  283. "dflt_scope_zloc":{"globals":globals(),"locals":{}},
  284. "dflt_scope_gscope":{"globals":globals(),"locals":{}},
  285. "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())},
  286. # "dflt_scope":{"globals":globals(),"locals":{}}
  287. }
  288. self.scopes['current_scope'] = self.scopes['dflt_scope_zloc']
  289. # self.scope_opt = ""locals
  290. self.scope_opt = "globals"
  291. self.print_tb = 0
  292. base_path = "/usr/games/repos/ign4/py_rld/djc/edan_hrld/"
  293. file_list = [
  294. base_path+"i0.py",
  295. ]
  296. self.add_files(file_list)
  297. # self.add_files(file_list,{"run"})
  298. def persistent_loop_cb(self,*a,**kw):
  299. base_path = "/usr/games/repos/ign4/py_rld/djc/edan_hrld/"
  300. file_list = [
  301. base_path+"f0.py",
  302. ]
  303. self.add_files(file_list)
  304. def add_files(self,files):
  305. for file in files:
  306. if type(file)==str:
  307. self.add_file(file)
  308. elif type(file)==list:
  309. self.add_file(*file)
  310. else:
  311. p("add files???",file)
  312. def add_file(self,file_name,fnx={}):
  313. self.files[file_name] = {"ftxt":"",**fnx}
  314. def get_scope(self,rfile_obj,file_name):
  315. return self.scopes["current_scope"]
  316. return {
  317. "scope":self.scopes["current_scope"],
  318. }
  319. # rfile_obj.get("scope")
  320. # if
  321. def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw):
  322. if file_name in self.files:
  323. rfile_obj = self.files[file_name]
  324. st = os.stat(file_name)
  325. st_tuple = (st.st_mtime,st.st_size)
  326. # if rfile_obj["ftxt"] == "":
  327. eflag ="nd"
  328. # rfile_obj["ftxt"] = st_tuple
  329. if rfile_obj["ftxt"] != st_tuple:
  330. # p(rfile_obj["ftxt"])
  331. rfile_obj["ftxt"] = st_tuple
  332. try:
  333. f = open(file_name,"r")
  334. ftxt = f.read()
  335. # scope_key = rfile_obj.get("scope")
  336. # p(scope_key,rfile_obj)
  337. f.close()
  338. scope_obj = self.get_scope(file_name,rfile_obj)
  339. if self.scope_opt == "locals":
  340. eflag ="locals"
  341. exec(ftxt,scope_obj["globals"],scope_obj["locals"])
  342. elif self.scope_opt == "globals":
  343. eflag ="globals"
  344. exec(ftxt,scope_obj["globals"])
  345. else:
  346. eflag ="[]"
  347. exec(ftxt)
  348. except Exception as e:
  349. p()
  350. print("file:",file_name)
  351. print("EXCEPT",eflag,e)
  352. if self.print_tb:
  353. traceback.print_tb(e.__traceback__,file=sys.stdout)
  354. else:
  355. pass
  356. finally:
  357. pass
  358. print(end="",flush=True)
  359. return ret
  360. def rld_files(self):
  361. ret = {
  362. "errs":{},
  363. "all":{},
  364. "alle":{},
  365. }
  366. for k in [*self.files]:
  367. self.rld_file(k,ret)
  368. p("",end="",flush=True)
  369. return ret
  370. def fn1(*a,**kw):
  371. glob_cl.rman.rld_files()
  372. glob_cl = globCL()
  373. # we are not flushing databases if
  374. def atexit_fn():
  375. print("atexit_fn!\n\n")
  376. # rconn = glob_cl.rconn
  377. # rconn.flushall()
  378. # rconn.flushdb()
  379. print("",end="",flush=True)
  380. if hot_reload_build:
  381. glob_cl.rman.rld_files()
  382. atexit.register(atexit_fn)
  383. else:
  384. import wsps.hot_reload_temp_staging
  385. wsps.hot_reload_temp_staging.init_class_edits(Mixins)
  386. glob_cl.dev_room_dashboard = "jfi_dash"
  387. # if not hot_reload_build:
  388. # glob_cl.rman.rld_files()
  389. # print("############################ ##############################")