起步
Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統,并且提供維護這樣一個系統的必需工具。它是一個專注于實時處理的任務隊列,同時也支持任務調度。
運行模式是生產者消費者模式:
任務隊列:任務隊列是一種在線程或機器間分發任務的機制。
消息隊列:消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。
Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件:Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。
任務執行單元:Worker是Celery提供的任務執行的單元,worker并發的運行在分布式的系統節點中
任務結果存儲:Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。
安裝
通過 pip 命令即可安裝:
pip install celery
本文使用 redis 做消息中間件,所以需要在安裝:
pip install redis
redis軟件也要安裝,官網只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。
簡單的demo
為了運行一個簡單的任務,從中說明 celery 的使用方式。在項目文件夾內創建 app.py 和 tasks.py 。tasks.py 用來定義任務:
# tasks.pyimport timefrom celery import Celerybroker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'app = Celery('my_tasks', broker=broker, backend=backend)@app.taskdef add(x, y): print('enter task') time.sleep(3) return x + y這些代碼做了什么事。 broker 指定任務隊列的消息中間件,backend 指定了任務執行結果的存儲。app 就是我們創建的 Celery 對象。通過 app.task 修飾器將 add 函數變成一個一部的任務。
# app.pyfrom tasks import addif __name__ == '__main__': print('start task') result = add.delay(2, 18) print('end task') print(result)add.delay 函數將任務序列化發送到消息中間件。終端執行 python app.py 可以看到輸出一個任務的唯一識別:
start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43
這個只是將任務推送到 redis,任務還沒被消費,任務會在 celery 隊列中。
開啟 celery woker 可以將任務進行消費:
celery worker -A tasks -l info # -A 后是模塊名
A 參數指定了celery 對象的位置,l 參數指定woker的日志級別。
如果此命令在終端報錯:
File "e:/workspace/.env/lib/site-packages/celery/app/trace.py", line 537, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
這是win 10 在使用 Celery 4.x 的時候會有這個問題,解決方式可以是改用 Celery 3.x 版本,或者按照 Unable to run tasks under Windows 上提供的方式,該issue提供了兩種方式解決,一種是安裝 eventlet 擴展:
pip install eventletcelery -A <mymodule> worker -l info -P eventlet
另一種方式是添加個 FORKED_BY_MULTIPROCESSING = 1 的環境變量(推薦這種方式):
import osos.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')如果一切順利,woker 正常啟動,就能在終端看到任務被消費了:
[2018-11-27 13:59:27,830: INFO/MainProcess] Received task: tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: WARNING/SpawnPoolWorker-2] enter task
[2018-11-27 13:59:30,835: INFO/SpawnPoolWorker-2] Task tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20
說明我們的demo已經成功了。
使用配置文件
在上面的demo中,是將broker和backend直接寫在代碼中的,而 Celery 還有其他配置,最好是寫出配置文件的形式,基本配置項有:
整理一下目錄結構,將我們的任務封裝成包:
內容如下:
# __init__.pyimport osfrom celery import Celeryos.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')app = Celery('demo')# 通過 Celery 實例加載配置模塊app.config_from_object('celery_app.celery_config')# celery_config.pyBROKER_URL = 'redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'# UTCCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = 'Asia/Shanghai'# 導入指定的任務模塊CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2',)# task1.pyimport timefrom celery_app import app@app.taskdef add(x, y): print('enter task') time.sleep(3) return x + y# task2.pyimport timefrom celery_app import app@app.taskdef mul(x, y): print('enter task') time.sleep(4) return x * y# app.pyfrom celery_app import task1if __name__ == '__main__': pass print('start task') result = task1.add.delay(2, 18) print('end task') print(result)提交任務與啟動worker:
$ python app.py$ celery worker -A celery_app -l info
result = task1.add.delay(2, 18) 返回的是一個任務對象,通過 delay 函數的方式可以發現這個過程是非阻塞的,這個任務對象有一個方法:
r.ready() # 查看任務狀態,返回布爾值, 任務執行完成, 返回 True, 否則返回 False.r.wait() # 等待任務完成, 返回任務執行結果,很少使用;r.get(timeout=1) # 獲取任務執行結果,可以設置等待時間r.result # 任務執行結果.r.state # PENDING, START, SUCCESS,任務當前的狀態r.status # PENDING, START, SUCCESS,任務當前的狀態r.successful # 任務成功返回truer.traceback # 如果任務拋出了一個異常,你也可以獲取原始的回溯信息
定時任務
定時任務的功能類似 crontab,可以完成每日統計任務等。首先我們需要配置一下 schedule,通過改造上面的配置文件,添加 CELERYBEAT_SCHEDULE 配置:
import datetimefrom celery.schedules import crontabCELERYBEAT_SCHEDULE = { 'task1-every-1-min': { 'task': 'celery_app.task1.add', 'schedule': datetime.timedelta(seconds=60), 'args': (2, 15), }, 'task2-once-a-day': { 'task': 'celery_app.task2.mul', 'schedule': crontab(hour=15, minute=23), 'args': (3, 6), }}task 指定要執行的任務;schedule 表示計劃的時間,datetime.timedelta(seconds=60) 表示間隔一分鐘,這里其實也可以是 crontab(minute='*/1') 來替換;args 表示要傳遞的參數。
啟動 celery beat:
$ celery worker -A celery_app -l info
我們目前是用兩個窗口來執行 woker 和 beat 。當然也可以只使用一個窗口來運行(僅限linux系統):
$ celery -B -A celery_app worker -l info
celery.task 裝飾器
@celery.task()def name(): pass
task() 方法將任務修飾成異步, name 可以顯示指定的任務名字;serializer 指定序列化的方式;bind 一個bool值,若為True,則task實例會作為第一個參數傳遞到任務方法中,可以訪問task實例的所有的屬性,即前面反序列化中那些屬性。
@task(bind=True) # 第一個參數是self,使用self.request訪問相關的屬性def add(self, x, y): logger.info(self.request.id)
base 可以指定任務積累,可以用來定義回調函數:
import celeryclass MyTask(celery.Task): # 任務失敗時執行 def on_failure(self, exc, task_id, args, kwargs, einfo): print('{0!r} failed: {1!r}'.format(task_id, exc)) # 任務成功時執行 def on_success(self, retval, task_id, args, kwargs): pass # 任務重試時執行 def on_retry(self, exc, task_id, args, kwargs, einfo): pass@task(base=MyTask)def add(x, y): raise KeyError()exc:失敗時的錯誤的類型;task_id:任務的id;args:任務函數的參數;kwargs:參數;einfo:失敗時的異常詳細信息;retval:任務成功執行的返回值;總結
網上找了一份比較常用的配置文件,需要的時候可以參考下:
# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URLBROKER_URL = 'amqp://username:passwd@host:port/虛擬主機名'# 指定結果的接受地址CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'# 指定任務序列化方式CELERY_TASK_SERIALIZER = 'msgpack' # 指定結果序列化方式CELERY_RESULT_SERIALIZER = 'msgpack'# 任務過期時間,celery任務執行結果的超時時間CELERY_TASK_RESULT_EXPIRES = 60 * 20 # 指定任務接受的序列化類型.CELERY_ACCEPT_CONTENT = ["msgpack"] # 任務發送完成是否需要確認,這一項對性能有一點影響 CELERY_ACKS_LATE = True # 壓縮方案選擇,可以是zlib, bzip2,默認是發送沒有壓縮的數據CELERY_MESSAGE_COMPRESSION = 'zlib' # 規定完成任務的時間CELERYD_TASK_TIME_LIMIT = 5 # 在5s內完成任務,否則執行該任務的worker將被殺死,任務移交給父進程# celery worker的并發數,默認是服務器的內核數目,也是命令行-c參數指定的數目CELERYD_CONCURRENCY = 4 # celery worker 每次去rabbitmq預取任務的數量CELERYD_PREFETCH_MULTIPLIER = 4 # 每個worker執行了多少任務就會死掉,默認是無限的CELERYD_MAX_TASKS_PER_CHILD = 40 # 這是使用了django-celery默認的數據庫調度模型,任務執行周期都被存在你指定的orm數據庫中# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'# 設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數據都會發送到默認的隊列中CELERY_DEFAULT_QUEUE = "default"# 設置詳細的隊列CELERY_QUEUES = { "default": { # 這是上面指定的默認隊列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列 "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # 設置扇形交換機 "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", },}以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。
新聞熱點
疑難解答