Back-end

FastAPI + Websocket + Celery + Redis: train 로그 실시간 전송 구현

hjr067 2025. 5. 6. 00:01

졸업 프로젝트 중 내가 이번주 넘 고생했던 api 구현이었기에 정리해본다!

우리 서비스는 생성된 코드를 통해 서버 환경에서 학습을 시키고, 그 학습 로그를 epoch마다 실시간으로 loss, accuracy 값을 프론트엔드에 전송을 해야했고, 학습이 완료되면 6가지의 학습 지표를 최종 출력해야했다.

 

프로젝트 개요

PyTorch 기반 학습 로그를 실시간으로 프론트엔드로 보내기 위한 전체 시스템 구성

  • FastAPI가 websocket 경로를 따라 사용자의 학습 요청을 받아 Celery로 학습 태스크를 보냄
  • 워커가 PyTorch 학습 실행 + print()를 Redis Pub 채널로 발행
  • FastAPI는 해당 채널을 구독해서 클라이언트 WebSocket에 전송
  • Celery는 Redis를 통해 워커에게 작업 전달

사용 기술 스택 요약

FastAPI 비동기 API 서버 + WebSocket 지원
WebSocket 클라이언트와 실시간 양방향 통신
Celery 비동기 작업 큐
Redis 메시지 브로커 + Pub/Sub 활용
PyTorch 딥러닝 모델 학습 수행
Docker (선택) 서비스 구성 자동화

전체 시스템 흐름

✅ 실시간 로그 전송 아키텍처

더보기

[Client WebSocket 요청]
          ↓
[FastAPI WebSocket 라우터 수신]
          ↓
[Celery Task로 학습 요청 전달]
          ↓
[Celery Worker가 학습 실행 (PyTorch)]
          ↓
[print → Redis Pub/Sub 로그 전달]
          ↓
[FastAPI가 Redis 로그 구독 → WebSocket으로 전송]

주요 동작 상세 설명

1) WebSocket: 실시간 연결

  • 프론트에서 학습 시작 시 WebSocket 연결
  • FastAPI가 연결 수립 후 Redis에서 로그 메시지 수신 대기
@app.websocket("/ws/train")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    pubsub = redis_client.pubsub()
    pubsub.subscribe("user:1")

    while True:
        message = pubsub.get_message()
        if message and message["type"] == "message":
            await websocket.send_text(message["data"])
        await asyncio.sleep(0.5)

2) Celery: 학습 비동기 처리

  • 학습 요청은 Celery Task로 Redis Queue에 전달됨
  • Worker가 학습 수행 후 결과를 Redis 채널로 발행
@celery_app.task(name="train_model", queue="training")
def train_model(code):
    print = get_custom_logger("user:1")  # Redis 발행용 print 대체
    exec(code)  # PyTorch 학습 코드 실행

3) Redis Pub/Sub: 실시간 로그 전달

  • print 함수 override → Redis 채널에 메시지 발행
def get_custom_logger(channel_name):
    def publish_log(*args, **kwargs):
        message = " ".join(map(str, args))
        redis_client.publish(channel_name, json.dumps({
            "type": "log",
            "message": message
        }))
    return publish_log

 

실행 방법

1️⃣ 의존성 설치

pip install fastapi uvicorn celery redis torch

2️⃣ Redis 실행

# 로컬에서 실행 시
redis-server

3️⃣ Celery 워커 실행

celery -A app.celery_worker.celery_app worker --loglevel=info --pool=solo -Q training

4️⃣ FastAPI 실행

uvicorn main:app --reload

5️⃣ WebSocket 연결 예시 (프론트 or 콘솔)

// JavaScript 예시
const socket = new WebSocket("ws://localhost:8000/ws/train?user_id=1");
socket.onmessage = (event) => {
  console.log(event.data);
};

 

4) 예상 실행 결과

  • WebSocket을 통해 학습 로그가 실시간 전송됨
  • 로그 예시:
[Epoch 1] Loss: 0.52, Accuracy: 78.5%
[Epoch 2] Loss: 0.47, Accuracy: 82.3%
...
[Done] Precision: 0.81, Recall: 0.79, F1: 0.80, ...
  • 6개 평가 지표도 학습 종료 후 함께 출력됨

 

⚠️ 주구장창 떴던 ERROR

1) ❗ [WinError 10061] 대상 컴퓨터에서 연결을 거부했으므로 연결하지 못했습니다

  • 원인 : Celery task가 Redis가 개인적으로 만든 'training' 쿼리를 감지하고 있어야 했는데, 대기 중이라 Redis가 해당 Celery task를 보내지 못함
    • 이 원인을 찾기까지 정말 많이 삽질했다 ... 밤에 잠도 못자고 원인을 찾았는데 서버 쪽인 줄 알고 괜스리 이상한 로그읽구 ㅠㅠ 그래도... 내 힘으로 찾아보고자 노력했으나 ... ㅎㅎㅎ 아우 어려웠다 !
  • 해결 :
celery -A app.celery_worker.celery_app worker --loglevel=info --pool=solo -Q training
  • 결과 : Celery가 클라이언트가 보내는 training 쿼리를 감지 하며 이후에는 Redis에 적용됨 !

2) ❗ Celery 설정 분리로 인한 broker URL 충돌

  • 원인 : FastAPI 프로세스에서 Celery 설정을 다시 → .env의 Redis URL이 다른 것을 참조
  • 해결 : FastAPI에서 from app.celery_worker import celery_app 정의

3) ❗ print 결과가 다시 publish_log로 가면 RecursionError

  • 원인 : publish_log 내에서가 자체를 print 하는 데, print는 바꾸여있음
  • 해결 : print 바꾸기 전과 후에 builtins.print 복원 구문 클래식 사용

4) ❗ 정의되지 않은 e 변수를 except 블록에서 사용

  • 원인 : Python 최신 버전에서는 except Exception as e: 와 같은 방식으로 명시적으로 변수를 지정해야 합니다. 단순히 except: 내에서 e를 사용하면 정의되지 않은 변수 오류가 발생
  • 해결 : except Exception as exc: 으로 수정하고, str(exc)로 예외 메시지를 참조하도록 수정

 

결과는 ..

요로코롬 해결완료 !! 히히

에포크마다 실시간으로 loss, accuracy 로그가 잘 오고,

최종적으로는 6개 평가지표가 로그 형식으로 잘 나온다 !

주요 컴포넌트 역할

FastAPI

  • WebSocket 라우트를 통해 사용자와 실시간 연결을 유지
  • JWT 인증 → 사용자 학습 요청 수신
  • Redis Pub/Sub을 구독해 로그를 실시간으로 전송

Celery

  • 비동기 task 큐 시스템
  • 백그라운드에서 실행할 TASK(작업)을 큐에 등록하고, 워커가 분리된 프로세스에서 실행
  • FastAPI에서 보낸 학습 요청을 다른 프로세스 (worker)에서 실행
  • 학습 코드는 exec(model_code)로 실행 → PyTorch 학습 수행

Redis

  • 인메모리 데이터 저장소
  • Celery의 broker (작업 전달), result backend (결과 저장소) 역할
  • 동시에 Pub/Sub 기능을 활용하여 print 로그를 WebSocket으로 전파

✔️ Redis Pub/Sub란?

"메시지를 발행(publish)하면, 해당 채널을 구독(subscribe) 중인 모든 클라이언트가 메시지를 받는 구조"

[흐름 예시]

 

  • FastAPI 서버는 Redis에서 "user:1"이라는 채널을 구독하고 있음
  • Celery 워커가 모델 학습 로그를 출력할 때마다 "user:1" 채널에 발행
  • 그러면 FastAPI가 그 메시지를 받아서 WebSocket을 통해 사용자에게 전송
[Celery Worker] -- publish --> [Redis "user:1"] -- subscribe --> [FastAPI WebSocket] → 사용자 화면

 

 

[사용 예시 코드]

발행 (Publisher)

redis_client.publish("user:1", json.dumps({"type": "log", "message": "학습 시작"}))

 

구독 (Subscriber)

pubsub = redis_client.pubsub()
pubsub.subscribe("user:1")
for message in pubsub.listen():
    print(message)

따라서, Pub/Sub는

 

  • WebSocket처럼 양방향 통신이 필요한 경우 중간 연결 고리 역할
  • Redis는 매우 빠른 인메모리 DB라서 실시간 로그 처리에 적합
  • 다수의 클라이언트에게 동시에 같은 메시지를 보낼 수도 있다

는 점에서 유용하다 !!

 

다음 개발에서 더 신경써야 할 점

  1. Celery Queue 지정 확인
    • 학습 요청은 "training" 큐에 담기므로, Celery 실행 시 반드시 -Q training 지정해야 함
  2. print 함수 override시 재귀 피하기
    • print() 내부에서 Redis로 로그 전송 → 그 안에서 또 print() 호출되면 RecursionError 발생
    • builtins.print 백업 → finally 블록에서 복원
  3. WebSocket Pub/Sub 처리 시 주의
    • Redis pubsub.get_message()는 blocking이 아니므로 timeout 고려 필요
    • asyncio.sleep(0.5) 주기로 폴링하면 적절함
  4. 예외 변수 이름 주의
    • except:만 쓰고 내부에서 e를 사용하면 UnboundLocalError 발생
    • 반드시 except Exception as e: 형태로 사용해야 함

배운 점

  • WebSocket + Redis Pub/Sub의 연계는 실시간 처리에 매우 유용
  • Celery를 통한 비동기 작업 처리의 필요성과 안정성 체험

 

다음 목표

 

  • AWS SageMaker 등 클라우드 학습 연동
  • 학습 결과 자동 저장 (예: S3)
  • 모델 추론 API 구현 및 배포

아자아자 ....!!!

 

이 프로젝트를 통해 백엔드 실시간 처리 아키텍처 설계부터 오류 해결까지 직접 경험할 수 있었고,

실무에서도 활용 가능한 시스템 구성을 익혀서 써본 글 !
같은 문제를 겪는 사람들에게 도움이 되었으면 좋겠다 🙌