포스트

[MCP&A2A] 05. 시스템 아키텍처

[MCP&A2A] 05. 시스템 아키텍처

전체 시스템 개요

MCP와 A2A를 결합한 프로덕션급 AI 에이전트 시스템의 전체 아키텍처를 살펴봅니다.

아키텍처 원칙

세 가지 핵심 원칙이 모든 설계 결정을 이끕니다:

1
2
3
4
5
6
7
8
9
10
11
1. 도구에 맞는 언어 선택
   - Go: 높은 처리량의 프로토콜 서버
   - Python: AI 워크플로우

2. 데이터베이스 레벨 보안
   - PostgreSQL Row-Level Security (RLS)로 멀티테넌시 강제
   - 애플리케이션 코드가 아닌 DB에서 격리

3. 완전한 무상태
   - 모든 서비스가 수평 확장 가능
   - 세션 없음, 공유 상태 없음, 단일 실패 지점 없음

시스템 컴포넌트 다이어그램

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
┌─────────────────────────────────────────────────────────┐
│                     사용자 인터페이스                      │
│  ┌──────────────────────────────────────────────────┐  │
│  │        Streamlit UI (Python)                     │  │
│  │  • 인증 (JWT 토큰 발급)                          │  │
│  │  • 문서 검색 인터페이스                          │  │
│  │  • A2A 태스크 생성 및 모니터링                   │  │
│  │  • 비용 추적 대시보드                            │  │
│  └──────────────────────────────────────────────────┘  │
└────────┬────────────────────────────┬───────────────────┘
         │                            │
         │ HTTP + JWT                 │ HTTP + SSE
         │                            │
┌────────▼────────────┐      ┌────────▼────────────┐
│   MCP 서버 (Go)     │      │   A2A 서버 (Go)     │
│ ┌─────────────────┐ │      │ ┌─────────────────┐ │
│ │ 도구:           │ │      │ │ 태스크 관리     │ │
│ │ • hybrid_search │ │      │ │ • 생성          │ │
│ │ • get_document  │ │      │ │ • 실행          │ │
│ │ • list_docs     │ │      │ │ • 스트리밍      │ │
│ └─────────────────┘ │      │ └─────────────────┘ │
│                     │      │                     │
│ ┌─────────────────┐ │      │ ┌─────────────────┐ │
│ │ 인증/권한       │ │      │ │ SSE 이벤트      │ │
│ │ • JWT 검증      │ │      │ └─────────────────┘ │
│ │ • RLS 컨텍스트  │ │      └──────┬──────────────┘
│ └─────────────────┘ │             │
└──────┬──────────────┘             │ MCP 호출
       │ SQL + RLS                  │
       │                            │
┌──────▼────────────────────────────▼──────┐
│        PostgreSQL 16 + pgvector          │
│  ┌────────────────────────────────────┐  │
│  │  documents 테이블                  │  │
│  │  • id, tenant_id, title, content  │  │
│  │  • embedding (vector(1536))       │  │
│  │  • metadata (jsonb)               │  │
│  │  • RLS 정책 활성화                │  │
│  └────────────────────────────────────┘  │
└───────────────────────────────────────────┘
              ▲
              │
┌─────────────▼─────────────┐
│  Python AI 워크플로우      │
│  ┌──────────────────────┐ │
│  │ LangGraph Workflows  │ │
│  │ • RAG 파이프라인     │ │
│  │ • 리서치 워크플로우  │ │
│  │ • 멀티스텝 에이전트  │ │
│  └──────────────────────┘ │
│                           │
│  MCP 클라이언트로         │
│  도구 호출                │
└───────────▲───────────────┘
            │
    ┌───────▼────────┐
    │ Ollama (LLM)   │
    │ • llama3.2     │
    │ • 임베딩 생성  │
    │ • 로컬 추론    │
    └────────────────┘

레이어별 책임

1. 프레젠테이션 레이어 (Streamlit UI)

책임:

  • 사용자 인증 및 JWT 토큰 발급
  • 검색 인터페이스 제공
  • A2A 태스크 생성 및 모니터링
  • 비용 추적 시각화
  • 실시간 이벤트 스트림 표시

기술 스택:

1
2
3
4
언어: Python 3.11+
프레임워크: Streamlit 1.28+
인증: PyJWT, cryptography
HTTP 클라이언트: requests, httpx

주요 기능:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# streamlit-ui/pages/1_🔐_Authentication.py
def generate_jwt_token(tenant_id: str, user_id: str) -> str:
    """RSA-256로 서명된 JWT 토큰 생성"""
    payload = {
        "tenant_id": tenant_id,
        "user_id": user_id,
        "iat": datetime.now(timezone.utc),
        "exp": datetime.now(timezone.utc) + timedelta(hours=1),
    }
    
    with open("/app/certs/private_key.pem", "rb") as f:
        private_key = load_pem_private_key(f.read(), password=None)
    
    return jwt.encode(payload, private_key, algorithm="RS256")

# streamlit-ui/pages/2_🔍_Search.py
def search_documents(query: str, limit: int = 10):
    """MCP 서버를 통한 하이브리드 검색"""
    response = requests.post(
        f"{MCP_SERVER_URL}/mcp",
        headers={"Authorization": f"Bearer {st.session_state.token}"},
        json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/call",
            "params": {
                "name": "hybrid_search",
                "arguments": {"query": query, "limit": limit}
            }
        }
    )
    return response.json()["result"]["content"][0]["text"]

2. 프로토콜 서버 레이어 (Go)

MCP 서버

책임:

  • JSON-RPC 2.0 프로토콜 처리
  • JWT 인증 및 테넌트 컨텍스트 설정
  • 도구 실행 (하이브리드 검색, 문서 조회)
  • 데이터베이스 액세스 (RLS 강제)

디렉토리 구조:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
mcp-server/
├── cmd/
│   └── server/
│       └── main.go                 # 엔트리 포인트
├── internal/
│   ├── auth/
│   │   └── jwt.go                  # JWT 검증
│   ├── database/
│   │   ├── postgres.go             # DB 연결 풀
│   │   ├── documents.go            # 문서 CRUD
│   │   └── hybrid_search.go        # 하이브리드 검색
│   ├── handlers/
│   │   └── mcp.go                  # MCP 요청 핸들러
│   ├── middleware/
│   │   ├── auth.go                 # 인증 미들웨어
│   │   ├── logging.go              # 로깅 미들웨어
│   │   └── ratelimit.go            # Rate limiting
│   ├── protocol/
│   │   └── types.go                # MCP 타입 정의
│   └── tools/
│       ├── tool.go                 # Tool 인터페이스
│       ├── hybrid_search.go        # 검색 도구
│       └── get_document.go         # 문서 조회 도구
└── go.mod

핵심 구현:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// mcp-server/internal/handlers/mcp.go
type MCPHandler struct {
    db           database.Store
    validator    *auth.JWTValidator
    toolRegistry map[string]tools.Tool
}

func (h *MCPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 1. JWT 인증
    claims, err := h.validator.ValidateToken(r.Header.Get("Authorization"))
    if err != nil {
        respondError(w, -32001, "Authentication required")
        return
    }
    
    // 2. 컨텍스트에 테넌트 정보 추가
    ctx := context.WithValue(r.Context(), "tenant_id", claims.TenantID)
    ctx = context.WithValue(ctx, "user_id", claims.UserID)
    
    // 3. JSON-RPC 요청 파싱
    var req protocol.Request
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        respondError(w, -32700, "Parse error")
        return
    }
    
    // 4. 메서드 라우팅
    switch req.Method {
    case "tools/call":
        h.handleToolsCall(ctx, w, &req)
    case "tools/list":
        h.handleToolsList(ctx, w, &req)
    default:
        respondError(w, -32601, "Method not found")
    }
}

A2A 서버

책임:

  • 태스크 생명주기 관리
  • Server-Sent Events (SSE) 스트리밍
  • LangGraph 워크플로우 오케스트레이션
  • 중간 결과(Artifact) 저장

디렉토리 구조:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
a2a-server/
├── cmd/
│   └── server/
│       └── main.go
├── internal/
│   ├── handlers/
│   │   ├── tasks.go                # 태스크 CRUD
│   │   └── events.go               # SSE 스트리밍
│   ├── models/
│   │   ├── task.go                 # 태스크 모델
│   │   └── artifact.go             # Artifact 모델
│   └── orchestrator/
│       └── workflow.go             # 워크플로우 관리
└── go.mod

3. 데이터 레이어 (PostgreSQL + pgvector)

책임:

  • 멀티테넌트 문서 저장
  • Row-Level Security로 격리 강제
  • 벡터 유사도 검색 (pgvector)
  • 전문 검색 (Full-Text Search)

스키마 설계:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-- 확장 기능 활성화
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- 문서 테이블
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID NOT NULL,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    embedding vector(1536),  -- OpenAI ada-002 차원
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    
    -- 인덱스
    CONSTRAINT documents_tenant_id_check CHECK (tenant_id IS NOT NULL)
);

-- Row-Level Security 활성화
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- RLS 정책: 사용자는 자신의 테넌트 데이터만 접근
CREATE POLICY tenant_isolation ON documents
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- 인덱스
CREATE INDEX idx_documents_tenant_id ON documents(tenant_id);
CREATE INDEX idx_documents_embedding ON documents 
    USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);  -- 벡터 검색 최적화
CREATE INDEX idx_documents_content_fts ON documents 
    USING gin(to_tsvector('english', content));  -- 전문 검색 최적화

RLS 동작 메커니즘:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// mcp-server/internal/database/postgres.go
func (db *DB) HybridSearch(ctx context.Context, tenantID string, params SearchParams) ([]Result, error) {
    // 트랜잭션 시작
    tx, err := db.pool.Begin(ctx)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback(ctx)
    
    // 중요: 테넌트 컨텍스트 설정
    // 이후 모든 쿼리는 자동으로 tenant_id로 필터링됨
    _, err = tx.Exec(ctx, 
        fmt.Sprintf("SET LOCAL app.current_tenant_id = '%s'", tenantID))
    if err != nil {
        return nil, err
    }
    
    // RLS 정책이 자동 적용됨
    // WHERE tenant_id = current_setting('app.current_tenant_id')::uuid
    // 가 모든 쿼리에 자동 추가됨
    rows, err := tx.Query(ctx, `
        SELECT id, title, content, embedding
        FROM documents
        WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $1)
        ORDER BY embedding <=> $2
        LIMIT $3
    `, params.Query, params.Embedding, params.Limit)
    
    // 결과 처리...
    tx.Commit(ctx)
    return results, nil
}

4. AI 워크플로우 레이어 (Python + LangGraph)

책임:

  • 멀티스텝 RAG 파이프라인
  • 복잡한 리서치 워크플로우
  • MCP 도구 조율
  • 중간 결과 관리

디렉토리 구조:

1
2
3
4
5
6
7
8
9
orchestration/
├── workflows/
│   ├── rag_workflow.py             # RAG 파이프라인
│   ├── research_workflow.py        # 리서치 워크플로우
│   └── base.py                     # 기본 워크플로우 클래스
├── clients/
│   ├── mcp_client.py               # MCP 클라이언트
│   └── a2a_client.py               # A2A 클라이언트
└── requirements.txt

LangGraph 워크플로우 예제:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# orchestration/workflows/rag_workflow.py
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

class RAGState(TypedDict):
    query: str
    documents: list
    ranked_docs: list
    answer: str
    verified: bool

class RAGWorkflow:
    def __init__(self, mcp_url: str):
        self.mcp_client = MCPClient(mcp_url)
        self.workflow = self.build_workflow()
    
    def build_workflow(self) -> StateGraph:
        workflow = StateGraph(RAGState)
        
        # 노드 정의
        workflow.add_node("search", self.search_documents)
        workflow.add_node("rank", self.rank_results)
        workflow.add_node("generate", self.generate_answer)
        workflow.add_node("verify", self.verify_sources)
        
        # 엣지 정의 (워크플로우 그래프)
        workflow.add_edge(START, "search")
        workflow.add_edge("search", "rank")
        workflow.add_edge("rank", "generate")
        workflow.add_edge("generate", "verify")
        workflow.add_edge("verify", END)
        
        return workflow.compile()
    
    def search_documents(self, state: RAGState) -> RAGState:
        """MCP 하이브리드 검색 도구 사용"""
        results = self.mcp_client.hybrid_search(
            query=state["query"],
            limit=10,
            bm25_weight=0.5,
            vector_weight=0.5
        )
        
        state["documents"] = results
        return state
    
    def rank_results(self, state: RAGState) -> RAGState:
        """결과 랭킹"""
        docs = sorted(
            state["documents"], 
            key=lambda x: x["score"], 
            reverse=True
        )[:5]
        
        state["ranked_docs"] = docs
        return state
    
    def generate_answer(self, state: RAGState) -> RAGState:
        """컨텍스트 기반 답변 생성"""
        context = "\n\n".join([
            f"Document: {doc['title']}\n{doc['content']}"
            for doc in state["ranked_docs"]
        ])
        
        prompt = f"""다음 문서를 바탕으로 질문에 답하세요.

컨텍스트:
{context}

질문: {state['query']}

답변:"""
        
        response = ollama.generate(
            model="llama3.2",
            prompt=prompt
        )
        
        state["answer"] = response["response"]
        return state

데이터 흐름

1. 검색 요청 플로우

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
사용자 → Streamlit UI
  ↓ (JWT 토큰 포함)
MCP 서버
  ↓ (JWT 검증)
Auth 미들웨어
  ↓ (테넌트 컨텍스트 추가)
HybridSearchTool
  ↓ (임베딩 생성)
Ollama
  ↓ (하이브리드 쿼리)
PostgreSQL + RLS
  ↓ (결과 반환)
MCP 서버
  ↓ (JSON 응답)
Streamlit UI
  ↓ (결과 표시)
사용자

2. A2A 태스크 플로우

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
사용자 → Streamlit UI
  ↓ (태스크 생성 요청)
A2A 서버
  ↓ (태스크 DB 저장)
PostgreSQL
  ↓ (워크플로우 시작)
LangGraph Workflow
  ↓ (MCP 도구 호출)
MCP 서버
  ↓ (SSE 이벤트 발행)
A2A 서버
  ↓ (실시간 스트림)
Streamlit UI
  ↓ (진행 상황 표시)
사용자

확장성 고려사항

수평 확장

모든 컴포넌트는 수평 확장 가능하도록 설계:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
┌─────────────┐
│ 로드밸런서  │
└──────┬──────┘
       │
   ┌───┴───────┬────────┐
   │           │        │
┌──▼──┐   ┌───▼─┐  ┌──▼──┐
│MCP-1│   │MCP-2│  │MCP-3│  ← 무상태 (세션 없음)
└──┬──┘   └───┬─┘  └──┬──┘
   │          │       │
   └──────┬───┴───────┘
          │
    ┌─────▼──────┐
    │ PostgreSQL │
    │  (Primary) │
    └─────┬──────┘
          │
    ┌─────▼──────┐
    │   Replicas │  ← 읽기 전용
    │  (읽기용)  │
    └────────────┘

캐싱 전략

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Redis를 통한 쿼리 결과 캐싱
type CacheLayer struct {
    redis *redis.Client
    ttl   time.Duration
}

func (c *CacheLayer) GetOrCompute(
    key string, 
    compute func() (interface{}, error),
) (interface{}, error) {
    // 캐시 확인
    cached, err := c.redis.Get(context.Background(), key).Result()
    if err == nil {
        var result interface{}
        json.Unmarshal([]byte(cached), &result)
        return result, nil
    }
    
    // 캐시 미스 - 계산
    result, err := compute()
    if err != nil {
        return nil, err
    }
    
    // 캐시 저장
    data, _ := json.Marshal(result)
    c.redis.Set(context.Background(), key, data, c.ttl)
    
    return result, nil
}

핵심 설계 결정

1. Go vs Python 분리

Go를 선택한 이유 (프로토콜 서버):

  • ✅ 높은 처리량 (5,000+ req/sec)
  • ✅ 낮은 메모리 사용량 (52MB 베이스라인)
  • ✅ 컴파일 타임 타입 안전성
  • ✅ 우수한 동시성 (goroutines)
  • ✅ 빠른 JSON 처리

Python을 선택한 이유 (AI 워크플로우):

  • ✅ 풍부한 AI 생태계 (LangChain, LangGraph)
  • ✅ 빠른 프로토타이핑
  • ✅ 임베딩 모델 통합 용이
  • ✅ 과학 컴퓨팅 라이브러리

2. PostgreSQL + pgvector

선택 이유:

  • ✅ ACID 트랜잭션
  • ✅ Row-Level Security (RLS)
  • ✅ 벡터 검색 (pgvector 확장)
  • ✅ 전문 검색 (Full-Text Search)
  • ✅ JSON 지원 (JSONB)
  • ✅ 성숙한 에코시스템

3. JWT 기반 인증

선택 이유:

  • ✅ 무상태 (수평 확장 용이)
  • ✅ 표준화 (RFC 7519)
  • ✅ 비대칭 암호화 (RS256)
  • ✅ 토큰 취소 가능 (jti 클레임)

핵심 요약

아키텍처 강점

  • 확장 가능: 모든 레이어가 수평 확장
  • 보안: RLS + JWT 다층 보안
  • 성능: Go 서버 + 캐싱
  • 유지보수: 관심사 분리

참고 자료:

  • 참조 구현: https://github.com/bhatti/mcp-a2a-go
  • PostgreSQL RLS: https://www.postgresql.org/docs/current/ddl-rowsecurity.html
  • pgvector: https://github.com/pgvector/pgvector

작성일: 2024년 12월 13일

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.