포스트

[MCP&A2A] 11. A2A 서버 개발

[MCP&A2A] 11. A2A 서버 개발

Agent-to-Agent Protocol (A2A) 서버

A2A 서버는 AI 에이전트 간 비동기 협업을 가능하게 하는 인프라입니다. MCP가 동기식 도구 실행에 집중한다면, A2A는 장기 실행 태스크와 에이전트 간 통신에 최적화되어 있습니다.

A2A vs MCP 비교

특성A2AMCP
통신 패턴비동기 (Task 기반)동기 (Request-Response)
실행 시간초~분~시간밀리초~초
상태 관리Stateful (태스크 추적)Stateless
진행 상황SSE 스트리밍없음
용도에이전트 협업, 워크플로우도구 실행, 데이터 조회
복잡도높음낮음

A2A가 필요한 시나리오

1
2
3
4
5
6
7
8
9
10
✅ A2A 사용:
- 멀티스텝 리서치 (검색 → 분석 → 보고서 생성)
- 코드 리뷰 (분석 → 개선 제안 → 리팩토링)
- 데이터 파이프라인 (수집 → 변환 → 저장)
- 에이전트 팀 협업 (조율자 → 작업자 → 검증자)

❌ A2A 불필요 (MCP 사용):
- 단일 문서 검색
- 간단한 계산
- 빠른 API 호출

프로젝트 구조

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
a2a-server/
├── cmd/
│   └── server/
│       └── main.go                 # 진입점
├── internal/
│   ├── models/
│   │   ├── task.go                 # 태스크 모델
│   │   ├── agent_card.go           # 에이전트 카드
│   │   └── artifact.go             # 결과물
│   ├── store/
│   │   ├── task_store.go           # 태스크 저장소
│   │   └── postgres_store.go       # PostgreSQL 구현
│   ├── executor/
│   │   ├── executor.go             # 태스크 실행기
│   │   └── worker_pool.go          # 워커 풀
│   ├── handlers/
│   │   ├── agent_card.go           # Agent Card 핸들러
│   │   ├── tasks.go                # 태스크 CRUD
│   │   ├── messages.go             # 메시지 핸들러
│   │   └── events.go               # SSE 스트리밍
│   ├── middleware/
│   │   ├── auth.go                 # JWT 인증
│   │   └── cors.go                 # CORS
│   └── skills/
│       ├── skill.go                # Skill 인터페이스
│       ├── research.go             # 리서치 스킬
│       └── analysis.go             # 분석 스킬
├── go.mod
└── README.md

핵심 모델

1. Task 모델

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// internal/models/task.go
package models

import (
    "time"
    
    "github.com/google/uuid"
)

// TaskStatus 열거형
type TaskStatus string

const (
    StatusPending   TaskStatus = "pending"   // 대기 중
    StatusRunning   TaskStatus = "running"   // 실행 중
    StatusCompleted TaskStatus = "completed" // 완료
    StatusFailed    TaskStatus = "failed"    // 실패
    StatusCancelled TaskStatus = "cancelled" // 취소됨
)

// Task 구조체
type Task struct {
    ID        string                 `json:"taskId"`
    AgentID   string                 `json:"agentId"`
    Skill     string                 `json:"skill"`
    Input     map[string]interface{} `json:"input"`
    Status    TaskStatus             `json:"status"`
    Artifacts []Artifact             `json:"artifacts,omitempty"`
    Error     *TaskError             `json:"error,omitempty"`
    
    // 메타데이터
    CreatedAt time.Time `json:"createdAt"`
    UpdatedAt time.Time `json:"updatedAt"`
    StartedAt *time.Time `json:"startedAt,omitempty"`
    EndedAt   *time.Time `json:"endedAt,omitempty"`
    
    // 진행 상황
    Progress  *TaskProgress `json:"progress,omitempty"`
    
    // 내부 필드
    TenantID  string `json:"-"`
    UserID    string `json:"-"`
}

// TaskProgress 진행 상황
type TaskProgress struct {
    Current int    `json:"current"`
    Total   int    `json:"total"`
    Message string `json:"message"`
}

// TaskError 에러 정보
type TaskError struct {
    Code    string `json:"code"`
    Message string `json:"message"`
    Details map[string]interface{} `json:"details,omitempty"`
}

// NewTask 생성자
func NewTask(agentID, skill string, input map[string]interface{}) *Task {
    now := time.Now()
    return &Task{
        ID:        uuid.New().String(),
        AgentID:   agentID,
        Skill:     skill,
        Input:     input,
        Status:    StatusPending,
        CreatedAt: now,
        UpdatedAt: now,
    }
}

// Start 태스크 시작
func (t *Task) Start() {
    now := time.Now()
    t.Status = StatusRunning
    t.StartedAt = &now
    t.UpdatedAt = now
}

// Complete 태스크 완료
func (t *Task) Complete(artifacts []Artifact) {
    now := time.Now()
    t.Status = StatusCompleted
    t.Artifacts = artifacts
    t.EndedAt = &now
    t.UpdatedAt = now
}

// Fail 태스크 실패
func (t *Task) Fail(err error) {
    now := time.Now()
    t.Status = StatusFailed
    t.Error = &TaskError{
        Code:    "EXECUTION_ERROR",
        Message: err.Error(),
    }
    t.EndedAt = &now
    t.UpdatedAt = now
}

// UpdateProgress 진행 상황 업데이트
func (t *Task) UpdateProgress(current, total int, message string) {
    t.Progress = &TaskProgress{
        Current: current,
        Total:   total,
        Message: message,
    }
    t.UpdatedAt = time.Now()
}

// Duration 실행 시간 (완료된 태스크만)
func (t *Task) Duration() time.Duration {
    if t.StartedAt == nil || t.EndedAt == nil {
        return 0
    }
    return t.EndedAt.Sub(*t.StartedAt)
}

2. Artifact 모델

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// internal/models/artifact.go
package models

import (
    "time"
    
    "github.com/google/uuid"
)

// Artifact 결과물
type Artifact struct {
    ID        string    `json:"artifactId"`
    Name      string    `json:"name"`
    Parts     []Part    `json:"parts"`
    CreatedAt time.Time `json:"createdAt"`
}

// Part 콘텐츠 블록
type Part struct {
    Kind     string                 `json:"kind"` // text, data, file, image
    Text     string                 `json:"text,omitempty"`
    Data     interface{}            `json:"data,omitempty"`
    URL      string                 `json:"url,omitempty"`
    MimeType string                 `json:"mimeType,omitempty"`
    Metadata map[string]interface{} `json:"metadata,omitempty"`
}

// NewArtifact 생성자
func NewArtifact(name string, parts []Part) *Artifact {
    return &Artifact{
        ID:        uuid.New().String(),
        Name:      name,
        Parts:     parts,
        CreatedAt: time.Now(),
    }
}

// NewTextArtifact 텍스트 결과물
func NewTextArtifact(name, text string) *Artifact {
    return NewArtifact(name, []Part{
        {
            Kind: "text",
            Text: text,
        },
    })
}

// NewDataArtifact 데이터 결과물
func NewDataArtifact(name string, data interface{}) *Artifact {
    return NewArtifact(name, []Part{
        {
            Kind: "data",
            Data: data,
        },
    })
}

// NewFileArtifact 파일 결과물
func NewFileArtifact(name, url, mimeType string) *Artifact {
    return NewArtifact(name, []Part{
        {
            Kind:     "file",
            URL:      url,
            MimeType: mimeType,
        },
    })
}

3. Agent Card 모델

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// internal/models/agent_card.go
package models

// AgentCard 에이전트 정의
type AgentCard struct {
    Name           string              `json:"name"`
    Description    string              `json:"description"`
    URL            string              `json:"url"`
    Version        string              `json:"version"`
    Skills         []Skill             `json:"skills"`
    Authentication *AuthenticationInfo `json:"authentication,omitempty"`
}

// Skill 스킬 정의
type Skill struct {
    Name        string   `json:"name"`
    Description string   `json:"description"`
    InputModes  []string `json:"inputModes"`  // text, data, file, image
    OutputModes []string `json:"outputModes"`
}

// AuthenticationInfo 인증 정보
type AuthenticationInfo struct {
    Type        string `json:"type"` // bearer, basic, oauth2
    Description string `json:"description,omitempty"`
}

// NewAgentCard 생성자
func NewAgentCard(name, description, url, version string) *AgentCard {
    return &AgentCard{
        Name:        name,
        Description: description,
        URL:         url,
        Version:     version,
        Skills:      []Skill{},
    }
}

// AddSkill 스킬 추가
func (ac *AgentCard) AddSkill(skill Skill) {
    ac.Skills = append(ac.Skills, skill)
}

Task Store 구현

인터페이스

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// internal/store/task_store.go
package store

import (
    "context"
    "time"
    
    "a2a-server/internal/models"
)

// TaskStore 인터페이스
type TaskStore interface {
    // 태스크 생성
    Create(ctx context.Context, task *models.Task) error
    
    // 태스크 조회
    Get(ctx context.Context, taskID string) (*models.Task, error)
    
    // 태스크 목록 (테넌트별)
    List(ctx context.Context, tenantID string, params ListParams) ([]*models.Task, error)
    
    // 태스크 업데이트
    Update(ctx context.Context, task *models.Task) error
    
    // 태스크 삭제
    Delete(ctx context.Context, taskID string) error
    
    // 상태별 조회
    GetByStatus(ctx context.Context, status models.TaskStatus, limit int) ([]*models.Task, error)
}

// ListParams 목록 조회 파라미터
type ListParams struct {
    Status    models.TaskStatus
    AgentID   string
    Limit     int
    Offset    int
    SortBy    string // created_at, updated_at
    SortOrder string // asc, desc
}

PostgreSQL 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
// internal/store/postgres_store.go
package store

import (
    "context"
    "encoding/json"
    "fmt"
    
    "github.com/jackc/pgx/v5/pgxpool"
    
    "a2a-server/internal/models"
)

type PostgresStore struct {
    pool *pgxpool.Pool
}

func NewPostgresStore(pool *pgxpool.Pool) *PostgresStore {
    return &PostgresStore{pool: pool}
}

// Create 태스크 생성
func (s *PostgresStore) Create(ctx context.Context, task *models.Task) error {
    inputJSON, err := json.Marshal(task.Input)
    if err != nil {
        return fmt.Errorf("failed to marshal input: %w", err)
    }
    
    query := `
        INSERT INTO tasks (
            id, tenant_id, user_id, agent_id, skill,
            input, status, created_at, updated_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    `
    
    _, err = s.pool.Exec(
        ctx, query,
        task.ID,
        task.TenantID,
        task.UserID,
        task.AgentID,
        task.Skill,
        inputJSON,
        task.Status,
        task.CreatedAt,
        task.UpdatedAt,
    )
    
    return err
}

// Get 태스크 조회
func (s *PostgresStore) Get(ctx context.Context, taskID string) (*models.Task, error) {
    query := `
        SELECT 
            id, tenant_id, user_id, agent_id, skill,
            input, status, artifacts, error,
            created_at, updated_at, started_at, ended_at,
            progress
        FROM tasks
        WHERE id = $1
    `
    
    var task models.Task
    var inputJSON, artifactsJSON, errorJSON, progressJSON []byte
    var startedAt, endedAt *time.Time
    
    err := s.pool.QueryRow(ctx, query, taskID).Scan(
        &task.ID,
        &task.TenantID,
        &task.UserID,
        &task.AgentID,
        &task.Skill,
        &inputJSON,
        &task.Status,
        &artifactsJSON,
        &errorJSON,
        &task.CreatedAt,
        &task.UpdatedAt,
        &startedAt,
        &endedAt,
        &progressJSON,
    )
    
    if err != nil {
        return nil, err
    }
    
    // JSON 언마샬링
    if err := json.Unmarshal(inputJSON, &task.Input); err != nil {
        return nil, err
    }
    
    if len(artifactsJSON) > 0 {
        if err := json.Unmarshal(artifactsJSON, &task.Artifacts); err != nil {
            return nil, err
        }
    }
    
    if len(errorJSON) > 0 {
        if err := json.Unmarshal(errorJSON, &task.Error); err != nil {
            return nil, err
        }
    }
    
    if len(progressJSON) > 0 {
        if err := json.Unmarshal(progressJSON, &task.Progress); err != nil {
            return nil, err
        }
    }
    
    task.StartedAt = startedAt
    task.EndedAt = endedAt
    
    return &task, nil
}

// Update 태스크 업데이트
func (s *PostgresStore) Update(ctx context.Context, task *models.Task) error {
    inputJSON, _ := json.Marshal(task.Input)
    artifactsJSON, _ := json.Marshal(task.Artifacts)
    errorJSON, _ := json.Marshal(task.Error)
    progressJSON, _ := json.Marshal(task.Progress)
    
    query := `
        UPDATE tasks SET
            status = $2,
            input = $3,
            artifacts = $4,
            error = $5,
            updated_at = $6,
            started_at = $7,
            ended_at = $8,
            progress = $9
        WHERE id = $1
    `
    
    _, err := s.pool.Exec(
        ctx, query,
        task.ID,
        task.Status,
        inputJSON,
        artifactsJSON,
        errorJSON,
        task.UpdatedAt,
        task.StartedAt,
        task.EndedAt,
        progressJSON,
    )
    
    return err
}

// List 태스크 목록
func (s *PostgresStore) List(ctx context.Context, tenantID string, params ListParams) ([]*models.Task, error) {
    query := `
        SELECT 
            id, agent_id, skill, status,
            created_at, updated_at
        FROM tasks
        WHERE tenant_id = $1
    `
    
    args := []interface{}{tenantID}
    argIndex := 2
    
    // 필터 추가
    if params.Status != "" {
        query += fmt.Sprintf(" AND status = $%d", argIndex)
        args = append(args, params.Status)
        argIndex++
    }
    
    if params.AgentID != "" {
        query += fmt.Sprintf(" AND agent_id = $%d", argIndex)
        args = append(args, params.AgentID)
        argIndex++
    }
    
    // 정렬
    sortBy := params.SortBy
    if sortBy == "" {
        sortBy = "created_at"
    }
    
    sortOrder := params.SortOrder
    if sortOrder == "" {
        sortOrder = "desc"
    }
    
    query += fmt.Sprintf(" ORDER BY %s %s", sortBy, sortOrder)
    
    // 페이지네이션
    if params.Limit > 0 {
        query += fmt.Sprintf(" LIMIT $%d", argIndex)
        args = append(args, params.Limit)
        argIndex++
    }
    
    if params.Offset > 0 {
        query += fmt.Sprintf(" OFFSET $%d", argIndex)
        args = append(args, params.Offset)
    }
    
    rows, err := s.pool.Query(ctx, query, args...)
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var tasks []*models.Task
    for rows.Next() {
        var task models.Task
        err := rows.Scan(
            &task.ID,
            &task.AgentID,
            &task.Skill,
            &task.Status,
            &task.CreatedAt,
            &task.UpdatedAt,
        )
        if err != nil {
            return nil, err
        }
        
        tasks = append(tasks, &task)
    }
    
    return tasks, nil
}

// GetByStatus 상태별 조회
func (s *PostgresStore) GetByStatus(ctx context.Context, status models.TaskStatus, limit int) ([]*models.Task, error) {
    query := `
        SELECT id, tenant_id, agent_id, skill, input, status, created_at
        FROM tasks
        WHERE status = $1
        ORDER BY created_at ASC
        LIMIT $2
    `
    
    rows, err := s.pool.Query(ctx, query, status, limit)
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var tasks []*models.Task
    for rows.Next() {
        var task models.Task
        var inputJSON []byte
        
        err := rows.Scan(
            &task.ID,
            &task.TenantID,
            &task.AgentID,
            &task.Skill,
            &inputJSON,
            &task.Status,
            &task.CreatedAt,
        )
        if err != nil {
            return nil, err
        }
        
        json.Unmarshal(inputJSON, &task.Input)
        tasks = append(tasks, &task)
    }
    
    return tasks, nil
}

데이터베이스 스키마

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- tasks 테이블
CREATE TABLE tasks (
    id UUID PRIMARY KEY,
    tenant_id UUID NOT NULL,
    user_id TEXT NOT NULL,
    agent_id TEXT NOT NULL,
    skill TEXT NOT NULL,
    
    -- 입력/출력
    input JSONB NOT NULL,
    artifacts JSONB,
    error JSONB,
    
    -- 상태
    status TEXT NOT NULL,
    progress JSONB,
    
    -- 타임스탬프
    created_at TIMESTAMPTZ NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL,
    started_at TIMESTAMPTZ,
    ended_at TIMESTAMPTZ
);

-- 인덱스
CREATE INDEX idx_tasks_tenant_id ON tasks(tenant_id);
CREATE INDEX idx_tasks_status ON tasks(status) WHERE status IN ('pending', 'running');
CREATE INDEX idx_tasks_agent_id ON tasks(agent_id);
CREATE INDEX idx_tasks_created_at ON tasks(created_at DESC);

-- RLS 활성화
ALTER TABLE tasks ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON tasks
    USING (tenant_id = current_setting('app.current_tenant_id', true)::uuid);

Task Executor 구현

Executor 인터페이스

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// internal/executor/executor.go
package executor

import (
    "context"
    
    "a2a-server/internal/models"
)

// Executor 태스크 실행기
type Executor interface {
    Execute(ctx context.Context, task *models.Task) error
}

// SkillExecutor 스킬별 실행기
type SkillExecutor interface {
    Name() string
    Execute(ctx context.Context, input map[string]interface{}) ([]models.Artifact, error)
}

Worker Pool 구현

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// internal/executor/worker_pool.go
package executor

import (
    "context"
    "log"
    "sync"
    
    "a2a-server/internal/models"
    "a2a-server/internal/store"
)

type WorkerPool struct {
    taskStore store.TaskStore
    skills    map[string]SkillExecutor
    
    workers   int
    taskQueue chan *models.Task
    wg        sync.WaitGroup
    
    eventBus *EventBus
}

func NewWorkerPool(
    taskStore store.TaskStore,
    workers int,
    eventBus *EventBus,
) *WorkerPool {
    return &WorkerPool{
        taskStore: taskStore,
        skills:    make(map[string]SkillExecutor),
        workers:   workers,
        taskQueue: make(chan *models.Task, workers*2),
        eventBus:  eventBus,
    }
}

// RegisterSkill 스킬 등록
func (wp *WorkerPool) RegisterSkill(skill SkillExecutor) {
    wp.skills[skill.Name()] = skill
}

// Start 워커 풀 시작
func (wp *WorkerPool) Start(ctx context.Context) {
    log.Printf("Starting worker pool with %d workers", wp.workers)
    
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(ctx, i)
    }
    
    // 대기 중인 태스크 폴링
    go wp.pollPendingTasks(ctx)
}

// Stop 워커 풀 중지
func (wp *WorkerPool) Stop() {
    close(wp.taskQueue)
    wp.wg.Wait()
    log.Println("Worker pool stopped")
}

// Submit 태스크 제출
func (wp *WorkerPool) Submit(task *models.Task) {
    wp.taskQueue <- task
}

// worker 개별 워커
func (wp *WorkerPool) worker(ctx context.Context, id int) {
    defer wp.wg.Done()
    
    log.Printf("Worker %d started", id)
    
    for task := range wp.taskQueue {
        log.Printf("Worker %d processing task %s", id, task.ID)
        
        if err := wp.executeTask(ctx, task); err != nil {
            log.Printf("Worker %d failed task %s: %v", id, task.ID, err)
        }
    }
    
    log.Printf("Worker %d stopped", id)
}

// executeTask 태스크 실행
func (wp *WorkerPool) executeTask(ctx context.Context, task *models.Task) error {
    // 스킬 찾기
    skill, ok := wp.skills[task.Skill]
    if !ok {
        task.Fail(fmt.Errorf("skill not found: %s", task.Skill))
        wp.taskStore.Update(ctx, task)
        wp.eventBus.Publish(task.ID, Event{
            Type: "error",
            Data: map[string]interface{}{
                "message": "Skill not found",
            },
        })
        return fmt.Errorf("skill not found: %s", task.Skill)
    }
    
    // 태스크 시작
    task.Start()
    wp.taskStore.Update(ctx, task)
    wp.eventBus.Publish(task.ID, Event{
        Type: "started",
        Data: map[string]interface{}{
            "taskId": task.ID,
        },
    })
    
    // 스킬 실행
    artifacts, err := skill.Execute(ctx, task.Input)
    
    if err != nil {
        // 실패 처리
        task.Fail(err)
        wp.taskStore.Update(ctx, task)
        wp.eventBus.Publish(task.ID, Event{
            Type: "error",
            Data: map[string]interface{}{
                "message": err.Error(),
            },
        })
        return err
    }
    
    // 완료 처리
    task.Complete(artifacts)
    wp.taskStore.Update(ctx, task)
    wp.eventBus.Publish(task.ID, Event{
        Type: "completed",
        Data: map[string]interface{}{
            "taskId":    task.ID,
            "artifacts": artifacts,
        },
    })
    
    log.Printf("Task %s completed in %v", task.ID, task.Duration())
    
    return nil
}

// pollPendingTasks 대기 중인 태스크 폴링
func (wp *WorkerPool) pollPendingTasks(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            tasks, err := wp.taskStore.GetByStatus(ctx, models.StatusPending, wp.workers)
            if err != nil {
                log.Printf("Failed to poll pending tasks: %v", err)
                continue
            }
            
            for _, task := range tasks {
                select {
                case wp.taskQueue <- task:
                    log.Printf("Queued pending task %s", task.ID)
                default:
                    log.Printf("Queue full, skipping task %s", task.ID)
                }
            }
        }
    }
}

Event Bus (SSE용)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// internal/executor/event_bus.go
package executor

import (
    "sync"
)

type Event struct {
    Type string
    Data map[string]interface{}
}

type EventBus struct {
    subscribers map[string][]chan Event
    mu          sync.RWMutex
}

func NewEventBus() *EventBus {
    return &EventBus{
        subscribers: make(map[string][]chan Event),
    }
}

// Subscribe 이벤트 구독
func (eb *EventBus) Subscribe(taskID string) chan Event {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    
    ch := make(chan Event, 10)
    eb.subscribers[taskID] = append(eb.subscribers[taskID], ch)
    
    return ch
}

// Unsubscribe 구독 취소
func (eb *EventBus) Unsubscribe(taskID string, ch chan Event) {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    
    subs := eb.subscribers[taskID]
    for i, sub := range subs {
        if sub == ch {
            eb.subscribers[taskID] = append(subs[:i], subs[i+1:]...)
            close(ch)
            break
        }
    }
    
    if len(eb.subscribers[taskID]) == 0 {
        delete(eb.subscribers, taskID)
    }
}

// Publish 이벤트 발행
func (eb *EventBus) Publish(taskID string, event Event) {
    eb.mu.RLock()
    defer eb.mu.RUnlock()
    
    for _, ch := range eb.subscribers[taskID] {
        select {
        case ch <- event:
        default:
            // 채널이 가득 차면 무시
        }
    }
}

HTTP 핸들러

Agent Card 핸들러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// internal/handlers/agent_card.go
package handlers

import (
    "encoding/json"
    "net/http"
    
    "a2a-server/internal/models"
)

type AgentCardHandler struct {
    agentCard *models.AgentCard
}

func NewAgentCardHandler(agentCard *models.AgentCard) *AgentCardHandler {
    return &AgentCardHandler{agentCard: agentCard}
}

// GET /.well-known/agent-card.json
func (h *AgentCardHandler) Get(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(h.agentCard)
}

Task 핸들러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// internal/handlers/tasks.go
package handlers

import (
    "encoding/json"
    "net/http"
    
    "github.com/go-chi/chi/v5"
    
    "a2a-server/internal/executor"
    "a2a-server/internal/models"
    "a2a-server/internal/store"
)

type TaskHandler struct {
    taskStore  store.TaskStore
    workerPool *executor.WorkerPool
}

func NewTaskHandler(taskStore store.TaskStore, workerPool *executor.WorkerPool) *TaskHandler {
    return &TaskHandler{
        taskStore:  taskStore,
        workerPool: workerPool,
    }
}

// POST /a2a/{agentId}/tasks
func (h *TaskHandler) Create(w http.ResponseWriter, r *http.Request) {
    agentID := chi.URLParam(r, "agentId")
    
    var req struct {
        Skill string                 `json:"skill"`
        Input map[string]interface{} `json:"input"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "Invalid request", http.StatusBadRequest)
        return
    }
    
    // 태스크 생성
    task := models.NewTask(agentID, req.Skill, req.Input)
    task.TenantID = r.Context().Value("tenant_id").(string)
    task.UserID = r.Context().Value("user_id").(string)
    
    // 저장
    if err := h.taskStore.Create(r.Context(), task); err != nil {
        http.Error(w, "Failed to create task", http.StatusInternalServerError)
        return
    }
    
    // 워커 풀에 제출
    h.workerPool.Submit(task)
    
    // 응답
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(task)
}

// GET /a2a/{agentId}/tasks/{taskId}
func (h *TaskHandler) Get(w http.ResponseWriter, r *http.Request) {
    taskID := chi.URLParam(r, "taskId")
    
    task, err := h.taskStore.Get(r.Context(), taskID)
    if err != nil {
        http.Error(w, "Task not found", http.StatusNotFound)
        return
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(task)
}

// GET /a2a/{agentId}/tasks
func (h *TaskHandler) List(w http.ResponseWriter, r *http.Request) {
    tenantID := r.Context().Value("tenant_id").(string)
    
    params := store.ListParams{
        Limit:  10,
        Offset: 0,
    }
    
    // 쿼리 파라미터 파싱
    if status := r.URL.Query().Get("status"); status != "" {
        params.Status = models.TaskStatus(status)
    }
    
    tasks, err := h.taskStore.List(r.Context(), tenantID, params)
    if err != nil {
        http.Error(w, "Failed to list tasks", http.StatusInternalServerError)
        return
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(map[string]interface{}{
        "tasks": tasks,
    })
}

SSE 이벤트 핸들러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// internal/handlers/events.go
package handlers

import (
    "encoding/json"
    "fmt"
    "net/http"
    
    "github.com/go-chi/chi/v5"
    
    "a2a-server/internal/executor"
)

type EventHandler struct {
    eventBus *executor.EventBus
}

func NewEventHandler(eventBus *executor.EventBus) *EventHandler {
    return &EventHandler{eventBus: eventBus}
}

// GET /a2a/{agentId}/tasks/{taskId}/events
func (h *EventHandler) Stream(w http.ResponseWriter, r *http.Request) {
    taskID := chi.URLParam(r, "taskId")
    
    // SSE 헤더 설정
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")
    
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming not supported", http.StatusInternalServerError)
        return
    }
    
    // 이벤트 구독
    eventChan := h.eventBus.Subscribe(taskID)
    defer h.eventBus.Unsubscribe(taskID, eventChan)
    
    // 이벤트 스트리밍
    for {
        select {
        case event, ok := <-eventChan:
            if !ok {
                return
            }
            
            // JSON 직렬화
            data, _ := json.Marshal(event.Data)
            
            // SSE 포맷으로 전송
            fmt.Fprintf(w, "event: %s\n", event.Type)
            fmt.Fprintf(w, "data: %s\n\n", data)
            flusher.Flush()
            
            // 완료 또는 에러 시 종료
            if event.Type == "completed" || event.Type == "error" {
                return
            }
            
        case <-r.Context().Done():
            return
        }
    }
}

main.go - 전체 조합

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// cmd/server/main.go
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
    "github.com/jackc/pgx/v5/pgxpool"
    
    "a2a-server/internal/executor"
    "a2a-server/internal/handlers"
    custommw "a2a-server/internal/middleware"
    "a2a-server/internal/models"
    "a2a-server/internal/skills"
    "a2a-server/internal/store"
)

func main() {
    // 환경 변수
    dbURL := os.Getenv("DATABASE_URL")
    port := os.Getenv("PORT")
    if port == "" {
        port = "8081"
    }
    
    // DB 연결
    pool, err := pgxpool.New(context.Background(), dbURL)
    if err != nil {
        log.Fatal(err)
    }
    defer pool.Close()
    
    log.Println("✅ Database connected")
    
    // Store 생성
    taskStore := store.NewPostgresStore(pool)
    
    // Event Bus
    eventBus := executor.NewEventBus()
    
    // Worker Pool
    workerPool := executor.NewWorkerPool(taskStore, 10, eventBus)
    
    // 스킬 등록
    workerPool.RegisterSkill(skills.NewResearchSkill())
    workerPool.RegisterSkill(skills.NewAnalysisSkill())
    
    log.Println("✅ Skills registered")
    
    // Worker Pool 시작
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    workerPool.Start(ctx)
    log.Println("✅ Worker pool started")
    
    // Agent Card
    agentCard := models.NewAgentCard(
        "Research Assistant",
        "Multi-step research and analysis agent",
        "https://api.example.com/a2a/research-assistant",
        "1.0.0",
    )
    agentCard.AddSkill(models.Skill{
        Name:        "research",
        Description: "Comprehensive research on a topic",
        InputModes:  []string{"text"},
        OutputModes: []string{"text", "data"},
    })
    
    // 핸들러
    agentCardHandler := handlers.NewAgentCardHandler(agentCard)
    taskHandler := handlers.NewTaskHandler(taskStore, workerPool)
    eventHandler := handlers.NewEventHandler(eventBus)
    
    // 라우터
    r := chi.NewRouter()
    
    // 미들웨어
    r.Use(middleware.RequestID)
    r.Use(middleware.RealIP)
    r.Use(middleware.Logger)
    r.Use(middleware.Recoverer)
    
    // Agent Card (공개)
    r.Get("/.well-known/agent-card.json", agentCardHandler.Get)
    
    // A2A 엔드포인트 (인증 필요)
    r.Group(func(r chi.Router) {
        r.Use(custommw.AuthMiddleware(jwtValidator))
        
        r.Post("/a2a/{agentId}/tasks", taskHandler.Create)
        r.Get("/a2a/{agentId}/tasks/{taskId}", taskHandler.Get)
        r.Get("/a2a/{agentId}/tasks", taskHandler.List)
        r.Get("/a2a/{agentId}/tasks/{taskId}/events", eventHandler.Stream)
    })
    
    // 서버 시작
    srv := &http.Server{
        Addr:    ":" + port,
        Handler: r,
    }
    
    go func() {
        log.Printf("🚀 A2A Server listening on :%s", port)
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()
    
    // Graceful shutdown
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    
    log.Println("Shutting down server...")
    
    cancel() // Worker pool 중지
    workerPool.Stop()
    
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer shutdownCancel()
    
    if err := srv.Shutdown(shutdownCtx); err != nil {
        log.Fatal(err)
    }
    
    log.Println("✅ Server stopped")
}

핵심 요약

A2A 서버 구성

  • Models: Task, Artifact, AgentCard
  • Store: PostgreSQL 기반 영속성
  • Executor: Worker Pool + Event Bus
  • Handlers: Task CRUD + SSE 스트리밍
  • Skills: 비즈니스 로직 구현

주요 특징

  • 비동기: 백그라운드 Worker Pool
  • 확장성: 워커 수 조정 가능
  • 실시간: SSE로 진행 상황 스트리밍
  • 상태 관리: PostgreSQL에 태스크 저장
  • 멀티테넌시: RLS로 격리

작성일: 2024-12-13

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.