졸업 프로젝트 중 내가 이번주 넘 고생했던 api 구현이었기에 정리해본다!
우리 서비스는 생성된 코드를 통해 서버 환경에서 학습을 시키고, 그 학습 로그를 epoch마다 실시간으로 loss, accuracy 값을 프론트엔드에 전송을 해야했고, 학습이 완료되면 6가지의 학습 지표를 최종 출력해야했다.
프로젝트 개요
PyTorch 기반 학습 로그를 실시간으로 프론트엔드로 보내기 위한 전체 시스템 구성
- FastAPI가 websocket 경로를 따라 사용자의 학습 요청을 받아 Celery로 학습 태스크를 보냄
- 워커가 PyTorch 학습 실행 + print()를 Redis Pub 채널로 발행
- FastAPI는 해당 채널을 구독해서 클라이언트 WebSocket에 전송
- Celery는 Redis를 통해 워커에게 작업 전달

사용 기술 스택 요약
| FastAPI | 비동기 API 서버 + WebSocket 지원 |
| WebSocket | 클라이언트와 실시간 양방향 통신 |
| Celery | 비동기 작업 큐 |
| Redis | 메시지 브로커 + Pub/Sub 활용 |
| PyTorch | 딥러닝 모델 학습 수행 |
| Docker (선택) | 서비스 구성 자동화 |
전체 시스템 흐름
✅ 실시간 로그 전송 아키텍처
[Client WebSocket 요청]
↓
[FastAPI WebSocket 라우터 수신]
↓
[Celery Task로 학습 요청 전달]
↓
[Celery Worker가 학습 실행 (PyTorch)]
↓
[print → Redis Pub/Sub 로그 전달]
↓
[FastAPI가 Redis 로그 구독 → WebSocket으로 전송]
주요 동작 상세 설명
1) WebSocket: 실시간 연결
- 프론트에서 학습 시작 시 WebSocket 연결
- FastAPI가 연결 수립 후 Redis에서 로그 메시지 수신 대기
@app.websocket("/ws/train")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
pubsub = redis_client.pubsub()
pubsub.subscribe("user:1")
while True:
message = pubsub.get_message()
if message and message["type"] == "message":
await websocket.send_text(message["data"])
await asyncio.sleep(0.5)
2) Celery: 학습 비동기 처리
- 학습 요청은 Celery Task로 Redis Queue에 전달됨
- Worker가 학습 수행 후 결과를 Redis 채널로 발행
@celery_app.task(name="train_model", queue="training")
def train_model(code):
print = get_custom_logger("user:1") # Redis 발행용 print 대체
exec(code) # PyTorch 학습 코드 실행
3) Redis Pub/Sub: 실시간 로그 전달
- print 함수 override → Redis 채널에 메시지 발행
def get_custom_logger(channel_name):
def publish_log(*args, **kwargs):
message = " ".join(map(str, args))
redis_client.publish(channel_name, json.dumps({
"type": "log",
"message": message
}))
return publish_log
실행 방법
1️⃣ 의존성 설치
pip install fastapi uvicorn celery redis torch
2️⃣ Redis 실행
# 로컬에서 실행 시
redis-server
3️⃣ Celery 워커 실행
celery -A app.celery_worker.celery_app worker --loglevel=info --pool=solo -Q training
4️⃣ FastAPI 실행
uvicorn main:app --reload
5️⃣ WebSocket 연결 예시 (프론트 or 콘솔)
// JavaScript 예시
const socket = new WebSocket("ws://localhost:8000/ws/train?user_id=1");
socket.onmessage = (event) => {
console.log(event.data);
};
4) 예상 실행 결과
- WebSocket을 통해 학습 로그가 실시간 전송됨
- 로그 예시:
[Epoch 1] Loss: 0.52, Accuracy: 78.5%
[Epoch 2] Loss: 0.47, Accuracy: 82.3%
...
[Done] Precision: 0.81, Recall: 0.79, F1: 0.80, ...
- 6개 평가 지표도 학습 종료 후 함께 출력됨
⚠️ 주구장창 떴던 ERROR
1) ❗ [WinError 10061] 대상 컴퓨터에서 연결을 거부했으므로 연결하지 못했습니다

- 원인 : Celery task가 Redis가 개인적으로 만든 'training' 쿼리를 감지하고 있어야 했는데, 대기 중이라 Redis가 해당 Celery task를 보내지 못함
- 이 원인을 찾기까지 정말 많이 삽질했다 ... 밤에 잠도 못자고 원인을 찾았는데 서버 쪽인 줄 알고 괜스리 이상한 로그읽구 ㅠㅠ 그래도... 내 힘으로 찾아보고자 노력했으나 ... ㅎㅎㅎ 아우 어려웠다 !
- 해결 :
celery -A app.celery_worker.celery_app worker --loglevel=info --pool=solo -Q training
- 결과 : Celery가 클라이언트가 보내는 training 쿼리를 감지 하며 이후에는 Redis에 적용됨 !
2) ❗ Celery 설정 분리로 인한 broker URL 충돌
- 원인 : FastAPI 프로세스에서 Celery 설정을 다시 → .env의 Redis URL이 다른 것을 참조
- 해결 : FastAPI에서 from app.celery_worker import celery_app 정의
3) ❗ print 결과가 다시 publish_log로 가면 RecursionError
- 원인 : publish_log 내에서가 자체를 print 하는 데, print는 바꾸여있음
- 해결 : print 바꾸기 전과 후에 builtins.print 복원 구문 클래식 사용
4) ❗ 정의되지 않은 e 변수를 except 블록에서 사용
- 원인 : Python 최신 버전에서는 except Exception as e: 와 같은 방식으로 명시적으로 변수를 지정해야 합니다. 단순히 except: 내에서 e를 사용하면 정의되지 않은 변수 오류가 발생
- 해결 : except Exception as exc: 으로 수정하고, str(exc)로 예외 메시지를 참조하도록 수정
결과는 ..

요로코롬 해결완료 !! 히히
에포크마다 실시간으로 loss, accuracy 로그가 잘 오고,
최종적으로는 6개 평가지표가 로그 형식으로 잘 나온다 !
주요 컴포넌트 역할
FastAPI
- WebSocket 라우트를 통해 사용자와 실시간 연결을 유지
- JWT 인증 → 사용자 학습 요청 수신
- Redis Pub/Sub을 구독해 로그를 실시간으로 전송
Celery
- 비동기 task 큐 시스템
- 백그라운드에서 실행할 TASK(작업)을 큐에 등록하고, 워커가 분리된 프로세스에서 실행
- FastAPI에서 보낸 학습 요청을 다른 프로세스 (worker)에서 실행
- 학습 코드는 exec(model_code)로 실행 → PyTorch 학습 수행
Redis
- 인메모리 데이터 저장소
- Celery의 broker (작업 전달), result backend (결과 저장소) 역할
- 동시에 Pub/Sub 기능을 활용하여 print 로그를 WebSocket으로 전파
✔️ Redis Pub/Sub란?
"메시지를 발행(publish)하면, 해당 채널을 구독(subscribe) 중인 모든 클라이언트가 메시지를 받는 구조"
[흐름 예시]
- FastAPI 서버는 Redis에서 "user:1"이라는 채널을 구독하고 있음
- Celery 워커가 모델 학습 로그를 출력할 때마다 "user:1" 채널에 발행함
- 그러면 FastAPI가 그 메시지를 받아서 WebSocket을 통해 사용자에게 전송함
[Celery Worker] -- publish --> [Redis "user:1"] -- subscribe --> [FastAPI WebSocket] → 사용자 화면
[사용 예시 코드]
발행 (Publisher)
redis_client.publish("user:1", json.dumps({"type": "log", "message": "학습 시작"}))
구독 (Subscriber)
pubsub = redis_client.pubsub()
pubsub.subscribe("user:1")
for message in pubsub.listen():
print(message)
따라서, Pub/Sub는
- WebSocket처럼 양방향 통신이 필요한 경우 중간 연결 고리 역할
- Redis는 매우 빠른 인메모리 DB라서 실시간 로그 처리에 적합
- 다수의 클라이언트에게 동시에 같은 메시지를 보낼 수도 있다
는 점에서 유용하다 !!
✅ 다음 개발에서 더 신경써야 할 점
- Celery Queue 지정 확인
- 학습 요청은 "training" 큐에 담기므로, Celery 실행 시 반드시 -Q training 지정해야 함
- print 함수 override시 재귀 피하기
- print() 내부에서 Redis로 로그 전송 → 그 안에서 또 print() 호출되면 RecursionError 발생
- builtins.print 백업 → finally 블록에서 복원
- WebSocket Pub/Sub 처리 시 주의
- Redis pubsub.get_message()는 blocking이 아니므로 timeout 고려 필요
- asyncio.sleep(0.5) 주기로 폴링하면 적절함
- 예외 변수 이름 주의
- except:만 쓰고 내부에서 e를 사용하면 UnboundLocalError 발생
- 반드시 except Exception as e: 형태로 사용해야 함
배운 점
- WebSocket + Redis Pub/Sub의 연계는 실시간 처리에 매우 유용
- Celery를 통한 비동기 작업 처리의 필요성과 안정성 체험
다음 목표
- AWS SageMaker 등 클라우드 학습 연동
- 학습 결과 자동 저장 (예: S3)
- 모델 추론 API 구현 및 배포
아자아자 ....!!!
이 프로젝트를 통해 백엔드 실시간 처리 아키텍처 설계부터 오류 해결까지 직접 경험할 수 있었고,
실무에서도 활용 가능한 시스템 구성을 익혀서 써본 글 !
같은 문제를 겪는 사람들에게 도움이 되었으면 좋겠다 🙌
'Back-end' 카테고리의 다른 글
| [한국연구재단 R&D 프로젝트] OpenAI Playground에 LangChain 적용 (0) | 2025.11.18 |
|---|---|
| [한국연구재단 R&D 프로젝트] 실시간 통신 총정리 (3) | 2025.07.21 |
| OOM 문제 해결 (0) | 2025.04.19 |
| [node.js] schedular (0) | 2025.01.06 |
| DB 구축 및 서버 배포 과정 (1) | 2024.11.28 |