国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python實現mysql的讀寫分離及負載均衡

2020-02-22 23:07:01
字體:
來源:轉載
供稿:網友

Oracle數據庫有其公司開發的配套rac來實現負載均衡,目前已知的最大節點數能到128個,但是其帶來的維護成本無疑是很高的,并且rac的穩定性也并不是特別理想,尤其是節點很多的時候。

       但是,相對mysql來說,rac的實用性要比mysql的配套集群軟件mysql-cluster要高很多。因為從網上了解到情況來看,很少公司在使用mysql-cluster,大多數企業都會選擇第三方代理軟件,例如MySQL Proxy、Mycat、haproxy等,但是這會引起另外一個問題:單點故障(包括mysql-cluster:管理節點)。如果要解決這個問題,就需要給代理軟件搭建集群,在訪問量很大的情況下,代理軟件的雙機或三機集群會成為訪問瓶頸,繼續增加其節點數,無疑會帶來各方面的成本。

那么,如何可以解決這個問題呢?

          解決上述問題,最好的方式個人認為應該是在程序中實現。通過和其他mysql DBA的溝通,也證實了這個想法。但是由此帶來的疑問也就產生了:會不會增加開發成本?對現有的應用系統做修改會不會改動很大?會不會增加后期版本升級的難度?等等。

        對于一個架構設計良好的應用系統可以很肯定的回答:不會。

        那么怎么算一個架構設計良好的應用系統呢?

       簡單來說,就是分層合理、功能模塊之間耦合性底。以本人的經驗來說,系統設計基本上可以劃分為以下四層:

       1.  實體層:主要定義一些實體類

       2.  數據層:也可以叫SQL處理層。主要負責跟數據庫交互取得數據

       3.  業務處:主要是根據業務流程及功能區分模塊(或者說定義不同的業務類)

       4.  表現層:呈現最終結果給用戶

       實現上述功能(mysql的讀寫分離及負載均衡),在這四個層次中,僅僅涉及到數據層。

嚴格來說,對于設計良好的系統,只涉及到一個類的一個函數:在數據層中,一般都會單獨劃分出一個連接類,并且這個連接類中會有一個連接函數,需要改動的就是這個函數:在讀取連接字符串之前加一個功能函數返回需要的主機、ip、端口號等信息(沒有開發經歷的同學可能理解這段話有點費勁)。

       流程圖如下:

           代碼如下:

import mmapimport jsonimport randomimport mysql.connectorimport time##公有變量#dbinfos={#   "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1},#   "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1}#   }dbinfos={}mmap_file = Nonemmap_time=None##這個函數返回json格式的字符串,也是實現初始化數據庫信息的地方##使用json格式是為了方便數據轉換,從字符串---》二進制--》字符串---》字典##如果采用其它方式共享dbinfos的方法,可以不用此方式##配置庫的地址def get_json_str1(): return json.dumps(dbinfos)##讀取配置庫中的內容def get_json_str(): try:  global dbinfos  cnx = mysql.connector.connect(user='root', password='Abcd.1234',        host='192.168.42.60',        database='rwlb')  cursor = cnx.cursor()  cmdString="select * from rwlb"  cnt=-1  cursor.execute(cmdString)  for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor:   cnt=cnt+1   dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status}   dbinfos["db"+str(cnt)]=dict_db  cursor.close()  cnx.close()  return json.dumps(dbinfos) except:  cursor.close()  cnx.close()  return ""##判斷是否能正常連接到數據庫def check_conn_host(): try:  cnx = mysql.connector.connect(user='root', password='Abcd.1234',        host='192.168.42.60',        database='rwlb')  cursor = cnx.cursor()  cmdString="select user()"  cnt=-1  cursor.execute(cmdString)  for user in cursor:   cnt=len(user)  cursor.close()  cnx.close()  return cnt except :  return -1;##select 屬于讀操作,其他屬于寫操作-----這里可以劃分的更詳細,比如執行存儲過程等def analyze_sql_state(sql): if "select" in sql:  return "R" else:  return "W"##讀取時間信息def read_mmap_time(): global mmap_time,mmap_file mmap_time.seek(0) ##初始時間 inittime=int(mmap_time.read().translate(None, b'/x00').decode()) ##當前時間 endtime=int(time.time()) ##時間差 dis_time=endtime-inittime print("dis_time:"+str(dis_time)) #重新讀取數據 if dis_time>10:  ##當配置庫正常的情況下才重新讀取數據  print(str(check_conn_host()))  if check_conn_host()>0:      print("read data again")   mmap_time.seek(0)   mmap_file.seek(0)   mmap_time.write(b'/x00')   mmap_file.write(b'/x00')   get_mmap_time()   get_mmap_info()  else:   print("can not connect to host")    #不重新讀取數據 else:  print("do not read data again")##從內存中讀取信息,def read_mmap_info(sql): read_mmap_time() print("The data is in memory") global mmap_file,dict_db mmap_file.seek(0) ##把二進制轉換為字符串 info_str=mmap_file.read().translate(None, b'/x00').decode() #3把字符串轉成json格式,方便后面轉換為字典使用 infos=json.loads(info_str)  host_count=len(infos) ##權重列表 listw=[] ##總的權重數量 wtotal=0 ##數據庫角色 dbrole=analyze_sql_state(sql) ##根據權重初始化一個列表。這個是比較簡單的算法,所以權重和控制在100以內比較好----這里可以選擇其他比較好的算法 for i in range(host_count):  db="db"+str(i)  if dbrole in infos[db]["role"]:   if int(infos[db]["status"])==1:    w=infos[db]["weight"]    wtotal=wtotal+w    for j in range(w):     listw.append(i) if wtotal >0:  ##產生一個隨機數  rad=random.randint(0,wtotal-1)  ##讀取隨機數所在的列表位置的數據  dbindex=listw[rad]  ##確定選擇的是哪個db  db="db"+str(dbindex)  ##為dict_db賦值,即選取的db的信息  dict_db=infos[db]  return dict_db else :  return {}##如果內存中沒有時間信息,則向內存紅寫入時間信息def get_mmap_time(): global mmap_time ##第二個參數1024是設定的內存大小,單位:字節。如果內容較多,可以調大一點 mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time') ##讀取有效比特數,不包括空比特 cnt=mmap_time.read_byte() if cnt==0:  print("Load time to memory")  mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')  inittime=str(int(time.time()))  mmap_time.write(inittime.encode())##如果內存中沒有對應信息,則向內存中寫信息以供下次調用使用def get_mmap_info(): global mmap_file ##第二個參數1024是設定的內存大小,單位:字節。如果內容較多,可以調大一點 mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') ##讀取有效比特數,不包括空比特 cnt=mmap_file.read_byte() if cnt==0:  print("Load data to memory")  mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')  mmap_file.write(get_json_str().encode())##測試函數def test1(): get_mmap_time() get_mmap_info() for i in range(10):  sql="select * from db"  #sql="update t set col1=a where b=2"  dbrole=analyze_sql_state(sql)  dict_db=read_mmap_info(sql)  print(dict_db["host"])def test2(): sql="select * from db" res=analyze_sql_state(sql) print("select:"+res) sql="update t set col1=a where b=2" res=analyze_sql_state(sql) print("update:"+res) sql="insert into t values(1,2)" res=analyze_sql_state(sql) print("insert:"+res) sql="delete from t where b=2" res=analyze_sql_state(sql) print("delete:"+res)##類似主函數if __name__=="__main__": test2()            
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 平邑县| 绥芬河市| 晴隆县| 北宁市| 大丰市| 柳州市| 南阳市| 门源| 牙克石市| 东阳市| 达尔| 高雄县| 温泉县| 体育| 邯郸市| 勃利县| 顺义区| 大英县| 襄垣县| 泰和县| 县级市| 淄博市| 修文县| 昌图县| 蒙城县| 祁门县| 陈巴尔虎旗| 和田市| 山阳县| 睢宁县| 特克斯县| 安岳县| 柳州市| 张家港市| 南投市| 新巴尔虎左旗| 新民市| 扶余县| 新乐市| 洪湖市| 徐州市|