[MCP&A2A] 06. 보안 아키텍처
[MCP&A2A] 06. 보안 아키텍처
다층 보안 전략
엔터프라이즈 AI 시스템은 다음 레이어에서 보안이 필요합니다:
1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────┐
│ 1. 인증 (Authentication) │ ← JWT 토큰
├─────────────────────────────────┤
│ 2. 권한 (Authorization) │ ← RBAC
├─────────────────────────────────┤
│ 3. 데이터 격리 (Isolation) │ ← RLS
├─────────────────────────────────┤
│ 4. 네트워크 (Network) │ ← TLS, 방화벽
├─────────────────────────────────┤
│ 5. 감사 (Audit) │ ← 로깅
└─────────────────────────────────┘
1. JWT 기반 인증
RS256 비대칭 암호화
왜 RS256인가?
- ✅ 서버는 공개키만 보유 (토큰 위조 불가)
- ✅ 키 로테이션 용이
- ✅ 마이크로서비스 간 신뢰
- ✅ 표준 (RFC 7519)
토큰 생성 (UI 서버)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# streamlit-ui/auth/jwt_generator.py
from datetime import datetime, timedelta, timezone
from cryptography.hazmat.primitives import serialization
import jwt
def generate_token(tenant_id: str, user_id: str, ttl: int = 3600) -> str:
"""RSA-256로 서명된 JWT 생성"""
now = datetime.now(timezone.utc)
payload = {
# 표준 클레임
"iss": "mcp-ui", # Issuer
"aud": "mcp-server", # Audience
"sub": user_id, # Subject
"iat": now, # Issued At
"exp": now + timedelta(seconds=ttl), # Expiration
"nbf": now, # Not Before
"jti": str(uuid.uuid4()), # JWT ID (revocation용)
# 커스텀 클레임
"tenant_id": tenant_id,
"user_id": user_id,
"roles": ["user"],
}
# RSA 개인키로 서명
with open("/app/certs/private_key.pem", "rb") as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
return jwt.encode(payload, private_key, algorithm="RS256")
토큰 검증 (MCP 서버)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// mcp-server/internal/auth/jwt.go
package auth
import (
"crypto/rsa"
"fmt"
"time"
"github.com/golang-jwt/jwt/v5"
)
type Claims struct {
TenantID string `json:"tenant_id"`
UserID string `json:"user_id"`
Roles []string `json:"roles"`
jwt.RegisteredClaims
}
type JWTValidator struct {
publicKey *rsa.PublicKey
}
func NewJWTValidator(publicKeyPath string) (*JWTValidator, error) {
keyData, err := os.ReadFile(publicKeyPath)
if err != nil {
return nil, err
}
publicKey, err := jwt.ParseRSAPublicKeyFromPEM(keyData)
if err != nil {
return nil, err
}
return &JWTValidator{publicKey: publicKey}, nil
}
func (v *JWTValidator) ValidateToken(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(
tokenString,
&Claims{},
func(token *jwt.Token) (interface{}, error) {
// RS256 알고리즘 확인
if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
return nil, fmt.Errorf("unexpected signing method")
}
return v.publicKey, nil
},
)
if err != nil {
return nil, err
}
claims, ok := token.Claims.(*Claims)
if !ok || !token.Valid {
return nil, fmt.Errorf("invalid token")
}
// Expiration 체크
if claims.ExpiresAt.Before(time.Now()) {
return nil, fmt.Errorf("token expired")
}
// Audience 체크
if !claims.VerifyAudience("mcp-server", true) {
return nil, fmt.Errorf("invalid audience")
}
return claims, nil
}
인증 미들웨어
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// mcp-server/internal/middleware/auth.go
func AuthMiddleware(validator *auth.JWTValidator) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Authorization 헤더 추출
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Missing authorization", 401)
return
}
// Bearer 토큰 파싱
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
// 토큰 검증
claims, err := validator.ValidateToken(tokenString)
if err != nil {
log.Printf("Token validation failed: %v", err)
http.Error(w, "Invalid token", 401)
return
}
// 컨텍스트에 클레임 추가
ctx := context.WithValue(r.Context(), "tenant_id", claims.TenantID)
ctx = context.WithValue(ctx, "user_id", claims.UserID)
ctx = context.WithValue(ctx, "jti", claims.ID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
2. Row-Level Security (RLS)
PostgreSQL RLS 정책
1
2
3
4
5
6
7
8
9
10
11
12
-- 1. RLS 활성화
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
-- 2. 테넌트 격리 정책
CREATE POLICY tenant_isolation ON documents
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- 3. 관리자 예외 정책 (선택)
CREATE POLICY admin_all_access ON documents
USING (
current_setting('app.current_user_role', true) = 'admin'
);
테넌트 컨텍스트 설정
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// mcp-server/internal/database/postgres.go
func (db *DB) SetTenantContext(
ctx context.Context,
tx pgx.Tx,
tenantID string,
) error {
// UUID 검증 (SQL 인젝션 방지)
if _, err := uuid.Parse(tenantID); err != nil {
return fmt.Errorf("invalid tenant ID: %w", err)
}
// 트랜잭션 로컬 변수 설정
query := fmt.Sprintf(
"SET LOCAL app.current_tenant_id = '%s'",
tenantID,
)
_, err := tx.Exec(ctx, query)
return err
}
// 모든 DB 작업에서 사용
func (db *DB) HybridSearch(
ctx context.Context,
tenantID string,
params SearchParams,
) ([]Result, error) {
tx, _ := db.pool.Begin(ctx)
defer tx.Rollback(ctx)
// RLS 컨텍스트 설정 (필수!)
if err := db.SetTenantContext(ctx, tx, tenantID); err != nil {
return nil, err
}
// 이후 모든 쿼리는 자동으로 tenant_id 필터링됨
rows, _ := tx.Query(ctx, "SELECT * FROM documents WHERE ...")
// ...
}
RLS 보안 테스트
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// mcp-server/internal/database/postgres_test.go
func TestTenantIsolation(t *testing.T) {
db := setupTestDB(t)
tenant1 := uuid.New().String()
tenant2 := uuid.New().String()
// 테넌트 1 문서 생성
doc1 := createDocument(t, db, tenant1, "Tenant 1 Secret")
// 테넌트 2 문서 생성
doc2 := createDocument(t, db, tenant2, "Tenant 2 Secret")
// 테넌트 1로 쿼리
ctx1 := context.Background()
results1, _ := db.ListDocuments(ctx1, tenant1, ListParams{})
// 테넌트 2로 쿼리
ctx2 := context.Background()
results2, _ := db.ListDocuments(ctx2, tenant2, ListParams{})
// 검증: 각자의 데이터만 볼 수 있어야 함
assert.Contains(t, results1, doc1)
assert.NotContains(t, results1, doc2) // ✅ 격리 성공
assert.Contains(t, results2, doc2)
assert.NotContains(t, results2, doc1) // ✅ 격리 성공
}
3. Rate Limiting
테넌트별 제한
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// mcp-server/internal/middleware/ratelimit.go
import "golang.org/x/time/rate"
type RateLimiter struct {
limiters map[string]*rate.Limiter
mu sync.RWMutex
rate rate.Limit
burst int
}
func NewRateLimiter(reqPerSec int, burst int) *RateLimiter {
return &RateLimiter{
limiters: make(map[string]*rate.Limiter),
rate: rate.Limit(reqPerSec),
burst: burst,
}
}
func (rl *RateLimiter) getLimiter(tenantID string) *rate.Limiter {
rl.mu.RLock()
limiter, exists := rl.limiters[tenantID]
rl.mu.RUnlock()
if !exists {
rl.mu.Lock()
limiter = rate.NewLimiter(rl.rate, rl.burst)
rl.limiters[tenantID] = limiter
rl.mu.Unlock()
}
return limiter
}
func (rl *RateLimiter) Allow(tenantID string) bool {
return rl.getLimiter(tenantID).Allow()
}
// 미들웨어
func RateLimitMiddleware(limiter *RateLimiter) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tenantID := r.Context().Value("tenant_id").(string)
if !limiter.Allow(tenantID) {
http.Error(w, "Rate limit exceeded", 429)
return
}
next.ServeHTTP(w, r)
})
}
}
4. 감사 로깅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// mcp-server/internal/audit/logger.go
type AuditLogger struct {
db *sql.DB
}
type AuditLog struct {
Timestamp time.Time
TenantID string
UserID string
Action string
Resource string
Details map[string]interface{}
Success bool
}
func (al *AuditLogger) Log(ctx context.Context, log AuditLog) error {
query := `
INSERT INTO audit_logs
(timestamp, tenant_id, user_id, action, resource, details, success)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`
details, _ := json.Marshal(log.Details)
_, err := al.db.ExecContext(
ctx, query,
log.Timestamp,
log.TenantID,
log.UserID,
log.Action,
log.Resource,
details,
log.Success,
)
return err
}
핵심 요약
보안 체크리스트
✅ 인증:
- RS256 JWT 토큰
- 짧은 TTL (1시간)
- JTI로 토큰 취소 가능
✅ 권한:
- RBAC 구현
- 도구별 권한 체크
✅ 데이터 격리:
- PostgreSQL RLS
- 테넌트 컨텍스트 필수 설정
✅ Rate Limiting:
- 테넌트별 할당량
- 버스트 허용
✅ 감사:
- 모든 작업 로깅
- 컴플라이언스 준수
작성일: 2024년 12월 13일
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.