UPDATE: 我决定尝试使用Django作为简单的代理,因为我认为我在Redis设置中做错了 . 但是,在进行docs中描述的更改后,在尝试使用 .delay()
运行Celery任务时,我得到与以下相同的错误 . Celery工作人员启动并显示它已连接到Django进行传输 . 这可能是防火墙问题吗?
ORIGINAL
我已经安装了Celery并为经纪人选择了Redis,并安装了它(我在Windows机器上,fyi) . 芹菜工作者启动,连接到Redis服务器,并发现我的 shared_tasks
-------------- celery@GALACTICA v3.1.19 (Cipater)
---- **** -----
--- * *** * -- Windows-7-6.1.7601-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x2dbf970
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: disabled
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. app.tasks.add
. app.tasks.mul
. app.tasks.xsum
. proj.celery.debug_task
[2016-01-16 11:53:05,586: INFO/MainProcess] Connected to redis://localhost:6379/
0
[2016-01-16 11:53:06,611: INFO/MainProcess] mingle: searching for neighbors
[2016-01-16 11:53:09,628: INFO/MainProcess] mingle: all alone
c:\python34\lib\site-packages\celery\fixups\django.py:265: UserWarning: Using se
ttings.DEBUG leads to a memory leak, never use this setting in production enviro
nments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2016-01-16 11:53:14,670: WARNING/MainProcess] c:\python34\lib\site-packages\cel
ery\fixups\django.py:265: UserWarning: Using settings.DEBUG leads to a memory le
ak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2016-01-16 11:53:14,671: WARNING/MainProcess] celery@GALACTICA ready.
我正在关注介绍文档,因此任务非常简单,包括一个名为 add
的任务 . 我可以在python shell中自己运行任务,但是当我尝试调用 add.delay()
让celery处理它时,看起来连接不成功:
>>> add.delay(2,2)
Traceback (most recent call last):
File "C:\Python34\lib\site-packages\kombu\utils\__init__.py", line 423, in __call__
return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Python34\lib\site-packages\kombu\connection.py", line 436, in _ensured
return fun(*args, **kwargs)
File "C:\Python34\lib\site-packages\kombu\messaging.py", line 177, in _publish
channel = self.channel
File "C:\Python34\lib\site-packages\kombu\messaging.py", line 194, in _get_channel
channel = self._channel = channel()
File "C:\Python34\lib\site-packages\kombu\utils\__init__.py", line 425, in __call__
value = self.__value__ = self.__contract__()
File "C:\Python34\lib\site-packages\kombu\messaging.py", line 209, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel) File "C:\Python34\lib\site-packages\kombu\connection.py", line 756, in default_channel
self.connection
File "C:\Python34\lib\site-packages\kombu\connection.py", line 741, in connection
self._connection = self._establish_connection()
File "C:\Python34\lib\site-packages\kombu\connection.py", line 696, in _establish_connection
conn = self.transport.establish_connection()
File "C:\Python34\lib\site-packages\kombu\transport\pyamqp.py", line 116, in establish_connection
conn = self.Connection(**opts)
File "C:\Python34\lib\site-packages\amqp\connection.py", line 165, in __init__
self.transport = self.Transport(host, connect_timeout, ssl)
File "C:\Python34\lib\site-packages\amqp\connection.py", line 186, in Transport
return create_transport(host, connect_timeout, ssl)
File "C:\Python34\lib\site-packages\amqp\transport.py", line 299, in create_transport
return TCPTransport(host, connect_timeout)
File "C:\Python34\lib\site-packages\amqp\transport.py", line 95, in __init__
raise socket.error(last_err)
OSError: [WinError 10061] No connection could be made because the target machine actively refused it
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Python34\lib\site-packages\celery\app\task.py", line 453, in delay
return self.apply_async(args, kwargs)
File "C:\Python34\lib\site-packages\celery\app\task.py", line 560, in apply_async
**dict(self._get_exec_options(), **options)
File "C:\Python34\lib\site-packages\celery\app\base.py", line 354, in send_task
reply_to=reply_to or self.oid, **options
File "C:\Python34\lib\site-packages\celery\app\amqp.py", line 305, in publish_task
**kwargs
File "C:\Python34\lib\site-packages\kombu\messaging.py", line 172, in publish
routing_key, mandatory, immediate, exchange, declare)
File "C:\Python34\lib\site-packages\kombu\connection.py", line 457, in _ensured
interval_max)
File "C:\Python34\lib\site-packages\kombu\connection.py", line 369, in ensure_connection
interval_start, interval_step, interval_max, callback)
File "C:\Python34\lib\site-packages\kombu\utils\__init__.py", line 246, in retry_over_time
return fun(*args, **kwargs)
File "C:\Python34\lib\site-packages\kombu\connection.py", line 237, in connect
return self.connection
File "C:\Python34\lib\site-packages\kombu\connection.py", line 741, in connection
self._connection = self._establish_connection()
File "C:\Python34\lib\site-packages\kombu\connection.py", line 696, in _establish_connection
conn = self.transport.establish_connection()
File "C:\Python34\lib\site-packages\kombu\transport\pyamqp.py", line 116, in establish_connection
conn = self.Connection(**opts)
File "C:\Python34\lib\site-packages\amqp\connection.py", line 165, in __init__
self.transport = self.Transport(host, connect_timeout, ssl)
File "C:\Python34\lib\site-packages\amqp\connection.py", line 186, in Transport
return create_transport(host, connect_timeout, ssl)
File "C:\Python34\lib\site-packages\amqp\transport.py", line 299, in create_transport
return TCPTransport(host, connect_timeout)
File "C:\Python34\lib\site-packages\amqp\transport.py", line 95, in __init__
raise socket.error(last_err)
OSError: [WinError 10061] No connection could be made because the target machine actively refused it
在芹菜工作者运行的控制台上没有输出,所以我认为它不会得到任务 . 我相信我的settings.py,celery.py和tasks.py都没问题:
settings.py
#celery settings
BROKER_URL = 'redis://localhost:6379/0'
celery.py
from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings # noqa
app = Celery('proj')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
tasks.py
from __future__ import absolute_import
#from proj.celery import app
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
我的项目布局几乎与GitHub上的Celery示例Django项目布局相同,以及示例here . 看起来像:
proj
├── proj
│ ├── celery.py
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── manage.py
└── app
├── __init__.py
├── models.py
├── tasks.py
├── tests.py
└── views.py
我的项目中另一个应用程序上的道歉被命名为“app” - 它使得阅读有点令人困惑,并且是在安装了PTVS的Visual Studio中自动生成基础项目的结果 . 我可能早就改变了它,但我没有意识到这个名字太模糊了 .
感谢您的任何想法 - 我已经被这一段时间困扰了 .
1 回答
我绕过了这个,但我不确定如何 . 第二天我回到了这个确切的配置,任务正在向芹菜 Worker 提出 .
也许我重启的服务之一就是关键,但我不确定 .
如果其他人遇到此问题,尤其是在Windows上:确保您的redis-server处于活动状态,并且您可以看到来自ping和任务的传入连接 . 在发布此问题之前我已经这样做了,但似乎可能是错误配置的候选人 .