Working with celery signals
If you're new to celery, start here.
Sometimes when using celery, you may want to get notified when a task running in the background executes successfully or when it fails. You may also want to run a function each time before the celery task runs or after it completes. These and many others along the same line are all situations where signals would come in handy.
I'll create a simple file tasks.py
and set up celery to
demonstrate how to use celery signals.
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
Make sure your redis server is running and start your celery worker:
(env) $ celery -A tasks worker --loglevel=INFO
Then run your tasks.py file and execute the add
task:
(env) $ python -i tasks.py
>>> add.delay(4, 4)
<AsyncResult: ce1ee079-6434-4f54-ace2-360ff316546b>
>>>
By default, what is returned is an AsyncResult instance but that's not what we're interested in. On the terminal with your Celery worker running, you should see something similar to this:
[2020-11-03 07:01:02,024: INFO/ForkPoolWorker-2] Task tasks.add[ce1ee079-6434-4f54-ace2-360ff316546b] succeeded in 0.0005510429999979749s: 8
The task executes successfully, and 8 is the result as expected.
Signals
There are a lot of signals that celery offers but I'll focus on 4 simple ones to demonstrate how signals work in general.
- task~prerun~
- task~postrun~
- task~success~
- task~failure~
task~prerun~
This signal is dispatched before a task is executed.
from celery import Celery
from celery.signals import task_prerun
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
print("From task_prerun_notifier ==> Running just before add() executes")
Sender is the task object being executed (the add
function
in this case).
Running add.delay(4, 4)
like before now gives the following
output on the celery terminal:
[2020-11-03 07:23:19,183: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 07:23:19,184: INFO/ForkPoolWorker-2] Task tasks.add[1ef11c46-f461-4eb8-84ca-5c5cdab62a74] succeeded in 0.0016491969999998801s: 8
Just before the task runs, the signal dispatches and prints as expected.
task~postrun~
Dispatched after a task has been executed.
from celery.signals import task_prerun, task_postrun
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
print("From task_prerun_notifier ==> Running just before add() executes")
@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
print("From task_postrun_notifier ==> Ok, done!")
Running this should give the following result:
[2020-11-03 17:03:51,655: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:03:51,656: INFO/ForkPoolWorker-2] Task tasks.add[7da6ee71-1941-4a87-b993-8136d94ac067] succeeded in 0.0017917519999999243s: 8
[2020-11-03 17:03:51,657: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!
task~success~
Dispatched when a task succeeds.
from celery import Celery
from celery.signals import task_prerun, task_postrun, task_success
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
print("From task_prerun_notifier ==> Running just before add() executes")
@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
print("From task_postrun_notifier ==> Ok, done!")
@task_success.connect(sender=add)
def task_success_notifier(sender=None, **kwargs):
print("From task_success_notifier ==> Task run successfully!")
Result:
[2020-11-03 17:40:47,276: INFO/MainProcess] Received task: tasks.add[6603eb49-75ab-4653-b32f-ebe760a52de0]
[2020-11-03 17:40:47,279: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:40:47,281: WARNING/ForkPoolWorker-2] From task_success_notifier ==> Task run successfully!
[2020-11-03 17:40:47,281: INFO/ForkPoolWorker-2] Task tasks.add[6603eb49-75ab-4653-b32f-ebe760a52de0] succeeded in 0.00201471799999986s: 8
[2020-11-03 17:40:47,282: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!
task~failure~
Dispatched when a task fails.
from celery import Celery
from celery.signals import task_prerun, task_postrun, task_failure
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
raise Exception
@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
print("From task_prerun_notifier ==> Running just before add() executes")
@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
print("From task_postrun_notifier ==> Ok, done!")
@task_failure.connect(sender=add)
def task_failure_notifier(sender=None, **kwargs):
print("From task_failure_notifier ==> Task failed successfully! 😅")
Result:
[2020-11-03 17:44:36,082: INFO/MainProcess] Received task: tasks.add[da4a03e8-5530-4c9e-afeb-75f8e0b1be5d]
[2020-11-03 17:44:36,085: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:44:36,096: WARNING/ForkPoolWorker-2] From task_failure_notifier ==> Task failed successfully! 😅
[2020-11-03 17:44:36,096: ERROR/ForkPoolWorker-2] Task tasks.add[da4a03e8-5530-4c9e-afeb-75f8e0b1be5d] raised unexpected: Exception()
Traceback (most recent call last):
...
in add
raise Exception
Exception
[2020-11-03 17:44:36,097: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!