概述
Redis是一個開源,先進的key-value存儲,并用于構建高性能,可擴展的Web應用程序的完美解決方案。
Redis從它的許多競爭繼承來的三個主要特點:
Redis數據庫完全在內存中,使用磁盤僅用于持久性。
相比許多鍵值數據存儲,Redis擁有一套較為豐富的數據類型。
Redis可以將數據復制到任意數量的從服務器。
Redis 優勢
異常快速:Redis的速度非常快,每秒能執行約11萬集合,每秒約81000+條記錄。
支持豐富的數據類型:Redis支持最大多數開發人員已經知道像列表,集合,有序集合,散列數據類型。這使得它非常容易解決各種各樣的問題,因為我們知道哪些問題是可以處理通過它的數據類型更好。
操作都是原子性:所有Redis操作是原子的,這保證了如果兩個客戶端同時訪問的Redis服務器將獲得更新后的值。
多功能實用工具:Redis是一個多實用的工具,可以在多個用例如緩存,消息,隊列使用(Redis原生支持發布/訂閱),任何短暫的數據,應用程序,如Web應用程序會話,網頁命中計數等。
步入主題:
Redis作為內存數據庫的一個典型代表,已經在很多應用場景中被使用,這里僅就Redis的pub/sub功能來說說怎樣通過此功能來實現一個簡單的作業調度系統。這里只是想展現一個簡單的想法,所以還是有很多需要考慮的東西沒有包括在這個例子中,比如錯誤處理,持久化等。
下面是實現上的想法
MyMaster:集群的master節點程序,負責產生作業,派發作業和獲取執行結果。
MySlave:集群的計算節點程序,每個計算節點一個,負責獲取作業并運行,并將結果發送會master節點。
channel CHANNEL_DISPATCH:每個slave節點訂閱一個channel,比如“CHANNEL_DISPATCH_[idx或機器名]”,master會向此channel中publish被dispatch的作業。
channel CHANNEL_RESULT:用來保存作業結果的channel,master和slave共享此channel,master訂閱此channel來獲取作業運行結果,每個slave負責將作業執行結果發布到此channel中。
Master代碼
#!/usr/bin/env python# -*- coding: utf-8 -*-import timeimport threadingimport randomimport redisREDIS_HOST = 'localhost'REDIS_PORT = 6379REDIS_DB = 0CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'CHANNEL_RESULT = 'CHANNEL_RESULT'class MyMaster():def __init__(self):passdef start(self):MyServerResultHandleThread().start()MyServerDispatchThread().start()class MyServerDispatchThread(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)for i in range(1, 100):channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))print("Dispatch job %s to %s" % (str(i), channel))ret = r.publish(channel, str(i))if ret == 0:print("Dispatch job %s failed." % str(i))time.sleep(5)class MyServerResultHandleThread(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)p = r.pubsub()p.subscribe(CHANNEL_RESULT)for message in p.listen():if message['type'] != 'message':continueprint("Received finished job %s" % message['data'])if __name__ == "__main__":MyMaster().start()time.sleep(10000) 說明
MyMaster類 - master主程序,用來啟動dispatch和resulthandler的線程
MyServerDispatchThread類 - 派發作業線程,產生作業并派發到計算節點
MyServerResultHandleThread類 - 作業運行結果處理線程,從channel里獲取作業結果并顯示
Slave代碼
#!/usr/bin/env python# -*- coding: utf-8 -*-from datetime import datetimeimport timeimport threadingimport randomimport redisREDIS_HOST = 'localhost'REDIS_PORT = 6379REDIS_DB = 0CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'CHANNEL_RESULT = 'CHANNEL_RESULT'class MySlave():def __init__(self):passdef start(self):for i in range(1, 4):MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()class MyJobWorkerThread(threading.Thread):def __init__(self, channel):threading.Thread.__init__(self)self.channel = channeldef run(self):r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)p = r.pubsub()p.subscribe(self.channel)for message in p.listen():if message['type'] != 'message':continueprint("%s: Received dispatched job %s " % (self.channel, message['data']))print("%s: Run dispatched job %s " % (self.channel, message['data']))time.sleep(2)print("%s: Send finished job %s " % (self.channel, message['data']))ret = r.publish(CHANNEL_RESULT, message['data'])if ret == 0:print("%s: Send finished job %s failed." % (self.channel, message['data']))if __name__ == "__main__":MySlave().start()time.sleep(10000) 說明
MySlave類 - slave節點主程序,用來啟動MyJobWorkerThread的線程
MyJobWorkerThread類 - 從channel里獲取派發的作業并將運行結果發送回master
測試
首先運行MySlave來定義派發作業channel。
然后運行MyMaster派發作業并顯示執行結果。
有關Python使用Redis實現作業調度系統(超簡單),小編就給大家介紹這么多,希望對大家有所幫助!



















