🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

LangGraph 프로덕션 베스트 프랙티스 완벽 가이드 - 슬라이드 1/7
A

AI Generated

2025. 12. 12. · 49 Views

LangGraph 프로덕션 베스트 프랙티스 완벽 가이드

LangGraph를 실무 환경에 안전하게 배포하기 위한 필수 지식을 담았습니다. PostgresSaver 설정부터 암호화, 모니터링, 성능 최적화까지 초급 개발자도 쉽게 따라할 수 있도록 실무 상황을 스토리로 풀어냈습니다.


목차

  1. PostgresSaver 필수 설정
  2. 직렬화와 암호화
  3. LangSmith 모니터링
  4. 성능 최적화 전략
  5. 에러 핸들링 패턴
  6. 스케일링 고려사항

1. PostgresSaver 필수 설정

어느 날 김개발 씨가 LangGraph 챗봇을 만들어서 친구들에게 자랑했습니다. 그런데 서버를 재시작하니 모든 대화 기록이 사라져버렸습니다.

당황한 김개발 씨는 선배 박시니어 씨에게 도움을 요청했습니다.

PostgresSaver는 LangGraph의 상태를 데이터베이스에 영구 저장하는 체크포인터입니다. 마치 게임을 할 때 세이브 포인트를 만드는 것처럼, 대화의 각 단계를 데이터베이스에 저장합니다.

서버가 재시작되어도 대화를 이어갈 수 있고, 특정 시점으로 돌아갈 수도 있습니다.

다음 코드를 살펴봅시다.

from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import Connection

# PostgreSQL 연결 설정
connection_string = "postgresql://user:password@localhost:5432/langraph_db"

# 동기 방식 체크포인터 생성
with Connection.connect(connection_string) as conn:
    checkpointer = PostgresSaver(conn)
    # setup() 메서드로 필요한 테이블 자동 생성
    checkpointer.setup()

    # StateGraph와 연결
    graph = StateGraph(state_schema=MyState)
    # ... 노드 추가 ...
    app = graph.compile(checkpointer=checkpointer)

김개발 씨는 입사 2개월 차 AI 개발자입니다. 회사에서 첫 프로젝트로 고객 상담 챗봇을 만들라는 미션을 받았습니다.

LangGraph를 열심히 공부해서 멋진 챗봇을 완성했습니다. 그런데 문제가 생겼습니다.

서버를 재시작할 때마다 모든 대화 기록이 날아가버렸습니다. 고객이 "아까 말한 주문 건은요?"라고 물어봐도 챗봇은 아무것도 기억하지 못했습니다.

박시니어 씨가 김개발 씨의 코드를 살펴봤습니다. "아, 여기가 문제네요.

체크포인터를 설정하지 않았네요." 그렇다면 체크포인터란 정확히 무엇일까요? 쉽게 비유하자면, 체크포인터는 마치 RPG 게임의 세이브 시스템과 같습니다.

게임을 하다가 중요한 지점마다 저장을 해두면, 나중에 게임을 껐다 켜도 그 지점부터 이어서 할 수 있습니다. LangGraph의 체크포인터도 대화의 각 단계를 저장해서 나중에 이어갈 수 있게 해줍니다.

체크포인터가 없던 시절에는 어땠을까요? 개발자들은 상태 저장 로직을 직접 작성해야 했습니다.

"이 변수는 언제 저장하지?", "어떤 형식으로 저장하지?" 같은 고민을 끊임없이 했습니다. 더 큰 문제는 복잡한 대화 흐름에서 어느 시점을 저장해야 할지 결정하기 어려웠다는 점입니다.

바로 이런 문제를 해결하기 위해 PostgresSaver가 등장했습니다. PostgresSaver를 사용하면 LangGraph가 자동으로 각 단계마다 상태를 PostgreSQL 데이터베이스에 저장합니다.

개발자는 저장 로직을 신경 쓸 필요가 없습니다. 또한 데이터베이스에 저장되기 때문에 여러 서버에서 동시에 접근할 수 있습니다.

위의 코드를 한 줄씩 살펴보겠습니다. 먼저 connection_string에서 데이터베이스 접속 정보를 지정합니다.

실무에서는 환경 변수로 관리하는 것이 안전합니다. Connection.connect()로 데이터베이스에 연결하고, PostgresSaver 객체를 생성합니다.

핵심은 setup() 메서드입니다. 이 메서드가 체크포인트를 저장할 테이블을 자동으로 생성해줍니다.

마지막으로 graph.compile() 메서드에 checkpointer를 전달하면 모든 설정이 완료됩니다. 이제 그래프가 실행될 때마다 자동으로 상태가 저장됩니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 은행 챗봇 서비스를 개발한다고 가정해봅시다.

고객이 "계좌 이체를 하고 싶어요"라고 말하면, 챗봇은 여러 단계를 거쳐 이체를 진행합니다. 각 단계마다 PostgresSaver가 상태를 저장하므로, 네트워크가 끊겼다가 다시 연결되어도 처음부터 다시 시작할 필요가 없습니다.

네이버, 카카오 같은 대형 플랫폼도 이런 패턴을 적극적으로 사용합니다. 수백만 명의 사용자가 동시에 대화해도 각자의 상태가 안전하게 저장됩니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 setup() 메서드를 빠뜨리는 것입니다.

setup()을 호출하지 않으면 필요한 테이블이 생성되지 않아 에러가 발생합니다. 또한 Connection 객체는 with 문을 사용해서 자동으로 닫히도록 관리해야 합니다.

또 다른 실수는 비동기 코드에서 동기 방식 PostgresSaver를 사용하는 경우입니다. 비동기 환경에서는 반드시 AsyncPostgresSaver를 사용해야 합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 도움을 받아 PostgresSaver를 설정한 김개발 씨는 다시 테스트를 해봤습니다.

서버를 재시작해도 대화가 이어졌습니다. "와, 진짜 되네요!" PostgresSaver를 제대로 설정하면 프로덕션 환경에서 안정적으로 서비스를 운영할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - setup() 메서드는 최초 1회만 실행하면 되며, 이미 테이블이 있으면 스킵됩니다

  • 실무에서는 연결 정보를 환경 변수로 관리하세요 (os.getenv 활용)
  • 비동기 환경에서는 AsyncPostgresSaver와 AsyncConnection을 사용하세요

2. 직렬화와 암호화

김개발 씨의 챗봇이 인기를 끌면서 민감한 개인정보를 다루게 되었습니다. 어느 날 보안팀에서 연락이 왔습니다.

"데이터베이스에 저장되는 대화 내용이 평문으로 보이는데, 이거 괜찮은가요?" 김개발 씨는 식은땀을 흘리며 다시 박시니어 씨를 찾아갔습니다.

직렬화는 Python 객체를 저장 가능한 형태로 변환하는 과정입니다. LangGraph는 기본적으로 pickle을 사용하지만, 보안을 위해 커스텀 직렬화를 구현할 수 있습니다.

암호화를 추가하면 데이터베이스에 저장되는 상태를 안전하게 보호할 수 있습니다.

다음 코드를 살펴봅시다.

from cryptography.fernet import Fernet
from langgraph.checkpoint.postgres import PostgresSaver
import pickle

class EncryptedSaver(PostgresSaver):
    def __init__(self, conn, encryption_key):
        super().__init__(conn)
        self.cipher = Fernet(encryption_key)

    def serialize(self, obj):
        # pickle로 직렬화 후 암호화
        pickled = pickle.dumps(obj)
        encrypted = self.cipher.encrypt(pickled)
        return encrypted

    def deserialize(self, data):
        # 복호화 후 역직렬화
        decrypted = self.cipher.decrypt(data)
        return pickle.loads(decrypted)

# 암호화 키 생성 (안전한 곳에 보관)
key = Fernet.generate_key()
checkpointer = EncryptedSaver(conn, key)

김개발 씨는 보안팀의 지적을 받고 당황했습니다. 데이터베이스를 열어보니 고객의 주민번호, 계좌번호 같은 민감한 정보가 그대로 보였습니다.

"이건 큰일인데..." 박시니어 씨가 설명을 시작했습니다. "LangGraph는 기본적으로 pickle을 사용해서 상태를 저장해요.

pickle은 빠르지만 암호화는 지원하지 않죠." 그렇다면 직렬화란 정확히 무엇일까요? 쉽게 비유하자면, 직렬화는 마치 이사할 때 짐을 박스에 포장하는 것과 같습니다.

Python 객체라는 복잡한 가구를 데이터베이스에 넣으려면 바이트 형태로 변환해야 합니다. 나중에 꺼낼 때는 다시 원래 모습으로 조립합니다.

이 과정을 직렬화와 역직렬화라고 부릅니다. 직렬화만 사용하던 시절에는 어땠을까요?

개발자들은 상태를 저장할 때 보안을 별도로 고민해야 했습니다. "이 데이터는 암호화해야 하나?", "어떤 알고리즘을 쓰지?" 같은 질문에 답을 찾아야 했습니다.

더 큰 문제는 암호화 로직을 직접 구현하다 보면 실수로 취약점이 생길 수 있다는 점이었습니다. 바로 이런 문제를 해결하기 위해 커스텀 직렬화가 필요합니다.

PostgresSaver 클래스를 상속받아서 serialize와 deserialize 메서드를 오버라이드하면 됩니다. 이렇게 하면 저장 전에 자동으로 암호화하고, 불러올 때 자동으로 복호화할 수 있습니다.

암호화 로직이 한곳에 모여 있어서 관리도 쉽습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 EncryptedSaver 클래스는 PostgresSaver를 상속받습니다. init 메서드에서 암호화 키를 받아서 Fernet 암호화 객체를 생성합니다.

Fernet은 cryptography 라이브러리가 제공하는 안전한 대칭 암호화 방식입니다. serialize 메서드에서는 pickle.dumps로 객체를 바이트로 변환한 후, cipher.encrypt로 암호화합니다.

deserialize 메서드는 그 반대입니다. cipher.decrypt로 복호화한 후 pickle.loads로 원래 객체를 복원합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 의료 상담 챗봇을 개발한다고 가정해봅시다.

환자의 증상, 병력 같은 민감한 정보가 대화에 포함됩니다. 이런 정보는 HIPAA 같은 규정에 따라 반드시 암호화해야 합니다.

커스텀 직렬화를 사용하면 코드 한 줄 바꾸지 않고도 자동으로 모든 상태가 암호화됩니다. 금융권에서는 더욱 엄격합니다.

암호화 키를 HSM 같은 하드웨어 보안 모듈에 저장하고, 주기적으로 키를 교체합니다. 이런 요구사항도 커스텀 직렬화로 구현할 수 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 암호화 키를 코드에 하드코딩하는 것입니다.

키가 유출되면 암호화가 무용지물이 되므로, 반드시 환경 변수나 비밀 관리 서비스에 저장해야 합니다. AWS Secrets Manager, Azure Key Vault 같은 서비스를 활용하세요.

또 다른 실수는 pickle의 보안 취약점을 간과하는 것입니다. pickle은 신뢰할 수 없는 데이터를 역직렬화할 때 임의 코드 실행 위험이 있습니다.

더 안전한 방법이 필요하다면 JSON이나 msgpack 같은 형식을 고려하세요. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 도움으로 암호화를 적용한 김개발 씨는 보안팀에 다시 보고했습니다. "이제 데이터베이스에 평문이 보이지 않네요.

잘했어요!" 커스텀 직렬화와 암호화를 제대로 구현하면 규정 준수는 물론 고객의 신뢰도 얻을 수 있습니다. 여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - 암호화 키는 절대 코드에 포함하지 말고 환경 변수나 비밀 관리 서비스를 사용하세요

  • pickle 대신 JSON이나 msgpack을 사용하면 더 안전합니다
  • 키 교체 계획을 미리 세워두면 나중에 편합니다

3. LangSmith 모니터링

김개발 씨의 챗봇이 드디어 정식 서비스를 시작했습니다. 그런데 며칠 후 고객센터에서 불만이 쏟아졌습니다.

"챗봇이 이상한 답변을 해요!", "응답이 너무 느려요!" 김개발 씨는 무엇이 문제인지 전혀 알 수 없었습니다. 로그를 봐도 힌트가 없었습니다.

LangSmith는 LangChain과 LangGraph 애플리케이션을 모니터링하고 디버깅하는 플랫폼입니다. 마치 항공기의 블랙박스처럼 모든 실행 과정을 기록해서, 문제가 생겼을 때 정확히 어디서 무엇이 잘못되었는지 추적할 수 있습니다.

다음 코드를 살펴봅시다.

import os
from langsmith import Client

# LangSmith 설정
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-chatbot"

# 커스텀 메타데이터 추가
from langchain_core.tracers.context import tracing_v2_enabled

with tracing_v2_enabled(
    project_name="production-chatbot",
    tags=["version:2.0", "environment:prod"],
    metadata={"customer_id": "12345", "session_type": "mobile"}
):
    # 여기서 그래프 실행
    result = app.invoke({"messages": [user_message]})

김개발 씨는 고객 불만을 해결하기 위해 밤을 새워 로그를 분석했습니다. 하지만 일반 로그로는 LLM이 어떤 프롬프트를 받았는지, 얼마나 오래 걸렸는지, 어떤 도구를 호출했는지 알 수 없었습니다.

박시니어 씨가 한숨을 쉬며 말했습니다. "LangSmith를 왜 안 켜놨어요?

이런 상황을 위해 있는 건데..." 그렇다면 LangSmith란 정확히 무엇일까요? 쉽게 비유하자면, LangSmith는 마치 CCTV와 블랙박스를 합쳐놓은 것과 같습니다.

챗봇이 실행되는 모든 순간을 영상처럼 기록해서, 나중에 돌려보며 "아, 여기서 이렇게 동작했구나"를 확인할 수 있습니다. 단순한 로그와 달리 프롬프트, 응답, 실행 시간, 토큰 사용량까지 모든 정보가 구조화되어 저장됩니다.

모니터링 도구가 없던 시절에는 어땠을까요? 개발자들은 print문을 곳곳에 넣어서 디버깅했습니다.

"여기까지 왔네", "이 변수는 이 값이네" 같은 메시지를 출력하며 문제를 추적했습니다. 프로덕션 환경에서는 더 막막했습니다.

고객이 겪은 문제를 재현하려면 똑같은 조건을 만들어야 했는데, 그게 거의 불가능했습니다. 바로 이런 문제를 해결하기 위해 LangSmith가 등장했습니다.

LangSmith를 활성화하면 모든 LangChain과 LangGraph 호출이 자동으로 기록됩니다. 대시보드에서 시각적으로 실행 흐름을 볼 수 있고, 느린 부분을 쉽게 찾을 수 있습니다.

또한 특정 실행을 재현하거나 테스트 케이스로 만들 수도 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 환경 변수로 LangSmith를 활성화합니다. LANGCHAIN_TRACING_V2를 "true"로 설정하면 모든 추적이 시작됩니다.

LANGCHAIN_API_KEY는 LangSmith 웹사이트에서 발급받은 키를 입력합니다. LANGCHAIN_PROJECT는 프로젝트 이름입니다.

여러 프로젝트를 운영한다면 각각 다른 이름을 사용해서 추적 데이터를 분리할 수 있습니다. tracing_v2_enabled 컨텍스트 매니저를 사용하면 더 세밀하게 제어할 수 있습니다.

tags로 버전이나 환경을 표시하고, metadata로 고객 ID 같은 추가 정보를 기록합니다. 나중에 "이 고객이 겪은 문제만 필터링해서 보자"라고 할 때 유용합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 여행 예약 챗봇을 운영한다고 가정해봅시다.

고객이 "항공편 검색이 안 돼요"라고 불만을 제기했습니다. LangSmith에서 해당 고객의 세션을 찾아보니, 호텔 검색 API는 0.3초만에 응답했지만 항공편 API는 15초나 걸렸습니다.

원인을 파악해서 API 타임아웃을 조정하고 문제를 해결했습니다. 스타트업부터 대기업까지 LangSmith를 적극 활용합니다.

A/B 테스트를 할 때도 유용합니다. "새 프롬프트가 기존보다 나은가?"를 실제 데이터로 비교할 수 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 민감한 정보를 그대로 추적하는 것입니다.

LangSmith는 기본적으로 모든 입출력을 기록하므로, 고객의 개인정보가 포함될 수 있습니다. 필요하다면 특정 필드를 마스킹하거나 추적에서 제외해야 합니다.

또 다른 실수는 무료 플랜의 한계를 모르는 것입니다. LangSmith 무료 플랜은 추적 횟수에 제한이 있습니다.

트래픽이 많은 서비스라면 유료 플랜을 고려하거나, 샘플링을 적용해야 합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 조언대로 LangSmith를 켠 김개발 씨는 몇 분 만에 문제를 찾았습니다. 특정 LLM 호출이 타임아웃되고 있었습니다.

"와, 이렇게 쉽게 찾을 수 있다니!" LangSmith를 제대로 활용하면 장애 대응 시간을 획기적으로 줄일 수 있습니다. 여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - 프로덕션에서는 샘플링을 적용해서 비용을 절감하세요 (예: 10% 추적)

  • 민감한 정보는 마스킹 처리하거나 추적에서 제외하세요
  • 대시보드의 "Playground" 기능으로 실패한 실행을 즉시 재현할 수 있습니다

4. 성능 최적화 전략

김개발 씨의 챗봇은 인기를 얻으면서 동시 접속자가 늘어났습니다. 그런데 응답 속도가 점점 느려지기 시작했습니다.

1초였던 응답 시간이 5초, 10초로 늘어났습니다. 박시니어 씨가 성능 최적화를 해야 한다고 조언했습니다.

성능 최적화는 LangGraph 애플리케이션의 응답 속도와 처리량을 개선하는 작업입니다. 데이터베이스 연결 풀링, 체크포인트 압축, 비동기 처리 같은 기법을 활용하면 같은 자원으로 더 많은 요청을 처리할 수 있습니다.

다음 코드를 살펴봅시다.

from psycopg_pool import AsyncConnectionPool
from langgraph.checkpoint.postgres import AsyncPostgresSaver
import asyncio

# 연결 풀 생성 (재사용으로 성능 향상)
pool = AsyncConnectionPool(
    conninfo="postgresql://user:password@localhost:5432/db",
    min_size=5,      # 최소 연결 수
    max_size=20,     # 최대 연결 수
    timeout=30       # 대기 타임아웃
)

async def optimize_graph():
    async with pool.connection() as conn:
        # 비동기 체크포인터 사용
        checkpointer = AsyncPostgresSaver(conn)
        await checkpointer.setup()

        # 병렬 처리로 응답 속도 향상
        graph = StateGraph(MyState)
        # ... 노드 추가 ...
        app = graph.compile(checkpointer=checkpointer)

        # 여러 요청을 동시에 처리
        tasks = [app.ainvoke({"messages": [msg]}) for msg in messages]
        results = await asyncio.gather(*tasks)
        return results

김개발 씨는 서버 모니터링 화면을 보며 한숨을 쉬었습니다. CPU 사용률은 20%밖에 안 되는데 응답이 느렸습니다.

"뭔가 잘못됐는데..." 박시니어 씨가 코드를 살펴보더니 고개를 저었습니다. "아, 매번 새로운 데이터베이스 연결을 만들고 있네요.

이것 때문에 느린 거예요." 그렇다면 연결 풀링이란 정확히 무엇일까요? 쉽게 비유하자면, 연결 풀링은 마치 택시 승강장과 같습니다.

손님이 올 때마다 새 택시를 공장에서 만들어 오는 대신, 승강장에 택시를 여러 대 대기시켜 놓습니다. 손님이 오면 바로 태우고, 내리면 다시 승강장으로 돌아옵니다.

데이터베이스 연결도 마찬가지입니다. 미리 연결을 만들어 두고 재사용하면 훨씬 빠릅니다.

연결 풀이 없던 시절에는 어땠을까요? 개발자들은 요청마다 새로운 데이터베이스 연결을 만들었습니다.

연결을 만드는 데만 수백 밀리초가 걸렸습니다. 동시 요청이 많으면 데이터베이스 서버가 연결 요청을 거부하기도 했습니다.

더 큰 문제는 연결을 제대로 닫지 않으면 메모리 누수가 생긴다는 점이었습니다. 바로 이런 문제를 해결하기 위해 연결 풀이 필수입니다.

AsyncConnectionPool을 사용하면 미리 정해진 수만큼 연결을 만들어 놓습니다. 요청이 오면 풀에서 연결을 빌려주고, 작업이 끝나면 반환받습니다.

연결을 만드는 오버헤드가 사라지고, 데이터베이스 부하도 줄어듭니다. 비동기 처리도 중요합니다.

동기 방식에서는 한 요청을 처리하는 동안 다른 요청이 기다려야 합니다. 비동기 방식에서는 I/O 작업을 기다리는 동안 다른 요청을 처리할 수 있습니다.

같은 CPU로 훨씬 많은 요청을 처리할 수 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 AsyncConnectionPool을 생성합니다. min_size는 항상 유지할 최소 연결 수이고, max_size는 최대 연결 수입니다.

트래픽이 적을 때는 5개만 유지하다가, 많아지면 20개까지 늘어납니다. pool.connection()으로 풀에서 연결을 빌립니다.

async with 블록이 끝나면 자동으로 반환되므로 안전합니다. AsyncPostgresSaver는 비동기 체크포인터입니다.

await checkpointer.setup()으로 테이블을 준비합니다. asyncio.gather는 여러 비동기 작업을 동시에 실행합니다.

10개 요청이 들어오면 순차적으로 처리하는 대신, 동시에 처리해서 응답 시간을 획기적으로 줄입니다. 실제 현업에서는 어떻게 활용할까요?

예를 들어 뉴스 요약 서비스를 개발한다고 가정해봅시다. 사용자가 10개 기사를 선택하면 각각 요약해야 합니다.

동기 방식으로 순차 처리하면 10초가 걸립니다. 비동기로 병렬 처리하면 2초로 줄어듭니다.

사용자 경험이 완전히 달라집니다. 쿠팡, 배달의민족 같은 대규모 서비스는 더 복잡한 최적화를 합니다.

캐싱 레이어를 추가하고, 읽기 전용 복제본을 활용하고, 샤딩으로 데이터베이스를 분산합니다. 하지만 기본은 연결 풀과 비동기 처리입니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 풀 크기를 너무 크게 설정하는 것입니다.

연결도 메모리를 사용하므로, 무조건 크다고 좋은 게 아닙니다. 데이터베이스 서버의 최대 연결 수도 고려해야 합니다.

PostgreSQL 기본값은 100개인데, 여러 서버가 각각 100개씩 연결하면 문제가 생깁니다. 또 다른 실수는 동기와 비동기를 섞어 쓰는 것입니다.

비동기 코드에서 동기 함수를 호출하면 블로킹이 발생해서 성능이 오히려 나빠집니다. 전체 스택을 비동기로 통일해야 합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 조언대로 연결 풀과 비동기를 적용한 김개발 씨는 성능 테스트를 다시 해봤습니다.

응답 시간이 10초에서 1초로 줄었습니다. "우와, 열 배나 빨라졌어요!" 성능 최적화를 제대로 하면 서버 비용을 줄이고 사용자 만족도를 높일 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - 연결 풀 크기는 실제 부하 테스트로 결정하세요 (보통 CPU 코어 수의 2-4배)

  • 비동기 코드는 전체 스택을 비동기로 통일해야 효과가 있습니다
  • LangSmith로 병목 지점을 찾은 후 선택적으로 최적화하세요

5. 에러 핸들링 패턴

김개발 씨의 챗봇이 어느 날 갑자기 멈췄습니다. 로그를 보니 "DatabaseError: connection timeout"이라는 에러만 수천 개 쌓여 있었습니다.

서비스는 다운되었고, 고객들은 불만을 쏟아냈습니다. 박시니어 씨가 한숨을 쉬며 말했습니다.

"에러 처리를 제대로 안 했네요."

에러 핸들링은 예외 상황에서 서비스가 안전하게 동작하도록 만드는 기법입니다. 재시도 로직, 폴백 처리, 서킷 브레이커 같은 패턴을 적용하면 일시적인 장애에도 서비스를 유지할 수 있습니다.

다음 코드를 살펴봅시다.

from tenacity import retry, stop_after_attempt, wait_exponential
from langgraph.errors import GraphRecursionError
import logging

logger = logging.getLogger(__name__)

# 재시도 로직 (지수 백오프)
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    reraise=True
)
async def safe_invoke(app, input_data, config):
    try:
        return await app.ainvoke(input_data, config)
    except GraphRecursionError as e:
        # 무한 루프 방지
        logger.error(f"Recursion limit reached: {e}")
        return {"error": "대화가 너무 길어졌습니다. 새로 시작해주세요."}
    except TimeoutError as e:
        # 타임아웃 처리
        logger.error(f"Timeout: {e}")
        return {"error": "응답 시간이 초과되었습니다. 다시 시도해주세요."}
    except Exception as e:
        # 일반 에러 처리
        logger.exception(f"Unexpected error: {e}")
        return {"error": "일시적인 문제가 발생했습니다. 잠시 후 다시 시도해주세요."}

김개발 씨는 장애 보고서를 작성하며 땀을 뻘뻘 흘렸습니다. "데이터베이스가 1분만 멈췄는데 왜 서비스가 완전히 다운됐을까요?" 박시니어 씨가 설명을 시작했습니다.

"에러가 발생했을 때 아무 처리도 안 하면 그대로 터져버려요. 재시도 로직도 없고, 폴백 처리도 없으니까요." 그렇다면 에러 핸들링이란 정확히 무엇일까요?

쉽게 비유하자면, 에러 핸들링은 마치 비상구와 안전벨트 같은 것입니다. 평소에는 필요 없지만, 문제가 생겼을 때 서비스를 지켜줍니다.

차가 미끄러질 때 안전벨트가 우리를 보호하듯, 에러 핸들링은 일시적인 장애에서 서비스를 보호합니다. 에러 핸들링이 없던 시절에는 어땠을까요?

개발자들은 에러가 나면 서비스를 재시작하는 수밖에 없었습니다. "데이터베이스가 잠깐 느려졌네?

재시작!" 이런 식이었습니다. 사용자는 갑자기 접속이 끊기고, 진행하던 작업을 잃어버렸습니다.

더 큰 문제는 재시작 중에도 요청이 계속 들어와서 에러가 쌓인다는 점이었습니다. 바로 이런 문제를 해결하기 위해 재시도 로직이 필수입니다.

tenacity 라이브러리의 retry 데코레이터를 사용하면 실패한 작업을 자동으로 재시도합니다. 지수 백오프 전략을 사용하면 재시도 간격이 점점 늘어나서 서버 부하를 줄입니다.

첫 번째 재시도는 2초 후, 두 번째는 4초 후, 세 번째는 8초 후 이런 식입니다. 폴백 처리도 중요합니다.

재시도를 다 해도 실패하면 사용자에게 친절한 에러 메시지를 보여줘야 합니다. "서버가 터졌습니다" 같은 메시지 대신 "일시적인 문제가 발생했습니다.

잠시 후 다시 시도해주세요"라고 안내합니다. 위의 코드를 한 줄씩 살펴보겠습니다.

retry 데코레이터는 최대 3번 재시도합니다. wait_exponential은 지수 백오프 전략을 지정합니다.

multiplier=1, min=2, max=10은 첫 대기 시간이 2초, 최대 10초까지 늘어난다는 뜻입니다. GraphRecursionError는 대화가 너무 길어져서 재귀 한계에 도달했을 때 발생합니다.

이럴 때는 재시도해도 소용없으므로 사용자에게 새로 시작하라고 안내합니다. TimeoutError는 응답이 너무 오래 걸릴 때 발생합니다.

LLM 호출이 멈췄을 수도 있고, 네트워크가 느릴 수도 있습니다. 재시도하면 성공할 가능성이 있습니다.

마지막 except Exception은 예상하지 못한 모든 에러를 잡습니다. logger.exception으로 전체 스택 트레이스를 기록해서 나중에 분석할 수 있습니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 번역 서비스를 개발한다고 가정해봅시다.

사용자가 문서를 업로드하면 LLM이 번역합니다. 가끔 LLM API가 일시적으로 느려질 때가 있습니다.

재시도 로직이 있으면 몇 초 기다렸다가 자동으로 다시 시도해서 성공합니다. 사용자는 문제가 있었는지도 모릅니다.

넷플릭스, 아마존 같은 대규모 서비스는 더 정교합니다. 서킷 브레이커 패턴을 사용해서 장애가 전파되는 것을 막습니다.

특정 API가 계속 실패하면 일정 시간 동안 호출을 차단하고, 폴백 응답을 반환합니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 모든 에러를 무조건 재시도하는 것입니다. 인증 에러나 잘못된 입력 에러는 재시도해도 계속 실패합니다.

재시도할 가치가 있는 에러만 선택적으로 재시도해야 합니다. 또 다른 실수는 재시도 간격을 너무 짧게 설정하는 것입니다.

0.1초마다 재시도하면 오히려 서버 부하를 증가시킵니다. 지수 백오프로 점진적으로 늘려야 합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 도움으로 에러 핸들링을 추가한 김개발 씨는 다시 부하 테스트를 해봤습니다.

데이터베이스를 일부러 멈췄다가 다시 켰는데도 서비스가 정상 동작했습니다. "와, 이제 안심이네요!" 에러 핸들링을 제대로 구현하면 장애를 예방하고 복구 시간을 단축할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - 재시도할 에러와 즉시 실패할 에러를 명확히 구분하세요

  • 재시도 횟수와 간격은 서비스 특성에 맞게 조정하세요
  • 모든 에러는 로깅해서 나중에 패턴 분석에 활용하세요

6. 스케일링 고려사항

김개발 씨의 챗봇이 TV에 소개되면서 갑자기 사용자가 폭증했습니다. 동시 접속자가 100명에서 10000명으로 늘었습니다.

서버는 버티지 못하고 응답이 멈췄습니다. 박시니어 씨가 급하게 달려왔습니다.

"스케일링 준비를 안 했네요. 빨리 조치해야 해요!"

스케일링은 트래픽 증가에 대응해서 시스템을 확장하는 작업입니다. 수평적 확장을 위한 stateless 설계, 분산 체크포인터, 로드 밸런싱 같은 전략을 적용하면 수천 명의 동시 사용자를 감당할 수 있습니다.

다음 코드를 살펴봅시다.

from langgraph.checkpoint.postgres import AsyncPostgresSaver
from redis import asyncio as aioredis
import os

# Redis 캐시 레이어 추가 (읽기 성능 향상)
redis_client = await aioredis.from_url(
    os.getenv("REDIS_URL"),
    encoding="utf-8",
    decode_responses=False
)

class CachedPostgresSaver(AsyncPostgresSaver):
    def __init__(self, conn, redis_client):
        super().__init__(conn)
        self.redis = redis_client

    async def get_tuple(self, config):
        # Redis 캐시 먼저 확인
        cache_key = f"checkpoint:{config['thread_id']}"
        cached = await self.redis.get(cache_key)
        if cached:
            return self.deserialize(cached)

        # 캐시 미스 시 DB 조회
        result = await super().get_tuple(config)
        # 결과를 캐시에 저장 (TTL 1시간)
        await self.redis.setex(cache_key, 3600, self.serialize(result))
        return result

# 환경 변수로 스케일 설정 분리
MAX_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "20"))
REDIS_ENABLED = os.getenv("REDIS_ENABLED", "true") == "true"

김개발 씨는 급증하는 에러 알림을 보며 패닉 상태였습니다. CPU는 100%를 찍었고, 메모리는 부족했고, 데이터베이스는 연결을 거부했습니다.

박시니어 씨가 상황을 파악했습니다. "서버를 한 대 더 띄워야 해요.

그런데 코드가 stateful하게 작성되어 있어서 문제네요." 그렇다면 스케일링이란 정확히 무엇일까요? 쉽게 비유하자면, 스케일링은 마치 식당에서 손님이 많을 때 테이블을 추가하는 것과 같습니다.

수직적 확장은 큰 테이블 하나를 더 크게 만드는 것이고, 수평적 확장은 작은 테이블을 여러 개 추가하는 것입니다. 웹 서비스에서는 보통 수평적 확장이 더 효과적입니다.

스케일링 준비가 안 된 시절에는 어땠을까요? 개발자들은 트래픽이 늘면 서버 스펙을 올렸습니다.

메모리 8GB를 16GB로, CPU 4코어를 8코어로 업그레이드했습니다. 하지만 한계가 있었습니다.

더 큰 서버는 가격이 기하급수적으로 비쌌고, 단일 서버는 장애 시 전체 서비스가 다운되는 단일 장애점 문제가 있었습니다. 바로 이런 문제를 해결하기 위해 수평적 확장이 필수입니다.

여러 서버를 띄우고 로드 밸런서로 트래픽을 분산하면, 트래픽이 늘어도 서버를 추가하기만 하면 됩니다. 한 서버가 다운되어도 다른 서버가 처리합니다.

하지만 이를 위해서는 각 서버가 독립적으로 동작해야 합니다. stateless 설계가 핵심입니다.

세션 정보를 서버 메모리에 저장하면 다른 서버는 모릅니다. 대신 PostgreSQL이나 Redis 같은 외부 저장소를 사용하면 모든 서버가 같은 정보를 공유합니다.

위의 코드를 한 줄씩 살펴보겠습니다. Redis는 인메모리 데이터베이스로 매우 빠릅니다.

자주 조회되는 체크포인트를 Redis에 캐싱하면 데이터베이스 부하를 크게 줄일 수 있습니다. CachedPostgresSaver는 PostgresSaver를 상속받아서 get_tuple 메서드를 오버라이드합니다.

먼저 Redis에서 캐시를 확인하고, 없으면 데이터베이스에서 조회한 후 캐시에 저장합니다. cache_key는 thread_id를 기반으로 만들어집니다.

setex 메서드는 TTL을 설정해서 1시간 후 자동으로 삭제됩니다. 오래된 캐시가 쌓이는 것을 방지합니다.

환경 변수로 설정을 분리하는 것도 중요합니다. 개발 환경에서는 연결 풀 크기를 5로, 프로덕션에서는 20으로 설정할 수 있습니다.

Redis를 켜고 끌 수도 있습니다. 실제 현업에서는 어떻게 활용할까요?

예를 들어 이미지 생성 서비스를 개발한다고 가정해봅시다. 평소에는 서버 2대로 충분한데, 이벤트 기간에는 트래픽이 10배 늘어납니다.

쿠버네티스 같은 오케스트레이션 도구로 자동 스케일링을 설정하면, 트래픽이 증가하면 자동으로 서버를 추가하고, 감소하면 줄입니다. 읽기 부하가 높다면 PostgreSQL의 읽기 전용 복제본을 활용합니다.

쓰기는 마스터에, 읽기는 복제본에 분산해서 부하를 나눕니다. 쓰기 부하가 높다면 샤딩을 고려합니다.

thread_id를 기준으로 여러 데이터베이스에 분산 저장합니다. 글로벌 서비스라면 지역별로 서버를 배포합니다.

한국 사용자는 서울 리전, 미국 사용자는 버지니아 리전에 접속해서 레이턴시를 줄입니다. CloudFront 같은 CDN도 활용합니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 처음부터 과도하게 설계하는 것입니다.

"언젠가 사용자가 100만 명이 될 거야"라며 복잡한 아키텍처를 만들지만, 실제로는 사용자가 100명도 안 됩니다. 필요할 때 확장하는 것이 더 효율적입니다.

또 다른 실수는 캐시 무효화를 제대로 안 하는 것입니다. 데이터가 업데이트되었는데 캐시를 지우지 않으면 사용자는 오래된 정보를 보게 됩니다.

업데이트 시 해당 캐시를 삭제하는 로직이 필요합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

박시니어 씨의 도움으로 Redis 캐시를 추가하고 서버를 3대로 늘린 김개발 씨는 트래픽을 안정적으로 처리했습니다. "이제 사용자가 더 늘어나도 걱정없겠네요!" 스케일링을 제대로 준비하면 폭발적인 성장에도 대응할 수 있습니다.

여러분도 오늘 배운 내용을 실제 프로젝트에 적용해 보세요.

실전 팁

💡 - 처음부터 과도하게 설계하지 말고, 필요할 때 점진적으로 확장하세요

  • stateless 설계를 위해 세션과 상태는 외부 저장소를 사용하세요
  • 모니터링과 알림을 설정해서 스케일링이 필요한 시점을 미리 파악하세요

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#LangGraph#PostgresSaver#LangSmith#StateGraph#Checkpointer#AI,LLM,Python,LangGraph

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.

함께 보면 좋은 카드 뉴스