티스토리 뷰
1. Celery란?
- Python으로 작성된 분산 작업 큐 프레임워크
- 작업 스케줄링, 실행, 모니터링을 담당한다
- 물리적으로 여러 서버에 분산한다
# 일반적인 프로그램
[웹서버] → [단일 프로세스에서 모든 작업 처리]
# celery
[웹서버] → [메시지 브로커] → [워커1@서버A]
→ [워커2@서버B]
→ [워커3@서버C]
→ [워커4@서버D]
- FIFO(First In, First Out) 방식의 작업 큐
# 작업들이 큐에 순서대로 쌓임
send_email.delay("user1@example.com", "Hello") # ← 첫 번째 들어감
send_email.delay("user2@example.com", "Hello") # ← 두 번째 들어감
send_email.delay("user3@example.com", "Hello") # ← 세 번째 들어감
# 워커가 처리할 때도 순서대로
# 1. user1 이메일 처리
# 2. user2 이메일 처리
# 3. user3 이메일 처리
- 각 구성 요소가 독립적으로 작동하면서 전체 시스템을 구성한다.
# 작업 생성 # 큐 관리 # 작업실행
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ Producer │───▶│ Message Broker │◀───│ Worker │
│ (Celery App)│ │ (RabbitMQ/Redis)│ │(Celery Worker)│
└─────────────┘ └─────────────────┘ └─────────────┘
│
▼ # 결과저장
┌─────────────────┐
│ Result Backend │
│ (Optional) │
└─────────────────┘
2. 왜 필요할까?
- 웹 애플리케이션은, 사용자가 요청을 보내면 서버는 처리 후 응답을 돌려줘야 한다.
- 서버가 처리될 때까지 마냥 기다릴 순 없다.
- 즉시 필요한 것만 처리하고(사용자는 즉시 응답을 받음), 나머지는 백드라운드에서 처리할 수 있게 해준다.
3. 어디에 쓸 수 있을까?
- 시간이 오래 걸리는 작업(예: 이메일/SMS 발송)
- 리소스 집약적인 작업 (예: GPU/CPU 많이 사용하는 데이터 분석 등) ,
- 사용자 요청과 무관하게 주기적으로 실행하는 작업들(예: 로그 파일 정리, DB백업, 캐시 갱신, 임시 파일 삭제 등)
- 외부 시스템 의존적인 작업들(예: 외부 API호출 등)
4. async랑 뭐가 다를까?
- 동작하는 위치가 다르다. 메모리와 리소스 공유도 다르다.
- async는 동일한 프로세스 내에서 실행한다. 그래서 같은 프로세스의 메모리를 공유하고, 하나의 서버에서만 실행한다. 기다리는 방식을 효율적으로!
- celery는 완전히 별개의 프로세스/서버에서 실행한다. (다른 서버의 worker에서 실행). 완전히 독립적인 메로리 공간을 같고, CPU 집약적인 작업 시에도 웹 서버에 영향이 없고, 여러 서버에 분산 실행이 가능하다. 아예 기다리지 않게!
* async는 python 문법/표준 기능으로, python에 내장된 기어(엔진 내부 기술)
- 이벤트루프 기반 비동기 실행 모델, CPU 연산(이미지 인코딩, 대규모 행렬 곱셈, 데이터 압축, 암호화/복호화 등)을 여러 스레드/코어로 병렬 실행하는것이 아니라, I/O 대기시간 동안 다른 작업으로 전환하는 작업에 최적화
** I/O 바운드 작업: 네트워크 요청(HTTP API, DB, 메시지 큐), 파일/디스크/I/O, 웹소켓/스트리밍 처리 등
- 사용예시: 사용자 회원가입 시
1) async/await만 사용했을 경우
async def register_user(email, password):
# 모든 작업을 순서대로 기다림
user = await create_user(email, password) # 0.1초
await send_verification_email(user) # 2초
await create_user_profile(user) # 0.5초
await notify_admin(user) # 1초
await generate_welcome_content(user) # 3초
return user # 총 6.6초 후 응답
2) celery접근
def register_user(email, password):
user = create_user(email, password) # 0.1초
# 나머지는 백그라운드에서
post_registration_tasks.delay(user.id)
return user # 0.1초 만에 응답
@app.task
def post_registration_tasks(user_id):
user = get_user(user_id)
send_verification_email(user)
create_user_profile(user)
notify_admin(user)
generate_welcome_content(user)
5. celery 사용
1) 설치
pip install celery[redis] # Redis 브로커/백엔드
# (선택) flower 모니터링: pip install flower
2) 프로젝트 구조
your_project/
├─ app.py # Celery 앱 생성 & 설정
├─ tasks.py # 태스크 모듈
├─ config.py # (선택) 설정 분리
└─ requirements.txt
3) celery 앱 생성
# app.py
from celery import Celery
from kombu import Queue
# 'task_manager' = 앱 이름(문자열, 식별자)
celery_app = Celery('task_manager')
# 기본 설정 (직접 update)
celery_app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
# 직렬화/타임존
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Seoul',
enable_utc=True,
# 워커/안정성
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
# 기본 큐 및 라우팅
task_default_queue='default',
task_queues=(
Queue('default'),
Queue('high_priority'),
Queue('low_priority'),
),
task_routes={
'tasks.send_email': {'queue': 'high_priority'},
'tasks.generate_report': {'queue': 'low_priority'},
},
)
# (선택) config.py 따로 둘 경우:
# celery_app.config_from_object('config')
# 실행은
celery -A task_manger worker -l info
4) task 작성
- bind=True 설정 시, 해당 task를 self로 받음, self.retry()같은 재시도 제어 가능
- autoretry_for, retry_backoff로 실패 자동 재시도
# tasks.py
from app import celery_app
# 기본 이름은 "tasks.add" (모듈명.함수명)
@celery_app.task
def add(x, y):
return x + y
# 이름 문자열로 호출 테스트용 (명시적 name 지정)
@celery_app.task(name="notify.send_email")
def send_email(to, subject):
return {"to": to, "subject": subject, "sent": True}
5) task 실행(프로듀서)
- delay() : 가장 간단하나 제어(큐/시간/우선순위)는 못건드림, task를 import해서 사용
# producer_delay.py
from tasks import add, send_email
# 기본 큐(default)로 즉시 실행
r1 = add.delay(1, 2)
# 기본 큐로 즉시 실행 (태스크 이름과 무관, 함수 참조)
r2 = send_email.delay("user@test.com", "Hello")
print(r1.id, r2.id)
- apply_async() : 세부 옵션 제어(queue, countdown, eta, priority, expires 등 모든 실행 옵션 가능), task를 import해서 사용
# producer_apply_async.py
from tasks import add, send_email
from datetime import timedelta
# 10초 뒤 실행
ra = add.apply_async(args=(10, 20), countdown=10)
# 특정 큐로 보내기 (우선 처리하고 싶은 작업)
rb = send_email.apply_async(("vip@test.com", "Priority"), queue="priority")
# ETA로 특정 시각에 실행 (예: 30초 뒤)
rc = add.apply_async(args=(100, 200), eta=None, countdown=30)
print(ra.id, rb.id, rc.id)
- send_task: app을 import해서 사용. task.py가 없어도, task 이름만 알면 사용 가능해 외부 서비스/분산 환경에서 사용
# producer_send_task.py
from app import celery_app
# 모듈 경로 기반 이름 (자동명): "tasks.add"
r1 = celery_app.send_task("tasks.add", args=[3, 4])
# 명시적 이름을 부여한 태스크: "notify.send_email"
r2 = celery_app.send_task("notify.send_email", args=["ext@test.com", "Hi from external"])
print(r1.id, r2.id)
6) 워커실행(CLI)
celery -A app.celery_app worker -l info -Q default,priority
7) 스케쥴링이 필요할 경우 beat 사용
# app.py (추가)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'daily-report-at-09': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=9, minute=0), # 매일 09:00
'options': {'queue': 'low_priority'},
},
}
# 실행
celery -A app.celery_app beat -l info
8) 모니터링(선택): Flower
celery -A app.celery_app flower --port=5555
6. amqps:// 와 rpc://, AsyncResult
1) amqps: 메시지브로커 연결 URL (RabbitMQ, Redis, AWS SQS 등)
- 보안 연결로 메시지를 안전하게 주고 받음
- amqps = amqp + SSL/TLS 암호화
2) rpc: Task 가 끝나고 결과를 저장해 두는 곳 (RabbitMQ의 응답 큐, rpc백엔드)
- 휘발성으로, 결과는 Task 실행 직후에만 확인 가능하고, 나중에 다시 꺼내보는 것은 불가능하다.
3) AsyncResult
- Celery 태스크 실행 결과를 추적하는 객체 (상태 확인-.state, 완료 대기-.get(), 취소-.revoke(), 진행률-meta 조회 등 가능)
- task를 실행하면, 바로 태스크 결과값을 주는게 아니라, 결과를 나중에 확인할 수 있는 티켓을 주는데, 그게 AsyncResult다
- 이때 AsyncResult는 task_id를 포함하고, 워커가 task를 실행 후 결과를 backend(rpc)에 기록하면, AsyncResult.get()으로 결과를 가져올 수 있다.
from tasks import add
result = add.delay(4, 6)
print(type(result)) # <class 'celery.result.AsyncResult'>
7. exchange와 queue, routing_key
1) exchange: 메시지를 받아서 "어떤 큐로 갈지" 분배하는 라우터
- exchange type
| 타입 | 라우팅 방식 | 설명 | 예시 |
| direct | routing_key와 정확히 일치하는 큐로 전달 | 가장 단순한 방식. 보통 "큐 이름 = 라우팅 키" 형태로 매칭 |
routing_key="email" → Queue("email")로 전달 |
| topic | routing_key와 패턴 매칭(*, #) | 유연한 pub/sub 패턴 *=단어 하나 #=0개 이상 단어 |
routing_key="user.signup" Queue바인딩키: "user.*" → 매칭됨 |
| fanout | 라우팅 키 무시, 모든 큐로 브로드캐스트 | publish-subscribe모델에 적합 | 알림 메시지를 모든 소비자에게 전달 |
| headers | routing_key 대신 헤더값(key/value)으로 매칭 | 메시지 속성의 헤더 조건 기반 라우팅 | x-match=any + {format=pdf} 바인딩된 큐만 받음 |
2) queue: 메시지가 실제 저장돼 있다가 워커가 소비하는 버퍼
3) routing_key: 메시지를 어떤 큐로 보낼지 결정하는 주소 라벨
- Total
- Today
- Yesterday
- 영어회화
- 실기
- 빅데이터 분석기사
- IH
- 스크랩
- 경제
- 줄넘기
- opic
- 30분
- Python
- 오픽
- 아침
- 갓생
- SQL
- 다이어트
- 오블완
- llm
- 습관
- 기초
- 아침운동
- 루틴
- 고득점 Kit
- 미라클모닝
- 티스토리챌린지
- 프로그래머스
- 운동
- C언어
- ChatGPT
- 뉴스
- Ai
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |