본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.
본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.
이미지 로딩 중...
AI Generated
2026. 2. 1. · 7 Views
Parallelization Workflow 완벽 가이드
AI 에이전트 개발에서 핵심이 되는 병렬화 워크플로우를 다룹니다. Coroutine을 활용한 동시 처리부터 부분 실패 복구, 성능 최적화까지 실무에서 바로 적용할 수 있는 기법을 배웁니다.
목차
1. 병렬 처리의 장점
김개발 씨는 AI 에이전트 시스템을 개발하던 중 심각한 성능 문제에 부딪혔습니다. 여러 외부 API를 순차적으로 호출하니 응답 시간이 10초를 넘어가는 것이었습니다.
"이대로는 사용자들이 다 떠나버리겠는데..." 고민에 빠진 김개발 씨에게 박시니어 씨가 다가왔습니다.
병렬 처리란 여러 작업을 동시에 실행하여 전체 소요 시간을 획기적으로 줄이는 기법입니다. 마치 식당에서 여러 요리사가 각자 다른 요리를 동시에 만드는 것과 같습니다.
순차 처리가 한 명의 요리사가 모든 요리를 차례로 만드는 것이라면, 병렬 처리는 팀워크로 효율을 극대화하는 방식입니다.
다음 코드를 살펴봅시다.
// 순차 처리 - 각 작업이 끝날 때까지 기다림
suspend fun fetchSequential(): List<Data> {
val result1 = fetchFromApiA() // 2초 소요
val result2 = fetchFromApiB() // 3초 소요
val result3 = fetchFromApiC() // 2초 소요
return listOf(result1, result2, result3) // 총 7초
}
// 병렬 처리 - 모든 작업을 동시에 실행
suspend fun fetchParallel(): List<Data> = coroutineScope {
val deferred1 = async { fetchFromApiA() } // 동시 시작
val deferred2 = async { fetchFromApiB() } // 동시 시작
val deferred3 = async { fetchFromApiC() } // 동시 시작
listOf(deferred1.await(), deferred2.await(), deferred3.await()) // 총 3초
}
김개발 씨는 입사 6개월 차 개발자입니다. 최근 회사에서 AI 에이전트 프로젝트를 맡게 되었는데, 여러 데이터 소스에서 정보를 수집해 사용자에게 종합적인 답변을 제공하는 시스템이었습니다.
처음에는 단순하게 생각했습니다. API A를 호출하고, 결과가 오면 API B를 호출하고, 다시 결과가 오면 API C를 호출하면 되겠지 하고요.
코드도 직관적이었고 디버깅도 쉬웠습니다. 하지만 문제는 성능이었습니다.
각 API 호출에 2~3초씩 걸렸고, 세 개를 순차적으로 호출하니 총 7초가 넘게 걸렸습니다. 사용자 입장에서 7초는 정말 긴 시간입니다.
실제로 사용자 이탈률이 치솟기 시작했습니다. 박시니어 씨가 화면을 보더니 바로 문제점을 짚었습니다.
"세 API가 서로 의존성이 있나요?" 김개발 씨가 고개를 저었습니다. "아니요, 각각 독립적인 데이터를 가져옵니다." "그렇다면 왜 순차적으로 호출하고 있어요?
동시에 호출하면 가장 오래 걸리는 API 시간만큼만 기다리면 됩니다." 이것이 바로 병렬 처리의 핵심입니다. 서로 의존성이 없는 작업들은 굳이 앞선 작업이 끝날 때까지 기다릴 필요가 없습니다.
마치 커피숍에서 커피와 샌드위치를 주문했을 때, 바리스타가 커피를 다 만든 후에야 주방에서 샌드위치를 만들기 시작한다면 얼마나 비효율적일까요? 실제 커피숍에서는 커피와 샌드위치가 동시에 준비됩니다.
커피에 3분, 샌드위치에 5분이 걸린다면 총 대기 시간은 8분이 아니라 5분입니다. 프로그래밍에서의 병렬 처리도 정확히 같은 원리입니다.
위 코드를 살펴보면, async 함수가 핵심입니다. async는 작업을 시작하되, 결과를 기다리지 않고 바로 다음 줄로 넘어갑니다.
대신 나중에 결과를 받아볼 수 있는 Deferred 객체를 반환합니다. 세 개의 async가 거의 동시에 실행되므로, 세 API 호출이 동시에 진행됩니다.
그리고 await를 호출하면 해당 작업이 완료될 때까지 기다렸다가 결과를 받습니다. 모든 작업이 동시에 시작되었으므로, 가장 오래 걸리는 작업 시간인 3초면 모든 결과를 받을 수 있습니다.
실무에서 이 패턴은 정말 자주 사용됩니다. 대시보드 페이지를 로딩할 때 여러 통계 데이터를 가져오거나, 검색 결과를 보여줄 때 여러 데이터베이스를 조회하거나, AI 에이전트가 여러 도구를 동시에 호출할 때 모두 병렬 처리가 적합합니다.
김개발 씨는 코드를 수정한 후 다시 테스트해 보았습니다. 7초 걸리던 응답이 3초로 줄어들었습니다.
사용자 경험이 확 달라졌고, 이탈률도 눈에 띄게 감소했습니다.
실전 팁
💡 - 작업 간 의존성이 없을 때만 병렬화하세요. A의 결과가 B의 입력으로 필요하다면 순차 처리가 맞습니다.
- 병렬 작업 수가 너무 많으면 오히려 성능이 저하될 수 있으니 적절한 동시성 제한을 고려하세요.
2. Coroutine을 활용한 병렬화
병렬 처리의 개념은 이해했지만, 김개발 씨는 구현 방법에서 막혔습니다. 예전에 Java로 스레드를 다뤄본 적이 있는데, 코드가 복잡해지고 동기화 문제로 밤을 새운 기억이 떠올랐기 때문입니다.
"Kotlin에서는 더 쉬운 방법이 있어요." 박시니어 씨가 Coroutine을 소개해 주었습니다.
Coroutine은 Kotlin에서 제공하는 경량 동시성 처리 도구입니다. 스레드보다 훨씬 가볍고, 코드도 순차적으로 작성하는 것처럼 직관적입니다.
async와 await를 조합하면 복잡한 병렬 처리도 마치 동기 코드처럼 깔끔하게 작성할 수 있습니다.
다음 코드를 살펴봅시다.
import kotlinx.coroutines.*
suspend fun parallelAgentTasks() = coroutineScope {
// 여러 에이전트 도구를 동시에 실행
val searchResult = async { searchWeb("Kotlin coroutine") }
val dbResult = async { queryDatabase("user_preferences") }
val cacheResult = async { checkCache("recent_queries") }
// 모든 결과를 기다린 후 종합
val combinedData = AgentContext(
webData = searchResult.await(),
userData = dbResult.await(),
cachedData = cacheResult.await()
)
processWithLLM(combinedData)
}
전통적인 멀티스레딩 프로그래밍은 악명이 높습니다. 스레드를 생성하고 관리하는 것부터 시작해서, 공유 자원에 대한 동기화, 데드락 방지, 예외 처리까지 신경 쓸 것이 한두 가지가 아닙니다.
김개발 씨가 움찔한 것도 무리가 아니었습니다. 하지만 Kotlin의 Coroutine은 다릅니다.
Coroutine은 스레드 위에서 동작하지만, 스레드 자체는 아닙니다. 하나의 스레드에서 수천 개의 Coroutine을 실행할 수 있습니다.
마치 하나의 요리사가 여러 타이머를 설정해 놓고 여러 요리를 동시에 진행하는 것과 같습니다. coroutineScope는 병렬 작업을 위한 안전한 영역을 만들어 줍니다.
이 영역 안에서 시작된 모든 Coroutine은 영역이 끝나기 전에 완료되어야 합니다. 만약 하나라도 예외가 발생하면 다른 Coroutine들도 자동으로 취소됩니다.
이것을 구조화된 동시성이라고 부릅니다. async 함수는 새로운 Coroutine을 시작하고, 그 결과를 담을 Deferred 객체를 즉시 반환합니다.
중요한 점은 async를 호출한 시점에 작업이 시작된다는 것입니다. 결과를 기다리는 것이 아니라 작업을 시작만 하고 바로 다음 줄로 넘어갑니다.
위 코드에서 세 개의 async가 연달아 호출됩니다. 첫 번째 async가 웹 검색을 시작하고, 그 결과를 기다리지 않고 바로 두 번째 async가 데이터베이스 조회를 시작합니다.
마찬가지로 세 번째 async도 캐시 확인을 바로 시작합니다. 이렇게 세 작업이 동시에 진행되고 있는 상태에서, await를 호출하면 해당 작업이 완료될 때까지 현재 Coroutine을 일시 중단합니다.
여기서 핵심은 일시 중단이지 스레드 차단이 아니라는 점입니다. 스레드는 다른 Coroutine의 작업을 계속 수행할 수 있습니다.
AI 에이전트 개발에서 이 패턴은 특히 유용합니다. 에이전트가 사용자 질문에 답하기 위해 여러 도구를 호출해야 할 때, 각 도구 호출을 병렬로 처리하면 응답 시간을 크게 단축할 수 있습니다.
웹 검색, 데이터베이스 조회, 캐시 확인, 외부 API 호출 등이 모두 동시에 진행됩니다. suspend 키워드도 눈여겨볼 필요가 있습니다.
suspend 함수는 Coroutine 안에서만 호출할 수 있으며, 실행 중간에 일시 중단될 수 있음을 나타냅니다. 네트워크 호출이나 파일 I/O 같은 시간이 걸리는 작업을 수행하는 함수는 대부분 suspend로 선언합니다.
김개발 씨는 이 패턴을 적용한 후 코드가 훨씬 깔끔해졌다고 느꼈습니다. 콜백 지옥도 없고, 복잡한 스레드 관리 코드도 없습니다.
마치 순차적으로 실행되는 것처럼 읽히지만, 실제로는 병렬로 실행되는 마법 같은 코드가 완성되었습니다.
실전 팁
💡 - async는 작업을 시작만 하고 await에서 결과를 받습니다. 여러 async를 먼저 실행한 후 나중에 await를 호출해야 진정한 병렬화가 됩니다.
- coroutineScope 안에서 발생한 예외는 자동으로 다른 Coroutine을 취소하므로 예외 처리 전략을 미리 세워두세요.
3. 결과 집계 전략
병렬로 여러 작업을 실행했다면, 이제 그 결과들을 어떻게 모을지 고민해야 합니다. 김개발 씨는 단순히 리스트에 담으면 되겠지 생각했지만, 실제로는 상황에 따라 다양한 집계 전략이 필요했습니다.
"결과를 어떻게 조합하느냐에 따라 에이전트의 품질이 달라집니다." 박시니어 씨의 조언이었습니다.
결과 집계란 병렬로 실행한 여러 작업의 결과를 하나로 모으는 과정입니다. 단순 리스트 수집, 맵 형태 변환, 조건부 필터링, 가중치 기반 병합 등 다양한 전략이 있습니다.
AI 에이전트에서는 여러 소스의 정보를 어떻게 종합하느냐가 답변 품질을 결정합니다.
다음 코드를 살펴봅시다.
suspend fun aggregateResults() = coroutineScope {
val sources = listOf("web", "database", "cache", "api")
// 모든 소스에서 병렬로 데이터 수집
val deferredResults = sources.map { source ->
async { fetchFromSource(source) }
}
// 전략 1: 모든 결과를 리스트로 수집
val allResults = deferredResults.map { it.await() }
// 전략 2: 맵 형태로 변환 (소스별 결과)
val resultMap = sources.zip(allResults).toMap()
// 전략 3: 유효한 결과만 필터링
val validResults = allResults.filter { it.isSuccess }
// 전략 4: 가중치 기반 병합
combineWithPriority(resultMap, priority = listOf("cache", "database", "api", "web"))
}
여러 요리사가 동시에 요리를 완성했다고 가정해 봅시다. 이제 이 요리들을 어떻게 손님에게 제공할까요?
단순히 테이블에 모두 올려놓을 수도 있고, 코스 순서대로 배치할 수도 있으며, 손님의 취향에 맞게 일부만 선별할 수도 있습니다. 병렬 처리 결과 집계도 마찬가지입니다.
가장 단순한 방법은 리스트 수집입니다. 모든 결과를 순서대로 리스트에 담습니다.
위 코드에서 deferredResults.map { it.await() }가 바로 이 방식입니다. 각 Deferred에 대해 await를 호출하고, 그 결과를 리스트로 모읍니다.
맵 형태 변환은 결과에 의미를 부여할 때 유용합니다. sources.zip(allResults).toMap()은 각 결과가 어떤 소스에서 왔는지 알 수 있게 해줍니다.
AI 에이전트가 "이 정보는 웹 검색에서 가져왔고, 저 정보는 데이터베이스에서 가져왔습니다"라고 출처를 밝힐 때 이 구조가 필요합니다. 실무에서는 모든 결과가 유효하지 않을 수 있습니다.
네트워크 오류로 일부 요청이 실패하거나, 검색 결과가 없는 경우도 있습니다. 조건부 필터링으로 유효한 결과만 걸러내면 에이전트가 잘못된 정보를 제공하는 것을 방지할 수 있습니다.
가장 고급 전략은 가중치 기반 병합입니다. 모든 소스가 동등한 신뢰도를 가지지는 않습니다.
예를 들어 캐시된 데이터는 최신성이 검증되었고, 데이터베이스는 정확도가 높으며, 웹 검색은 범위가 넓지만 신뢰도가 낮을 수 있습니다. 우선순위를 정해 결과를 병합하면 더 품질 높은 답변을 만들 수 있습니다.
김개발 씨는 처음에 단순 리스트 수집만 사용했습니다. 하지만 사용자 피드백을 분석해 보니, 정보의 출처가 불명확하고 때로는 서로 충돌하는 정보가 섞여 있었습니다.
맵 형태 변환과 가중치 기반 병합을 도입한 후 답변 품질이 눈에 띄게 향상되었습니다. 한 가지 주의할 점이 있습니다.
await 호출 순서입니다. 위 코드에서 deferredResults.map { it.await() }는 순차적으로 await를 호출합니다.
첫 번째 결과를 기다린 후 두 번째 결과를 기다리는 식입니다. 이미 모든 작업이 병렬로 시작되었으므로 전체 시간에는 영향이 없지만, 만약 첫 번째 작업이 오래 걸리면 이미 완료된 다른 결과를 먼저 처리할 수 없습니다.
더 효율적인 방법은 awaitAll을 사용하는 것입니다. awaitAll은 모든 Deferred가 완료될 때까지 기다린 후 결과를 한 번에 반환합니다.
또는 select를 사용해 먼저 완료된 작업부터 처리할 수도 있습니다. 결과 집계 전략은 에이전트의 용도와 요구사항에 따라 선택해야 합니다.
속도가 중요하면 먼저 도착한 결과를 우선 처리하고, 정확도가 중요하면 모든 결과를 모아 교차 검증하는 방식이 적합합니다.
실전 팁
💡 - awaitAll 함수를 사용하면 여러 Deferred를 한 번에 기다릴 수 있어 코드가 더 깔끔해집니다.
- 결과에 메타데이터(소스, 타임스탬프, 신뢰도)를 함께 저장하면 나중에 다양한 집계 전략을 적용하기 쉽습니다.
4. 부분 실패 처리
모든 것이 완벽하게 동작하면 좋겠지만, 현실은 그렇지 않습니다. 김개발 씨의 에이전트가 운영 환경에 배포된 후, 간헐적으로 전체 시스템이 먹통이 되는 문제가 발생했습니다.
원인을 추적해 보니 외부 API 하나가 응답하지 않을 때 전체 병렬 작업이 실패하고 있었습니다. "부분 실패를 어떻게 처리하느냐가 안정성의 핵심입니다."
부분 실패 처리란 병렬로 실행한 여러 작업 중 일부가 실패해도 나머지 성공한 결과를 활용할 수 있게 하는 전략입니다. 마치 여러 배달원 중 한 명이 지각해도 나머지 배달은 정상적으로 완료되는 것처럼, 시스템의 복원력을 높여줍니다.
다음 코드를 살펴봅시다.
suspend fun fetchWithPartialFailure() = coroutineScope {
val sources = listOf("api1", "api2", "api3", "api4")
// supervisorScope로 독립적 실패 처리
supervisorScope {
val results = sources.map { source ->
async {
try {
Result.success(fetchFromSource(source))
} catch (e: Exception) {
println("$source 실패: ${e.message}")
Result.failure(e)
}
}
}.map { it.await() }
// 성공한 결과만 추출
val successfulData = results.filter { it.isSuccess }
.map { it.getOrNull()!! }
// 최소 2개 이상 성공해야 유효한 응답
if (successfulData.size >= 2) {
AgentResponse(data = successfulData, partial = results.any { it.isFailure })
} else {
throw InsufficientDataException("충분한 데이터를 수집하지 못했습니다")
}
}
}
은행에서 대출 심사를 한다고 상상해 봅시다. 신용정보, 소득증명, 재직증명, 담보평가 등 여러 자료를 수집해야 합니다.
만약 재직증명 발급이 하루 늦어진다고 해서 이미 받아놓은 다른 서류들을 전부 폐기해야 할까요? 그렇지 않습니다.
일단 가능한 서류로 예비 심사를 진행하고, 재직증명이 도착하면 최종 결정을 내리면 됩니다. 기본적인 coroutineScope는 구조화된 동시성을 제공합니다.
이는 하나의 자식 Coroutine이 실패하면 다른 모든 자식도 취소된다는 의미입니다. 대부분의 경우 이것이 올바른 동작입니다.
관련된 작업이 하나라도 실패하면 전체 결과가 무의미해지기 때문입니다. 하지만 독립적인 여러 소스에서 데이터를 수집하는 경우는 다릅니다.
API1이 실패했다고 해서 API2, API3, API4의 결과까지 버릴 필요가 없습니다. 이럴 때 supervisorScope를 사용합니다.
supervisorScope 안에서는 각 자식 Coroutine이 독립적으로 실패합니다. 한 자식이 예외를 던져도 다른 자식들은 계속 실행됩니다.
이것이 부분 실패 처리의 핵심입니다. 위 코드에서 각 async 블록은 try-catch로 감싸져 있습니다.
성공하면 Result.success로, 실패하면 Result.failure로 결과를 감쌉니다. 이렇게 하면 예외가 전파되지 않고, 각 작업의 성공/실패 여부를 나중에 확인할 수 있습니다.
Result 타입은 Kotlin 표준 라이브러리에서 제공하는 것으로, 성공 또는 실패를 나타내는 컨테이너입니다. isSuccess, isFailure로 상태를 확인하고, getOrNull, exceptionOrNull로 값이나 예외를 꺼낼 수 있습니다.
결과를 수집한 후에는 성공한 것만 필터링합니다. 하지만 여기서 중요한 결정이 필요합니다.
성공한 결과가 몇 개 이상이어야 유효한 응답으로 인정할 것인가? 이것은 비즈니스 요구사항에 따라 다릅니다.
김개발 씨의 에이전트는 최소 2개 이상의 소스에서 데이터를 수집해야 신뢰할 수 있는 답변을 생성할 수 있었습니다. 그래서 성공 개수가 2개 미만이면 예외를 던지도록 했습니다.
응답에 partial 플래그를 포함시킨 것도 눈여겨볼 만합니다. 일부 소스가 실패했음을 사용자에게 알려주면, 사용자가 답변의 불완전성을 인지하고 추가 조치를 취할 수 있습니다.
"일부 정보를 가져오지 못했습니다. 더 자세한 내용은 나중에 다시 확인해 주세요." 이 패턴을 적용한 후 김개발 씨의 에이전트는 훨씬 안정적으로 동작하게 되었습니다.
외부 API 하나가 불안정해도 전체 시스템은 정상적으로 응답을 제공했습니다.
실전 팁
💡 - supervisorScope와 일반 coroutineScope의 차이를 명확히 이해하고 상황에 맞게 선택하세요.
- 부분 실패 시 사용자에게 어떤 정보가 누락되었는지 알려주면 더 나은 사용자 경험을 제공할 수 있습니다.
5. 성능 최적화
부분 실패 처리까지 적용한 김개발 씨의 에이전트는 안정적으로 동작했습니다. 하지만 사용자가 늘어나면서 새로운 문제가 나타났습니다.
동시 요청이 많아지자 서버 리소스가 급격히 소모되고 응답 시간도 들쭉날쭉해졌습니다. "무작정 병렬화한다고 능사가 아닙니다.
적절한 제어가 필요해요."
성능 최적화는 병렬 처리의 동시성을 적절히 제어하여 시스템 리소스를 효율적으로 사용하는 것입니다. 동시 실행 개수 제한, 타임아웃 설정, 배압 제어 등의 기법을 통해 안정적이면서도 빠른 시스템을 구축할 수 있습니다.
다음 코드를 살펴봅시다.
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
// 동시 실행 개수를 제한하는 Semaphore
val concurrencyLimit = Semaphore(permits = 5)
suspend fun optimizedParallelFetch(items: List<String>) = coroutineScope {
items.map { item ->
async {
// 최대 5개만 동시 실행
concurrencyLimit.withPermit {
// 개별 타임아웃 설정
withTimeout(3000L) {
fetchData(item)
}
}
}
}.awaitAll()
}
// 청크 단위로 나눠서 처리 (배압 제어)
suspend fun batchProcess(items: List<String>, batchSize: Int = 10) {
items.chunked(batchSize).forEach { batch ->
optimizedParallelFetch(batch)
delay(100) // 배치 사이 쿨다운
}
}
고속도로를 생각해 봅시다. 차선이 많으면 더 많은 차가 동시에 달릴 수 있지만, 무한히 많은 차선을 만들 수는 없습니다.
또한 너무 많은 차가 한꺼번에 진입하면 오히려 정체가 발생합니다. 병렬 처리도 마찬가지입니다.
무작정 동시성을 높이면 오히려 성능이 떨어질 수 있습니다. 첫 번째 최적화 기법은 동시 실행 개수 제한입니다.
Semaphore는 동시에 접근할 수 있는 리소스의 개수를 제한하는 도구입니다. permits = 5로 설정하면 최대 5개의 Coroutine만 동시에 해당 영역에 진입할 수 있습니다.
withPermit 블록 안의 코드는 Semaphore의 허가를 받아야만 실행됩니다. 이미 5개가 실행 중이면 나머지는 대기합니다.
하나가 완료되어 허가를 반납하면 대기 중인 다음 Coroutine이 실행됩니다. 왜 동시 실행을 제한해야 할까요?
여러 이유가 있습니다. 외부 API의 rate limit을 초과하지 않기 위해서, 데이터베이스 커넥션 풀을 고갈시키지 않기 위해서, 서버 메모리를 보호하기 위해서 등입니다.
두 번째 기법은 타임아웃 설정입니다. withTimeout은 지정된 시간 내에 작업이 완료되지 않으면 예외를 던집니다.
하나의 느린 작업이 전체 시스템을 지연시키는 것을 방지합니다. 김개발 씨의 에이전트에서 외부 API 하나가 간헐적으로 30초씩 응답이 지연되는 경우가 있었습니다.
타임아웃 없이는 사용자가 30초를 기다려야 했습니다. 3초 타임아웃을 설정한 후, 느린 API는 빠르게 실패 처리하고 다른 소스의 데이터로 응답할 수 있게 되었습니다.
세 번째 기법은 배압 제어입니다. 1000개의 항목을 처리해야 한다면 한 번에 1000개의 Coroutine을 생성하는 것보다, 10개씩 나눠서 100번 처리하는 것이 안정적입니다.
chunked 함수로 리스트를 작은 배치로 나누고, 각 배치를 처리한 후 잠시 쉬어가는 방식입니다. delay(100)는 배치 사이에 100밀리초의 쿨다운을 줍니다.
이 시간 동안 시스템이 숨을 돌리고, 가비지 컬렉션이 실행되고, 다른 요청을 처리할 여유가 생깁니다. 성능 최적화에는 정답이 없습니다.
동시 실행 개수, 타임아웃 시간, 배치 크기 등은 실제 부하 테스트를 통해 최적값을 찾아야 합니다. 처음에는 보수적으로 설정하고, 모니터링하면서 점진적으로 조정하는 것이 안전합니다.
김개발 씨는 프로메테우스와 그라파나를 연동해 실시간 모니터링 대시보드를 구축했습니다. 응답 시간 분포, 에러율, 리소스 사용량을 지표로 삼아 최적의 설정값을 찾아갔습니다.
그 결과 동시 사용자 1000명에서도 안정적인 응답 시간을 유지하는 시스템을 만들 수 있었습니다.
실전 팁
💡 - Semaphore의 permits 값은 외부 API의 rate limit, 데이터베이스 커넥션 풀 크기 등을 고려해 설정하세요.
- withTimeoutOrNull을 사용하면 타임아웃 시 예외 대신 null을 반환받아 더 유연하게 처리할 수 있습니다.
6. 실전 다중 소스 데이터 수집
이제 김개발 씨는 지금까지 배운 모든 것을 종합해 실제 AI 에이전트의 데이터 수집 시스템을 구축하려 합니다. 여러 검색 엔진, 데이터베이스, 외부 API, 캐시에서 동시에 데이터를 가져와 사용자에게 종합적인 답변을 제공하는 시스템입니다.
"이론은 충분해요. 이제 실전입니다."
다중 소스 데이터 수집은 앞서 배운 병렬화, 결과 집계, 부분 실패 처리, 성능 최적화를 모두 결합한 실전 패턴입니다. AI 에이전트가 다양한 소스에서 정보를 수집하고, 신뢰도에 따라 가중치를 부여하며, 일부 실패에도 견고하게 동작하는 완성형 시스템입니다.
다음 코드를 살펴봅시다.
data class SourceResult(
val source: String,
val data: Any?,
val confidence: Double,
val latency: Long,
val error: String? = null
)
suspend fun collectFromMultipleSources(query: String): AgentResponse = supervisorScope {
val startTime = System.currentTimeMillis()
val semaphore = Semaphore(permits = 8)
val sources = mapOf(
"cache" to { checkCache(query) },
"database" to { queryDatabase(query) },
"webSearch" to { searchWeb(query) },
"vectorDB" to { searchVectorStore(query) },
"knowledgeBase" to { queryKnowledgeBase(query) }
)
val results = sources.map { (name, fetcher) ->
async {
semaphore.withPermit {
val taskStart = System.currentTimeMillis()
try {
withTimeout(5000L) {
val data = fetcher()
SourceResult(name, data, getConfidence(name), System.currentTimeMillis() - taskStart)
}
} catch (e: Exception) {
SourceResult(name, null, 0.0, System.currentTimeMillis() - taskStart, e.message)
}
}
}
}.awaitAll()
buildResponse(results, System.currentTimeMillis() - startTime)
}
드디어 모든 퍼즐 조각이 맞춰지는 순간입니다. 김개발 씨는 지금까지 배운 기법들을 하나의 시스템으로 통합했습니다.
이 코드는 실제 프로덕션 환경에서 사용할 수 있는 수준의 완성도를 갖추고 있습니다. 먼저 SourceResult 데이터 클래스를 정의합니다.
각 소스의 결과뿐 아니라 소스 이름, 신뢰도, 응답 시간, 에러 정보까지 담습니다. 이 메타데이터들은 나중에 결과를 집계하고 분석하는 데 필수적입니다.
supervisorScope로 전체 작업을 감싸 부분 실패에 대응합니다. 하나의 소스가 실패해도 다른 소스의 결과는 살아남습니다.
sources 맵은 조회할 데이터 소스와 해당 조회 함수를 정의합니다. 캐시, 데이터베이스, 웹 검색, 벡터 데이터베이스, 지식 베이스 등 다섯 가지 소스를 사용합니다.
실제 에이전트에서는 이보다 더 많은 소스가 있을 수 있습니다. 각 소스에 대해 async로 병렬 작업을 시작합니다.
Semaphore로 동시 실행 개수를 8개로 제한하고, withTimeout으로 각 작업에 5초 타임아웃을 설정합니다. 작업 시작 시간을 기록해두었다가 완료 후 지연 시간을 계산합니다.
이 정보는 성능 모니터링과 병목 분석에 유용합니다. 어떤 소스가 자주 느린지, 평균 응답 시간은 얼마인지 파악할 수 있습니다.
try-catch로 예외를 잡아 SourceResult에 에러 정보를 담습니다. 예외가 전파되지 않으므로 다른 작업에 영향을 주지 않습니다.
getConfidence 함수는 각 소스의 기본 신뢰도를 반환합니다. 캐시는 검증된 데이터이므로 신뢰도가 높고, 웹 검색은 불확실성이 있어 신뢰도가 낮을 수 있습니다.
이 값은 결과 집계 시 가중치로 사용됩니다. 모든 작업이 완료되면 awaitAll로 결과를 수집합니다.
buildResponse 함수에서 성공한 결과를 신뢰도 순으로 정렬하고, 충돌하는 정보는 신뢰도가 높은 것을 우선하며, 최종 응답을 구성합니다. 전체 소요 시간도 기록합니다.
사용자에게 "이 응답은 X초 만에 생성되었습니다"라고 알려주거나, SLA 모니터링에 활용할 수 있습니다. 김개발 씨의 에이전트는 이 시스템으로 평균 응답 시간 1.2초, 가용성 99.9%를 달성했습니다.
외부 API 하나가 장애를 일으켜도 다른 소스의 데이터로 답변을 제공할 수 있었고, 피크 시간대에도 안정적인 성능을 유지했습니다. 박시니어 씨가 코드를 검토하고 흡족한 미소를 지었습니다.
"이제 진짜 에이전트 개발자가 됐네요." 김개발 씨는 6개월 전 스레드가 무서워 도망치던 자신을 떠올리며 뿌듯해했습니다.
실전 팁
💡 - 각 소스의 신뢰도와 응답 시간을 로깅하면 시스템 개선 포인트를 찾는 데 큰 도움이 됩니다.
- 정기적으로 소스별 성능을 분석해 느린 소스는 캐싱하거나 대체 소스를 마련하세요.
- 결과 집계 로직은 별도 함수로 분리해 테스트 가능하게 만드세요.
이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!
댓글 (0)
함께 보면 좋은 카드 뉴스
AI 에이전트의 Task Decomposition & Planning 완벽 가이드
AI 에이전트가 복잡한 작업을 어떻게 분해하고 계획하는지 알아봅니다. 작업 분해 전략부터 동적 재계획까지, 에이전트 개발의 핵심 개념을 실무 예제와 함께 쉽게 설명합니다.
에이전트 강화 미세조정 RFT 완벽 가이드
AI 에이전트가 스스로 학습하고 적응하는 강화 미세조정(RFT) 기법을 알아봅니다. 온라인/오프라인 학습부터 A/B 테스팅까지 실무에서 바로 적용할 수 있는 핵심 개념을 다룹니다.
Voice Clone 구현 완벽 가이드
음성 복제(Voice Clone) 기술을 활용하여 특정 화자의 목소리를 재현하는 방법을 알아봅니다. 참조 오디오 준비부터 실전 구현까지, 초급 개발자도 따라할 수 있도록 단계별로 설명합니다.
Qwen3-TTS 프로젝트 소개
알리바바가 공개한 최신 텍스트-음성 변환 프로젝트 Qwen3-TTS를 소개합니다. 음성 복제부터 10개 언어 지원까지, 차세대 TTS 기술의 핵심을 초급 개발자 눈높이에서 살펴봅니다.
AI 에이전트 패턴 완벽 가이드
LLM 기반 AI 에이전트를 프로덕션 환경에서 성공적으로 구축하기 위한 핵심 패턴들을 소개합니다. 튜토리얼과 실제 제품 사이의 간극을 메우고, 8가지 카테고리로 정리된 패턴들을 통해 실무에서 바로 적용할 수 있는 지식을 전달합니다.