Back-end

[한국연구재단 R&D 프로젝트] 실시간 통신 총정리

hjr067 2025. 7. 21. 02:37

아우웅 .. 지금 완전 WebSocket이며 Redis며 Nginx 프록시며 실시간 통신에 관한 구현을 하고는 있는데 머릿속으로 정리가 안돼서 티스토리에라도 끄적여봅니다 ..

 

일단, 구현한 것은 

1. 3명이 한 방에 들어가서 3초 후에 동시 시작

2. 시작 후 3명이 음성 토론을 실시간으로 진행 (약간 줌 느낌이라고 난 생각했다)

  • 이때, 음성 시작과 끝을 백엔드는 기록한다 → 음성 토론을 다 녹음한 후 STT를 이용하여 스크립트화 해서 후에 GPT API를 통해 분석해야 한다.
  • WebRTC 시그널링을 통해 P2P 연결

3. 페이지는 방장이 넘길 때 다른 사람의 접속 페이지에서도 동시에 넘어가야 하며

4. 각자 선택 제출을 음성 토론 중에 진행하고,

5. 종료되면 음성 세션 종료와 함께 audio 기록 업로드

 

정도가 동시성 구현 하는 거였는데요

(물론 다른 기능적 api가 40개 정도 더 있었다 ;; ㅎㅎㅎ)

그래도 서버비랑 개발비용 지원받으면서 했던 플젝은 처음이라 좀 열심히 하고자 했었던 것 가타요

 

아무튼 서론이 길었는데 공부한 모든 것들을 정리해볼게요

 

우선 실시간 동작의 전체 구조

이렇게 되는 것 같아요 (이 말투 쓰면 안되는데 자신이 없어짐 ㅠ)

 

웹소켓 엔드포인트 설계

# app/api/voice_ws.py
@router.websocket("/voice/{session_id}")
async def voice_session_ws(
    websocket: WebSocket,
    session_id: str,
    db: AsyncSession = Depends(get_db),
):
    # 1. 연결 수락 전에 토큰 검증
    token = websocket.query_params.get("token", "").strip('"')
    payload = verify_token(token)
    if not token or not payload:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    user_id = payload.get("sub")
    await websocket.accept()

    try:
        while True:
            raw = await websocket.receive_text()
            msg = json.loads(raw)
            mtype: str = msg.get("type")
            data: dict = msg.get("data", {})

            # 1) 최초 init
            if mtype == "init":
                await manager.connect(websocket, session_id, user_info=data)
                await _handle_init(db, session_id, data)
                await manager.broadcast_to_session(
                    session_id,
                    ParticipantEvent(
                        type="join",
                        participant_id=data.get("user_id") or data.get("guest_id"),
                        nickname=data["nickname"],
                    ).model_dump()
                )
            # 2) 마이크/발화 상태 변경
            elif mtype == "voice_status_update":
                participant = await voice_service.update_voice_status(
                    db=db,
                    session_id=session_id,
                    user_id=data.get("user_id"),
                    guest_id=data.get("guest_id"),
                    is_mic_on=data["is_mic_on"],
                    is_speaking=data.get("is_speaking", False),
                )
                await manager.broadcast_to_session(
                    session_id,
                    VoiceStatusBroadcast(
                        participant_id=participant.id,
                        nickname=participant.nickname,
                        is_mic_on=participant.is_mic_on,
                        is_speaking=participant.is_speaking,
                    ).model_dump()
                )
            # 3) 녹음 시작/종료 등...

 

1. 3명이 한 방에 들어가서 3초 후 동시 시작

  • 방 입장 및 동기화
    • 각 참가자는 /ws/voice/{session_id} WebSocket에 접속해서, init 메세지로 참가자 정보를 서버에 등록함
    • 이때, 세 명의 참가자는 하나의 세션 id를 통해 접속한다 (GET)
    • 서버는 참가자 입장 정보를 broadcast해서, 프론트엔드에서 "3명 모두 입장"을 감지할 수 있음
    • 3초 후 동시 시작은 참가자 수가 3명이 되면 타이머를 시작하고, 3초 후에 모두 동시에 시작 신호를 받도록 구현
      • 이건 내가 프론트에서 구현할 수 있는거 아닌가? 라고 생각했는데 이러면 약간씩의 오차가 생길 수 있어서 서버에서 동시에 3초를 세어주고 시작시키는 게 오차가 적다구 해서 구현
  # app/api/voice_ws.py
  if mtype == "init":
      await manager.connect(websocket, session_id, user_info=data)
      await _handle_init(db, session_id, data)
      await manager.broadcast_to_session(
          session_id,
          ParticipantEvent(
              type="join",
              participant_id=data.get("user_id") or data.get("guest_id"),
              nickname=data["nickname"],
          ).model_dump()
      )

 

2. 시작 후 3명이 음성 토론을 실시간으로 진행

  • WebRTC P2P 연결
    • 각 참가자는 /ws/signaling WebSocket에 접속해서, offer/answer/candidate 메세지를 주고받음
      • 클라이언트끼리 P2P 연결을 맺기 위해 저 세개의 메세지를 주고 받는것 ! 방 코드로 각 방의 연결을 분리해서, 같은 방에 있는 사람들끼리만 시그널링 메세지를 주고받게 함.
    • 서버는 단순히 시그널링 메세지를 같은 방(같은 room_code로 입장한)에 있는 다른 참가자에게 중계
    • 실제 음성 데이터는 브라우저끼리 P2P로 주고받음 (서버는 시그널링만 담당)
  # app/api/voice_signaling_ws.py
  @router.websocket("/ws/signaling")
  async def signaling_ws(
      websocket: WebSocket,
      room_code: str = Query(...),
      token: str = Query(...)
  ):
      ...
      await manager.connect(room_code, websocket)
      try:
          while True:
              data = await websocket.receive_json()
              await manager.broadcast(room_code, data, sender=websocket)
      except WebSocketDisconnect:
          await manager.disconnect(room_code, websocket)

 

3. 음성 시작, 끝 기록

  • 음성 녹음 시작/종료
    • 클라이언트가 start_recording, stop_recording 메세지를 보내면, 서버는 DB에 녹음 시작/종료 시각, 파일 경로 등을 기록
    • 녹음 파일은 서버에 업로드되고, 이후 STT 및 GPT 분석 파이프라인으로 연동
  # app/api/voice_ws.py
  elif mtype == "start_recording":
      participant = await voice_service.start_recording(
          db=db,
          session_id=session_id,
          user_id=user_id,
          guest_id=None,
      )
      await websocket.send_json({
          "type": "recording_started",
          "data": {
              "path": participant.recording_file_path,
              "started_at": str(participant.recording_started_at)
          }
      })
  elif mtype == "stop_recording":
      participant, duration = await voice_service.stop_recording(
          db=db,
          session_id=session_id,
          user_id=user_id,
          guest_id=None,
      )
      await websocket.send_json({
          "type": "recording_stopped",
          "data": {
              "path": participant.recording_file_path,
              "ended_at": str(participant.recording_ended_at),
              "duration": duration
          }
      })

 

4. 방장 권한 확인 및 페이지 동기화

  • 방장이 next_page 메세지를 보내면, 서버가 방장 권한을 확인하고, 모든 참가자에게 "type": "next_page" 신호를 broadcast
  • Frontend는 이 신호를 받아서 페이지를 동시에 전환
  # app/api/endpoints/voice.py
  elif mtype == "next_page":
      user_id = message.get("user_id")
      is_host = await websocket_manager.is_host(session_id, user_id)
      if not is_host:
          await websocket.send_json({
              "type": "error",
              "message": "방장만 다음 페이지로 이동할 수 있습니다."
          })
          continue
      await websocket_manager.broadcast_to_session(session_id, {
          "type": "next_page"
      })
      await websocket.send_json({
          "type": "info",
          "message": "다음 페이지로 이동했습니다."
      })

 

5. 각자 선택 제출을 음성 토론 중 진행

6. 종료되면 음성 세션 종료와 함께 audio 기록 업로드

 

 

그리고 오류들을 토대로 깨달은 점 ... ㅎㅎ

 

1) 웹소켓 연결 시 토큰은 query parameter로 전달하고, 연결 수락 전에 검증해야 한다

  • 왜냐하면 웹소켓은 HTTP 요청과 다르게 헤더에 토큰을 넣기 어려워서, query parameter를 사용하는 것이 안정적이다

 

2) WebRTC 시그널링 서버 분리 문제

  • 음성 통신과 일반 메세지가 같은 WebSocket에서 섞여서 복잡해짐 -> 자꾸 이게 꼬여서 다른게 load됨 ;;
  • ws/voice/{session_id} , ws/voice/signaling 이런식이었는데 후자를 call 해도 앞에꺼에서 걸리고 했었던 것 같다.
  • 원인은 WebSocket 라우팅이 충돌하거나 제대로 분리되지 않아서 발생하는 것이었다 - 꽤나 자존심 상하는 에러ㅋㅋ
    • FastAPI, Starlette, Django Channels 등 대부분의 Python 기반 웹 프레임워크들은 라우트 매칭 순서와 방식에 따라 URL 경로를 해석한다. /ws/voice/{session_id}는 ws/voice/ 이후에 오는 모든 문자열을 변수 session_id로 인식하기 때문에, /ws/voice/signaling 요청도 session_id="signaling"로 잘못 매칭됐던 것이었다 (진심 황재령 이것도 모르고 지금까지 플젝들을 어케 한거야 .... 너무 창피해)
    • {session_id}가 너무 포괄적으로 매칭됨 ^^!!!!
    • 쉽게 말해,
      @app.websocket("/ws/voice/{session_id}")
      async def voice_ws(session_id: str):
          ...
      이건 /ws/voice/abcd1234 → 정상적으로 voice_ws()로 감
      @app.websocket("/ws/voice/signaling")
      async def signaling_ws():
          ...
      이땐, /ws/voice/signaling → session_id="signaling"로 인식되어 voice_ws()로 잘못 감 ㅎ
    • 즉, 라우트를 명확하게 분리하거나, 정적 경로를 먼저 등록하면 해결된다 → 나는 라우트 분리를 했다 (창피......)

3) 세션 관리에서는 중복 세션 방지와 기존 세션 재사용이 중요

  • 방 코드로 기존 세션을 먼저 확인하고, 없을 때만 새로 생성

4) P2P vs 서버 중계 : 음성 데이터는 P2P, 시그널링은 서버 중계 ~

 

5) 같은 사용자가 여러번 WebSocket에 접속하려고 할 때 매니저에서 중복 접속 체크 로직이 없었음 ;

  • 웹소켓은 HTTP 연결과 달리 연결 상태를 유지하고 있기 때문에, 같은 사용자의 중복 접속을 관리해야 함
# app/core/websocket_manager.py
class WebSocketManager:
    def __init__(self):
        self.active_connections: Dict[str, Set[WebSocket]] = {}
        self.connection_info: Dict[WebSocket, dict] = {}
        # 추가: 사용자별 연결 추적
        self.user_connections: Dict[str, WebSocket] = {}
    
    async def connect(self, websocket: WebSocket, session_id: str, user_info: dict):
        user_id = user_info.get("user_id") or user_info.get("guest_id")
        
        # 중복 접속 체크
        if user_id in self.user_connections:
            existing_ws = self.user_connections[user_id]
            if existing_ws in self.active_connections.get(session_id, set()):
                # 기존 연결 해제
                await self.disconnect(existing_ws)
        
        # 새 연결 등록
        if session_id not in self.active_connections:
            self.active_connections[session_id] = set()
        
        self.active_connections[session_id].add(websocket)
        self.connection_info[websocket] = {
            "session_id": session_id,
            "user_info": user_info
        }
        self.user_connections[user_id] = websocket

 

6) Nginx WebSocket 프록시 설정

# nginx_websocket_config.conf
location /ws/ {
    proxy_pass http://backend:8000;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    
    # WebSocket 타임아웃 설정 (요기 !!)
    proxy_read_timeout 86400;
    proxy_send_timeout 86400;
    proxy_connect_timeout 60;
    
    # 버퍼 설정
    proxy_buffering off;
    proxy_cache off;
}
  • 웹소켓은 지속 연결이니까 타임아웃을 길게 설정하는 게 좋다 (이건 웹의 기능에 따라 판단하면 됨)

7) 뭐 사소한 거지만 accept 순서는 먼저 !

# 잘못된 순서
async def websocket_endpoint(websocket: WebSocket):
    data = await websocket.receive_text()  # 먼저 메시지 수신
    await websocket.accept()  # 나중에 accept

# 올바른 순서 << !
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()  # 먼저 accept
    data = await websocket.receive_text()  # 나중에 메시지 수신

 

8) 연결 해제 시 메모리 누수

# 연결 해제 시 정리 x
except WebSocketDisconnect:
    pass

# 연결 해제 시 정리
except WebSocketDisconnect:
    await websocket_manager.disconnect(websocket, session_id)

 

 

점점 추가할게욥