1.簡(jiǎn)介
celery(芹菜)是一個(gè)異步任務(wù)隊(duì)列/基于分布式消息傳遞的作業(yè)隊(duì)列。它側(cè)重于實(shí)時(shí)操作,但對(duì)調(diào)度支持也很好。
celery用于生產(chǎn)系統(tǒng)每天處理數(shù)以百萬(wàn)計(jì)的任務(wù)。
celery是用Python編寫的,但該協(xié)議可以在任何語(yǔ)言實(shí)現(xiàn)。它也可以與其他語(yǔ)言通過(guò)webhooks實(shí)現(xiàn)。
建議的消息代理RabbitMQ的,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, ,和數(shù)據(jù)庫(kù)(使用SQLAlchemy的或Django的 ORM) 。
celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。
2. 安裝
有了上面的概念,需要安裝這么幾個(gè)東西:RabbitMQ、SQLAlchemy、Celery
安裝方式也都很簡(jiǎn)單: RabbitMQ:
mac下:
brew install rabbitmq
sudo apt-get install rabbitmq-server
剩下兩個(gè)都是Python的東西了,直接pip安裝就好了,對(duì)于從來(lái)沒(méi)有安裝過(guò)MySQL驅(qū)動(dòng)的同學(xué)可能需要安裝MySQL-python。
安裝完成之后,啟動(dòng)服務(wù):
$ rabbitmq-server[回車]
啟動(dòng)后不要關(guān)閉窗口, 下面操作新建窗口(Tab)
3. 簡(jiǎn)單案例
確保你之前的RabbitMQ已經(jīng)啟動(dòng)。
還是官網(wǎng)的那個(gè)例子,在任意目錄新建一個(gè)tasks.py的文件,內(nèi)容如下:
from celery import Celeryapp = Celery('tasks', broker='amqp://guest@localhost//')@app.taskdef add(x, y): return x + y 在同級(jí)目錄執(zhí)行:
$ celery -A tasks worker --loglevel=info
該命令的意思是啟動(dòng)一個(gè)worker,把tasks中的任務(wù)(add(x,y))把任務(wù)放到隊(duì)列中。
保持窗口打開(kāi),新開(kāi)一個(gè)窗口進(jìn)入交互模式,python或者ipython:
>>> from tasks import add>>> add.delay(4, 4)
到此為止,你已經(jīng)可以使用celery執(zhí)行任務(wù)了,上面的python交互模式下簡(jiǎn)單的調(diào)用了add任務(wù),并傳遞4,4參數(shù)。
但此時(shí)有一個(gè)問(wèn)題,你突然想知道這個(gè)任務(wù)的執(zhí)行結(jié)果和狀態(tài),到底完了沒(méi)有。因此就需要設(shè)置backend了。
修改之前的tasks.py中的代碼為:
# coding:utf-8import subprocessfrom time import sleepfrom celery import Celerybackend = 'db+mysql://root:@192.168.0.102/celery'broker = 'amqp://guest@192.168.0.102:5672'app = Celery('tasks', backend=backend, broker=broker)@app.taskdef add(x, y): sleep(10) return x + y@app.taskdef hostname(): return subprocess.check_output(['hostname']) 除了添加backend之外,上面還添加了一個(gè)who的方法用來(lái)測(cè)試多服務(wù)器操作。修改完成之后,還是按照之前的方式啟動(dòng)。
同樣進(jìn)入python的交互模型:
>>> from tasks import add, hostname>>> r = add.delay(4, 4)>>> r.ready() # 10s內(nèi)執(zhí)行,會(huì)輸出False,因?yàn)閍dd中sleep了10s>>>>>> r = hostname.delay()>>> r.result # 輸出你的hostname
4. 測(cè)試多服務(wù)器
做完上面的測(cè)試之后,產(chǎn)生了一個(gè)疑惑,Celery叫做分布式任務(wù)管理,那它的分布式體現(xiàn)在哪?它的任務(wù)都是怎么執(zhí)行的?在哪個(gè)機(jī)器上執(zhí)行的?
在當(dāng)前服務(wù)器上的celery服務(wù)不關(guān)閉的情況下,按照同樣的方式在另外一臺(tái)服務(wù)器上安裝Celery,并啟動(dòng):
$ celery -A tasks worker --loglevel=info
發(fā)現(xiàn)前一個(gè)服務(wù)器的Celery服務(wù)中輸出你剛啟動(dòng)的服務(wù)器的hostname,前提是那臺(tái)服務(wù)器連上了你的rabbitmq。
然后再進(jìn)入python交互模式:
>>> from tasks import hostname>>>>>> for i in range(10):... r = hostname.delay()... print r.result # 輸出你的hostname>>>
看你輸入的內(nèi)容已經(jīng)觀察兩臺(tái)服務(wù)器上你啟動(dòng)celery服務(wù)的輸出。
5. RabbitMQ遠(yuǎn)程連接的問(wèn)題
一開(kāi)始測(cè)試時(shí)遠(yuǎn)程服務(wù)器無(wú)法連接本地的RabbitMQ服務(wù),后來(lái)發(fā)現(xiàn)需要設(shè)置權(quán)限,在/usr/local/etc/rabbitmq/rabbitmq-env.conf這個(gè)文件中,修改NODE_IP_ADDRESS=127.0.0.1中的ip為0.0.0.0。
6. 總結(jié)的說(shuō)
這篇文章簡(jiǎn)單的介紹了Celery的使用,重點(diǎn)還是在分布式的使用。覺(jué)得不太爽的地方是,在擴(kuò)展時(shí),需要重新把代碼(tasks.py)部署一遍,而不是可以直接把tasks進(jìn)行共享,可能Celery是通過(guò)task來(lái)進(jìn)行不同的worker的匹配的?目前還不太了解,等深入使用之后再說(shuō)。



















