포스트

[MCP&A2A] 12. 워크플로우 오케스트레이션

[MCP&A2A] 12. 워크플로우 오케스트레이션

LangGraph 기반 워크플로우

복잡한 AI 태스크는 여러 단계로 나누어 순차적 또는 병렬로 실행해야 합니다. LangGraph는 상태 기반 그래프로 이러한 멀티스텝 워크플로우를 효과적으로 관리합니다.

왜 LangGraph인가?

특성LangGraphLangChain순수 Python
상태 관리✅ 내장❌ 수동❌ 수동
분기/조건✅ 간단⚠️ 복잡⚠️ 복잡
시각화✅ 자동❌ 없음❌ 없음
디버깅✅ 쉬움⚠️ 어려움⚠️ 어려움
재시도✅ 내장❌ 수동❌ 수동
체크포인트✅ 지원❌ 없음❌ 없음

LangGraph 핵심 개념

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
LangGraph = State + Nodes + Edges

State (상태):
- TypedDict로 정의
- 노드 간 공유되는 데이터
- 불변성 유지

Node (노드):
- 개별 작업 단위
- State를 받아서 수정된 State 반환
- 함수로 구현

Edge (엣지):
- 노드 간 연결
- 조건부 분기 가능
- START → 노드1 → 노드2 → END

기본 워크플로우 구조

State 정의

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# orchestration/workflows/base.py
from typing import TypedDict, List, Optional, Annotated
from langgraph.graph import add_messages

class WorkflowState(TypedDict):
    """워크플로우 상태"""
    # 입력
    query: str
    
    # 중간 데이터
    documents: List[dict]
    analysis: Optional[dict]
    
    # 출력
    result: Optional[str]
    
    # 메타데이터
    step: str
    errors: List[str]
    
    # 메시지 히스토리 (자동 누적)
    messages: Annotated[List[dict], add_messages]

add_messages의 의미:

  • 메시지를 자동으로 누적
  • 덮어쓰지 않고 append
  • 대화 히스토리 관리에 유용

Node 정의

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from langgraph.graph import StateGraph

def search_node(state: WorkflowState) -> WorkflowState:
    """문서 검색 노드"""
    query = state["query"]
    
    # MCP 도구 호출
    results = mcp_client.hybrid_search(
        query=query,
        limit=10
    )
    
    # 상태 업데이트
    state["documents"] = results
    state["step"] = "search_complete"
    
    return state

def analyze_node(state: WorkflowState) -> WorkflowState:
    """문서 분석 노드"""
    documents = state["documents"]
    
    # LLM으로 분석
    analysis = llm.invoke({
        "documents": documents,
        "task": "분석 및 요약"
    })
    
    state["analysis"] = analysis
    state["step"] = "analysis_complete"
    
    return state

def generate_node(state: WorkflowState) -> WorkflowState:
    """결과 생성 노드"""
    analysis = state["analysis"]
    
    # 최종 결과 생성
    result = llm.invoke({
        "analysis": analysis,
        "task": "보고서 생성"
    })
    
    state["result"] = result
    state["step"] = "completed"
    
    return state

Graph 구성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from langgraph.graph import START, END

# 그래프 생성
workflow = StateGraph(WorkflowState)

# 노드 추가
workflow.add_node("search", search_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("generate", generate_node)

# 엣지 추가 (순서 정의)
workflow.add_edge(START, "search")
workflow.add_edge("search", "analyze")
workflow.add_edge("analyze", "generate")
workflow.add_edge("generate", END)

# 컴파일
app = workflow.compile()

실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 초기 상태
initial_state = {
    "query": "AI 에이전트 프로토콜 비교",
    "documents": [],
    "analysis": None,
    "result": None,
    "step": "started",
    "errors": [],
    "messages": []
}

# 실행
result = app.invoke(initial_state)

print(result["result"])
print(f"Steps: {result['step']}")

조건부 분기

조건 함수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def should_retry(state: WorkflowState) -> str:
    """재시도 여부 결정"""
    if state.get("errors"):
        retry_count = state.get("retry_count", 0)
        if retry_count < 3:
            return "retry"
    return "continue"

def route_by_quality(state: WorkflowState) -> str:
    """품질에 따라 라우팅"""
    quality_score = state.get("quality_score", 0)
    
    if quality_score >= 0.8:
        return "high_quality"
    elif quality_score >= 0.5:
        return "medium_quality"
    else:
        return "low_quality"

조건부 엣지

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langgraph.graph import StateGraph

workflow = StateGraph(WorkflowState)

workflow.add_node("search", search_node)
workflow.add_node("verify", verify_node)
workflow.add_node("retry_search", retry_search_node)
workflow.add_node("generate", generate_node)

# 일반 엣지
workflow.add_edge(START, "search")

# 조건부 엣지
workflow.add_conditional_edges(
    "verify",
    should_retry,
    {
        "retry": "retry_search",
        "continue": "generate"
    }
)

workflow.add_edge("retry_search", "verify")
workflow.add_edge("generate", END)

app = workflow.compile()

실전 워크플로우 예제

1. 종합 리서치 워크플로우

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# orchestration/workflows/research_workflow.py
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, START, END
import json

class ResearchState(TypedDict):
    query: str
    search_results: List[dict]
    ranked_results: List[dict]
    key_findings: List[str]
    report: Optional[str]
    confidence: float
    sources: List[str]

class ResearchWorkflow:
    def __init__(self, mcp_client, llm):
        self.mcp_client = mcp_client
        self.llm = llm
        self.workflow = self.build_workflow()
    
    def build_workflow(self):
        workflow = StateGraph(ResearchState)
        
        # 노드 추가
        workflow.add_node("search", self.search_documents)
        workflow.add_node("rank", self.rank_results)
        workflow.add_node("extract", self.extract_findings)
        workflow.add_node("synthesize", self.synthesize_report)
        workflow.add_node("verify", self.verify_sources)
        
        # 엣지 구성
        workflow.add_edge(START, "search")
        workflow.add_edge("search", "rank")
        workflow.add_edge("rank", "extract")
        workflow.add_edge("extract", "synthesize")
        workflow.add_edge("synthesize", "verify")
        
        # 조건부 엣지: 신뢰도에 따라 재검색 또는 완료
        workflow.add_conditional_edges(
            "verify",
            self.check_confidence,
            {
                "retry": "search",
                "complete": END
            }
        )
        
        return workflow.compile()
    
    def search_documents(self, state: ResearchState) -> ResearchState:
        """1단계: 문서 검색"""
        print(f"🔍 검색 중: {state['query']}")
        
        results = self.mcp_client.hybrid_search(
            query=state["query"],
            limit=20,
            bm25_weight=0.5,
            vector_weight=0.5
        )
        
        state["search_results"] = results
        print(f"{len(results)}개 문서 발견")
        
        return state
    
    def rank_results(self, state: ResearchState) -> ResearchState:
        """2단계: 결과 순위 재조정"""
        print("📊 결과 순위 조정 중...")
        
        results = state["search_results"]
        query = state["query"]
        
        # LLM으로 관련성 재평가
        prompt = f"""
        Query: {query}
        
        다음 문서들의 관련성을 0-1 점수로 평가하세요:
        {json.dumps([r["title"] for r in results[:10]], ensure_ascii=False)}
        
        JSON 배열로 반환: [0.9, 0.8, ...]
        """
        
        scores = self.llm.invoke(prompt)
        # scores 파싱 (간소화)
        
        # 점수로 재정렬
        ranked = sorted(
            zip(results, scores),
            key=lambda x: x[1],
            reverse=True
        )
        
        state["ranked_results"] = [r[0] for r in ranked[:10]]
        print(f"✅ 상위 10개 선정 완료")
        
        return state
    
    def extract_findings(self, state: ResearchState) -> ResearchState:
        """3단계: 핵심 발견 추출"""
        print("💡 핵심 발견 추출 중...")
        
        documents = state["ranked_results"]
        
        # 각 문서에서 핵심 내용 추출
        findings = []
        sources = []
        
        for doc in documents[:5]:  # 상위 5개만
            prompt = f"""
            다음 문서에서 '{state["query"]}'와 관련된 핵심 발견을 1-2문장으로 추출:
            
            {doc["content"][:500]}
            """
            
            finding = self.llm.invoke(prompt)
            findings.append(finding)
            sources.append(doc["title"])
        
        state["key_findings"] = findings
        state["sources"] = sources
        print(f"{len(findings)}개 핵심 발견 추출")
        
        return state
    
    def synthesize_report(self, state: ResearchState) -> ResearchState:
        """4단계: 보고서 생성"""
        print("📝 보고서 생성 중...")
        
        findings = state["key_findings"]
        query = state["query"]
        
        prompt = f"""
        질문: {query}
        
        핵심 발견:
        {chr(10).join([f"{i+1}. {f}" for i, f in enumerate(findings)])}
        
        위 발견들을 바탕으로 종합 보고서를 작성하세요:
        - 명확한 구조 (서론, 본론, 결론)
        - 발견 간 연결
        - 객관적 톤
        """
        
        report = self.llm.invoke(prompt)
        
        state["report"] = report
        print("✅ 보고서 생성 완료")
        
        return state
    
    def verify_sources(self, state: ResearchState) -> ResearchState:
        """5단계: 출처 검증"""
        print("✔️ 출처 검증 중...")
        
        report = state["report"]
        sources = state["sources"]
        
        # 간단한 신뢰도 계산
        # 실제로는 더 복잡한 검증 로직
        confidence = min(1.0, len(sources) / 5.0)
        
        state["confidence"] = confidence
        print(f"✅ 신뢰도: {confidence:.2f}")
        
        return state
    
    def check_confidence(self, state: ResearchState) -> str:
        """신뢰도 확인"""
        if state["confidence"] < 0.6:
            print("⚠️ 신뢰도 낮음 - 재검색")
            return "retry"
        else:
            print("✅ 신뢰도 충분 - 완료")
            return "complete"
    
    def run(self, query: str) -> dict:
        """워크플로우 실행"""
        initial_state = {
            "query": query,
            "search_results": [],
            "ranked_results": [],
            "key_findings": [],
            "report": None,
            "confidence": 0.0,
            "sources": []
        }
        
        result = self.workflow.invoke(initial_state)
        return result

# 사용 예제
if __name__ == "__main__":
    from mcp_client import MCPClient
    from llm_client import LLMClient
    
    mcp = MCPClient("http://localhost:8080")
    llm = LLMClient("ollama", model="llama3.2")
    
    workflow = ResearchWorkflow(mcp, llm)
    
    result = workflow.run("MCP와 A2A 프로토콜의 차이점")
    
    print("\n" + "="*50)
    print("📊 최종 보고서")
    print("="*50)
    print(result["report"])
    print(f"\n신뢰도: {result['confidence']:.2f}")
    print(f"출처 수: {len(result['sources'])}")

2. 코드 리뷰 워크플로우

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# orchestration/workflows/code_review_workflow.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END

class CodeReviewState(TypedDict):
    code: str
    language: str
    issues: List[dict]
    suggestions: List[dict]
    refactored_code: Optional[str]
    test_coverage: float

class CodeReviewWorkflow:
    def __init__(self, llm):
        self.llm = llm
        self.workflow = self.build_workflow()
    
    def build_workflow(self):
        workflow = StateGraph(CodeReviewState)
        
        workflow.add_node("analyze", self.analyze_code)
        workflow.add_node("security", self.check_security)
        workflow.add_node("performance", self.check_performance)
        workflow.add_node("style", self.check_style)
        workflow.add_node("suggest", self.generate_suggestions)
        workflow.add_node("refactor", self.refactor_code)
        
        # 병렬 검사
        workflow.add_edge(START, "analyze")
        workflow.add_edge("analyze", "security")
        workflow.add_edge("analyze", "performance")
        workflow.add_edge("analyze", "style")
        
        # 모두 완료 후 제안
        workflow.add_edge("security", "suggest")
        workflow.add_edge("performance", "suggest")
        workflow.add_edge("style", "suggest")
        
        # 리팩토링
        workflow.add_conditional_edges(
            "suggest",
            lambda s: "refactor" if len(s["issues"]) > 0 else "skip",
            {
                "refactor": "refactor",
                "skip": END
            }
        )
        
        workflow.add_edge("refactor", END)
        
        return workflow.compile()
    
    def analyze_code(self, state: CodeReviewState) -> CodeReviewState:
        """코드 구조 분석"""
        print("🔍 코드 분석 중...")
        
        # 코드 복잡도, 라인 수 등 분석
        # ...
        
        return state
    
    def check_security(self, state: CodeReviewState) -> CodeReviewState:
        """보안 검사"""
        print("🔒 보안 검사 중...")
        
        prompt = f"""
        다음 {state["language"]} 코드의 보안 취약점을 찾으세요:
        
        {state["code"]}
        
        SQL 인젝션, XSS, 하드코딩된 비밀번호 등을 확인하세요.
        """
        
        issues = self.llm.invoke(prompt)
        
        if "issues" not in state:
            state["issues"] = []
        state["issues"].extend(self.parse_issues(issues, "security"))
        
        return state
    
    def check_performance(self, state: CodeReviewState) -> CodeReviewState:
        """성능 검사"""
        print("⚡ 성능 검사 중...")
        
        # 성능 이슈 탐지
        # ...
        
        return state
    
    def check_style(self, state: CodeReviewState) -> CodeReviewState:
        """스타일 검사"""
        print("🎨 스타일 검사 중...")
        
        # 코딩 스타일 검사
        # ...
        
        return state
    
    def generate_suggestions(self, state: CodeReviewState) -> CodeReviewState:
        """개선 제안 생성"""
        print("💡 개선 제안 생성 중...")
        
        issues = state["issues"]
        
        suggestions = []
        for issue in issues:
            suggestion = f"Fix {issue['type']}: {issue['description']}"
            suggestions.append({
                "issue": issue,
                "suggestion": suggestion
            })
        
        state["suggestions"] = suggestions
        
        return state
    
    def refactor_code(self, state: CodeReviewState) -> CodeReviewState:
        """코드 리팩토링"""
        print("🔧 코드 리팩토링 중...")
        
        suggestions = state["suggestions"]
        original_code = state["code"]
        
        prompt = f"""
        다음 코드를 개선 제안에 따라 리팩토링하세요:
        
        원본 코드:
        {original_code}
        
        개선 제안:
        {json.dumps(suggestions, ensure_ascii=False)}
        
        리팩토링된 코드만 반환하세요.
        """
        
        refactored = self.llm.invoke(prompt)
        state["refactored_code"] = refactored
        
        return state
    
    def parse_issues(self, text: str, category: str) -> List[dict]:
        """이슈 파싱"""
        # 간소화된 파싱
        return [
            {
                "type": category,
                "severity": "high",
                "description": text
            }
        ]

3. 데이터 파이프라인 워크플로우

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# orchestration/workflows/data_pipeline_workflow.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
import pandas as pd

class DataPipelineState(TypedDict):
    source_url: str
    raw_data: Optional[pd.DataFrame]
    cleaned_data: Optional[pd.DataFrame]
    transformed_data: Optional[pd.DataFrame]
    validation_errors: List[str]
    output_path: Optional[str]

class DataPipelineWorkflow:
    def __init__(self):
        self.workflow = self.build_workflow()
    
    def build_workflow(self):
        workflow = StateGraph(DataPipelineState)
        
        workflow.add_node("extract", self.extract_data)
        workflow.add_node("clean", self.clean_data)
        workflow.add_node("transform", self.transform_data)
        workflow.add_node("validate", self.validate_data)
        workflow.add_node("load", self.load_data)
        
        workflow.add_edge(START, "extract")
        workflow.add_edge("extract", "clean")
        workflow.add_edge("clean", "transform")
        workflow.add_edge("transform", "validate")
        
        # 검증 결과에 따라 분기
        workflow.add_conditional_edges(
            "validate",
            lambda s: "load" if not s["validation_errors"] else "clean",
            {
                "load": "load",
                "clean": "clean"  # 재처리
            }
        )
        
        workflow.add_edge("load", END)
        
        return workflow.compile()
    
    def extract_data(self, state: DataPipelineState) -> DataPipelineState:
        """데이터 추출"""
        print(f"📥 데이터 추출 중: {state['source_url']}")
        
        # CSV, API 등에서 데이터 로드
        df = pd.read_csv(state["source_url"])
        
        state["raw_data"] = df
        print(f"{len(df)} 행 추출 완료")
        
        return state
    
    def clean_data(self, state: DataPipelineState) -> DataPipelineState:
        """데이터 정제"""
        print("🧹 데이터 정제 중...")
        
        df = state["raw_data"].copy()
        
        # 중복 제거
        df = df.drop_duplicates()
        
        # 결측치 처리
        df = df.fillna(method='ffill')
        
        # 이상치 제거
        # ...
        
        state["cleaned_data"] = df
        print(f"{len(df)} 행 정제 완료")
        
        return state
    
    def transform_data(self, state: DataPipelineState) -> DataPipelineState:
        """데이터 변환"""
        print("🔄 데이터 변환 중...")
        
        df = state["cleaned_data"].copy()
        
        # 컬럼 변환
        # 파생 변수 생성
        # 집계
        # ...
        
        state["transformed_data"] = df
        print("✅ 변환 완료")
        
        return state
    
    def validate_data(self, state: DataPipelineState) -> DataPipelineState:
        """데이터 검증"""
        print("✔️ 데이터 검증 중...")
        
        df = state["transformed_data"]
        errors = []
        
        # 스키마 검증
        required_columns = ["id", "value", "timestamp"]
        missing = set(required_columns) - set(df.columns)
        if missing:
            errors.append(f"Missing columns: {missing}")
        
        # 데이터 타입 검증
        # ...
        
        # 비즈니스 규칙 검증
        # ...
        
        state["validation_errors"] = errors
        
        if errors:
            print(f"❌ 검증 실패: {len(errors)} 에러")
        else:
            print("✅ 검증 통과")
        
        return state
    
    def load_data(self, state: DataPipelineState) -> DataPipelineState:
        """데이터 로드"""
        print("💾 데이터 저장 중...")
        
        df = state["transformed_data"]
        output_path = "/output/processed_data.csv"
        
        df.to_csv(output_path, index=False)
        
        state["output_path"] = output_path
        print(f"✅ 저장 완료: {output_path}")
        
        return state

진행 상황 추적

Progress Callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from typing import Callable

class ProgressTracker:
    def __init__(self, callback: Callable[[str, int, int], None]):
        self.callback = callback
        self.current_step = 0
        self.total_steps = 0
    
    def start(self, total_steps: int):
        self.total_steps = total_steps
        self.current_step = 0
    
    def step(self, message: str):
        self.current_step += 1
        self.callback(message, self.current_step, self.total_steps)
    
    def complete(self):
        self.callback("완료", self.total_steps, self.total_steps)

# 사용 예제
def progress_callback(message: str, current: int, total: int):
    progress = (current / total) * 100
    print(f"[{progress:.0f}%] {message}")

tracker = ProgressTracker(progress_callback)

# 워크플로우에 통합
class TrackedWorkflow:
    def __init__(self, tracker: ProgressTracker):
        self.tracker = tracker
    
    def search_node(self, state):
        self.tracker.step("문서 검색 중...")
        # 검색 로직
        return state
    
    def analyze_node(self, state):
        self.tracker.step("문서 분석 중...")
        # 분석 로직
        return state

에러 처리 및 재시도

Retry 메커니즘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientWorkflow:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10)
    )
    def search_with_retry(self, query: str):
        """재시도 가능한 검색"""
        try:
            results = self.mcp_client.hybrid_search(query=query)
            if not results:
                raise ValueError("No results found")
            return results
        except Exception as e:
            print(f"검색 실패, 재시도 중: {e}")
            raise
    
    def search_node(self, state: WorkflowState) -> WorkflowState:
        try:
            results = self.search_with_retry(state["query"])
            state["search_results"] = results
        except Exception as e:
            state["errors"].append(str(e))
            # fallback 로직
            state["search_results"] = []
        
        return state

Circuit Breaker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from pybreaker import CircuitBreaker

class WorkflowWithCircuitBreaker:
    def __init__(self):
        self.breaker = CircuitBreaker(
            fail_max=5,
            reset_timeout=60
        )
    
    def call_external_api(self, url: str):
        @self.breaker
        def _call():
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response.json()
        
        try:
            return _call()
        except CircuitBreakerError:
            print("Circuit breaker open - using cache")
            return self.get_cached_data()

체크포인트 및 복구

상태 저장

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from langgraph.checkpoint.sqlite import SqliteSaver

# 체크포인트 저장소
memory = SqliteSaver.from_conn_string(":memory:")

# 체크포인트 활성화
app = workflow.compile(checkpointer=memory)

# 실행 (자동 체크포인트)
config = {"configurable": {"thread_id": "task_123"}}
result = app.invoke(initial_state, config)

# 중단된 워크플로우 재개
resumed_result = app.invoke(None, config)  # 마지막 상태부터 재개

워크플로우 시각화

1
2
3
4
from IPython.display import Image, display

# Mermaid 다이어그램 생성
display(Image(app.get_graph().draw_mermaid_png()))

핵심 요약

LangGraph 장점

  • 상태 관리: TypedDict로 명확한 상태 정의
  • 분기/조건: 조건부 엣지로 복잡한 로직
  • 재시도: tenacity, circuit breaker 통합
  • 체크포인트: 중단/재개 가능
  • 시각화: 자동 그래프 생성
  • 디버깅: 각 노드별 상태 추적

실무 패턴

1
2
3
4
5
6
1. 상태 정의 (TypedDict)
2. 노드 구현 (함수)
3. 그래프 구성 (엣지)
4. 에러 처리 (retry, circuit breaker)
5. 진행 추적 (callback)
6. 체크포인트 (복구)

작성일: 2024-12-13

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.