国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python基于mysql實現的簡單隊列以及跨進程鎖實例詳解

2019-11-25 18:20:23
字體:
來源:轉載
供稿:網友

通常在我們進行多進程應用開發的過程中,不可避免的會遇到多個進程訪問同一個資源(臨界資源)的狀況,這時候必須通過加一個全局性的鎖,來實現資源的同步訪問(即:同一時間里只能有一個進程訪問資源)。

舉個例子如下:

假設我們用mysql來實現一個任務隊列,實現的過程如下:

1. 在Mysql中創建Job表,用于儲存隊列任務,如下:

create table jobs(  id auto_increment not null primary key,  message text not null,  job_status not null default 0);

message 用來存儲任務信息,job_status用來標識任務狀態,假設只有兩種狀態,0:在隊列中, 1:已出隊列 
 
2. 有一個生產者進程,往job表中放新的數據,進行排隊:

insert into jobs(message) values('msg1');

3.假設有多個消費者進程,從job表中取排隊信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;update jobs set job_status=1 where id = ?; -- id為剛剛取得的記錄id

4. 如果沒有跨進程的鎖,兩個消費者進程有可能同時取到重復的消息,導致一個消息被消費多次。這種情況是我們不希望看到的,于是,我們需要實現一個跨進程的鎖。

=========================分割線=======================================

說到跨進程的鎖實現,我們主要有幾種實現方式:

(1)信號量
(2)文件鎖fcntl
(3)socket(端口號綁定)
(4)signal
這幾種方式各有利弊,總體來說前2種方式可能多一點,這里我就不詳細說了,大家可以去查閱資料。
 
查資料的時候發現mysql中有鎖的實現,適用于對于性能要求不是很高的應用場景,大并發的分布式訪問可能會有瓶頸.
 
對此用python實現了一個demo,如下:
 
文件名:glock.py

#!/usr/bin/env python2.7 # # -*- coding:utf-8 -*- # #  Desc  : # import logging, time import MySQLdb class Glock:   def __init__(self, db):     self.db = db   def _execute(self, sql):     cursor = self.db.cursor()     try:       ret = None       cursor.execute(sql)       if cursor.rowcount != 1:         logging.error("Multiple rows returned in mysql lock function.")         ret = None       else:         ret = cursor.fetchone()       cursor.close()       return ret     except Exception, ex:       logging.error("Execute sql /"%s/" failed! Exception: %s", sql, str(ex))       cursor.close()       return None   def lock(self, lockstr, timeout):     sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout)     ret = self._execute(sql)      if ret[0] == 0:       logging.debug("Another client has previously locked '%s'.", lockstr)       return False     elif ret[0] == 1:       logging.debug("The lock '%s' was obtained successfully.", lockstr)       return True     else:       logging.error("Error occurred!")       return None   def unlock(self, lockstr):     sql = "SELECT RELEASE_LOCK('%s')" % (lockstr)     ret = self._execute(sql)     if ret[0] == 0:       logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr)       return False     elif ret[0] == 1:       logging.debug("The lock '%s' the lock was released.", lockstr)       return True     else:       logging.error("The lock '%s' did not exist.", lockstr)       return None #Init logging def init_logging():   sh = logging.StreamHandler()   logger = logging.getLogger()   logger.setLevel(logging.DEBUG)   formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')   sh.setFormatter(formatter)   logger.addHandler(sh)   logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) def main():   init_logging()   db = MySQLdb.connect(host='localhost', user='root', passwd='')   lock_name = 'queue'    l = Glock(db)    ret = l.lock(lock_name, 10)   if ret != True:     logging.error("Can't get lock! exit!")     quit()   time.sleep(10)   logging.info("You can do some synchronization work across processes!")   ##TODO   ## you can do something in here ##   l.unlock(lock_name) if __name__ == "__main__":   main() 

在main函數里:

l.lock(lock_name, 10) 中,10是表示timeout的時間是10秒,如果10秒還獲取不了鎖,就返回,執行后面的操作。
 
在這個demo中,在標記TODO的地方,可以將消費者從job表中取消息的邏輯放在這里。即分割線以上的.

2.假設有多個消費者進程,從job表中取排隊信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;update jobs set job_status=1 where id = ?; -- id為剛剛取得的記錄id

這樣,就能保證多個進程訪問臨界資源時同步進行了,保證數據的一致性。
 
測試的時候,啟動兩個glock.py, 結果如下:

[@tj-10-47 test]# ./glock.py  2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. 

可以看到第一個glock.py是 17:08:50解鎖的,下面的glock.py是在17:08:50獲取鎖的,可以證實這樣是完全可行的。

[@tj-10-47 test]# ./glock.py 2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.[@tj-10-47 test]#

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 东台市| 西青区| 邻水| 墨江| 星座| 南川市| 永德县| 阳江市| 四会市| 洮南市| 龙胜| 色达县| 合作市| 锡林浩特市| 留坝县| 阜阳市| 金阳县| 洛阳市| 濉溪县| 嘉定区| 江孜县| 南充市| 巨野县| 安泽县| 高碑店市| 宝清县| 甘德县| 华安县| 定结县| 阿克| 江都市| 九台市| 贵定县| 皮山县| 绥阳县| 吴旗县| 福清市| 景洪市| 历史| 武汉市| 英超|