在flask应用中使用celery任务队列,celery队列无法正常启动
最近在写一个flask应用,想使用celery做任务队列,就去flask官网上找了
样例程序
,然后复制到本机上执行了一下,结果celery没有正常启动.
只有一个源文件test.py,rabbitmq已配置并正常启动.
代码如下:
from flask import Flask
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='amqp://guest@localhost//',
CELERY_RESULT_BACKEND='amqp://guest@localhost//'
)
celery = make_celery(app)
@celery.task()
def add_together(a, b):
return a + b
result = add_together.delay(23, 42)
result.wait()
差不多就是样例代码直接拷贝下来用了.
然后我在当前目录下执行
celery -A test worker --loglevel=info
和
celery -A test.celery worker --loglevel=info
之后阻塞,同时没有任何提示.
我另外打开一个终端,在这个目录下执行
python test.py
之后发生阻塞.用pdb调试发现阻塞在最后一行
result.wait()
.
请问我的配置过程出现什么问题了?
野生的路人
9 years, 9 months ago
Answers
# coding: utf-8
from celery import Celery
from flask import Flask
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='amqp://guest@localhost//',
CELERY_RESULT_BACKEND='amqp://guest@localhost//'
)
celery = make_celery(app)
@celery.task(name='add_together')
def add_together(a, b):
return a + b
if __name__ == '__main__':
result = add_together.delay(23, 42)
result.wait()
由于你没有提供错误信息,只能改对之后告诉你了
调用
celery -A test.celery worker --loglevel=info
启动
celery
,然后用
python test.py
调用脚本
kazama
answered 9 years, 9 months ago