본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2026. 2. 1. · 7 Views
Orchestrator-Workers 패턴 완벽 가이드
AI 에이전트 시스템에서 복잡한 작업을 효율적으로 분산 처리하는 Orchestrator-Workers 패턴을 학습합니다. 대규모 문서 처리, 병렬 태스크 실행, 결과 병합까지 실무에 바로 적용할 수 있는 내용을 다룹니다.
목차
1. Orchestrator-Workers 아키텍처
어느 날 김개발 씨는 회사에서 긴급한 프로젝트를 맡게 되었습니다. 수만 건의 고객 문의를 AI로 분석해야 하는데, 하나씩 처리하면 며칠이 걸릴 것 같았습니다.
"이걸 어떻게 빠르게 처리하지?" 고민하던 중, 선배 박시니어 씨가 다가와 말했습니다. "Orchestrator-Workers 패턴을 써보는 건 어때요?"
Orchestrator-Workers 패턴은 한마디로 지휘자와 연주자의 관계와 같습니다. 오케스트라에서 지휘자가 악보를 나눠주고 연주자들이 각자 맡은 파트를 연주하듯, Orchestrator는 작업을 분배하고 Workers는 실제 처리를 담당합니다.
이 패턴을 제대로 이해하면 대규모 작업을 효율적으로 병렬 처리할 수 있습니다.
다음 코드를 살펴봅시다.
import asyncio
from typing import List, Dict, Any
class Orchestrator:
def __init__(self, workers: List['Worker']):
# 워커 목록을 관리합니다
self.workers = workers
self.task_queue = asyncio.Queue()
async def distribute_tasks(self, tasks: List[Dict]) -> List[Any]:
# 태스크를 워커들에게 분배합니다
results = []
worker_tasks = [worker.process(tasks[i::len(self.workers)])
for i, worker in enumerate(self.workers)]
# 모든 워커의 결과를 수집합니다
results = await asyncio.gather(*worker_tasks)
return self.merge_results(results)
def merge_results(self, results: List[List]) -> List:
# 분산 처리된 결과를 병합합니다
return [item for sublist in results for item in sublist]
김개발 씨는 입사 6개월 차 주니어 개발자입니다. AI 기반 고객 문의 분석 시스템을 담당하게 되었는데, 문제가 있었습니다.
하루에 들어오는 문의가 무려 10만 건이나 되는데, 단일 프로세스로는 처리 속도가 턱없이 부족했던 것입니다. "김개발 씨, 혹시 오케스트라 공연 본 적 있어요?" 박시니어 씨가 물었습니다.
갑작스러운 질문에 김개발 씨는 고개를 갸우뚱했습니다. 박시니어 씨가 설명을 이어갔습니다.
"오케스트라에서 지휘자는 직접 악기를 연주하지 않아요. 대신 전체 흐름을 조율하고, 각 파트에 신호를 보내죠.
바이올린, 첼로, 플루트 연주자들이 각자 맡은 부분을 연주하고, 지휘자는 그것을 하나의 음악으로 만들어내는 거예요." 바로 이것이 Orchestrator-Workers 패턴의 핵심입니다. Orchestrator는 지휘자처럼 전체 작업 흐름을 관리합니다.
어떤 작업을 누구에게 맡길지 결정하고, 진행 상황을 모니터링하며, 최종 결과를 취합합니다. 직접 연산을 수행하지 않고 조율에 집중하는 것이 특징입니다.
반면 Workers는 실제 연주자들입니다. Orchestrator로부터 할당받은 작업을 묵묵히 처리합니다.
각 Worker는 독립적으로 동작하며, 자신에게 주어진 태스크만 충실히 수행합니다. 서로의 작업에 간섭하지 않기 때문에 병렬 처리가 가능해집니다.
이 패턴이 없던 시절에는 어땠을까요? 개발자들은 모든 작업을 순차적으로 처리해야 했습니다.
10만 건의 문의를 분석하려면 1건당 1초가 걸린다고 해도 무려 27시간이 넘게 소요됩니다. 서버 자원은 남아돌아도 활용할 방법이 없었습니다.
위의 코드를 살펴보면, Orchestrator 클래스가 핵심 역할을 합니다. 생성자에서 워커 목록을 받아 관리하고, distribute_tasks 메서드에서 작업을 분배합니다.
특히 tasks[i::len(self.workers)] 부분은 파이썬의 슬라이싱을 활용해 태스크를 균등하게 나누는 기법입니다. asyncio.gather는 여러 비동기 작업을 동시에 실행하고 모든 결과가 완료될 때까지 기다립니다.
이를 통해 10개의 워커가 있다면 작업 시간을 이론적으로 10분의 1로 줄일 수 있습니다. 실제 현업에서 이 패턴은 다양하게 활용됩니다.
대규모 데이터 처리, 이미지 분석, 로그 파싱, 그리고 AI 에이전트 시스템에서 특히 빛을 발합니다. 넷플릭스는 수백만 건의 추천 연산을 이 패턴으로 처리하고, 구글은 검색 인덱싱에 유사한 방식을 사용합니다.
김개발 씨는 고개를 끄덕였습니다. "아, 그래서 대기업들이 대용량 데이터를 그렇게 빨리 처리할 수 있었군요!" 이제 그도 이 패턴을 자신의 프로젝트에 적용할 준비가 되었습니다.
실전 팁
💡 - Orchestrator는 가볍게 유지하고 실제 연산은 Workers에게 위임하세요
- Workers의 개수는 CPU 코어 수나 가용 메모리를 고려해서 결정하세요
- 각 Worker는 독립적으로 실패할 수 있으므로 에러 핸들링을 반드시 구현하세요
2. 태스크 분배 전략
김개발 씨는 Orchestrator-Workers 패턴의 기본 구조를 이해했습니다. 그런데 막상 구현하려고 보니 새로운 고민이 생겼습니다.
"작업을 어떻게 나눠야 할까요? 그냥 똑같이 나누면 되나요?" 박시니어 씨가 웃으며 답했습니다.
"피자 나눌 때도 전략이 필요하잖아요. 누군가는 많이 먹고, 누군가는 조금만 먹으니까요."
태스크 분배 전략은 작업을 워커들에게 어떻게 할당할지 결정하는 방법입니다. 마치 피자를 나눌 때 모두에게 같은 크기로 줄지, 아니면 배고픈 사람에게 더 줄지 결정하는 것과 같습니다.
올바른 분배 전략을 선택하면 시스템 전체의 효율성이 크게 향상됩니다.
다음 코드를 살펴봅시다.
from enum import Enum
from typing import List, Dict, Callable
import random
class DistributionStrategy(Enum):
ROUND_ROBIN = "round_robin" # 순차적으로 돌아가며 분배
WEIGHTED = "weighted" # 워커 성능에 따라 가중치 부여
CONTENT_BASED = "content_based" # 작업 내용에 따라 분배
class TaskDistributor:
def __init__(self, strategy: DistributionStrategy):
self.strategy = strategy
def distribute(self, tasks: List[Dict], workers: List) -> Dict[int, List]:
# 라운드 로빈: 순서대로 균등 분배
if self.strategy == DistributionStrategy.ROUND_ROBIN:
distribution = {i: [] for i in range(len(workers))}
for idx, task in enumerate(tasks):
worker_idx = idx % len(workers)
distribution[worker_idx].append(task)
return distribution
# 가중치 기반: 성능 좋은 워커에게 더 많이 할당
elif self.strategy == DistributionStrategy.WEIGHTED:
weights = [w.performance_score for w in workers]
return self._weighted_distribute(tasks, weights)
김개발 씨는 첫 번째 버전의 시스템을 만들었습니다. 단순히 작업을 워커 수로 나눠서 균등하게 분배했습니다.
그런데 이상한 현상이 발생했습니다. 어떤 워커는 일찍 끝나서 놀고 있고, 어떤 워커는 아직도 열심히 일하고 있었습니다.
"왜 이런 현상이 생기는 거죠?" 김개발 씨가 물었습니다. 박시니어 씨가 모니터를 가리켰습니다.
"작업마다 처리 시간이 다르기 때문이에요. 짧은 문의는 금방 분석되지만, 긴 문의는 시간이 오래 걸리잖아요.
그래서 태스크 분배 전략이 중요한 거예요." 첫 번째 전략은 Round Robin 방식입니다. 카드를 돌리듯이 순서대로 하나씩 분배합니다.
구현이 간단하고 공평해 보이지만, 앞서 말한 문제가 발생할 수 있습니다. 작업 크기가 균일할 때 효과적입니다.
두 번째는 Weighted 분배 방식입니다. 워커마다 성능이 다르다는 것을 인정하는 전략입니다.
고성능 서버에는 더 많은 작업을, 저성능 서버에는 적은 작업을 할당합니다. 마치 힘센 사람에게 무거운 짐을 맡기는 것과 같습니다.
세 번째는 Content-Based 분배입니다. 작업의 내용을 보고 적합한 워커에게 보내는 방식입니다.
예를 들어 영어 문의는 영어 전문 워커에게, 한국어 문의는 한국어 전문 워커에게 보내는 식입니다. AI 에이전트 시스템에서 자주 사용됩니다.
코드를 살펴보면, DistributionStrategy 열거형으로 전략을 정의합니다. 이렇게 하면 나중에 전략을 추가하거나 변경하기 쉬워집니다.
TaskDistributor 클래스는 선택된 전략에 따라 다른 분배 로직을 실행합니다. Round Robin 구현 부분에서 idx % len(workers) 연산이 핵심입니다.
나머지 연산을 통해 워커 인덱스를 순환시킵니다. 태스크가 100개이고 워커가 4개라면, 0번 워커는 0, 4, 8, 12번 태스크를 받게 됩니다.
실무에서는 상황에 따라 전략을 선택해야 합니다. 작업 크기가 비슷하면 Round Robin으로 충분합니다.
서버 스펙이 제각각이면 Weighted가 적합합니다. 작업 종류가 다양하면 Content-Based를 고려해보세요.
주의할 점도 있습니다. 너무 복잡한 분배 전략은 오히려 오버헤드가 될 수 있습니다.
분배하는 데 걸리는 시간이 실제 작업 시간보다 길어지면 본말전도입니다. 단순함과 효율성 사이의 균형을 찾는 것이 중요합니다.
김개발 씨는 자신의 시스템에 Weighted 전략을 적용해보기로 했습니다. 회사 서버 중 일부는 최신형이고 일부는 구형이었기 때문입니다.
결과는 놀라웠습니다. 전체 처리 시간이 30%나 단축되었습니다.
실전 팁
💡 - 작업 특성을 먼저 분석하고 그에 맞는 전략을 선택하세요
- 동적으로 전략을 변경할 수 있도록 설계하면 유연성이 높아집니다
- 분배 전략의 오버헤드도 고려해야 합니다
3. 워커 풀 관리
시스템이 점점 커지면서 김개발 씨는 새로운 문제에 직면했습니다. 워커를 미리 만들어두자니 메모리가 낭비되고, 필요할 때마다 만들자니 시간이 오래 걸렸습니다.
"워커를 효율적으로 관리할 방법이 없을까요?" 이번에도 박시니어 씨가 해답을 알고 있었습니다. "수영장에 레인을 미리 만들어두는 것처럼, 워커 풀을 만들어보세요."
워커 풀은 미리 생성해둔 워커들의 집합입니다. 마치 수영장의 레인처럼, 필요할 때 바로 사용하고 끝나면 반납하는 구조입니다.
풀 관리를 잘하면 리소스 낭비를 줄이면서도 빠른 응답 속도를 유지할 수 있습니다.
다음 코드를 살펴봅시다.
import asyncio
from typing import Optional
from contextlib import asynccontextmanager
class WorkerPool:
def __init__(self, min_workers: int = 2, max_workers: int = 10):
self.min_workers = min_workers
self.max_workers = max_workers
self.available_workers = asyncio.Queue()
self.active_count = 0
async def initialize(self):
# 최소 워커 수만큼 미리 생성합니다
for _ in range(self.min_workers):
worker = await self._create_worker()
await self.available_workers.put(worker)
@asynccontextmanager
async def acquire_worker(self):
# 워커를 풀에서 가져옵니다
worker = await self._get_or_create_worker()
self.active_count += 1
try:
yield worker
finally:
# 작업 완료 후 워커를 풀에 반납합니다
self.active_count -= 1
await self.available_workers.put(worker)
async def _get_or_create_worker(self) -> 'Worker':
if not self.available_workers.empty():
return await self.available_workers.get()
if self.active_count < self.max_workers:
return await self._create_worker()
return await self.available_workers.get() # 대기
김개발 씨의 시스템은 트래픽 패턴이 불규칙했습니다. 평소에는 분당 100건 정도지만, 이벤트 기간에는 분당 10,000건까지 치솟았습니다.
워커를 10,000건 기준으로 만들어두면 평소에 낭비가 심하고, 100건 기준으로 만들면 이벤트 때 감당이 안 됩니다. "이럴 때 워커 풀이 필요해요." 박시니어 씨가 화이트보드에 그림을 그렸습니다.
"수영장 생각해보세요. 레인이 8개 있으면 최대 8명이 동시에 수영할 수 있죠.
누군가 끝나면 다른 사람이 들어가고요." 워커 풀도 같은 원리입니다. **최소 워커 수(min_workers)**만큼은 항상 대기시켜 둡니다.
이것이 기본 레인입니다. 요청이 들어오면 대기 중인 워커가 바로 처리합니다.
생성 시간이 필요 없으니 응답이 빠릅니다. 요청이 몰리면 어떻게 할까요?
대기 중인 워커가 없으면 새로운 워커를 생성합니다. 단, **최대 워커 수(max_workers)**를 넘지 않습니다.
무한정 만들면 서버가 다운될 수 있기 때문입니다. 코드에서 asynccontextmanager 데코레이터가 눈에 띕니다.
이것은 파이썬의 컨텍스트 매니저를 비동기로 구현한 것입니다. with 문과 함께 사용하면 워커 획득과 반납이 자동으로 처리됩니다.
예외가 발생해도 반납이 보장되니 안전합니다. acquire_worker 메서드를 보면, 먼저 풀에서 워커를 가져오려고 시도합니다.
풀이 비어있고 아직 최대치에 도달하지 않았다면 새로 생성합니다. 최대치에 도달했다면 다른 워커가 반납될 때까지 기다립니다.
try-finally 블록은 매우 중요합니다. 작업 중 에러가 발생해도 finally에서 워커를 반드시 반납합니다.
이것이 없으면 워커가 점점 사라지는 리소스 누수가 발생합니다. 실무에서 워커 풀의 크기를 결정하는 것은 예술에 가깝습니다.
너무 작으면 대기 시간이 길어지고, 너무 크면 메모리가 낭비됩니다. 보통 CPU 코어 수의 2배에서 4배 정도로 시작해서 모니터링하며 조절합니다.
주의해야 할 점이 있습니다. 워커가 상태를 가지고 있다면 재사용 전에 초기화해야 합니다.
이전 작업의 데이터가 남아있으면 심각한 버그로 이어질 수 있습니다. 깨끗한 상태로 반납하는 것이 원칙입니다.
김개발 씨는 워커 풀을 도입한 후 메모리 사용량이 60% 감소했습니다. 동시에 응답 시간도 빨라졌습니다.
미리 만들어둔 워커가 대기하고 있으니 생성 시간이 필요 없었기 때문입니다.
실전 팁
💡 - min_workers와 max_workers는 실제 트래픽 패턴을 분석해서 결정하세요
- 워커 상태 초기화를 잊지 마세요, 이전 작업 데이터가 남으면 버그의 원인이 됩니다
- 헬스 체크를 통해 비정상 워커를 풀에서 제거하는 로직도 고려하세요
4. 결과 수집 및 병합
워커들이 열심히 일한 덕분에 작업이 빠르게 처리되었습니다. 그런데 김개발 씨 앞에는 새로운 과제가 놓였습니다.
10개의 워커가 각각 반환한 결과를 어떻게 하나로 합칠 것인가? "결과가 뒤죽박죽이면 안 되는데..." 걱정하는 김개발 씨에게 박시니어 씨가 말했습니다.
"퍼즐 조각을 맞추는 것처럼 생각해보세요."
결과 수집 및 병합은 분산 처리된 결과물을 하나의 일관된 형태로 조합하는 과정입니다. 마치 퍼즐 조각들을 모아 완성된 그림을 만드는 것과 같습니다.
순서 보장, 중복 제거, 충돌 해결 등 다양한 고려사항이 있습니다.
다음 코드를 살펴봅시다.
from typing import List, Dict, Any
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class TaskResult:
task_id: str
worker_id: int
data: Any
sequence: int # 원본 순서 보장용
class ResultCollector:
def __init__(self):
self.results: Dict[str, TaskResult] = {}
self.errors: List[Dict] = []
async def collect(self, worker_results: List[List[TaskResult]]) -> List[Any]:
# 모든 워커 결과를 수집합니다
all_results = []
for worker_result in worker_results:
for result in worker_result:
if result.data is not None:
all_results.append(result)
else:
self.errors.append({"task_id": result.task_id})
# 원본 순서대로 정렬합니다
sorted_results = sorted(all_results, key=lambda x: x.sequence)
return [r.data for r in sorted_results]
def merge_dicts(self, dict_results: List[Dict]) -> Dict:
# 딕셔너리 결과들을 병합합니다
merged = defaultdict(list)
for d in dict_results:
for key, value in d.items():
merged[key].append(value)
return dict(merged)
김개발 씨가 만든 시스템에서 이상한 일이 발생했습니다. 분명히 1번부터 100번까지 순서대로 문서를 처리했는데, 결과는 23, 1, 45, 7번 순서로 뒤죽박죽이었습니다.
고객에게 보여줄 수 없는 상태였습니다. "왜 순서가 엉망이 된 거죠?" 김개발 씨가 당황해서 물었습니다.
박시니어 씨가 차분하게 설명했습니다. "각 워커가 처리하는 속도가 다르기 때문이에요.
1번 워커가 처리한 문서가 가장 먼저 끝날 수도 있고, 3번 워커가 먼저 끝날 수도 있어요. 결과 수집기가 이 문제를 해결해줍니다." 결과 수집의 핵심은 순서 보장입니다.
코드에서 TaskResult 클래스에 sequence 필드가 있는 것을 주목하세요. 이것이 원본 순서를 기억하는 장치입니다.
작업을 분배할 때 순번을 부여하고, 수집할 때 그 순번으로 정렬합니다. collect 메서드를 보면, 먼저 모든 워커의 결과를 하나의 리스트로 모읍니다.
이 과정에서 에러가 발생한 결과는 별도로 분리합니다. 성공한 결과만 모은 후, sequence 기준으로 정렬합니다.
마치 퍼즐 조각에 번호를 매겨두고 순서대로 맞추는 것과 같습니다. 에러 처리도 중요합니다.
100개 작업 중 3개가 실패했다고 전체를 버릴 수는 없습니다. 성공한 97개는 정상 처리하고, 실패한 3개는 따로 기록해서 나중에 재시도하거나 관리자에게 알립니다.
merge_dicts 메서드는 딕셔너리 형태의 결과를 병합할 때 사용합니다. 예를 들어 각 워커가 단어 빈도를 계산했다면, 이것을 합쳐서 전체 빈도를 구해야 합니다.
defaultdict를 사용하면 키가 없어도 에러 없이 값을 추가할 수 있습니다. 실무에서는 더 복잡한 병합 로직이 필요할 때가 많습니다.
중복 데이터가 있으면 어떤 것을 우선할지, 충돌이 발생하면 어떻게 해결할지 정책이 필요합니다. 보통 타임스탬프가 최신인 것을 우선하거나, 특정 워커의 결과를 신뢰하는 방식을 사용합니다.
메모리 관리도 신경 써야 합니다. 결과가 매우 크다면 메모리에 전부 올리지 않고 스트리밍 방식으로 처리해야 합니다.
파일로 임시 저장하거나 데이터베이스에 직접 쓰는 방법도 있습니다. 김개발 씨는 결과 수집기를 추가한 후 테스트를 돌렸습니다.
이제 1번부터 100번까지 완벽한 순서로 결과가 출력되었습니다. 에러가 발생한 문서도 별도 리스트에 깔끔하게 정리되어 있었습니다.
실전 팁
💡 - 원본 순서가 중요하다면 반드시 sequence 필드를 추가하세요
- 에러 결과와 성공 결과를 분리해서 관리하면 후처리가 편합니다
- 대용량 결과는 메모리 대신 스트리밍이나 파일 기반 처리를 고려하세요
5. 부하 분산
시스템이 잘 돌아가는 줄 알았는데, 어느 날 갑자기 서버 한 대가 과부하로 다운되었습니다. 확인해보니 특정 워커에만 무거운 작업이 몰려 있었습니다.
"왜 이 워커만 힘들어하는 거죠?" 김개발 씨의 질문에 박시니어 씨가 답했습니다. "부하 분산을 제대로 하지 않아서 그래요.
팀원들 업무량도 균형 있게 나눠야 하듯이요."
부하 분산은 워커들 사이에 작업량을 균형 있게 배분하는 기술입니다. 마치 팀장이 팀원들의 업무량을 조절하듯, 시스템의 안정성과 효율성을 위해 필수적입니다.
실시간 모니터링과 동적 조절이 핵심입니다.
다음 코드를 살펴봅시다.
import asyncio
from typing import Dict, List
from dataclasses import dataclass, field
from time import time
@dataclass
class WorkerMetrics:
worker_id: int
current_load: int = 0 # 현재 처리 중인 작업 수
avg_response_time: float = 0.0 # 평균 응답 시간
error_rate: float = 0.0 # 에러 발생률
last_health_check: float = field(default_factory=time)
class LoadBalancer:
def __init__(self, workers: List['Worker']):
self.workers = workers
self.metrics: Dict[int, WorkerMetrics] = {
i: WorkerMetrics(worker_id=i) for i in range(len(workers))
}
def select_worker(self) -> int:
# 가장 부하가 적은 워커를 선택합니다 (Least Connections)
min_load = float('inf')
selected = 0
for worker_id, metrics in self.metrics.items():
# 건강하지 않은 워커는 제외합니다
if metrics.error_rate > 0.5:
continue
if metrics.current_load < min_load:
min_load = metrics.current_load
selected = worker_id
return selected
def update_metrics(self, worker_id: int, response_time: float, success: bool):
# 작업 완료 후 메트릭을 업데이트합니다
metrics = self.metrics[worker_id]
metrics.current_load -= 1
metrics.avg_response_time = (metrics.avg_response_time + response_time) / 2
if not success:
metrics.error_rate = min(1.0, metrics.error_rate + 0.1)
김개발 씨는 로그를 분석하다가 놀라운 사실을 발견했습니다. 워커 3개 중 1번 워커만 항상 100%에 가까운 CPU 사용률을 보이고 있었습니다.
나머지 2개는 50%도 채 안 되었습니다. "Round Robin으로 균등하게 나눴는데 왜 이런 거죠?" 김개발 씨가 의아해했습니다.
박시니어 씨가 핵심을 짚었습니다. "작업 개수는 같아도 작업 무게가 다르기 때문이에요.
1번 워커에게 유독 무거운 작업들이 몰린 거죠. 이럴 때 동적 부하 분산이 필요합니다." Least Connections 방식은 가장 직관적인 부하 분산 기법입니다.
현재 처리 중인 작업 수가 가장 적은 워커에게 새 작업을 할당합니다. 코드의 select_worker 메서드가 바로 이것을 구현합니다.
WorkerMetrics 클래스는 각 워커의 상태를 추적합니다. current_load는 현재 처리 중인 작업 수, avg_response_time은 평균 응답 시간, error_rate는 에러 발생률입니다.
이 세 가지 지표가 부하 분산의 핵심 데이터입니다. 에러율이 높은 워커는 아예 배제하는 것도 중요합니다.
코드에서 error_rate > 0.5인 워커는 선택 대상에서 제외됩니다. 이미 문제가 있는 워커에게 계속 작업을 보내면 상황이 악화될 뿐입니다.
update_metrics 메서드는 작업이 완료될 때마다 호출됩니다. 현재 부하를 감소시키고, 응답 시간을 갱신하며, 실패했다면 에러율을 높입니다.
이 피드백 루프가 동적 부하 분산의 핵심입니다. 더 정교한 방식도 있습니다.
Weighted Least Connections는 워커 성능까지 고려합니다. 고성능 워커는 같은 부하에서도 더 빨리 처리하니까요.
Adaptive Load Balancing은 실시간 응답 시간을 기반으로 가중치를 자동 조절합니다. 실무에서 부하 분산이 제대로 작동하는지 확인하려면 모니터링이 필수입니다.
각 워커의 CPU, 메모리, 작업 큐 길이를 대시보드로 시각화하면 문제를 빨리 발견할 수 있습니다. 한 가지 주의할 점이 있습니다.
부하 분산 로직 자체가 병목이 되면 안 됩니다. 수천 개의 작업이 들어올 때 매번 모든 워커를 순회하면 오버헤드가 커집니다.
캐싱이나 확률적 선택 등의 최적화가 필요할 수 있습니다. 김개발 씨는 Least Connections 방식으로 부하 분산기를 교체했습니다.
일주일간 모니터링한 결과, 모든 워커의 CPU 사용률이 70-80% 사이로 균등해졌습니다. 더 이상 특정 서버만 힘들어하는 일은 없었습니다.
실전 팁
💡 - 단순히 작업 개수가 아니라 작업의 무게를 고려한 분산이 중요합니다
- 에러율이 높은 워커는 자동으로 제외하는 로직을 추가하세요
- 부하 분산기 자체의 성능도 병목이 될 수 있으니 주의하세요
6. 실전: 대용량 문서 처리 시스템
이제 김개발 씨는 지금까지 배운 모든 것을 종합해서 실제 시스템을 구축해야 합니다. 하루 100만 건의 문서를 AI로 분석하는 프로젝트입니다.
"이론은 알겠는데, 실제로 어떻게 조합해야 할까요?" 막막해하는 김개발 씨에게 박시니어 씨가 마지막 레슨을 시작했습니다. "자, 이제 모든 조각을 맞춰봅시다."
대용량 문서 처리 시스템은 Orchestrator-Workers 패턴의 모든 요소가 유기적으로 결합된 실전 예제입니다. 마치 공장의 조립 라인처럼, 문서 수집부터 결과 저장까지 전체 파이프라인을 효율적으로 구성합니다.
다음 코드를 살펴봅시다.
import asyncio
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class Document:
id: str
content: str
priority: int = 0
class DocumentProcessingSystem:
def __init__(self, num_workers: int = 10):
self.worker_pool = WorkerPool(min_workers=5, max_workers=num_workers)
self.load_balancer = LoadBalancer([])
self.result_collector = ResultCollector()
self.distributor = TaskDistributor(DistributionStrategy.WEIGHTED)
async def process_batch(self, documents: List[Document]) -> Dict:
# 1. 우선순위에 따라 문서 정렬
sorted_docs = sorted(documents, key=lambda d: d.priority, reverse=True)
# 2. 워커 풀 초기화
await self.worker_pool.initialize()
# 3. 병렬 처리 실행
tasks = []
for doc in sorted_docs:
worker_id = self.load_balancer.select_worker()
async with self.worker_pool.acquire_worker() as worker:
task = asyncio.create_task(worker.analyze(doc))
tasks.append(task)
# 4. 결과 수집 및 병합
results = await asyncio.gather(*tasks, return_exceptions=True)
return await self.result_collector.collect(results)
async def run_pipeline(self, doc_stream) -> None:
# 스트리밍 방식으로 대용량 처리
batch = []
async for doc in doc_stream:
batch.append(doc)
if len(batch) >= 1000: # 배치 크기
await self.process_batch(batch)
batch = []
프로젝트 킥오프 미팅에서 요구사항이 쏟아졌습니다. 하루 100만 건 처리, 실시간에 가까운 응답 속도, 99.9% 가용성.
김개발 씨는 막막했지만, 지금까지 배운 패턴들이 떠올랐습니다. "먼저 전체 구조를 그려봅시다." 박시니어 씨가 화이트보드 앞에 섰습니다.
"DocumentProcessingSystem이 Orchestrator 역할을 합니다. 모든 컴포넌트를 조율하는 중앙 사령탑이죠." 생성자를 보면 네 가지 핵심 컴포넌트가 있습니다.
WorkerPool은 워커 생명주기를 관리합니다. LoadBalancer는 작업을 균등하게 분배합니다.
ResultCollector는 결과를 수집하고 병합합니다. TaskDistributor는 분배 전략을 담당합니다.
process_batch 메서드가 핵심 처리 로직입니다. 첫 번째 단계에서 우선순위에 따라 문서를 정렬합니다.
급한 문서가 먼저 처리되어야 하니까요. 두 번째 단계에서 워커 풀을 초기화합니다.
세 번째 단계가 가장 중요합니다. 각 문서에 대해 부하 분산기가 적절한 워커를 선택하고, 워커 풀에서 워커를 빌려 작업을 수행합니다.
asyncio.create_task로 비동기 작업을 생성하면 병렬 실행이 가능합니다. 네 번째 단계에서 asyncio.gather로 모든 결과를 기다립니다.
return_exceptions=True 옵션은 일부 작업이 실패해도 전체가 중단되지 않게 합니다. 실패한 작업은 예외 객체로 반환되어 나중에 처리할 수 있습니다.
run_pipeline 메서드는 스트리밍 처리를 구현합니다. 100만 건을 한 번에 메모리에 올리면 서버가 다운됩니다.
대신 1000건씩 배치로 나눠서 처리합니다. 메모리 사용량을 일정하게 유지하면서도 처리량을 극대화하는 기법입니다.
실제 운영 환경에서는 더 많은 것을 고려해야 합니다. 중간에 시스템이 죽으면 어디서부터 재시작할지 결정하는 체크포인팅, 실시간 상태를 보여주는 모니터링 대시보드, 문제 발생 시 알림을 보내는 알럿 시스템 등이 필요합니다.
김개발 씨는 2주간의 개발 끝에 시스템을 완성했습니다. 첫 번째 부하 테스트 결과, 초당 1,200건의 문서를 처리할 수 있었습니다.
하루로 환산하면 1억 건 이상입니다. 목표였던 100만 건은 가볍게 달성했습니다.
박시니어 씨가 김개발 씨의 어깨를 두드렸습니다. "이제 이 패턴을 자유자재로 활용할 수 있겠네요.
다음 프로젝트에서는 더 복잡한 시스템도 만들 수 있을 거예요." 김개발 씨는 뿌듯한 마음으로 고개를 끄덕였습니다. Orchestrator-Workers 패턴은 이제 그의 가장 강력한 무기가 되었습니다.
실전 팁
💡 - 스트리밍 방식으로 대용량 데이터를 처리하면 메모리 문제를 피할 수 있습니다
- 체크포인팅을 구현해서 장애 발생 시 재시작 지점을 확보하세요
- 모니터링과 알럿 시스템은 운영 환경에서 필수입니다
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
vLLM 통합 완벽 가이드
대규모 언어 모델 추론을 획기적으로 가속화하는 vLLM의 설치부터 실전 서비스 구축까지 다룹니다. PagedAttention과 연속 배칭 기술로 GPU 메모리를 효율적으로 활용하는 방법을 배웁니다.
Web UI Demo 구축 완벽 가이드
Gradio를 활용하여 머신러닝 모델과 AI 서비스를 위한 웹 인터페이스를 구축하는 방법을 다룹니다. 코드 몇 줄만으로 전문적인 데모 페이지를 만들고 배포하는 과정을 초급자도 쉽게 따라할 수 있도록 설명합니다.
Sandboxing & Execution Control 완벽 가이드
AI 에이전트가 코드를 실행할 때 반드시 필요한 보안 기술인 샌드박싱과 실행 제어에 대해 알아봅니다. 격리된 환경에서 안전하게 코드를 실행하고, 악성 동작을 탐지하는 방법을 단계별로 설명합니다.
Voice Design then Clone 워크플로우 완벽 가이드
AI 음성 합성에서 일관된 캐릭터 음성을 만드는 Voice Design then Clone 워크플로우를 설명합니다. 참조 음성 생성부터 재사용 가능한 캐릭터 구축까지 실무 활용법을 다룹니다.
Tool Use 완벽 가이드 - Shell, Browser, DB 실전 활용
AI 에이전트가 외부 도구를 활용하여 셸 명령어 실행, 브라우저 자동화, 데이터베이스 접근 등을 수행하는 방법을 배웁니다. 실무에서 바로 적용할 수 있는 패턴과 베스트 프랙티스를 담았습니다.