Python – How to configure CELERYBEAT_SCHEDULE in Django settings

celerydjangoperiodic-taskpython

I can get this to run as a standalone application, but I am having trouble getting it to work in Django.

Here is the stand alone code:

from celery import Celery
from celery.schedules import crontab


app = Celery('tasks')
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TIMEZONE='US/Central',
    CELERY_ENABLE_UTC=True,
    CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'tasks.test',
        'schedule': crontab(),
        },
    }
)

@app.task
def test():
    with open('test.txt', 'a') as f:
        f.write('Hello, World!\n')`

It feeds the Rabbitmq server and writes to the file every minute. It works like a charm, but when I try to get it to work in a Django I get this error:

Did you remember to import the module containing this task? Or maybe you are using relative imports? Please see ____ for
more information.

The full contents of the message body was: {'retries': 0, 'eta': None,
'kwargs': {}, 'taskset': None, 'timelimit': [None, None], 'callbacks':
None, 'task': 'proj.test', 'args': [], 'expires': None, 'id':
'501ca998-b5eb-4ba4-98a8-afabda9e88dd', 'utc': True, 'errbacks': None,
'chord': None} (246b) Traceback (most recent call last): File
"/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py",
line 456, in on_task_received
strategies[name](message, body, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Scheduler: Sending due task test (proj.test)
[2016-06-16 01:16:00,055: ERROR/MainProcess] Received unregistered
task of type 'proj.test'.

And this is my code in Django:

# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'proj.test',
        'schedule': crontab(),
    }
}

celery.py

from __future__ import absolute_import

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings  # noqa

app = Celery('proj')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

task.py

from __future__ import absolute_import
from celery import shared_task


@shared_task
def test():
    with open('test.txt', 'w') as f:
        print('Hello, World', file=f)

init.py

from __future__ import absolute_import

from .celery import app as celery_app 

Any thoughts on this are most appreciated. Thanks.

Best Answer

Why don't you try like the following and let me know if it worked out for you or not. It does work for me.

In settings.py

CELERYBEAT_SCHEDULE = {
    'my_scheduled_job': {
        'task': 'run_scheduled_jobs', # the same goes in the task name
        'schedule': crontab(),
    },
}

And in tasks.py..

from celery.task import task # notice the import of task and not shared task. 

@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
    # do whatever stuff you do
    return True

But if you are looking for shared_task then..

@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
    # do what you want here..
    return True

I use shared task for async jobs.. So I need to call it from a function like the following..

in views.py / or anywhere.py in your project app

def some_function():
    my_shared_task.apply_async(countdown= in_seconds)
    return True

and just in case if you have forgotten then remember to include your app in which you are trying to run to the tasks..

INSTALLED_APPS = [...
 'my_app'...
] # include app

I'm sure this approach works fine.. Thanks