티스토리 뷰

1. Celery란?

- Python으로 작성된 분산 작업 큐 프레임워크

- 작업 스케줄링, 실행, 모니터링을 담당한다

- 물리적으로 여러 서버에 분산한다

# 일반적인 프로그램
[웹서버] → [단일 프로세스에서 모든 작업 처리]

# celery
[웹서버] → [메시지 브로커] → [워커1@서버A]
                           → [워커2@서버B] 
                           → [워커3@서버C]
                           → [워커4@서버D]

- FIFO(First In, First Out) 방식의 작업 큐

# 작업들이 큐에 순서대로 쌓임
send_email.delay("user1@example.com", "Hello")  # ← 첫 번째 들어감
send_email.delay("user2@example.com", "Hello")  # ← 두 번째 들어감  
send_email.delay("user3@example.com", "Hello")  # ← 세 번째 들어감

# 워커가 처리할 때도 순서대로
# 1. user1 이메일 처리
# 2. user2 이메일 처리
# 3. user3 이메일 처리

- 각 구성 요소가 독립적으로 작동하면서 전체 시스템을 구성한다.

# 작업 생성             # 큐 관리             # 작업실행
┌─────────────┐    ┌─────────────────┐    ┌─────────────┐
│   Producer  │───▶│  Message Broker │◀───│   Worker    │
│ (Celery App)│    │  (RabbitMQ/Redis)│    │(Celery Worker)│
└─────────────┘    └─────────────────┘    └─────────────┘
                            │
                            ▼ # 결과저장
                   ┌─────────────────┐
                   │  Result Backend │
                   │   (Optional)    │
                   └─────────────────┘

2. 왜 필요할까?

- 웹 애플리케이션은, 사용자가 요청을 보내면 서버는 처리 후 응답을 돌려줘야 한다.

- 서버가 처리될 때까지 마냥 기다릴 순 없다.

- 즉시 필요한 것만 처리하고(사용자는 즉시 응답을 받음), 나머지는 백드라운드에서 처리할 수 있게 해준다. 

 

3. 어디에 쓸 수 있을까?

- 시간이 오래 걸리는 작업(예: 이메일/SMS 발송)

- 리소스 집약적인 작업 (예: GPU/CPU 많이 사용하는 데이터 분석 등)  ,

- 사용자 요청과 무관하게 주기적으로 실행하는 작업들(예: 로그 파일 정리, DB백업, 캐시 갱신, 임시 파일 삭제 등)

- 외부 시스템 의존적인 작업들(예: 외부 API호출 등)

 

4. async랑 뭐가 다를까?

- 동작하는 위치가 다르다. 메모리와 리소스 공유도 다르다.

- async는 동일한 프로세스 내에서 실행한다. 그래서 같은 프로세스의 메모리를 공유하고, 하나의 서버에서만 실행한다. 기다리는 방식을 효율적으로!

- celery는 완전히 별개의 프로세스/서버에서 실행한다. (다른 서버의 worker에서 실행). 완전히 독립적인 메로리 공간을 같고, CPU 집약적인 작업 시에도 웹 서버에 영향이 없고, 여러 서버에 분산 실행이 가능하다. 아예 기다리지 않게!

* async는 python 문법/표준 기능으로, python에 내장된 기어(엔진 내부 기술)

- 이벤트루프 기반 비동기 실행 모델, CPU 연산(이미지 인코딩, 대규모 행렬 곱셈, 데이터 압축, 암호화/복호화 등)을 여러 스레드/코어로 병렬 실행하는것이 아니라, I/O 대기시간 동안 다른 작업으로 전환하는 작업에 최적화

** I/O 바운드 작업: 네트워크 요청(HTTP API, DB, 메시지 큐), 파일/디스크/I/O, 웹소켓/스트리밍 처리 등

- 사용예시: 사용자 회원가입 시

1) async/await만 사용했을 경우

async def register_user(email, password):
    # 모든 작업을 순서대로 기다림
    user = await create_user(email, password)           # 0.1초
    await send_verification_email(user)                 # 2초
    await create_user_profile(user)                     # 0.5초
    await notify_admin(user)                           # 1초
    await generate_welcome_content(user)               # 3초
    
    return user  # 총 6.6초 후 응답

 

2) celery접근

def register_user(email, password):
    user = create_user(email, password)  # 0.1초
    
    # 나머지는 백그라운드에서
    post_registration_tasks.delay(user.id)
    
    return user  # 0.1초 만에 응답

@app.task
def post_registration_tasks(user_id):
    user = get_user(user_id)
    send_verification_email(user)
    create_user_profile(user) 
    notify_admin(user)
    generate_welcome_content(user)

 

5. celery 사용

1) 설치

pip install celery[redis]  # Redis 브로커/백엔드
# (선택) flower 모니터링: pip install flower

2) 프로젝트 구조

your_project/
├─ app.py                 # Celery 앱 생성 & 설정
├─ tasks.py               # 태스크 모듈
├─ config.py              # (선택) 설정 분리
└─ requirements.txt

3) celery 앱 생성

# app.py
from celery import Celery
from kombu import Queue

# 'task_manager' = 앱 이름(문자열, 식별자)
celery_app = Celery('task_manager')

# 기본 설정 (직접 update)
celery_app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',

    # 직렬화/타임존
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='Asia/Seoul',
    enable_utc=True,

    # 워커/안정성
    worker_prefetch_multiplier=1,
    task_acks_late=True,
    task_reject_on_worker_lost=True,

    # 기본 큐 및 라우팅
    task_default_queue='default',
    task_queues=(
        Queue('default'),
        Queue('high_priority'),
        Queue('low_priority'),
    ),
    task_routes={
        'tasks.send_email': {'queue': 'high_priority'},
        'tasks.generate_report': {'queue': 'low_priority'},
    },
)

# (선택) config.py 따로 둘 경우:
# celery_app.config_from_object('config')

# 실행은
celery -A task_manger worker -l info

4) task 작성

- bind=True 설정 시, 해당 task를 self로 받음, self.retry()같은 재시도 제어 가능

- autoretry_for, retry_backoff로 실패 자동 재시도

# tasks.py
from app import celery_app

# 기본 이름은 "tasks.add" (모듈명.함수명)
@celery_app.task
def add(x, y):
    return x + y

# 이름 문자열로 호출 테스트용 (명시적 name 지정)
@celery_app.task(name="notify.send_email")
def send_email(to, subject):
    return {"to": to, "subject": subject, "sent": True}

 

5) task 실행(프로듀서) 

- delay() : 가장 간단하나 제어(큐/시간/우선순위)는 못건드림, task를 import해서 사용

# producer_delay.py
from tasks import add, send_email

# 기본 큐(default)로 즉시 실행
r1 = add.delay(1, 2)

# 기본 큐로 즉시 실행 (태스크 이름과 무관, 함수 참조)
r2 = send_email.delay("user@test.com", "Hello")

print(r1.id, r2.id)

- apply_async() : 세부 옵션 제어(queue, countdown, eta, priority, expires 등 모든 실행 옵션 가능), task를 import해서 사용

# producer_apply_async.py
from tasks import add, send_email
from datetime import timedelta

# 10초 뒤 실행
ra = add.apply_async(args=(10, 20), countdown=10)

# 특정 큐로 보내기 (우선 처리하고 싶은 작업)
rb = send_email.apply_async(("vip@test.com", "Priority"), queue="priority")

# ETA로 특정 시각에 실행 (예: 30초 뒤)
rc = add.apply_async(args=(100, 200), eta=None, countdown=30)

print(ra.id, rb.id, rc.id)

- send_task: app을 import해서 사용. task.py가 없어도, task 이름만 알면 사용 가능해 외부 서비스/분산 환경에서 사용

# producer_send_task.py
from app import celery_app

# 모듈 경로 기반 이름 (자동명): "tasks.add"
r1 = celery_app.send_task("tasks.add", args=[3, 4])

# 명시적 이름을 부여한 태스크: "notify.send_email"
r2 = celery_app.send_task("notify.send_email", args=["ext@test.com", "Hi from external"])

print(r1.id, r2.id)

6) 워커실행(CLI)

celery -A app.celery_app worker -l info -Q default,priority

 

7) 스케쥴링이 필요할 경우 beat 사용

# app.py (추가)
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    'daily-report-at-09': {
        'task': 'tasks.generate_report',
        'schedule': crontab(hour=9, minute=0),  # 매일 09:00
        'options': {'queue': 'low_priority'},
    },
}


# 실행
celery -A app.celery_app beat -l info

 

8) 모니터링(선택): Flower

celery -A app.celery_app flower --port=5555

 

6. amqps:// 와 rpc://, AsyncResult

1) amqps: 메시지브로커 연결 URL (RabbitMQ, Redis, AWS SQS 등)

- 보안 연결로 메시지를 안전하게 주고 받음

- amqps = amqp + SSL/TLS 암호화

 

2) rpc: Task 가 끝나고 결과를 저장해 두는 곳 (RabbitMQ의 응답 큐, rpc백엔드)

- 휘발성으로, 결과는 Task 실행 직후에만 확인 가능하고, 나중에 다시 꺼내보는 것은 불가능하다.

 

3) AsyncResult

- Celery 태스크 실행 결과를 추적하는 객체 (상태 확인-.state, 완료 대기-.get(), 취소-.revoke(), 진행률-meta 조회 등 가능)

- task를 실행하면, 바로 태스크 결과값을 주는게 아니라, 결과를 나중에 확인할 수 있는 티켓을 주는데, 그게 AsyncResult다

- 이때 AsyncResult는 task_id를 포함하고, 워커가 task를 실행 후 결과를 backend(rpc)에 기록하면, AsyncResult.get()으로 결과를 가져올 수 있다.

from tasks import add

result = add.delay(4, 6)
print(type(result))   # <class 'celery.result.AsyncResult'>

 

7. exchange와 queue, routing_key

1) exchange: 메시지를 받아서 "어떤 큐로 갈지" 분배하는 라우터

- exchange type

타입 라우팅 방식 설명 예시
direct routing_key와 정확히 일치하는 큐로 전달 가장 단순한 방식.
보통 "큐 이름 = 라우팅 키" 형태로 매칭
routing_key="email" → Queue("email")로 전달
topic routing_key와 패턴 매칭(*, #) 유연한 pub/sub 패턴
*=단어 하나
#=0개 이상 단어
routing_key="user.signup"
Queue바인딩키: "user.*" → 매칭됨
fanout 라우팅 키 무시, 모든 큐로 브로드캐스트 publish-subscribe모델에 적합 알림 메시지를 모든 소비자에게 전달
headers routing_key 대신 헤더값(key/value)으로 매칭 메시지 속성의 헤더 조건 기반 라우팅 x-match=any + {format=pdf} 바인딩된 큐만 받음

2) queue: 메시지가 실제 저장돼 있다가 워커가 소비하는 버퍼

3) routing_key: 메시지를 어떤 큐로 보낼지 결정하는 주소 라벨

반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/01   »
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30 31
글 보관함