51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 4373|回复: 2
打印 上一主题 下一主题

Celery 里面可以使用多进程、多线程、协程吗?

[复制链接]
  • TA的每日心情
    无聊
    8 小时前
  • 签到天数: 528 天

    连续签到: 1 天

    [LV.9]测试副司令

    跳转到指定楼层
    1#
    发表于 2021-3-24 10:52:44 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
    1测试积点
    本人使用 Django + Celery 搭建了一套测试平台,其中有个需求是运行一个任务,这个任务需要处理几万至上百万张图片,为此使用 celery 进行任务调度和执行。现在的问题是一次任务需要花半小时才能完成 10W 张图片的处理,我想缩短任务执行时间,所以想在 celery 中的一个任务中使用多进程 + 协程的方式,但我在网上没有找到 celery 中使用多进程、协程的示例,官方文档也没有涉及这一块。
    我尝试在 celery 直接使用多进程,运行后报错,不通过 celery 是没问题的,以下是代码:
    1. import time

    2. from celery import Celery
    3. from multiprocessing import Pool

    4. app = Celery('tasks', broker='redis://:docserver123456!@172.17.10.175:6379/3')


    5. def func(msg):
    6.     print("*msg: ", msg)
    7.     time.sleep(3)
    8.     print("*end")


    9. @app.task
    10. def add():
    11.     p = Pool(5)
    12.     for i in range(10):
    13.         msg = f'hello str({i})'
    14.         p.apply_async(func, (msg,))
    15.     p.close()
    16.     p.join()
    17.     print('all done')
    复制代码
    以下是运行信息,直接调用成功,使用 celery 失败:
    1. [root[url=home.php?mod=space&uid=152075]@localhost[/url] ce_test]# python
    2. Python 3.7.8 (default, Jan 26 2021, 15:45:27)
    3. [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
    4. Type "help", "copyright", "credits" or "license" for more information.
    5. >>> from tasks import add
    6. >>> add()
    7. *msg:  hello str(0)
    8. *msg:  hello str(1)
    9. *msg:  hello str(2)
    10. *msg:  hello str(3)
    11. *msg:  hello str(4)
    12. *end
    13. *end
    14. *end
    15. *msg:  hello str(5)
    16. *end
    17. *msg:  hello str(6)
    18. *msg:  hello str(7)
    19. *end
    20. *msg:  hello str(8)
    21. *msg:  hello str(9)
    22. *end
    23. *end
    24. *end
    25. *end
    26. *end
    27. all done
    28. >>> add.delay()
    29. <AsyncResult: f0bbd061-53b6-44a9-8087-ad08fed4401c>
    复制代码
    以下是报错信息:
    1. [root@localhost ce_test]# celery -A tasks worker -l info
    2. /usr/local/python3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
    3. absolutely not recommended!

    4. Please specify a different user using the --uid option.

    5. User information: uid=0 euid=0 gid=0 egid=0

    6.   uid=uid, euid=euid, gid=gid, egid=egid,

    7. -------------- celery@localhost.localdomain v4.4.7 (cliffs)
    8. --- ***** -----
    9. -- ******* ---- Linux-3.10.0-1160.11.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core 2021-03-18 15:45:41
    10. - *** --- * ---
    11. - ** ---------- [config]
    12. - ** ---------- .> app:         tasks:0x7fae4a6f1590
    13. - ** ---------- .> transport:   redis://:**@172.17.10.175:6379/3
    14. - ** ---------- .> results:     disabled://
    15. - *** --- * --- .> concurrency: 4 (prefork)
    16. -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    17. --- ***** -----
    18. -------------- [queues]
    19.                 .> celery           exchange=celery(direct) key=celery


    20. [tasks]
    21.   . tasks.add

    22. [2021-03-18 15:45:41,779: INFO/MainProcess] Connected to redis://:**@172.17.10.175:6379/3
    23. [2021-03-18 15:45:41,797: INFO/MainProcess] mingle: searching for neighbors
    24. [2021-03-18 15:45:42,895: INFO/MainProcess] mingle: all alone
    25. [2021-03-18 15:45:42,920: INFO/MainProcess] celery@localhost.localdomain ready.
    26. [2021-03-18 15:49:02,260: INFO/MainProcess] Received task: tasks.add[f0bbd061-53b6-44a9-8087-ad08fed4401c]
    27. [2021-03-18 15:49:02,271: ERROR/ForkPoolWorker-1] Task tasks.add[f0bbd061-53b6-44a9-8087-ad08fed4401c] raised unexpected: AssertionError('daemonic processes are not allowed to have children')
    28. Traceback (most recent call last):
    29.   File "/usr/local/python3/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
    30.     R = retval = fun(*args, **kwargs)
    31.   File "/usr/local/python3/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
    32.     return self.run(*args, **kwargs)
    33.   File "/home/ce_test/tasks.py", line 17, in add
    34.     p = Pool(5)
    35.   File "/usr/local/python3/lib/python3.7/multiprocessing/context.py", line 119, in Pool
    36.     context=self.get_context())
    37.   File "/usr/local/python3/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
    38.     self._repopulate_pool()
    39.   File "/usr/local/python3/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
    40.     w.start()
    41.   File "/usr/local/python3/lib/python3.7/multiprocessing/process.py", line 110, in start
    42.     'daemonic processes are not allowed to have children'
    43. AssertionError: daemonic processes are not allowed to have children
    复制代码
    有哪位大哥能帮忙解答下吗?

    分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏
    回复

    使用道具 举报

  • TA的每日心情
    奋斗
    8 小时前
  • 签到天数: 1517 天

    连续签到: 1 天

    [LV.Master]测试大本营

    2#
    发表于 2021-3-25 11:02:37 | 只看该作者
    可以试试集群模式
    回复

    使用道具 举报

  • TA的每日心情
    奋斗
    10 小时前
  • 签到天数: 2813 天

    连续签到: 1 天

    [LV.Master]测试大本营

    3#
    发表于 2021-3-25 17:31:06 | 只看该作者
    不清楚
    回复

    使用道具 举报

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

    GMT+8, 2024-11-18 18:25 , Processed in 0.062294 second(s), 21 queries .

    Powered by Discuz! X3.2

    © 2001-2024 Comsenz Inc.

    快速回复 返回顶部 返回列表