测试积点老人 发表于 2021-3-24 10:52:44

Celery 里面可以使用多进程、多线程、协程吗?

本人使用 Django + Celery 搭建了一套测试平台,其中有个需求是运行一个任务,这个任务需要处理几万至上百万张图片,为此使用 celery 进行任务调度和执行。现在的问题是一次任务需要花半小时才能完成 10W 张图片的处理,我想缩短任务执行时间,所以想在 celery 中的一个任务中使用多进程 + 协程的方式,但我在网上没有找到 celery 中使用多进程、协程的示例,官方文档也没有涉及这一块。
我尝试在 celery 直接使用多进程,运行后报错,不通过 celery 是没问题的,以下是代码:
import time

from celery import Celery
from multiprocessing import Pool

app = Celery('tasks', broker='redis://:docserver123456!@172.17.10.175:6379/3')


def func(msg):
    print("*msg: ", msg)
    time.sleep(3)
    print("*end")


@app.task
def add():
    p = Pool(5)
    for i in range(10):
      msg = f'hello str({i})'
      p.apply_async(func, (msg,))
    p.close()
    p.join()
    print('all done')以下是运行信息,直接调用成功,使用 celery 失败:
@localhost ce_test]# python
Python 3.7.8 (default, Jan 26 2021, 15:45:27)
on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> add()
*msg:hello str(0)
*msg:hello str(1)
*msg:hello str(2)
*msg:hello str(3)
*msg:hello str(4)
*end
*end
*end
*msg:hello str(5)
*end
*msg:hello str(6)
*msg:hello str(7)
*end
*msg:hello str(8)
*msg:hello str(9)
*end
*end
*end
*end
*end
all done
>>> add.delay()
<AsyncResult: f0bbd061-53b6-44a9-8087-ad08fed4401c>以下是报错信息:
# celery -A tasks worker -l info
/usr/local/python3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

uid=uid, euid=euid, gid=gid, egid=egid,

-------------- celery@localhost.localdomain v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-3.10.0-1160.11.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2021-03-18 15:45:41
- *** --- * ---
- ** ----------
- ** ---------- .> app:         tasks:0x7fae4a6f1590
- ** ---------- .> transport:   redis://:**@172.17.10.175:6379/3
- ** ---------- .> results:   disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
--------------
                .> celery         exchange=celery(direct) key=celery



. tasks.add

Connected to redis://:**@172.17.10.175:6379/3
mingle: searching for neighbors
mingle: all alone
celery@localhost.localdomain ready.
Received task: tasks.add
Task tasks.add raised unexpected: AssertionError('daemonic processes are not allowed to have children')
Traceback (most recent call last):
File "/usr/local/python3/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
File "/home/ce_test/tasks.py", line 17, in add
    p = Pool(5)
File "/usr/local/python3/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
File "/usr/local/python3/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    self._repopulate_pool()
File "/usr/local/python3/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    w.start()
File "/usr/local/python3/lib/python3.7/multiprocessing/process.py", line 110, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children有哪位大哥能帮忙解答下吗?

qqq911 发表于 2021-3-25 11:02:37

可以试试集群模式

jingzizx 发表于 2021-3-25 17:31:06

不清楚
页: [1]
查看完整版本: Celery 里面可以使用多进程、多线程、协程吗?