import asyncio import time import redis import os import copy import json import traceback import inspect import atexit from asgiref.sync import async_to_sync,sync_to_async ''' umod # ''' p=print async def anop(*a,**kw):pass def nop(*a,**kw):pass class Edict(dict):pass class Eobj():pass def connect_redis1(): return redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace") def connect_redis(): return redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace") # return redis.Redis(host='localhost', port=6379,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace") r''' return { "rconn0":redis.Redis(host='localhost', port=6379, db=0,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace"), "rconn1":redis.Redis(host='localhost', port=6379, db=1,charset="utf-8", decode_responses=True, encoding_errors="backslashreplace"), } # ''' def eclass_factory(n): ret = [] for k in range(n): class nx:pass # _rcls_name ret.append(nx) return ret ''' # ''' def callable_helper(fn): ret = { "is_callable":0, "is_coroutine":0, } ret["is_callable"]= callable(fn) ret["is_coroutine"]= inspect.iscoroutinefunction(fn) # if (ca) return ret verbose = print verbose=nop class DbgMixin: async def ws_rec(self,text_data,*a,**kw): # if "dbg" in text_data_json = json.loads(text_data) if not 'dbg' in text_data_json: return p("DBG!") exec(text_data_json["dbg"]) class AconMixin(DbgMixin): # class AconMixin: async def connect(self): # print("-----------CONNECTING async def connect") # p("ACON!!!!!!!!!!!!!!!!!!!!!!!!!!!") p(flush=True) await self.accept() verbose("") # await self.ws_conn_once(event) await self.ws_conn_once() async def ws_conn_once(self, *a,**kw):pass async def ws_disconn_once(self, *a,**kw):pass async def ws_rec_once(self, *a,**kw):pass async def receive(self, *a,**kw): # print("REC") # print("-----------REC") await self.call_all_mro("ws_rec",*a,**kw) await self.ws_rec_once(*a,**kw) print("",end="",flush=True) async def call_all_mro(self,mthd_name,*args,**kwargs): called = set([None]) for cls_obj in self.__class__.__mro__: mthd = getattr(cls_obj,mthd_name,None) # print("~",cls_obj,mthd) if not mthd in called: # print(cls_obj,mthd) called.add(mthd) await mthd(self,*args,**kwargs) async def disconnect(self, close_code): await self.call_all_mro("ws_disconn",close_code) await self.ws_disconn_once(close_code) async def websocket_connect_x(self, event): await super().websocket_connect(event) print("") await self.ws_conn_once(event) class AclTaskMixin: async def ws_disconn(self,*a,**kw): # for group in self.groups_set: pass async def ws_conn0(self): self.groups_set = set() if glob_cl.has_cl: return glob_cl.has_cl = True print("AclTaskMixin:ws_conn0") glob_cl.run_persistent_loop(self) glob_cl.acl = self.channel_layer # print("++++++++++++++++++++++++++") async def ws_conn_once(self,*a,**kw): pass # print("\t\tws_conn_once:","AclTaskMixin") class DevMixin(*eclass_factory(10)):pass class DevMroMixin(*eclass_factory(10)):pass class WSStoMixin(*eclass_factory(10)):pass class GCLMixin(*eclass_factory(10)):pass class RldManMixin(*eclass_factory(10)):pass class LoneMixinA():pass class LoneMixinB():pass Mixins = Eobj() Mixins.DevMixin = DevMixin Mixins.DevMroMixin = DevMroMixin Mixins.WSStoMixin = WSStoMixin Mixins.GCLMixin = GCLMixin Mixins.RldManMixin = RldManMixin Mixins.LoneMixinA = LoneMixinA Mixins.LoneMixinB = LoneMixinB Mixins.DbgMixin = DbgMixin Mixins.AconMixin = AconMixin Mixins.AclTaskMixin = AclTaskMixin # GCLMixin class globCL(GCLMixin): def __init__(self,*a,**kw): # for now we are not going to do any weird class stuff self.acl = None self.has_cl = False # rconns = connect_redis() self.rconn1 = connect_redis1() self.rconn = connect_redis() self.rman = RldMan() self.cbs_once = [] self.cbs_asap = [] self.cbs_once_set = set() self.cbs = {"fn1":fn1,"fn2x":nop} self.timeout = 1 self.fast_timeout = .1 pass async def persistent_fast_cb_loop(self,*a,**kw): while 1: if len(self.cbs_asap): cbs_once = [*self.cbs_asap] self.cbs_asap=[] k=0 for v in cbs_once: try: call_info = callable_helper(v) if (call_info["is_callable"]): if call_info["is_coroutine"]: pass await v({k,self}) else: v({k,self}) else: pass # p(k,v) pass except Exception as e: p("persistent_fast_cb_loop Exception cbs_once:",e) p(flush=True) # raise else: pass finally: pass k += 1 await asyncio.sleep(self.fast_timeout) async def persistent_loop(self,*a,**kw): while 1: # p("PL:",time.time(),flush=1) # await asyncio.sleep(5) # for k,v in cls.cbs.items(): for k,v in [*self.cbs.items()]: try: call_info = callable_helper(v) if (call_info["is_callable"]): if call_info["is_coroutine"]: pass await v({k,self}) else: v({k,self}) else: pass # p(k,v) pass except Exception as e: p("persistent_loop Exception",e) p(flush=True) # raise else: pass finally: pass # cbs_once cbs_once = [*self.cbs_once,*self.cbs_once_set] self.cbs_once = [] self.cbs_once_set = set() k=0 for v in cbs_once: try: call_info = callable_helper(v) if (call_info["is_callable"]): if call_info["is_coroutine"]: pass await v({k,self}) else: v({k,self}) else: pass # p(k,v) pass except Exception as e: p("persistent_loop Exception cbs_once:",e) p(flush=True) # raise else: pass finally: pass k += 1 await asyncio.sleep(self.timeout) def _run_persistent_loop(self,*a,**kw): p("_run_persistent_loop:") loop = asyncio.get_event_loop() self.run_persistent_loop = nop self.rman.persistent_loop_cb() loop.create_task(self.persistent_loop()) loop.create_task(self.persistent_fast_cb_loop()) self.loop = loop run_persistent_loop = _run_persistent_loop def sync_dev_group_discard(self,*a,**kw): glob_cl.cbs_once.append(self.sync_dev_group_discard(*a,**kw)) async def dev_group_discard(self,group_name,consumer): channel_layer = glob_cl.acl if consumer.channel_name in consumer.groups: consumer.groups.remove(group_name) await channel_layer.group_discard(group_name,consumer.channel_name) def sync_dev_group_add(self,*a,**kw): glob_cl.cbs_once.append(self.group_add(*a,**kw)) async def dev_group_add(self,group_name,consumer): channel_layer = glob_cl.acl # if not consumer.channel_name in consumer.groups_set: # consumer.groups_set.add(groups) if not consumer.channel_name in consumer.groups: consumer.groups.append(group_name) await channel_layer.group_add(group_name,consumer.channel_name) # RldManMixin class RldMan(RldManMixin): def __init__(self,*a,**kw): self.files = {} z="ABC" self.scopes = { "dflt_scope":{"globals":globals(),"locals":locals()}, "dflt_scope_zloc":{"globals":globals(),"locals":{}}, "dflt_scope_gscope":{"globals":globals(),"locals":{}}, "dflt_scope_copy": {"globals":copy.copy(globals()),"locals":copy.copy(locals())}, # "dflt_scope":{"globals":globals(),"locals":{}} } self.scopes['current_scope'] = self.scopes['dflt_scope_zloc'] # self.scope_opt = "locals" self.scope_opt = "globals" self.print_tb = 0 base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/" file_list = [ base_path+"i0.py", base_path+"i1.py", base_path+"i2.py", base_path+"i3.py", base_path+"i4.py", base_path+"i5.py", ] self.add_files(file_list) # self.add_files(file_list,{"run"}) def persistent_loop_cb(self,*a,**kw): base_path = "/usr/games/repos/ign4/py_rld/djc/rdir1/" file_list = [ base_path+"r0.py", base_path+"r1.py", base_path+"r2.py", base_path+"r3.py", base_path+"r4.py", base_path+"r5.py", ] self.add_files(file_list) def add_files(self,files): for file in files: if type(file)==str: self.add_file(file) elif type(file)==list: self.add_file(*file) else: p("add files???",file) def add_file(self,file_name,fnx={}): self.files[file_name] = {"ftxt":"",**fnx} def get_scope(self,rfile_obj,file_name): return self.scopes["current_scope"] return { "scope":self.scopes["current_scope"], } # rfile_obj.get("scope") # if def rld_file(self,file_name,ret= {"errs":{},"all":{},"alle":{},},**kw): if file_name in self.files: rfile_obj = self.files[file_name] st = os.stat(file_name) st_tuple = (st.st_mtime,st.st_size) # if rfile_obj["ftxt"] == "": eflag ="nd" # rfile_obj["ftxt"] = st_tuple if rfile_obj["ftxt"] != st_tuple: # p(rfile_obj["ftxt"]) rfile_obj["ftxt"] = st_tuple try: f = open(file_name,"r") ftxt = f.read() # scope_key = rfile_obj.get("scope") # p(scope_key,rfile_obj) f.close() scope_obj = self.get_scope(file_name,rfile_obj) if self.scope_opt == "locals": eflag ="locals" exec(ftxt,scope_obj["globals"],scope_obj["locals"]) elif self.scope_opt == "globals": eflag ="globals" exec(ftxt,scope_obj["globals"]) else: eflag ="[]" exec(ftxt) except Exception as e: p() print("file:",file_name) print("EXCEPT",eflag,e) if self.print_tb: traceback.print_tb(e.__traceback__,file=sys.stdout) else: pass # print("ELSE") finally: # print("FINALLY") pass print(end="",flush=True) return ret def rld_files(self): # p() ret = { "errs":{}, "all":{}, "alle":{}, } # for k in self.files: for k in [*self.files]: self.rld_file(k,ret) # p("rld_files!") p("",end="",flush=True) return ret def fn1(*a,**kw): # rld_files glob_cl.rman.rld_files() glob_cl = globCL() glob_cl.rman.rld_files() # print("..",end="\n",flush=True) r""" def atexit_fn(): pass # rconn.flushall() print("ATEXIT FN") rinfo_len() keys = rconn.keys() print(keys) print(len(keys)) print("t 000") # time.sleep(10) print("t 010") # time.sleep(20) print("t 020") print("ATEXIT FN...") rconn.flushall() atexit.register(atexit_fn) # """ def atexit_fn(): print("atexit_fn!\n\n") rconn = glob_cl.rconn # rconn.flushall() rconn.flushdb() print("",end="",flush=True) atexit.register(atexit_fn)