포스트

[MCP&A2A] 06. 보안 아키텍처

[MCP&A2A] 06. 보안 아키텍처

다층 보안 전략

엔터프라이즈 AI 시스템은 다음 레이어에서 보안이 필요합니다:

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────┐
│   1. 인증 (Authentication)      │  ← JWT 토큰
├─────────────────────────────────┤
│   2. 권한 (Authorization)       │  ← RBAC
├─────────────────────────────────┤
│   3. 데이터 격리 (Isolation)    │  ← RLS
├─────────────────────────────────┤
│   4. 네트워크 (Network)         │  ← TLS, 방화벽
├─────────────────────────────────┤
│   5. 감사 (Audit)               │  ← 로깅
└─────────────────────────────────┘

1. JWT 기반 인증

RS256 비대칭 암호화

왜 RS256인가?

  • ✅ 서버는 공개키만 보유 (토큰 위조 불가)
  • ✅ 키 로테이션 용이
  • ✅ 마이크로서비스 간 신뢰
  • ✅ 표준 (RFC 7519)

토큰 생성 (UI 서버)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# streamlit-ui/auth/jwt_generator.py
from datetime import datetime, timedelta, timezone
from cryptography.hazmat.primitives import serialization
import jwt

def generate_token(tenant_id: str, user_id: str, ttl: int = 3600) -> str:
    """RSA-256로 서명된 JWT 생성"""
    now = datetime.now(timezone.utc)
    
    payload = {
        # 표준 클레임
        "iss": "mcp-ui",           # Issuer
        "aud": "mcp-server",       # Audience
        "sub": user_id,            # Subject
        "iat": now,                # Issued At
        "exp": now + timedelta(seconds=ttl),  # Expiration
        "nbf": now,                # Not Before
        "jti": str(uuid.uuid4()),  # JWT ID (revocation용)
        
        # 커스텀 클레임
        "tenant_id": tenant_id,
        "user_id": user_id,
        "roles": ["user"],
    }
    
    # RSA 개인키로 서명
    with open("/app/certs/private_key.pem", "rb") as f:
        private_key = serialization.load_pem_private_key(
            f.read(), 
            password=None
        )
    
    return jwt.encode(payload, private_key, algorithm="RS256")

토큰 검증 (MCP 서버)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// mcp-server/internal/auth/jwt.go
package auth

import (
    "crypto/rsa"
    "fmt"
    "time"
    
    "github.com/golang-jwt/jwt/v5"
)

type Claims struct {
    TenantID string   `json:"tenant_id"`
    UserID   string   `json:"user_id"`
    Roles    []string `json:"roles"`
    jwt.RegisteredClaims
}

type JWTValidator struct {
    publicKey *rsa.PublicKey
}

func NewJWTValidator(publicKeyPath string) (*JWTValidator, error) {
    keyData, err := os.ReadFile(publicKeyPath)
    if err != nil {
        return nil, err
    }
    
    publicKey, err := jwt.ParseRSAPublicKeyFromPEM(keyData)
    if err != nil {
        return nil, err
    }
    
    return &JWTValidator{publicKey: publicKey}, nil
}

func (v *JWTValidator) ValidateToken(tokenString string) (*Claims, error) {
    token, err := jwt.ParseWithClaims(
        tokenString, 
        &Claims{}, 
        func(token *jwt.Token) (interface{}, error) {
            // RS256 알고리즘 확인
            if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
                return nil, fmt.Errorf("unexpected signing method")
            }
            return v.publicKey, nil
        },
    )
    
    if err != nil {
        return nil, err
    }
    
    claims, ok := token.Claims.(*Claims)
    if !ok || !token.Valid {
        return nil, fmt.Errorf("invalid token")
    }
    
    // Expiration 체크
    if claims.ExpiresAt.Before(time.Now()) {
        return nil, fmt.Errorf("token expired")
    }
    
    // Audience 체크
    if !claims.VerifyAudience("mcp-server", true) {
        return nil, fmt.Errorf("invalid audience")
    }
    
    return claims, nil
}

인증 미들웨어

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// mcp-server/internal/middleware/auth.go
func AuthMiddleware(validator *auth.JWTValidator) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            // Authorization 헤더 추출
            authHeader := r.Header.Get("Authorization")
            if authHeader == "" {
                http.Error(w, "Missing authorization", 401)
                return
            }
            
            // Bearer 토큰 파싱
            tokenString := strings.TrimPrefix(authHeader, "Bearer ")
            
            // 토큰 검증
            claims, err := validator.ValidateToken(tokenString)
            if err != nil {
                log.Printf("Token validation failed: %v", err)
                http.Error(w, "Invalid token", 401)
                return
            }
            
            // 컨텍스트에 클레임 추가
            ctx := context.WithValue(r.Context(), "tenant_id", claims.TenantID)
            ctx = context.WithValue(ctx, "user_id", claims.UserID)
            ctx = context.WithValue(ctx, "jti", claims.ID)
            
            next.ServeHTTP(w, r.WithContext(ctx))
        })
    }
}

2. Row-Level Security (RLS)

PostgreSQL RLS 정책

1
2
3
4
5
6
7
8
9
10
11
12
-- 1. RLS 활성화
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- 2. 테넌트 격리 정책
CREATE POLICY tenant_isolation ON documents
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- 3. 관리자 예외 정책 (선택)
CREATE POLICY admin_all_access ON documents
    USING (
        current_setting('app.current_user_role', true) = 'admin'
    );

테넌트 컨텍스트 설정

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// mcp-server/internal/database/postgres.go
func (db *DB) SetTenantContext(
    ctx context.Context, 
    tx pgx.Tx, 
    tenantID string,
) error {
    // UUID 검증 (SQL 인젝션 방지)
    if _, err := uuid.Parse(tenantID); err != nil {
        return fmt.Errorf("invalid tenant ID: %w", err)
    }
    
    // 트랜잭션 로컬 변수 설정
    query := fmt.Sprintf(
        "SET LOCAL app.current_tenant_id = '%s'", 
        tenantID,
    )
    
    _, err := tx.Exec(ctx, query)
    return err
}

// 모든 DB 작업에서 사용
func (db *DB) HybridSearch(
    ctx context.Context, 
    tenantID string, 
    params SearchParams,
) ([]Result, error) {
    tx, _ := db.pool.Begin(ctx)
    defer tx.Rollback(ctx)
    
    // RLS 컨텍스트 설정 (필수!)
    if err := db.SetTenantContext(ctx, tx, tenantID); err != nil {
        return nil, err
    }
    
    // 이후 모든 쿼리는 자동으로 tenant_id 필터링됨
    rows, _ := tx.Query(ctx, "SELECT * FROM documents WHERE ...")
    
    // ...
}

RLS 보안 테스트

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// mcp-server/internal/database/postgres_test.go
func TestTenantIsolation(t *testing.T) {
    db := setupTestDB(t)
    
    tenant1 := uuid.New().String()
    tenant2 := uuid.New().String()
    
    // 테넌트 1 문서 생성
    doc1 := createDocument(t, db, tenant1, "Tenant 1 Secret")
    
    // 테넌트 2 문서 생성
    doc2 := createDocument(t, db, tenant2, "Tenant 2 Secret")
    
    // 테넌트 1로 쿼리
    ctx1 := context.Background()
    results1, _ := db.ListDocuments(ctx1, tenant1, ListParams{})
    
    // 테넌트 2로 쿼리
    ctx2 := context.Background()
    results2, _ := db.ListDocuments(ctx2, tenant2, ListParams{})
    
    // 검증: 각자의 데이터만 볼 수 있어야 함
    assert.Contains(t, results1, doc1)
    assert.NotContains(t, results1, doc2)  // ✅ 격리 성공
    
    assert.Contains(t, results2, doc2)
    assert.NotContains(t, results2, doc1)  // ✅ 격리 성공
}

3. Rate Limiting

테넌트별 제한

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// mcp-server/internal/middleware/ratelimit.go
import "golang.org/x/time/rate"

type RateLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.RWMutex
    rate     rate.Limit
    burst    int
}

func NewRateLimiter(reqPerSec int, burst int) *RateLimiter {
    return &RateLimiter{
        limiters: make(map[string]*rate.Limiter),
        rate:     rate.Limit(reqPerSec),
        burst:    burst,
    }
}

func (rl *RateLimiter) getLimiter(tenantID string) *rate.Limiter {
    rl.mu.RLock()
    limiter, exists := rl.limiters[tenantID]
    rl.mu.RUnlock()
    
    if !exists {
        rl.mu.Lock()
        limiter = rate.NewLimiter(rl.rate, rl.burst)
        rl.limiters[tenantID] = limiter
        rl.mu.Unlock()
    }
    
    return limiter
}

func (rl *RateLimiter) Allow(tenantID string) bool {
    return rl.getLimiter(tenantID).Allow()
}

// 미들웨어
func RateLimitMiddleware(limiter *RateLimiter) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            tenantID := r.Context().Value("tenant_id").(string)
            
            if !limiter.Allow(tenantID) {
                http.Error(w, "Rate limit exceeded", 429)
                return
            }
            
            next.ServeHTTP(w, r)
        })
    }
}

4. 감사 로깅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// mcp-server/internal/audit/logger.go
type AuditLogger struct {
    db *sql.DB
}

type AuditLog struct {
    Timestamp time.Time
    TenantID  string
    UserID    string
    Action    string
    Resource  string
    Details   map[string]interface{}
    Success   bool
}

func (al *AuditLogger) Log(ctx context.Context, log AuditLog) error {
    query := `
        INSERT INTO audit_logs 
        (timestamp, tenant_id, user_id, action, resource, details, success)
        VALUES ($1, $2, $3, $4, $5, $6, $7)
    `
    
    details, _ := json.Marshal(log.Details)
    
    _, err := al.db.ExecContext(
        ctx, query,
        log.Timestamp,
        log.TenantID,
        log.UserID,
        log.Action,
        log.Resource,
        details,
        log.Success,
    )
    
    return err
}

핵심 요약

보안 체크리스트

인증:

  • RS256 JWT 토큰
  • 짧은 TTL (1시간)
  • JTI로 토큰 취소 가능

권한:

  • RBAC 구현
  • 도구별 권한 체크

데이터 격리:

  • PostgreSQL RLS
  • 테넌트 컨텍스트 필수 설정

Rate Limiting:

  • 테넌트별 할당량
  • 버스트 허용

감사:

  • 모든 작업 로깅
  • 컴플라이언스 준수

작성일: 2024년 12월 13일

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.