Celery 学习与实践

1 celery 简要概述

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度

celery 的优点

  • 简单:celery的 配置和使用还是比较简单的, 非常容易使用和维护和不需要配置文件
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
    如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA。
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

1.1 celery 可以做什么?

典型的应用场景, 比如

  • 异步发邮件 , 一般发邮件比较耗时的操作,需要及时返回给前端,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
  • 比如有些 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
  • 定时调度任务等

2 celery 的核心模块

2-1 celery 的5个角色

Task:就是任务,有异步任务和定时任务
Broker:中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
Beat:定时任务调度器,根据配置定时将任务发送给Broker。
Backend:用于存储任务的执行结果。

file

3 实战

选择 Broker

使用 RabbitMQ 作为 Broker

$ docker run -d -p 5672:5672 rabbitmq

命令完成后,代理已经在后台运行,准备好为您移动消息:正在启动 rabbitmq-server: SUCCESS。

安装 Celery

$ pip install celery

应用

Let’s create the file tasks.py:

from celery import Celery

# app = Celery('tasks', broker='pyamqp://guest@localhost//')

# 对接MQ
app = Celery('tasks', broker='pyamqp://guest@98.142.143.145:5672//')

@app.task
def add(x, y):
    return x + y

Running the Celery worker server
You can now run the worker by executing our program with the worker argument:

$ celery -A tasks worker --loglevel=INFO

执行结果:

[root@quant celery_test]# celery -A tasks worker --loglevel=INFO
/root/anaconda3/lib/python3.7/site-packages/celery/platforms.py:841: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- celery@quant v5.2.6 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.13.12-1.el7.elrepo.x86_64-x86_64-with-centos-7.9.2009-Core 2022-05-02 11:04:45
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f21f5813390
- ** ---------- .> transport:   amqp://guest:**@98.142.143.145:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add

[2022-05-02 11:04:45,801: INFO/MainProcess] Connected to amqp://guest:**@98.142.143.145:5672//
[2022-05-02 11:04:45,805: INFO/MainProcess] mingle: searching for neighbors
[2022-05-02 11:04:46,825: INFO/MainProcess] mingle: all alone
[2022-05-02 11:04:46,849: INFO/MainProcess] celery@quant ready.

相关文章:
Github|celery/celery
Celery中文手册
First Steps with Celery
celery 简要概述
知乎|Celery详解

为者常成,行者常至