🤖

본 콘텐츠의 이미지 및 내용은 AI로 생성되었습니다.

⚠️

본 콘텐츠의 이미지 및 내용을 무단으로 복제, 배포, 수정하여 사용할 경우 저작권법에 의해 법적 제재를 받을 수 있습니다.

이미지 로딩 중...

AI 에이전트의 Task Decomposition & Planning 완벽 가이드 - 슬라이드 1/7
A

AI Generated

2026. 2. 2. · 5 Views

AI 에이전트의 Task Decomposition & Planning 완벽 가이드

AI 에이전트가 복잡한 작업을 어떻게 분해하고 계획하는지 알아봅니다. 작업 분해 전략부터 동적 재계획까지, 에이전트 개발의 핵심 개념을 실무 예제와 함께 쉽게 설명합니다.


목차

  1. 작업_분해_전략
  2. 계층적_작업_구조
  3. 의존성_그래프_구축
  4. 병렬_실행_최적화
  5. 우선순위_결정_알고리즘
  6. 동적_재계획_패턴

1. 작업 분해 전략

어느 날 김개발 씨는 회사에서 새로운 AI 에이전트 프로젝트를 맡게 되었습니다. "고객 문의를 자동으로 처리하는 에이전트를 만들어주세요." 간단해 보였지만, 막상 시작하려니 어디서부터 손대야 할지 막막했습니다.

선배 박시니어 씨가 다가와 한마디 했습니다. "큰 작업을 작은 조각으로 나누는 것부터 시작해봐요."

**작업 분해(Task Decomposition)**는 복잡한 문제를 관리 가능한 작은 단위로 나누는 기술입니다. 마치 큰 피자를 여러 조각으로 나눠 먹기 쉽게 만드는 것과 같습니다.

AI 에이전트가 "이메일 답장 작성"이라는 요청을 받으면, 이를 "의도 파악 → 정보 수집 → 초안 작성 → 검토"라는 단계로 분해합니다. 이렇게 나누면 각 단계를 독립적으로 처리하고 검증할 수 있습니다.

다음 코드를 살펴봅시다.

from typing import List, Dict

class TaskDecomposer:
    def __init__(self, llm_client):
        self.llm = llm_client

    # 복잡한 작업을 하위 작업으로 분해
    def decompose(self, task: str) -> List[Dict]:
        prompt = f"""
        다음 작업을 실행 가능한 하위 작업으로 분해하세요:
        작업: {task}

        각 하위 작업은 독립적으로 실행 가능해야 합니다.
        """
        subtasks = self.llm.generate(prompt)
        return self._parse_subtasks(subtasks)

    def _parse_subtasks(self, raw_output: str) -> List[Dict]:
        # 하위 작업을 구조화된 형태로 변환
        tasks = []
        for line in raw_output.strip().split('\n'):
            tasks.append({"name": line, "status": "pending"})
        return tasks

김개발 씨는 입사 6개월 차 주니어 개발자입니다. AI 에이전트 개발이라는 새로운 영역에 발을 들여놓았는데, 첫 번째 벽에 부딪혔습니다.

고객이 "우리 회사 매출 보고서를 분석해서 다음 분기 전략을 제안해줘"라고 요청하면, 에이전트는 이 복잡한 요청을 어떻게 처리해야 할까요? 선배 개발자 박시니어 씨가 화이트보드 앞에 섰습니다.

"자, 이 문제를 한번 분해해볼까요?" 작업 분해란 정확히 무엇일까요? 쉽게 비유하자면, 작업 분해는 마치 요리 레시피를 작성하는 것과 같습니다.

"된장찌개를 만들어라"라는 큰 목표가 있다면, 우리는 자연스럽게 "재료 준비 → 육수 끓이기 → 된장 풀기 → 야채 넣기 → 간 맞추기"라는 단계로 나눕니다. 각 단계가 명확하면 초보 요리사도 따라할 수 있습니다.

AI 에이전트도 마찬가지입니다. 복잡한 요청을 받았을 때, 이를 한 번에 처리하려고 하면 실패할 확률이 높습니다.

하지만 작은 단계로 나누면, 각 단계에서 실패하더라도 어디서 문제가 생겼는지 파악하기 쉽습니다. 작업 분해가 없던 시절에는 어땠을까요?

초기 AI 시스템들은 복잡한 요청을 받으면 한 번에 모든 것을 처리하려 했습니다. 결과는 종종 엉망이었습니다.

중간에 오류가 발생하면 처음부터 다시 시작해야 했고, 어디서 잘못되었는지 추적하기도 어려웠습니다. 바로 이런 문제를 해결하기 위해 작업 분해 전략이 등장했습니다.

작업 분해를 사용하면 디버깅이 쉬워집니다. 또한 각 단계를 독립적으로 테스트할 수 있습니다.

무엇보다 부분적인 실패에서 복구가 가능하다는 큰 이점이 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

먼저 decompose 메서드는 LLM에게 작업 분해를 요청합니다. 프롬프트에서 "독립적으로 실행 가능해야 한다"고 명시한 점이 중요합니다.

이렇게 해야 나중에 병렬 실행이나 재시도가 가능해집니다. _parse_subtasks 메서드는 LLM의 출력을 구조화된 딕셔너리 형태로 변환합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 고객 서비스 챗봇을 개발한다고 가정해봅시다.

고객이 "지난달 주문한 상품 환불해주세요"라고 요청하면, 에이전트는 이를 "주문 내역 조회 → 환불 정책 확인 → 환불 가능 여부 판단 → 환불 처리 → 확인 메시지 전송"으로 분해합니다. 각 단계가 명확하니 문제가 생겨도 어디서 막혔는지 금방 알 수 있습니다.

하지만 주의할 점도 있습니다. 초보 개발자들이 흔히 하는 실수 중 하나는 너무 잘게 분해하는 것입니다.

"이메일 보내기"를 "키보드로 e 입력 → 키보드로 m 입력..." 수준으로 나누면 오히려 복잡해집니다. 적절한 추상화 수준을 유지하는 것이 중요합니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 박시니어 씨의 설명을 들은 김개발 씨는 고개를 끄덕였습니다.

"아, 그래서 작은 단위로 나눠야 하는군요!" 작업 분해를 제대로 이해하면 복잡한 AI 에이전트도 체계적으로 설계할 수 있습니다.

실전 팁

💡 - 각 하위 작업은 "동사 + 목적어" 형태로 명확하게 정의하세요

  • 하위 작업 간의 입출력 관계를 미리 정의해두면 통합이 쉬워집니다

2. 계층적 작업 구조

김개발 씨가 작업 분해의 기초를 익히고 나니, 새로운 문제가 보이기 시작했습니다. 분해한 작업들이 평면적으로 나열되어 있으니 전체 그림이 잘 보이지 않았습니다.

"이 작업들 사이에 관계가 있을 텐데..." 박시니어 씨가 힌트를 주었습니다. "회사 조직도처럼 생각해봐요.

작업에도 계층이 있거든요."

**계층적 작업 구조(Hierarchical Task Structure)**는 작업들을 트리 형태로 구성하는 방식입니다. 마치 회사 조직도에서 CEO 아래 부서장이 있고, 부서장 아래 팀장이 있는 것과 같습니다.

상위 작업은 "무엇을" 해야 하는지를 정의하고, 하위 작업은 "어떻게" 할지를 구체화합니다. 이런 구조 덕분에 복잡한 프로젝트도 한눈에 파악할 수 있습니다.

다음 코드를 살펴봅시다.

from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class TaskNode:
    id: str
    name: str
    parent_id: Optional[str] = None
    children: List['TaskNode'] = field(default_factory=list)
    status: str = "pending"

    # 하위 작업 추가
    def add_subtask(self, subtask: 'TaskNode'):
        subtask.parent_id = self.id
        self.children.append(subtask)

    # 모든 하위 작업이 완료되었는지 확인
    def is_complete(self) -> bool:
        if not self.children:
            return self.status == "completed"
        return all(child.is_complete() for child in self.children)

    # 트리 구조 시각화
    def display(self, indent: int = 0):
        prefix = "  " * indent + ("└─ " if indent > 0 else "")
        print(f"{prefix}{self.name} [{self.status}]")
        for child in self.children:
            child.display(indent + 1)

김개발 씨는 고객 문의 처리 에이전트를 만들면서 수십 개의 하위 작업을 정의했습니다. 그런데 이 작업들이 리스트에 쭉 나열되어 있으니, 어떤 작업이 어떤 작업의 일부인지 구분이 어려웠습니다.

마치 회사의 모든 직원 이름을 알파벳 순으로 나열해놓은 것과 같았습니다. 박시니어 씨가 화이트보드에 트리 구조를 그리기 시작했습니다.

"조직도를 생각해봐요. CEO가 맨 위에 있고, 그 아래 각 부서가 있죠?" 계층적 작업 구조란 정확히 무엇일까요?

쉽게 비유하자면, 이것은 마치 책의 목차와 같습니다. 책에는 "1장.

서론"이 있고, 그 아래 "1.1 배경", "1.2 목적" 같은 소제목이 있습니다. 더 아래로 내려가면 "1.1.1 역사적 배경"처럼 더 세부적인 내용이 나옵니다.

각 수준이 점점 더 구체적인 내용을 담고 있습니다. AI 에이전트의 작업 구조도 마찬가지입니다.

최상위에는 "고객 문의 처리"라는 큰 목표가 있습니다. 그 아래에 "문의 분류", "정보 조회", "응답 생성"이라는 중간 수준 작업이 있습니다.

"정보 조회" 아래에는 다시 "데이터베이스 검색", "API 호출", "결과 병합"이라는 세부 작업이 배치됩니다. 계층 구조가 없던 시절에는 어땠을까요?

모든 작업이 같은 레벨에 있으니 전체적인 진행 상황 파악이 어려웠습니다. "지금 뭘 하고 있는 거지?"라는 질문에 명확히 답하기 힘들었습니다.

또한 작업 간의 포함 관계를 표현할 방법이 없었습니다. 바로 이런 문제를 해결하기 위해 계층적 구조가 필요합니다.

계층 구조를 사용하면 추상화 수준별로 작업을 관리할 수 있습니다. 경영진에게는 최상위 작업만 보여주고, 실무자에게는 세부 작업을 보여줄 수 있습니다.

또한 부분 완료 상태를 자연스럽게 표현할 수 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

TaskNode 클래스는 트리의 각 노드를 표현합니다. parent_id로 부모를 참조하고, children 리스트로 자식들을 관리합니다.

is_complete 메서드가 특히 중요한데, 이것은 재귀적으로 모든 하위 작업의 완료 여부를 확인합니다. 리프 노드(자식이 없는 노드)는 자신의 상태만 확인하고, 중간 노드는 모든 자식이 완료되어야 완료로 판단합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 "웹사이트 리뉴얼" 프로젝트를 생각해봅시다.

최상위에 "웹사이트 리뉴얼"이 있고, 그 아래 "디자인", "프론트엔드", "백엔드", "QA"가 있습니다. "프론트엔드" 아래에는 다시 "컴포넌트 개발", "API 연동", "성능 최적화"가 배치됩니다.

이렇게 하면 프로젝트 매니저는 상위 레벨만 보고도 전체 진행률을 파악할 수 있습니다. 하지만 주의할 점도 있습니다.

초보 개발자들이 흔히 하는 실수 중 하나는 계층을 너무 깊게 만드는 것입니다. 7단계, 8단계까지 내려가면 오히려 관리가 어려워집니다.

일반적으로 3~4단계가 적당합니다. 또한 하나의 부모 아래 자식이 너무 많아도 문제입니다.

7개 이상이면 중간 레벨을 하나 더 만드는 것을 고려해보세요. 다시 김개발 씨의 이야기로 돌아가 봅시다.

작업들을 트리 구조로 재구성하니 전체 그림이 명확해졌습니다. "이제 어떤 작업이 다른 작업의 일부인지 한눈에 보이네요!"

실전 팁

💡 - 계층은 3~4단계를 넘지 않도록 유지하세요

  • 각 노드에 완료 조건을 명확히 정의해두면 진행률 추적이 쉬워집니다

3. 의존성 그래프 구축

김개발 씨는 계층 구조까지 완성했지만, 또 다른 문제에 부딪혔습니다. "데이터베이스 조회"가 끝나야 "분석 보고서 생성"이 가능한데, 현재 구조에서는 이런 관계가 표현되지 않았습니다.

순서를 무시하고 작업을 실행하면 당연히 오류가 발생합니다. 박시니어 씨가 말했습니다.

"이제 작업 간의 의존성을 그래프로 표현할 차례야."

**의존성 그래프(Dependency Graph)**는 작업들 사이의 선후 관계를 나타내는 방향성 그래프입니다. 마치 요리 레시피에서 "양파를 썰기 전에 먼저 껍질을 벗겨야 한다"는 순서가 있는 것처럼, 작업에도 "A가 끝나야 B를 시작할 수 있다"는 관계가 존재합니다.

이 관계를 명시적으로 정의하면 올바른 실행 순서를 보장할 수 있습니다.

다음 코드를 살펴봅시다.

from collections import defaultdict, deque
from typing import List, Set

class DependencyGraph:
    def __init__(self):
        self.graph = defaultdict(list)  # 작업 -> 의존하는 작업들
        self.in_degree = defaultdict(int)  # 각 작업의 선행 작업 수
        self.all_tasks = set()

    # 의존성 추가: task는 dependency가 완료되어야 실행 가능
    def add_dependency(self, task: str, dependency: str):
        self.graph[dependency].append(task)
        self.in_degree[task] += 1
        self.all_tasks.update([task, dependency])

    # 위상 정렬로 실행 순서 결정
    def get_execution_order(self) -> List[str]:
        queue = deque()
        result = []

        # 선행 작업이 없는 것부터 시작
        for task in self.all_tasks:
            if self.in_degree[task] == 0:
                queue.append(task)

        while queue:
            current = queue.popleft()
            result.append(current)
            for next_task in self.graph[current]:
                self.in_degree[next_task] -= 1
                if self.in_degree[next_task] == 0:
                    queue.append(next_task)

        return result

김개발 씨가 만든 에이전트가 이상하게 동작했습니다. 분명히 모든 작업을 정의해두었는데, 가끔 "데이터가 없습니다"라는 오류가 발생했습니다.

원인을 찾아보니, "분석 보고서 생성" 작업이 "데이터 조회" 작업보다 먼저 실행되는 경우가 있었습니다. 순서가 보장되지 않았던 것입니다.

박시니어 씨가 설명을 시작했습니다. "작업 간에는 의존성이라는 게 있어.

A가 끝나야 B를 할 수 있다면, B는 A에 의존한다고 말하지." 의존성 그래프란 정확히 무엇일까요? 쉽게 비유하자면, 이것은 마치 대학교 수강 신청과 같습니다.

"알고리즘" 과목을 듣기 위해서는 먼저 "자료구조"를 이수해야 합니다. "자료구조"를 듣기 위해서는 "프로그래밍 기초"가 선행되어야 합니다.

이런 선수과목 관계를 그림으로 그리면 방향성 그래프가 됩니다. AI 에이전트에서도 마찬가지입니다.

"이메일 발송" 작업은 "이메일 내용 생성"이 완료되어야 실행 가능합니다. "이메일 내용 생성"은 "고객 정보 조회"와 "템플릿 로드"가 모두 끝나야 시작할 수 있습니다.

이런 관계를 화살표로 연결하면 의존성 그래프가 완성됩니다. 의존성 관리가 없던 시절에는 어땠을까요?

개발자가 수동으로 순서를 정해서 실행했습니다. 작업이 10개쯤 되면 그럭저럭 관리가 가능했지만, 100개가 넘어가면 실수가 발생하기 시작했습니다.

더 큰 문제는 작업이 추가되거나 변경될 때였습니다. 기존 순서를 전부 다시 검토해야 했습니다.

바로 이런 문제를 해결하기 위해 의존성 그래프가 필요합니다. 의존성을 명시적으로 정의하면, 위상 정렬(Topological Sort) 알고리즘으로 올바른 실행 순서를 자동으로 계산할 수 있습니다.

새로운 작업이 추가되어도 의존성만 정의하면 순서는 자동으로 조정됩니다. 위의 코드를 한 줄씩 살펴보겠습니다.

add_dependency 메서드는 의존성 관계를 추가합니다. in_degree는 각 작업이 가지는 선행 작업의 수를 저장합니다.

get_execution_order 메서드는 위상 정렬을 수행합니다. 선행 작업이 없는 것부터 시작해서, 하나씩 완료 처리하며 다음 작업으로 넘어갑니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 CI/CD 파이프라인을 생각해봅시다.

"테스트 실행"은 "빌드 완료"에 의존하고, "배포"는 "테스트 통과"에 의존합니다. 이 관계를 그래프로 정의하면, 파이프라인 도구가 자동으로 올바른 순서를 결정합니다.

하지만 주의할 점도 있습니다. 가장 치명적인 실수는 **순환 의존성(Circular Dependency)**입니다.

A가 B에 의존하고, B가 다시 A에 의존하면 영원히 실행할 수 없습니다. 위상 정렬에서 모든 작업이 결과에 포함되지 않으면 순환이 있다는 신호입니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 의존성 그래프를 도입하니 작업 순서 문제가 완전히 해결되었습니다.

"이제 어떤 순서로 실행해야 하는지 시스템이 알아서 결정해주네요!"

실전 팁

💡 - 의존성을 추가할 때마다 순환 여부를 체크하는 로직을 넣으세요

  • 가능하면 의존성을 최소화하여 병렬 실행 가능성을 높이세요

4. 병렬 실행 최적화

김개발 씨의 에이전트가 잘 동작하기 시작했지만, 한 가지 불만이 있었습니다. 너무 느렸습니다.

모든 작업을 하나씩 순서대로 실행하다 보니, 전체 완료까지 시간이 오래 걸렸습니다. 박시니어 씨가 물었습니다.

"혹시 동시에 실행해도 되는 작업들이 있지 않아?" 김개발 씨는 의존성 그래프를 다시 들여다보았습니다.

병렬 실행 최적화는 서로 의존성이 없는 작업들을 동시에 실행하여 전체 소요 시간을 줄이는 기법입니다. 마치 식당 주방에서 파스타 면을 삶는 동안 소스를 만들 수 있는 것처럼, 서로 영향을 주지 않는 작업은 동시에 진행할 수 있습니다.

의존성 그래프를 분석하면 병렬 실행 가능한 작업 그룹을 자동으로 찾아낼 수 있습니다.

다음 코드를 살펴봅시다.

import asyncio
from typing import List, Set, Dict

class ParallelExecutor:
    def __init__(self, dependency_graph: Dict[str, List[str]]):
        self.deps = dependency_graph
        self.completed: Set[str] = set()

    # 현재 실행 가능한 작업들 찾기
    def get_ready_tasks(self, all_tasks: Set[str]) -> Set[str]:
        ready = set()
        for task in all_tasks:
            if task not in self.completed:
                dependencies = self.deps.get(task, [])
                # 모든 의존성이 완료되었으면 실행 가능
                if all(dep in self.completed for dep in dependencies):
                    ready.add(task)
        return ready

    # 병렬 실행
    async def execute_parallel(self, tasks: Set[str], executor_func):
        while tasks - self.completed:
            ready = self.get_ready_tasks(tasks)
            if not ready:
                break

            # 준비된 작업들을 동시에 실행
            results = await asyncio.gather(
                *[executor_func(task) for task in ready]
            )
            self.completed.update(ready)
            print(f"병렬 완료: {ready}")

김개발 씨가 만든 에이전트는 10개의 작업을 처리하는 데 총 100초가 걸렸습니다. 각 작업이 평균 10초씩 걸렸기 때문입니다.

그런데 가만히 보니, 처음 3개 작업은 서로 독립적이었습니다. 굳이 하나씩 기다릴 필요가 없었습니다.

박시니어 씨가 의존성 그래프를 가리키며 말했습니다. "여기 보면, 작업 1, 2, 3은 서로 화살표가 없지?

이건 동시에 실행해도 된다는 뜻이야." 병렬 실행이란 정확히 무엇일까요? 쉽게 비유하자면, 이것은 마치 여러 명이 함께 청소하는 것과 같습니다.

혼자서 거실 청소 → 주방 청소 → 화장실 청소를 순서대로 하면 3시간이 걸립니다. 하지만 세 사람이 각자 한 곳씩 맡으면 1시간 만에 끝납니다.

단, 누군가 먼저 물건을 치워야 다른 사람이 청소할 수 있는 경우에는 순서를 지켜야 합니다. AI 에이전트에서도 마찬가지입니다.

"사용자 정보 조회"와 "상품 정보 조회"는 서로 관련이 없으니 동시에 실행 가능합니다. 하지만 "주문 생성"은 두 조회가 모두 끝나야 시작할 수 있습니다.

이런 패턴을 Fork-Join이라고 부릅니다. 여러 작업이 갈라졌다가(Fork), 나중에 합쳐지는(Join) 구조입니다.

병렬 실행을 하지 않던 시절에는 어땠을까요? 모든 것이 순차적으로 실행되었습니다.

CPU는 대부분의 시간을 유휴 상태로 보냈습니다. 네트워크 응답을 기다리는 동안에도 다른 작업을 할 수 있었는데, 그냥 기다리기만 했습니다.

바로 이런 비효율을 해결하기 위해 병렬 실행 최적화가 필요합니다. 의존성 그래프를 분석하면 어떤 작업들이 동시에 실행 가능한지 알 수 있습니다.

Python의 asyncio나 JavaScript의 Promise.all을 사용하면 이런 병렬 실행을 쉽게 구현할 수 있습니다. 위의 코드를 한 줄씩 살펴보겠습니다.

get_ready_tasks 메서드는 현재 시점에서 실행 가능한 작업들을 찾습니다. 모든 의존성이 completed 집합에 있으면 해당 작업은 준비된 것입니다.

execute_parallel 메서드는 asyncio.gather를 사용해 준비된 작업들을 동시에 실행합니다. 한 배치가 완료되면 다음 배치의 준비 작업을 찾아 다시 실행합니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 대시보드 데이터를 조회하는 에이전트를 생각해봅시다.

"매출 데이터", "사용자 통계", "재고 현황"을 각각 다른 API에서 가져와야 합니다. 이 세 요청을 순차적으로 보내면 3초가 걸리지만, 병렬로 보내면 가장 느린 요청 하나만큼인 1.2초면 충분합니다.

하지만 주의할 점도 있습니다. 병렬 실행의 가장 흔한 문제는 자원 경합입니다.

100개의 작업을 동시에 실행하면 서버가 과부하 걸릴 수 있습니다. 따라서 **동시 실행 개수 제한(Concurrency Limit)**을 설정하는 것이 좋습니다.

asyncio.Semaphore를 사용하면 최대 동시 실행 수를 제어할 수 있습니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

병렬 실행을 도입하니 전체 실행 시간이 100초에서 40초로 줄어들었습니다. "60%나 빨라졌네요!" 의존성을 제대로 분석하면 성능 최적화의 기회를 찾을 수 있습니다.

실전 팁

💡 - 동시 실행 개수는 시스템 자원과 외부 API의 Rate Limit을 고려해서 설정하세요

  • 병렬 실행 중 하나가 실패해도 나머지는 계속 진행되도록 예외 처리를 해두세요

5. 우선순위 결정 알고리즘

김개발 씨의 에이전트가 점점 복잡해지면서 새로운 고민이 생겼습니다. 동시에 실행 가능한 작업이 10개인데, 시스템 자원은 3개만 동시에 처리할 수 있었습니다.

어떤 3개를 먼저 실행해야 할까요? "아무거나 골라도 되지 않나요?" 박시니어 씨가 고개를 저었습니다.

"뭘 먼저 하느냐에 따라 전체 완료 시간이 달라져."

우선순위 결정 알고리즘은 여러 작업 중 어떤 것을 먼저 실행할지 결정하는 로직입니다. 마치 응급실에서 환자의 위급 정도에 따라 치료 순서를 정하는 것과 같습니다.

임계 경로에 있는 작업, 다른 작업들이 많이 기다리는 작업, 긴급한 작업 등 여러 기준을 종합하여 최적의 순서를 계산합니다.

다음 코드를 살펴봅시다.

from dataclasses import dataclass
from typing import List, Dict
import heapq

@dataclass
class PrioritizedTask:
    priority: float
    task_id: str

    def __lt__(self, other):
        return self.priority > other.priority  # 높은 우선순위가 먼저

class PriorityScheduler:
    def __init__(self):
        self.task_queue = []

    # 우선순위 계산: 여러 요소를 종합
    def calculate_priority(self, task: Dict) -> float:
        # 가중치 기반 점수 계산
        urgency = task.get('urgency', 0) * 3.0        # 긴급도
        blocking = task.get('blocking_count', 0) * 2.0  # 대기 중인 작업 수
        deadline = task.get('deadline_pressure', 0) * 2.5  # 마감 압박
        cost = task.get('estimated_cost', 1) * -0.5   # 비용 (낮을수록 좋음)

        return urgency + blocking + deadline + cost

    # 작업 추가
    def add_task(self, task: Dict):
        priority = self.calculate_priority(task)
        heapq.heappush(
            self.task_queue,
            PrioritizedTask(priority, task['id'])
        )

    # 다음 실행할 작업 가져오기
    def get_next_task(self) -> str:
        if self.task_queue:
            return heapq.heappop(self.task_queue).task_id
        return None

김개발 씨는 에이전트가 처리해야 할 작업이 동시에 여러 개 쌓이는 상황을 마주했습니다. API 호출은 분당 100회로 제한되어 있었고, 동시에 200개의 작업이 대기 중이었습니다.

아무 순서로나 처리하면 사용자가 오래 기다리거나, 중요한 작업이 밀리는 문제가 발생했습니다. 박시니어 씨가 병원 응급실을 예로 들었습니다.

"응급실에서 순서대로 환자를 보지 않잖아. 위급한 환자를 먼저 보지." 우선순위 결정 알고리즘이란 정확히 무엇일까요?

쉽게 비유하자면, 이것은 마치 바쁜 아침에 할 일 목록을 정리하는 것과 같습니다. 출근 버스 시간은 정해져 있고, 아침밥도 먹어야 하고, 샤워도 해야 합니다.

버스를 놓치면 큰일이니 출발 시간은 양보할 수 없고, 나머지 일들은 중요도에 따라 순서를 정합니다. AI 에이전트에서는 이런 판단을 자동으로 해야 합니다.

여러 기준을 종합해서 점수를 매기고, 점수가 높은 작업부터 처리합니다. 일반적으로 사용되는 기준들이 있습니다.

**긴급도(Urgency)**는 마감이 임박한 정도입니다. **블로킹 영향(Blocking Impact)**는 이 작업이 완료되어야 시작할 수 있는 다른 작업의 수입니다.

**비용 효율(Cost Efficiency)**은 투입 대비 효과입니다. 우선순위 관리가 없던 시절에는 어땠을까요?

FIFO(First-In-First-Out) 방식으로 먼저 들어온 작업을 먼저 처리했습니다. 하지만 이 방식은 작업의 특성을 전혀 고려하지 않습니다.

10분 걸리는 중요하지 않은 작업 뒤에 1초면 끝나는 긴급 작업이 기다리는 상황이 발생합니다. 바로 이런 문제를 해결하기 위해 우선순위 스케줄링이 필요합니다.

작업마다 점수를 계산하고, **우선순위 큐(Priority Queue)**를 사용하면 항상 가장 중요한 작업을 먼저 처리할 수 있습니다. Python의 heapq 모듈이 이를 효율적으로 지원합니다.

위의 코드를 한 줄씩 살펴보겠습니다. calculate_priority 메서드는 여러 요소에 가중치를 곱해서 최종 점수를 계산합니다.

긴급도에 3.0을 곱한 이유는 긴급한 작업을 더 우선시하기 위해서입니다. 비용에 음수 가중치를 준 이유는 비용이 낮을수록 좋기 때문입니다.

PrioritizedTask__lt__ 메서드는 높은 우선순위가 먼저 오도록 비교 로직을 정의합니다. 실제 현업에서는 어떻게 활용할까요?

예를 들어 메시지 처리 시스템을 생각해봅시다. 결제 관련 메시지는 최우선으로 처리해야 합니다.

알림 메시지는 약간 늦어도 괜찮습니다. 통계 집계 메시지는 한가할 때 처리해도 됩니다.

이런 우선순위 정책을 코드로 표현하면, 시스템 부하가 높을 때도 중요한 작업은 빠르게 처리됩니다. 하지만 주의할 점도 있습니다.

가장 큰 문제는 **기아 상태(Starvation)**입니다. 우선순위가 낮은 작업이 계속 밀려서 영원히 실행되지 않을 수 있습니다.

이를 방지하기 위해 에이징(Aging) 기법을 사용합니다. 대기 시간이 길어질수록 우선순위를 조금씩 올려주는 것입니다.

다시 김개발 씨의 이야기로 돌아가 봅시다. 우선순위 알고리즘을 도입하니 사용자 체감 응답 시간이 크게 개선되었습니다.

"긴급한 요청은 바로 처리되고, 덜 급한 건 나중에 처리되네요!"

실전 팁

💡 - 가중치 값은 실제 운영 데이터를 분석해서 조정하세요

  • 에이징을 적용하여 낮은 우선순위 작업도 언젠가는 실행되도록 보장하세요

6. 동적 재계획 패턴

김개발 씨의 에이전트가 순조롭게 작업을 처리하던 중, 갑자기 외부 API가 응답하지 않았습니다. 원래 계획대로라면 다음 작업도 줄줄이 실패해야 했습니다.

하지만 똑똑한 에이전트라면 "계획을 수정해서 다른 방법을 시도"할 수 있어야 합니다. 박시니어 씨가 말했습니다.

"계획은 고정된 게 아니야. 상황에 따라 바뀔 수 있어야 해."

**동적 재계획(Dynamic Replanning)**은 실행 중 예상치 못한 상황이 발생했을 때 계획을 수정하는 능력입니다. 마치 네비게이션이 교통 체증을 감지하면 우회 경로를 안내하는 것과 같습니다.

작업 실패, 새로운 정보 발견, 우선순위 변경 등의 상황에서 에이전트는 남은 계획을 재평가하고 최적의 경로를 다시 계산합니다.

다음 코드를 살펴봅시다.

from enum import Enum
from typing import Dict, List, Optional, Callable

class ReplanTrigger(Enum):
    TASK_FAILED = "task_failed"
    NEW_INFO = "new_information"
    PRIORITY_CHANGE = "priority_change"
    TIMEOUT = "timeout"

class DynamicPlanner:
    def __init__(self, llm_client):
        self.llm = llm_client
        self.current_plan: List[Dict] = []
        self.execution_context: Dict = {}

    # 재계획이 필요한지 판단
    def should_replan(self, trigger: ReplanTrigger, context: Dict) -> bool:
        # 실패한 작업이 다른 작업에 영향을 주는지 확인
        if trigger == ReplanTrigger.TASK_FAILED:
            failed_task = context.get('failed_task')
            dependent_tasks = self._get_dependents(failed_task)
            return len(dependent_tasks) > 0
        return True

    # 새로운 계획 생성
    async def replan(self, trigger: ReplanTrigger, context: Dict) -> List[Dict]:
        prompt = f"""
        현재 상황: {trigger.value}
        실패/변경 내용: {context}
        남은 작업: {self._get_remaining_tasks()}

        위 상황을 고려하여 새로운 실행 계획을 제안하세요.
        대안이 있다면 대안을 포함하고, 없다면 작업을 건너뛰세요.
        """
        new_plan = await self.llm.generate(prompt)
        self.current_plan = self._parse_plan(new_plan)
        return self.current_plan

    def _get_dependents(self, task_id: str) -> List[str]:
        # 해당 작업에 의존하는 작업 목록 반환
        return [t['id'] for t in self.current_plan
                if task_id in t.get('dependencies', [])]

김개발 씨가 만든 에이전트는 잘 동작했지만, 한 가지 취약점이 있었습니다. 중간에 문제가 생기면 전체가 멈춰버렸습니다.

외부 API가 느려지거나, 예상치 못한 데이터가 들어오거나, 사용자가 요청을 변경하는 상황에 대응하지 못했습니다. 박시니어 씨가 네비게이션 앱을 켜서 보여주었습니다.

"운전하다가 사고로 길이 막히면 어떻게 해? 네비가 알아서 다른 길을 찾아주잖아." 동적 재계획이란 정확히 무엇일까요?

쉽게 비유하자면, 이것은 마치 유능한 여행 가이드와 같습니다. 예정된 관광지가 휴무일이면, 가이드는 즉석에서 대안을 찾아 일정을 조정합니다.

비가 오면 야외 활동 대신 실내 활동으로 바꿉니다. 핵심은 목표는 유지하면서 방법을 유연하게 바꾸는 것입니다.

AI 에이전트에서도 마찬가지입니다. "데이터베이스에서 정보를 가져온다"가 실패하면, "캐시에서 이전 데이터를 사용한다"거나 "API로 대체 데이터를 요청한다"는 대안을 시도할 수 있습니다.

중요한 것은 최종 목표 달성이지, 특정 방법을 고수하는 것이 아닙니다. 재계획 기능이 없던 시절에는 어땠을까요?

계획은 한 번 세우면 끝이었습니다. 중간에 문제가 생기면 전체 프로세스가 실패하고, 처음부터 다시 시작해야 했습니다.

사용자는 오류 메시지만 받고, 에이전트는 아무런 대안을 제시하지 못했습니다. 바로 이런 문제를 해결하기 위해 동적 재계획이 필요합니다.

재계획이 발동되는 상황은 여러 가지입니다. 작업 실패 시 대안 경로를 찾습니다.

새로운 정보 발견 시 더 효율적인 방법으로 전환합니다. 우선순위 변경 시 남은 작업의 순서를 재조정합니다.

시간 초과 시 일부 작업을 건너뛰거나 간소화합니다. 위의 코드를 한 줄씩 살펴보겠습니다.

ReplanTrigger 열거형은 재계획을 발동시키는 상황들을 정의합니다. should_replan 메서드는 정말 재계획이 필요한지 판단합니다.

모든 실패가 재계획을 필요로 하지는 않습니다. 독립적인 작업이 실패했다면 그냥 건너뛰면 됩니다.

replan 메서드는 LLM을 활용해 새로운 계획을 생성합니다. 현재 상황과 남은 작업을 고려해서 최적의 대안을 찾습니다.

실제 현업에서는 어떻게 활용할까요? 예를 들어 주문 처리 에이전트를 생각해봅시다.

결제 시스템이 일시적으로 다운되면, 에이전트는 "결제 대기열에 저장 → 시스템 복구 시 재시도"라는 대안 계획으로 전환합니다. 재고가 부족하면 "대체 상품 추천 → 예약 주문 안내"로 계획을 수정합니다.

하지만 주의할 점도 있습니다. 재계획이 너무 자주 발생하면 성능에 영향을 줍니다.

LLM 호출은 비용과 시간이 들기 때문입니다. 따라서 재계획 임계값을 설정하는 것이 좋습니다.

사소한 변화에는 기존 계획을 유지하고, 중대한 변화에만 재계획을 수행합니다. 또한 무한 재계획 루프를 방지해야 합니다.

재계획 횟수에 상한을 두거나, 같은 실패가 반복되면 포기하는 로직이 필요합니다. 다시 김개발 씨의 이야기로 돌아가 봅시다.

동적 재계획 기능을 추가하니, 에이전트가 훨씬 강인해졌습니다. "이제 문제가 생겨도 알아서 대안을 찾네요.

마치 진짜 사람처럼!"

실전 팁

💡 - 재계획 전에 간단한 재시도(Retry)를 먼저 시도하세요. 일시적 오류는 재시도만으로 해결되기도 합니다

  • 재계획 이력을 로깅해두면 나중에 에이전트 개선에 활용할 수 있습니다

이상으로 학습을 마칩니다. 위 내용을 직접 코드로 작성해보면서 익혀보세요!

#AI#TaskDecomposition#Planning#Agent#Workflow#AI,Agent,Planning

댓글 (0)

댓글을 작성하려면 로그인이 필요합니다.

함께 보면 좋은 카드 뉴스