【Django入門22】ジョブキューの導入|Celeryを使った非同期処理の実装】

ジョブキューとは?

ジョブキューの概要

ジョブキューとは、アプリケーション内で実行するべきタスクを順番に処理するための仕組みです。ジョブキューは、長時間かかる処理(メール送信、画像のリサイズ、データの集計など)を非同期に実行し、アプリケーションの応答性を維持するために使用されます。

Djangoでは、ジョブキューを実装するためにCeleryという強力なツールを使うのが一般的です。Celeryは、分散タスクキューとして設計されており、大規模なタスクの処理にも対応しています。

Celeryの基本構成と動作

Celeryの基本概念

Celeryの基本構成は以下の3つのコンポーネントで構成されます:

  1. ワーカー(Worker):タスクを実行するプロセス。
  2. ブローカー(Broker):タスクをワーカーに渡すためのメッセージングシステム。通常、RedisやRabbitMQが使用されます。
  3. バックエンド(Result Backend):タスクの結果を保存し、後で参照できるようにする仕組み。

Celeryが解決する課題

  • 長時間かかる処理をバックグラウンドで実行することで、ユーザーの待ち時間を短縮。
  • 冗長な計算や重い処理を分散して実行し、システム全体の負荷を分散。
  • 実行結果の保存と再利用が可能。

Celeryのインストールとセットアップ

必要なパッケージのインストール

まず、DjangoプロジェクトにCeleryをインストールします。

pip install celery[redis]
  • celery[redis]:Celery本体とRedisのサポートをインストールします。

プロジェクト構成

DjangoプロジェクトにCeleryをセットアップするためのディレクトリ構成は次のようになります:

myproject/
    ├── myapp/
    ├── myproject/
    │   ├── __init__.py
    │   ├── celery.py  # Celery設定ファイル
    └── manage.py

Celeryの設定ファイルの作成

myproject/celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Django設定ファイルのモジュールを指定
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# Celeryアプリケーションの生成
app = Celery('myproject')

# Djangoの設定ファイルから設定を読み込む
app.config_from_object('django.conf:settings', namespace='CELERY')

# タスクを自動検出
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
  • config_from_object:Djangoの設定ファイルからCeleryの設定を読み込みます。
  • autodiscover_tasks:各アプリ内のtasks.pyファイルからタスクを自動検出します。

__init__.pyでCeleryをインポート

myproject/init.py:

from __future__ import absolute_import, unicode_literals

# Celeryアプリケーションをインポートしてアプリケーションが起動するようにする
from .celery import app as celery_app

__all__ = ['celery_app']

Redisのセットアップ

Redisのインストール

RedisはCeleryのブローカーとしてよく使用されるインメモリデータストアです。

Ubuntuの場合:

sudo apt update
sudo apt install redis

macOSの場合(Homebrew使用):

brew install redis

Redisを起動します:

redis-server

Django設定ファイルの更新

settings.py:

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

タスクの作成と実行

タスクの定義

タスクは各アプリケーション内のtasks.pyファイルに定義します。

myapp/tasks.py:

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def send_email_notification(user_id):
    # 仮のメール送信処理
    print(f'ユーザー {user_id} にメールを送信しました。')
  • shared_task:アプリケーションに依存しないタスクを定義します。

タスクの呼び出し

タスクを非同期で実行するにはdelayメソッドを使用します。

views.py:

from django.http import HttpResponse
from .tasks import add, send_email_notification

def my_view(request):
    # 非同期タスクの実行
    result = add.delay(10, 20)
    send_email_notification.delay(user_id=1)
    return HttpResponse("タスクがバックグラウンドで実行されました。")

ワーカーの起動とタスクの実行確認

ワーカーの起動

次のコマンドでCeleryワーカーを起動します:

celery -A myproject worker --loglevel=info
  • -A myproject:Celeryアプリケーションが定義されているプロジェクトを指定します。
  • --loglevel=info:タスクの実行状況を詳細に表示します。

タスクの実行確認

ワーカーが正常に起動している場合、タスクがキューに追加されると自動的に処理され、結果がログに表示されます。

タスクのモニタリングとエラーハンドリング

Flowerを使ったモニタリング

Celeryのタスクの状態をリアルタイムで監視するには、FlowerというWebベースのモニタリングツールを使用します。

インストール:

pip install flower

起動:

celery -A myproject flower

ブラウザでhttp://localhost:5555/にアクセスすると、タスクの状態が確認できます。

エラーハンドリング

タスクの実行中にエラーが発生した場合に備え、再試行やエラーログの保存を設定することが重要です。

タスクでの再試行設定:

from celery import shared_task
from celery.exceptions import Retry

@shared_task(bind=True, max_retries=3)
def unreliable_task(self):
    try:
        # 不安定な処理をシミュレート
        raise Exception("一時的なエラー")
    except Exception as e:
        raise self.retry(exc=e, countdown=60)  # 1分後に再試行

まとめ

Djangoにジョブキューを導入することで、長時間かかる処理を非同期で効率的に実行できるようになります。Celeryはシンプルなタスクから大規模な分散タスクまで対応可能で、Redisなどのブローカーと連携することで強力なバックグラウンド処理が実現します。