国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python實現事件驅動

2020-01-04 14:00:38
字體:
來源:轉載
供稿:網友

本文實例為大家分享了python實現事件驅動的具體代碼,供大家參考,具體內容如下

EventManager事件管理類實現,大概就百來行代碼左右。

# encoding: UTF-8# 系統模塊from Queue import Queue, Emptyfrom threading import *#################################################class EventManager: #---------------------------------------------------------------------- def __init__(self):  """初始化事件管理器"""  # 事件對象列表  self.__eventQueue = Queue()  # 事件管理器開關  self.__active = False  # 事件處理線程  self.__thread = Thread(target = self.__Run)   # 這里的__handlers是一個字典,用來保存對應的事件的響應函數  # 其中每個鍵對應的值是一個列表,列表中保存了對該事件監聽的響應函數,一對多  self.__handlers = {}  #---------------------------------------------------------------------- def __Run(self):  """引擎運行"""  while self.__active == True:   try:    # 獲取事件的阻塞時間設為1秒    event = self.__eventQueue.get(block = True, timeout = 1)     self.__EventProcess(event)   except Empty:    pass  #---------------------------------------------------------------------- def __EventProcess(self, event):  """處理事件"""  # 檢查是否存在對該事件進行監聽的處理函數  if event.type_ in self.__handlers:   # 若存在,則按順序將事件傳遞給處理函數執行   for handler in self.__handlers[event.type_]:    handler(event)  #---------------------------------------------------------------------- def Start(self):  """啟動"""  # 將事件管理器設為啟動  self.__active = True  # 啟動事件處理線程  self.__thread.start()  #---------------------------------------------------------------------- def Stop(self):  """停止"""  # 將事件管理器設為停止  self.__active = False  # 等待事件處理線程退出  self.__thread.join()  #---------------------------------------------------------------------- def AddEventListener(self, type_, handler):  """綁定事件和監聽器處理函數"""  # 嘗試獲取該事件類型對應的處理函數列表,若無則創建  try:   handlerList = self.__handlers[type_]  except KeyError:   handlerList = []   self.__handlers[type_] = handlerList  # 若要注冊的處理器不在該事件的處理器列表中,則注冊該事件  if handler not in handlerList:   handlerList.append(handler)  #---------------------------------------------------------------------- def RemoveEventListener(self, type_, handler):  """移除監聽器的處理函數"""  #讀者自己試著實現  #---------------------------------------------------------------------- def SendEvent(self, event):  """發送事件,向事件隊列中存入事件"""  self.__eventQueue.put(event) ########################################################################"""事件對象"""class Event: def __init__(self, type_=None):  self.type_ = type_  # 事件類型  self.dict = {}   # 字典用于保存具體的事件數據

測試代碼

#-------------------------------------------------------------------# encoding: UTF-8import sysfrom datetime import datetimefrom threading import *from EventManager import * #事件名稱 新文章EVENT_ARTICAL = "Event_Artical" #事件源 公眾號class PublicAccounts: def __init__(self,eventManager):  self.__eventManager = eventManager def WriteNewArtical(self):  #事件對象,寫了新文章  event = Event(type_=EVENT_ARTICAL)  event.dict["artical"] = u'如何寫出更優雅的代碼/n'  #發送事件  self.__eventManager.SendEvent(event)  print u'公眾號發送新文章/n' #監聽器 訂閱者class Listener: def __init__(self,username):  self.__username = username  #監聽器的處理函數 讀文章 def ReadArtical(self,event):  print(u'%s 收到新文章' % self.__username)  print(u'正在閱讀新文章內容:%s' % event.dict["artical"]) """測試函數"""#--------------------------------------------------------------------def test(): listner1 = Listener("thinkroom") #訂閱者1 listner2 = Listener("steve")#訂閱者2  eventManager = EventManager()  #綁定事件和監聽器響應函數(新文章) eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical) eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical) eventManager.Start()  publicAcc = PublicAccounts(eventManager) timer = Timer(2, publicAcc.WriteNewArtical) timer.start() if __name__ == '__main__': test()

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。


注:相關教程知識閱讀請移步到python教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 临潭县| 溆浦县| 武鸣县| 平潭县| 焉耆| 凤山市| 若羌县| 瓦房店市| 宝兴县| 留坝县| 清原| 格尔木市| 昔阳县| 林周县| 文昌市| 鹤峰县| 东宁县| 通渭县| 亚东县| 福安市| 深州市| 青河县| 南开区| 枣阳市| 宜川县| 祥云县| 台东县| 东安县| 佛冈县| 大连市| 包头市| SHOW| 林口县| 西青区| 辉县市| 巧家县| 米林县| 林西县| 邳州市| 淮滨县| 鲁山县|