国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

詳解分布式任務隊列Celery使用說明

2020-01-04 13:58:54
字體:
來源:轉載
供稿:網友

起步

Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統,并且提供維護這樣一個系統的必需工具。它是一個專注于實時處理的任務隊列,同時也支持任務調度。

運行模式是生產者消費者模式:

分布式,任務隊列,Celery

任務隊列:任務隊列是一種在線程或機器間分發任務的機制。

消息隊列:消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。

Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之后中間人把消息派送給職程,職程對消息進行處理。

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件:Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis, MongoDB等,本文使用 redis 。

任務執行單元:Worker是Celery提供的任務執行的單元,worker并發的運行在分布式的系統節點中

任務結果存儲:Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。

安裝

通過 pip 命令即可安裝:

pip install celery

本文使用 redis 做消息中間件,所以需要在安裝:

pip install redis

redis軟件也要安裝,官網只提供了 linux 版本的下載:https://redis.io/download,windows 的可以到 https://github.com/MicrosoftArchive/redis 下載 exe 安裝包。

簡單的demo

為了運行一個簡單的任務,從中說明 celery 的使用方式。在項目文件夾內創建 app.py 和 tasks.py 。tasks.py 用來定義任務:

# tasks.pyimport timefrom celery import Celerybroker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'app = Celery('my_tasks', broker=broker, backend=backend)@app.taskdef add(x, y):  print('enter task')  time.sleep(3)  return x + y

這些代碼做了什么事。 broker 指定任務隊列的消息中間件,backend 指定了任務執行結果的存儲。app 就是我們創建的 Celery 對象。通過 app.task 修飾器將 add 函數變成一個一部的任務。

# app.pyfrom tasks import addif __name__ == '__main__':  print('start task')  result = add.delay(2, 18)  print('end task')  print(result)

add.delay 函數將任務序列化發送到消息中間件。終端執行 python app.py 可以看到輸出一個任務的唯一識別:

start task
end task
79ef4736-1ecb-4afd-aa5e-b532657acd43

這個只是將任務推送到 redis,任務還沒被消費,任務會在 celery 隊列中。

開啟 celery woker 可以將任務進行消費:

celery worker -A tasks -l info  # -A 后是模塊名

A 參數指定了celery 對象的位置,l 參數指定woker的日志級別。

如果此命令在終端報錯:

  File "e:/workspace/.env/lib/site-packages/celery/app/trace.py", line 537, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)

這是win 10 在使用 Celery 4.x 的時候會有這個問題,解決方式可以是改用 Celery 3.x 版本,或者按照 Unable to run tasks under Windows 上提供的方式,該issue提供了兩種方式解決,一種是安裝 eventlet 擴展:

pip install eventletcelery -A <mymodule> worker -l info -P eventlet

另一種方式是添加個 FORKED_BY_MULTIPROCESSING = 1 的環境變量(推薦這種方式):

import osos.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')

如果一切順利,woker 正常啟動,就能在終端看到任務被消費了:

[2018-11-27 13:59:27,830: INFO/MainProcess] Received task: tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19]
[2018-11-27 13:59:27,831: WARNING/SpawnPoolWorker-2] enter task
[2018-11-27 13:59:30,835: INFO/SpawnPoolWorker-2] Task tasks.add[745e5be7-4675-4f84-9d57-3f5e91c33a19] succeeded in 3.0s: 20

說明我們的demo已經成功了。

使用配置文件
在上面的demo中,是將broker和backend直接寫在代碼中的,而 Celery 還有其他配置,最好是寫出配置文件的形式,基本配置項有:

  • CELERY_DEFAULT_QUEUE:默認隊列
  • BROKER_URL  : 代理人的網址
  • CELERY_RESULT_BACKEND:結果存儲地址
  • CELERY_TASK_SERIALIZER:任務序列化方式
  • CELERY_RESULT_SERIALIZER:任務執行結果序列化方式
  • CELERY_TASK_RESULT_EXPIRES:任務過期時間
  • CELERY_ACCEPT_CONTENT:指定任務接受的內容序列化類型(序列化),一個列表;

整理一下目錄結構,將我們的任務封裝成包:

分布式,任務隊列,Celery

內容如下:

# __init__.pyimport osfrom celery import Celeryos.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')app = Celery('demo')# 通過 Celery 實例加載配置模塊app.config_from_object('celery_app.celery_config')# celery_config.pyBROKER_URL = 'redis://127.0.0.1:6379/1'CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'# UTCCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = 'Asia/Shanghai'# 導入指定的任務模塊CELERY_IMPORTS = (  'celery_app.task1',  'celery_app.task2',)# task1.pyimport timefrom celery_app import app@app.taskdef add(x, y):  print('enter task')  time.sleep(3)  return x + y# task2.pyimport timefrom celery_app import app@app.taskdef mul(x, y):  print('enter task')  time.sleep(4)  return x * y# app.pyfrom celery_app import task1if __name__ == '__main__':  pass  print('start task')  result = task1.add.delay(2, 18)  print('end task')  print(result)

提交任務與啟動worker:

$ python app.py$ celery worker -A celery_app -l info

result = task1.add.delay(2, 18) 返回的是一個任務對象,通過 delay 函數的方式可以發現這個過程是非阻塞的,這個任務對象有一個方法:

r.ready()   # 查看任務狀態,返回布爾值, 任務執行完成, 返回 True, 否則返回 False.r.wait()   # 等待任務完成, 返回任務執行結果,很少使用;r.get(timeout=1)    # 獲取任務執行結果,可以設置等待時間r.result   # 任務執行結果.r.state    # PENDING, START, SUCCESS,任務當前的狀態r.status   # PENDING, START, SUCCESS,任務當前的狀態r.successful # 任務成功返回truer.traceback # 如果任務拋出了一個異常,你也可以獲取原始的回溯信息

定時任務

定時任務的功能類似 crontab,可以完成每日統計任務等。首先我們需要配置一下 schedule,通過改造上面的配置文件,添加 CELERYBEAT_SCHEDULE 配置:

import datetimefrom celery.schedules import crontabCELERYBEAT_SCHEDULE = {  'task1-every-1-min': {    'task': 'celery_app.task1.add',    'schedule': datetime.timedelta(seconds=60),    'args': (2, 15),  },  'task2-once-a-day': {    'task': 'celery_app.task2.mul',    'schedule': crontab(hour=15, minute=23),    'args': (3, 6),  }}

task 指定要執行的任務;schedule 表示計劃的時間,datetime.timedelta(seconds=60) 表示間隔一分鐘,這里其實也可以是 crontab(minute='*/1') 來替換;args 表示要傳遞的參數。

啟動 celery beat:

$ celery worker -A celery_app -l info

分布式,任務隊列,Celery

我們目前是用兩個窗口來執行 woker 和 beat 。當然也可以只使用一個窗口來運行(僅限linux系統):

$ celery -B -A celery_app worker -l info

celery.task 裝飾器

@celery.task()def name():  pass

task() 方法將任務修飾成異步, name 可以顯示指定的任務名字;serializer 指定序列化的方式;bind 一個bool值,若為True,則task實例會作為第一個參數傳遞到任務方法中,可以訪問task實例的所有的屬性,即前面反序列化中那些屬性。

@task(bind=True) # 第一個參數是self,使用self.request訪問相關的屬性def add(self, x, y):  logger.info(self.request.id)

base 可以指定任務積累,可以用來定義回調函數:

import celeryclass MyTask(celery.Task):  # 任務失敗時執行  def on_failure(self, exc, task_id, args, kwargs, einfo):    print('{0!r} failed: {1!r}'.format(task_id, exc))  # 任務成功時執行  def on_success(self, retval, task_id, args, kwargs):    pass  # 任務重試時執行  def on_retry(self, exc, task_id, args, kwargs, einfo):    pass@task(base=MyTask)def add(x, y):  raise KeyError()exc:失敗時的錯誤的類型;task_id:任務的id;args:任務函數的參數;kwargs:參數;einfo:失敗時的異常詳細信息;retval:任務成功執行的返回值;

總結

網上找了一份比較常用的配置文件,需要的時候可以參考下:

# 注意,celery4版本后,CELERY_BROKER_URL改為BROKER_URLBROKER_URL = 'amqp://username:passwd@host:port/虛擬主機名'# 指定結果的接受地址CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'# 指定任務序列化方式CELERY_TASK_SERIALIZER = 'msgpack' # 指定結果序列化方式CELERY_RESULT_SERIALIZER = 'msgpack'# 任務過期時間,celery任務執行結果的超時時間CELERY_TASK_RESULT_EXPIRES = 60 * 20  # 指定任務接受的序列化類型.CELERY_ACCEPT_CONTENT = ["msgpack"]  # 任務發送完成是否需要確認,這一項對性能有一點影響   CELERY_ACKS_LATE = True # 壓縮方案選擇,可以是zlib, bzip2,默認是發送沒有壓縮的數據CELERY_MESSAGE_COMPRESSION = 'zlib' # 規定完成任務的時間CELERYD_TASK_TIME_LIMIT = 5 # 在5s內完成任務,否則執行該任務的worker將被殺死,任務移交給父進程# celery worker的并發數,默認是服務器的內核數目,也是命令行-c參數指定的數目CELERYD_CONCURRENCY = 4 # celery worker 每次去rabbitmq預取任務的數量CELERYD_PREFETCH_MULTIPLIER = 4 # 每個worker執行了多少任務就會死掉,默認是無限的CELERYD_MAX_TASKS_PER_CHILD = 40 # 這是使用了django-celery默認的數據庫調度模型,任務執行周期都被存在你指定的orm數據庫中# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'# 設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數據都會發送到默認的隊列中CELERY_DEFAULT_QUEUE = "default"# 設置詳細的隊列CELERY_QUEUES = {  "default": { # 這是上面指定的默認隊列    "exchange": "default",    "exchange_type": "direct",    "routing_key": "default"  },  "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列    "routing_key": "topic.#",    "exchange": "topic_exchange",    "exchange_type": "topic",  },  "task_eeg": { # 設置扇形交換機    "exchange": "tasks",    "exchange_type": "fanout",    "binding_key": "tasks",  },}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。


注:相關教程知識閱讀請移步到python教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 龙井市| 射洪县| 高安市| 沈丘县| 建昌县| 巴楚县| 波密县| 博湖县| 大石桥市| 胶州市| 伊宁县| 通州市| 晋城| 德兴市| 兴城市| 太仆寺旗| 汪清县| 玛曲县| 龙海市| 罗定市| 广昌县| 新宾| 北辰区| 北京市| 陈巴尔虎旗| 玛曲县| 望奎县| 平顺县| 苏尼特左旗| 三亚市| 唐海县| 富蕴县| 弋阳县| 柯坪县| 嫩江县| 通海县| 桃园县| 寿光市| 凤台县| 中山市| 塔城市|