国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > Python > 正文

Python并行分布式框架Celery詳解

2020-01-04 14:21:20
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

Celery 簡(jiǎn)介

除了redis,還可以使用另外一個(gè)神器---Celery。Celery是一個(gè)異步任務(wù)的調(diào)度工具。

Celery 是 Distributed Task Queue,分布式任務(wù)隊(duì)列,分布式?jīng)Q定了可以有多個(gè) worker 的存在,隊(duì)列表示其是異步操作,即存在一個(gè)產(chǎn)生任務(wù)提出需求的工頭,和一群等著被分配工作的碼農(nóng)。

在 Python 中定義 Celery 的時(shí)候,我們要引入 Broker,中文翻譯過(guò)來(lái)就是“中間人”的意思,在這里 Broker 起到一個(gè)中間人的角色。在工頭提出任務(wù)的時(shí)候,把所有的任務(wù)放到 Broker 里面,在 Broker 的另外一頭,一群碼農(nóng)等著取出一個(gè)個(gè)任務(wù)準(zhǔn)備著手做。

這種模式注定了整個(gè)系統(tǒng)會(huì)是個(gè)開(kāi)環(huán)系統(tǒng),工頭對(duì)于碼農(nóng)們把任務(wù)做的怎樣是不知情的。所以我們要引入 Backend 來(lái)保存每次任務(wù)的結(jié)果。這個(gè) Backend 有點(diǎn)像我們的 Broker,也是存儲(chǔ)任務(wù)的信息用的,只不過(guò)這里存的是那些任務(wù)的返回結(jié)果。我們可以選擇只讓錯(cuò)誤執(zhí)行的任務(wù)返回結(jié)果到 Backend,這樣我們?nèi)』亟Y(jié)果,便可以知道有多少任務(wù)執(zhí)行失敗了。

Celery(芹菜)是一個(gè)異步任務(wù)隊(duì)列/基于分布式消息傳遞的作業(yè)隊(duì)列。它側(cè)重于實(shí)時(shí)操作,但對(duì)調(diào)度支持也很好。Celery用于生產(chǎn)系統(tǒng)每天處理數(shù)以百萬(wàn)計(jì)的任務(wù)。Celery是用Python編寫(xiě)的,但該協(xié)議可以在任何語(yǔ)言實(shí)現(xiàn)。它也可以與其他語(yǔ)言通過(guò)webhooks實(shí)現(xiàn)。Celery建議的消息隊(duì)列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和數(shù)據(jù)庫(kù)(使用SQLAlchemy的或Django的 ORM) 。

Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

在學(xué)習(xí)Celery之前,我先簡(jiǎn)單的去了解了一下什么是生產(chǎn)者消費(fèi)者模式。

生產(chǎn)者消費(fèi)者模式

在實(shí)際的軟件開(kāi)發(fā)過(guò)程中,經(jīng)常會(huì)碰到如下場(chǎng)景:某個(gè)模塊負(fù)責(zé)產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個(gè)模塊來(lái)負(fù)責(zé)處理(此處的模塊是廣義的,可以是類(lèi)、函數(shù)、線(xiàn)程、進(jìn)程等)。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱(chēng)為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱(chēng)為消費(fèi)者。

單單抽象出生產(chǎn)者和消費(fèi)者,還夠不上是生產(chǎn)者消費(fèi)者模式。該模式還需要有一個(gè)緩沖區(qū)處于生產(chǎn)者和消費(fèi)者之間,作為一個(gè)中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū),而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù),如下圖所示:

Python,分布式框架,Celery

生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過(guò)消息隊(duì)列(緩沖區(qū))來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給消息隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從消息隊(duì)列里取,消息隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。這個(gè)消息隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的。------------->這里又有一個(gè)問(wèn)題,什么叫做解耦?

解耦

假設(shè)生產(chǎn)者和消費(fèi)者分別是兩個(gè)類(lèi)。如果讓生產(chǎn)者直接調(diào)用消費(fèi)者的某個(gè)方法,那么生產(chǎn)者對(duì)于消費(fèi)者就會(huì)產(chǎn)生依賴(lài)(也就是耦合)。將來(lái)如果消費(fèi)者的代碼發(fā)生變化,可能會(huì)影響到生產(chǎn)者。而如果兩者都依賴(lài)于某個(gè)緩沖區(qū),兩者之間不直接依賴(lài),耦合也就相應(yīng)降低了。生產(chǎn)者直接調(diào)用消費(fèi)者的某個(gè)方法,還有另一個(gè)弊端。由于函數(shù)調(diào)用是同步的(或者叫阻塞的),在消費(fèi)者的方法沒(méi)有返回之前,生產(chǎn)者只好一直等在那邊。萬(wàn)一消費(fèi)者處理數(shù)據(jù)很慢,生產(chǎn)者就會(huì)白白糟蹋大好時(shí)光。緩沖區(qū)還有另一個(gè)好處。如果制造數(shù)據(jù)的速度時(shí)快時(shí)慢,緩沖區(qū)的好處就體現(xiàn)出來(lái)了。當(dāng)數(shù)據(jù)制造快的時(shí)候,消費(fèi)者來(lái)不及處理,未處理的數(shù)據(jù)可以暫時(shí)存在緩沖區(qū)中。等生產(chǎn)者的制造速度慢下來(lái),消費(fèi)者再慢慢處理掉。

因?yàn)樘橄螅催^(guò)網(wǎng)上的說(shuō)明之后,通過(guò)我的理解,我舉了個(gè)例子:吃包子。

假如你非常喜歡吃包子(吃起來(lái)根本停不下來(lái)),今天,你媽媽?zhuān)ㄉa(chǎn)者)在蒸包子,廚房有張桌子(緩沖區(qū)),你媽媽將蒸熟的包子盛在盤(pán)子(消息)里,然后放到桌子上,你正在看巴西奧運(yùn)會(huì),看到蒸熟的包子放在廚房桌子上的盤(pán)子里,你就把盤(pán)子取走,一邊吃包子一邊看奧運(yùn)。在這個(gè)過(guò)程中,你和你媽媽使用同一個(gè)桌子放置盤(pán)子和取走盤(pán)子,這里桌子就是一個(gè)共享對(duì)象。生產(chǎn)者添加食物,消費(fèi)者取走食物。桌子的好處是,你媽媽不用直接把盤(pán)子給你,只是負(fù)責(zé)把包子裝在盤(pán)子里放到桌子上,如果桌子滿(mǎn)了,就不再放了,等待。而且生產(chǎn)者還有其他事情要做,消費(fèi)者吃包子比較慢,生產(chǎn)者不能一直等消費(fèi)者吃完包子把盤(pán)子放回去再去生產(chǎn),因?yàn)槌园拥娜擞泻芏啵绻@期間你好朋友來(lái)了,和你一起吃包子,生產(chǎn)者不用關(guān)注是哪個(gè)消費(fèi)者去桌子上拿盤(pán)子,而消費(fèi)者只去關(guān)注桌子上有沒(méi)有放盤(pán)子,如果有,就端過(guò)來(lái)吃盤(pán)子中的包子,沒(méi)有的話(huà)就等待。對(duì)應(yīng)關(guān)系如下圖:

Python,分布式框架,Celery

考察了一下,原來(lái)當(dāng)初設(shè)計(jì)這個(gè)模式,主要就是用來(lái)處理并發(fā)問(wèn)題的,而Celery就是一個(gè)用python寫(xiě)的并行分布式框架。

然后我接著去學(xué)習(xí)Celery

Celery 是一個(gè)強(qiáng)大的 分布式任務(wù)隊(duì)列 的 異步處理框架,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機(jī)上運(yùn)行。我們通常使用它來(lái)實(shí)現(xiàn)異步任務(wù)(async task)和定時(shí)任務(wù)(crontab)。我們需要一個(gè)消息隊(duì)列來(lái)下發(fā)我們的任務(wù)。首先要有一個(gè)消息中間件,此處選擇rabbitmq (也可選擇 redis 或 Amazon Simple Queue Service(SQS)消息隊(duì)列服務(wù))。推薦 選擇 rabbitmq 。使用RabbitMQ是官方特別推薦的方式,因此我也使用它作為我們的broker。

Celery的定義

Celery(芹菜)是一個(gè)簡(jiǎn)單、靈活且可靠的,處理大量消息的分布式系統(tǒng),并且提供維護(hù)這樣一個(gè)系統(tǒng)的必需工具。

我比較喜歡的一點(diǎn)是:Celery支持使用任務(wù)隊(duì)列的方式在分布的機(jī)器、進(jìn)程、線(xiàn)程上執(zhí)行任務(wù)調(diào)度。然后我接著去理解什么是任務(wù)隊(duì)列。

任務(wù)隊(duì)列

任務(wù)隊(duì)列是一種在線(xiàn)程或機(jī)器間分發(fā)任務(wù)的機(jī)制。

消息隊(duì)列

消息隊(duì)列的輸入是工作的一個(gè)單元,稱(chēng)為任務(wù),獨(dú)立的職程(Worker)進(jìn)程持續(xù)監(jiān)視隊(duì)列中是否有需要處理的新任務(wù)。

Celery 用消息通信,通常使用中間人(Broker)在客戶(hù)端和職程間斡旋。這個(gè)過(guò)程從客戶(hù)端向隊(duì)列添加消息開(kāi)始,之后中間人把消息派送給職程,職程對(duì)消息進(jìn)行處理。如下圖所示:

Python,分布式框架,Celery

Celery 系統(tǒng)可包含多個(gè)職程和中間人,以此獲得高可用性和橫向擴(kuò)展能力。

Celery的架構(gòu)

Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成。

消息中間件

Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這里我先去了解RabbitMQ,Redis。

 

任務(wù)執(zhí)行單元

Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中

任務(wù)結(jié)果存儲(chǔ)

Task result store用來(lái)存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括Redis,MongoDB,Django ORM,AMQP等,這里我先不去看它是如何存儲(chǔ)的,就先選用Redis來(lái)存儲(chǔ)任務(wù)執(zhí)行結(jié)果。

然后我接著去安裝Celery,在安裝Celery之前,我已經(jīng)在自己虛擬機(jī)上安裝好了Python,版本是3.6,

安裝celery,版本為4.2.1

sudo apt install python-celery-common

因?yàn)樯婕暗较⒅虚g件,所以我先去選擇一個(gè)在我工作中要用到的消息中間件(在Celery幫助文檔中稱(chēng)呼為中間人<broker>),為了更好的去理解文檔中的例子,我安裝了兩個(gè)中間件,一個(gè)是RabbitMQ,一個(gè)redis。

在這里我就先根據(jù)Celery的幫助文檔安裝和設(shè)置RabbitMQ。要使用 Celery,我們需要?jiǎng)?chuàng)建一個(gè) RabbitMQ 用戶(hù)、一個(gè)虛擬主機(jī),并且允許這個(gè)用戶(hù)訪問(wèn)這個(gè)虛擬主機(jī)。下面是我在個(gè)人pc機(jī)Ubuntu16.04上的設(shè)置:

$ sudo rabbitmqctl add_user forward password

#創(chuàng)建了一個(gè)RabbitMQ用戶(hù),用戶(hù)名為forward,密碼是password

$ sudo rabbitmqctl add_vhost ubuntu

#創(chuàng)建了一個(gè)虛擬主機(jī),主機(jī)名為ubuntu

$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"

#允許用戶(hù)forward訪問(wèn)虛擬主機(jī)ubuntu,因?yàn)镽abbitMQ通過(guò)主機(jī)名來(lái)與節(jié)點(diǎn)通信

$ sudo rabbitmq-server

之后我啟用RabbitMQ服務(wù)器,結(jié)果如下,成功運(yùn)行:

Python,分布式框架,Celery

之后我安裝Redis,它的安裝比較簡(jiǎn)單,如下:

$ sudo pip install redis

然后進(jìn)行簡(jiǎn)單的配置,只需要設(shè)置 Redis 數(shù)據(jù)庫(kù)的位置:

BROKER_URL = 'redis://localhost:6379/0'

URL的格式為:

redis://:password**@hostname**:port/db_number

URL Scheme 后的所有字段都是可選的,并且默認(rèn)為 localhost 的 6379 端口,使用數(shù)據(jù)庫(kù) 0。我的配置是:

redis://:password**@ubuntu**:6379/5

之后安裝Celery,我是用標(biāo)準(zhǔn)的Python工具pip安裝的,如下:

$ sudo pip install celery

開(kāi)始使用 Celery

使用celery包含三個(gè)方面:1. 定義任務(wù)函數(shù)。2. 運(yùn)行celery服務(wù)。3. 客戶(hù)應(yīng)用程序的調(diào)用。

為了測(cè)試Celery能否工作,我運(yùn)行了一個(gè)最簡(jiǎn)單的任務(wù),編寫(xiě)tasks.py:

from celery import Celery# broker設(shè)置中間件,backend設(shè)置后端存儲(chǔ)app = Celery('tasks',broker='redis://127.0.0.1:6379/5',backend='redis://127.0.0.1:6379/6')@app.taskdef add(x,y):  return x+y

編輯保存退出后,我在當(dāng)前目錄下運(yùn)行如下命令(記得要先開(kāi)啟redis):

$ celery -A tasks worker --loglevel=info

啟動(dòng)一個(gè)worker

#查詢(xún)文檔,了解到該命令中-A參數(shù)表示的是Celery APP的名稱(chēng),這個(gè)實(shí)例中指的就是tasks.py(和文件名一致),后面的tasks就是APP的名稱(chēng),worker是一個(gè)執(zhí)行任務(wù)角色,后面的loglevel=info記錄日志類(lèi)型默認(rèn)是info,這個(gè)命令啟動(dòng)了一個(gè)worker,用來(lái)執(zhí)行程序中add這個(gè)加法任務(wù)(task)。

然后看到界面顯示結(jié)果如下:

Python,分布式框架,Celery

我們可以看到Celery正常工作在名稱(chēng)luanpeng-XPS15R的虛擬主機(jī)上,版本為v4.2.1,在下面的[config]中我們可以看到當(dāng)前APP的名稱(chēng)tasks,運(yùn)輸工具transport就是我們?cè)诔绦蛑性O(shè)置的中間人redis://127.0.0.1:6379/5,result我們沒(méi)有設(shè)置,暫時(shí)顯示為disabled,然后我們也可以看到worker缺省使用perfork來(lái)執(zhí)行并發(fā),當(dāng)前并發(fā)數(shù)顯示為1,然后可以看到下面的[queues]就是我們說(shuō)的隊(duì)列,當(dāng)前默認(rèn)的隊(duì)列是celery,然后我們看到下面的[tasks]中有一個(gè)任務(wù)tasks.add.

如果你有多個(gè)不同類(lèi)型的任務(wù)可以放在不同的文件夾下,比如我們?cè)谠赼pp1文件夾創(chuàng)建一個(gè)tasks.py,在app2文件夾下創(chuàng)建一個(gè)tasks.py

我們可以這樣定義任務(wù)

celery -A app1.tasks worker --loglevel=info

注意:目錄結(jié)構(gòu)和命令發(fā)起的當(dāng)前目錄決定了任定義時(shí)的命令,任務(wù)定義的命令決定了任務(wù)定義的名稱(chēng),任務(wù)的名稱(chēng)決定了任務(wù)調(diào)用時(shí)的名稱(chēng)。

了解了這些之后,根據(jù)文檔在當(dāng)前目錄,我重新打開(kāi)一個(gè)terminal,然后執(zhí)行Python,進(jìn)入Python交互界面,用delay()方法調(diào)用任務(wù),執(zhí)行如下操作:

如果我們只有一個(gè)tasks.py文件,我們可以這樣定義任務(wù)

celery -A tasks worker --loglevel=info

那我們可以這樣調(diào)用任務(wù)start_task.py,py文件必須和tasks.py文件在同一個(gè)目錄下

from tasks import addadd.delay(6,6)  # 調(diào)用delay函數(shù)即可執(zhí)行任務(wù)

如果我們?cè)赼pp1文件夾下有tasks.py文件,我們可以這樣定義任務(wù)

celery -A app1.tasks worker --loglevel=info

那我們可以這樣調(diào)用任務(wù)start_task.py

from app1.tasks import addadd.delay(6,6)  # 調(diào)用delay函數(shù)即可執(zhí)行任務(wù)

所以定義任務(wù)和調(diào)用任務(wù)必須在同一個(gè)目錄。

執(zhí)行調(diào)用任務(wù)的start_task.py文件

python start_task.py

這個(gè)任務(wù)已經(jīng)由之前啟動(dòng)的Worker異步執(zhí)行了,然后我打開(kāi)之前啟動(dòng)的worker的控制臺(tái),對(duì)輸出進(jìn)行查看驗(yàn)證,結(jié)果如下:

[2018-09-24 20:07:11,496: INFO/MainProcess] Received task: app1.tasks.add[8207c280-0864-4b1e-8792-155de5417406] [2018-09-24 20:07:11,501: INFO/ForkPoolWorker-4] Task app1.tasks.add[8207c280-0864-4b1e-8792-155de5417406] succeeded in 0.003930353002942866s: 12

第一行說(shuō)明worker收到了一個(gè)任務(wù):app1.tasks.add,這里我們和之前發(fā)送任務(wù)返回的AsyncResult對(duì)比我們發(fā)現(xiàn),每個(gè)task都有一個(gè)唯一的ID,第二行說(shuō)明了這個(gè)任務(wù)執(zhí)行succeed,執(zhí)行結(jié)果為12。

查看資料說(shuō)調(diào)用任務(wù)后會(huì)返回一個(gè)AsyncResult實(shí)例,可用于檢查任務(wù)的狀態(tài),等待任務(wù)完成或獲取返回值(如果任務(wù)失敗,則為異常和回溯)。但這個(gè)功能默認(rèn)是不開(kāi)啟的,需要設(shè)置一個(gè) Celery 的結(jié)果后端(backend),也就是tasks.py設(shè)置的使用redis進(jìn)行結(jié)果存儲(chǔ)。

通過(guò)這個(gè)例子后我對(duì)Celery有了初步的了解,然后我在這個(gè)例子的基礎(chǔ)上去進(jìn)一步的學(xué)習(xí)。

因?yàn)镃elery是用Python編寫(xiě)的,所以為了讓代碼結(jié)構(gòu)化一些,就像一個(gè)應(yīng)用

Python,分布式框架,Celery

app1/app1_app.py

from celery import Celeryimport os,io# 在app1目錄同級(jí)目錄執(zhí)行celery -A app1.app1_app worker -l infoapp = Celery(main='app1.app1_app',include=['app1.tasks1','app1.tasks2']) # 創(chuàng)建app,并引入任務(wù)定義。main、include參數(shù)的值為模塊名,所以都是指定命令的相對(duì)目錄app.config_from_object('app1.app1_config')  # 通過(guò)配置文件進(jìn)行配置,而著這里是相對(duì)目錄# broker設(shè)置中間件,backend設(shè)置后端存儲(chǔ)# app = Celery('app1.app1_app',broker='redis://127.0.0.1:6379/5',backend='redis://127.0.0.1:6379/6',include=['app1.tasks1','app1.task2'])if __name__ == "__main__":  log_path = os.getcwd()+'/log/celery.log'  if(not os.path.exists(log_path)):    f = open(log_path, 'w')    f.close()  # 在app1目錄同級(jí)目錄執(zhí)行celery -A app1.app1_app worker -l info  app = Celery(main='app1_app',include=['tasks1', 'tasks2']) # 創(chuàng)建app,并引入任務(wù)定義。main、include參數(shù)的值為模塊名,所以都是指定命令的相對(duì)目錄  app.config_from_object('app1_config') # 通過(guò)配置文件進(jìn)行配置,而著這里是相對(duì)目錄  # 使用下面的命令也可以啟動(dòng)celery,不過(guò)要該模塊的名稱(chēng),是的相對(duì)目錄正確  app.start(argv=['celery', 'worker', '-l', 'info', '-f', 'log/celery.log', "-c", "40"])

一定要注意模塊的相對(duì)目錄,和你想要執(zhí)行命令的目錄

#首先創(chuàng)建了一個(gè)celery實(shí)例app,實(shí)例化的過(guò)程中,制定了任務(wù)名(也就是包名.模塊名),Celery的第一個(gè)參數(shù)是當(dāng)前模塊的名稱(chēng),我們可以調(diào)用config_from_object()來(lái)讓Celery實(shí)例加載配置模塊,我的例子中的配置文件起名為app1_config.py,配置文件如下:

BROKER_URL = 'redis://127.0.0.1:6379/5'  # 配置broker 中間件CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6'  # 配置backend結(jié)果存儲(chǔ)CELERY_ACCEPT_CONTENT = ['json']CELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json'

在配置文件中我們可以對(duì)任務(wù)的執(zhí)行等進(jìn)行管理,比如說(shuō)我們可能有很多的任務(wù),但是我希望有些優(yōu)先級(jí)比較高的任務(wù)先被執(zhí)行,而不希望先進(jìn)先出的等待。那么需要引入一個(gè)隊(duì)列的問(wèn)題. 也就是說(shuō)在我的broker的消息存儲(chǔ)里面有一些隊(duì)列,他們并行運(yùn)行,但是worker只從對(duì)應(yīng) 的隊(duì)列里面取任務(wù)。在這里我們希望tasks.py中的某些任務(wù)先被執(zhí)行。task中我設(shè)置了兩個(gè)任務(wù):

所以我通過(guò)from celery import group引入group,用來(lái)創(chuàng)建并行執(zhí)行的一組任務(wù)。然后這塊現(xiàn)需要理解的就是這個(gè)@app.task,@符號(hào)在python中用作函數(shù)修飾符,到這塊我又回頭去看python的裝飾器(在代碼運(yùn)行期間動(dòng)態(tài)增加功能的方式)到底是如何實(shí)現(xiàn)的,在這里的作用就是通過(guò)task()裝飾器在可調(diào)用的對(duì)象(app)上創(chuàng)建一個(gè)任務(wù)。

tasks1.py

from app1.app1_app import app@app.taskdef deal1(text):  print(text)  return text+"======="

tasks2.py

from app1.app1_app import app@app.taskdef deal2(text):  print(text)  return text+"+++++++++"

隊(duì)列

了解完裝飾器后,我回過(guò)頭去整理配置的問(wèn)題,前面提到任務(wù)的優(yōu)先級(jí)問(wèn)題,在這個(gè)例子中如果我們想讓deal1這個(gè)任務(wù)優(yōu)先于deal2任務(wù)被執(zhí)行,我們可以將兩個(gè)任務(wù)放到不同的隊(duì)列中,由我們決定先執(zhí)行哪個(gè)任務(wù),我們可以在配置文件app1_config.py中增加這樣配置:

from kombu import Exchange,QueueBROKER_URL = 'redis://127.0.0.1:6379/5'  # 配置broker 中間件CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6'  # 配置backend結(jié)果存儲(chǔ)CELERY_ACCEPT_CONTENT = ['json']CELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json'# (當(dāng)使用Redis作為broker時(shí),Exchange的名字必須和Queue的名字一樣)CELERY_QUEUES = (          Queue("default", Exchange("default"), routing_key = "default"),          Queue("for_task1", Exchange("for_task1"), routing_key="task_a"),          Queue("for_task2", Exchange("for_task2"), routing_key="task_b"))# 定義任務(wù)的走向,不同的任務(wù)發(fā)送 進(jìn)入不同的隊(duì)列,并為不同的任務(wù)設(shè)定不同的routing_key# 若沒(méi)有指定這個(gè)任務(wù)route到那個(gè)Queue中去執(zhí)行,此時(shí)執(zhí)行此任務(wù)的時(shí)候,會(huì)route到Celery默認(rèn)的名字叫做celery的隊(duì)列中去。CELERY_ROUTES = {  'app1.tasks1.deal1': {"queue": "for_task1", "routing_key": "task_a"},  'app1.tasks2.deal2':{"queue":"for_task2","routing_key":"task_b"}}

先了解了幾個(gè)常用的參數(shù)的含義:

Exchange:交換機(jī),決定了消息路由規(guī)則;

Queue:消息隊(duì)列;

Channel:進(jìn)行消息讀寫(xiě)的通道;

Bind:綁定了Queue和Exchange,意即為符合什么樣路由規(guī)則的消息,將會(huì)放置入哪一個(gè)消息隊(duì)列;

我將deal1這個(gè)函數(shù)任務(wù)放在了一個(gè)叫做for_task1的隊(duì)列里面,將deal2這個(gè)函數(shù)任務(wù)放在了一個(gè)叫做for_task2的隊(duì)列里面,然后我在當(dāng)前應(yīng)用目錄下執(zhí)行命令:

celery -A app1.app1_app worker -l info -Q for_task1

這個(gè)worker就只負(fù)責(zé)處理for_task1這個(gè)隊(duì)列的任務(wù),這是通過(guò)在啟動(dòng)worker是使用-Q Queue_Name參數(shù)指定的。

我們定義任務(wù)調(diào)用文件start_task.py

from __future__ import print_functionfrom app1.app1_app import appif __name__=="__main__":  for i in range(10):    text = 'text'+str(i)    app.send_task('app1.tasks1.deal1',args=[text])  # 任務(wù)的名稱(chēng)必須和Celery注冊(cè)的任務(wù)名稱(chēng)相同    app.send_task('app1.tasks2.deal2',args=[text]) # 任務(wù)的名稱(chēng)必須和Celery注冊(cè)的任務(wù)名稱(chēng)相同    print('push over %d'%i)

執(zhí)行上述代碼文件

python start_task.py

任務(wù)已經(jīng)被執(zhí)行,我在worker控制臺(tái)查看結(jié)果(只有app1.appa_app.deal1任務(wù)被這個(gè)worker執(zhí)行了):

[2018-09-24 22:26:38,928: INFO/ForkPoolWorker-8] Task app1.tasks1.deal1[b3007993-9bfb-4161-b5b2-4f0f022f2f8b] succeeded in 0.0008255800021288451s: 'text4======='[2018-09-24 22:26:38,928: INFO/ForkPoolWorker-6] Task app1.tasks1.deal1[df24b991-88fc-4253-86bf-540754c62da9] succeeded in 0.004320767002354842s: 'text3======='[2018-09-24 22:26:38,929: INFO/MainProcess] Received task: app1.tasks1.deal1[dbdf9ac0-ea27-4455-90d2-e4fe8f3e895e] [2018-09-24 22:26:38,930: WARNING/ForkPoolWorker-4] text5[2018-09-24 22:26:38,931: INFO/ForkPoolWorker-4] Task app1.tasks1.deal1[dbdf9ac0-ea27-4455-90d2-e4fe8f3e895e] succeeded in 0.0006721289973938838s: 'text5======='

可以看到worker收到任務(wù),并且執(zhí)行了任務(wù)。

Scheduler ( 定時(shí)任務(wù),周期性任務(wù) )

在這里我們還是在交互模式下手動(dòng)去執(zhí)行,我們想要crontab的定時(shí)生成和執(zhí)行,我們可以用celery的beat去周期的生成任務(wù)和執(zhí)行任務(wù),在這個(gè)例子中我希望每10秒鐘產(chǎn)生一個(gè)任務(wù),然后去執(zhí)行這個(gè)任務(wù),我可以這樣配置(在app1_config.py文件中添加如下內(nèi)容):

# 設(shè)計(jì)周期任務(wù)CELERY_TIMEZONE = 'Asia/Shanghai'from celery.schedules import crontab  # 設(shè)置定時(shí)任務(wù)from datetime import timedelta# 每隔30秒執(zhí)行app1.tasks1.deal函數(shù)CELERYBEAT_SCHEDULE = {  'deal-every-30-seconds': {     'task': 'app1.tasks1.deal1',     'schedule': timedelta(seconds=30),     'args': ['hello']  },  'deal-every-10-seconds': {     'task': 'app1.tasks2.deal2',     'schedule': timedelta(seconds=10),     'args': ['hello']  },   # Executes every Monday morning at 7:30 A.M  'deal-every-monday-morning': {     'task': 'app1.tasks2.deal2',     'schedule': crontab(hour=7, minute=30, day_of_week=1),     'args': ['hello']  },}

使用了scheduler,要制定時(shí)區(qū):CELERY_TIMEZONE = ‘Asia/Shanghai',啟動(dòng)celery加上-B的參數(shù)。

celery -A app1.app1_app worker -l info -B

前兩個(gè)任務(wù)為周期任務(wù),第三個(gè)任務(wù)為定時(shí)任務(wù),指定時(shí)間點(diǎn)開(kāi)始執(zhí)行分發(fā)任務(wù),讓worker取走執(zhí)行,可以這樣配置:

看完這些基礎(chǔ)的東西,我回過(guò)頭對(duì)celery在回顧了一下,用圖把它的框架大致畫(huà)出來(lái),如下圖:

Python,分布式框架,Celery

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)VEVB武林網(wǎng)的支持。


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到python教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶(hù)名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 临湘市| 全州县| 云龙县| 新竹市| 西林县| 辽宁省| 得荣县| 梁山县| 康保县| 安徽省| 宜城市| 永福县| 吉水县| 炎陵县| 观塘区| 澄江县| 林周县| 南川市| 沅江市| 龙岩市| 南宫市| 水富县| 宝清县| 博乐市| 枣阳市| 墨脱县| 邹平县| 屏东市| 平江县| 镇康县| 台江县| 杨浦区| 鹰潭市| 永春县| 怀化市| 庄河市| 中西区| 鸡东县| 溆浦县| 孟村| 凤台县|