[MCP&A2A] 11. A2A 서버 개발
[MCP&A2A] 11. A2A 서버 개발
Agent-to-Agent Protocol (A2A) 서버
A2A 서버는 AI 에이전트 간 비동기 협업을 가능하게 하는 인프라입니다. MCP가 동기식 도구 실행에 집중한다면, A2A는 장기 실행 태스크와 에이전트 간 통신에 최적화되어 있습니다.
A2A vs MCP 비교
| 특성 | A2A | MCP |
|---|---|---|
| 통신 패턴 | 비동기 (Task 기반) | 동기 (Request-Response) |
| 실행 시간 | 초~분~시간 | 밀리초~초 |
| 상태 관리 | Stateful (태스크 추적) | Stateless |
| 진행 상황 | SSE 스트리밍 | 없음 |
| 용도 | 에이전트 협업, 워크플로우 | 도구 실행, 데이터 조회 |
| 복잡도 | 높음 | 낮음 |
A2A가 필요한 시나리오
1
2
3
4
5
6
7
8
9
10
✅ A2A 사용:
- 멀티스텝 리서치 (검색 → 분석 → 보고서 생성)
- 코드 리뷰 (분석 → 개선 제안 → 리팩토링)
- 데이터 파이프라인 (수집 → 변환 → 저장)
- 에이전트 팀 협업 (조율자 → 작업자 → 검증자)
❌ A2A 불필요 (MCP 사용):
- 단일 문서 검색
- 간단한 계산
- 빠른 API 호출
프로젝트 구조
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
a2a-server/
├── cmd/
│ └── server/
│ └── main.go # 진입점
├── internal/
│ ├── models/
│ │ ├── task.go # 태스크 모델
│ │ ├── agent_card.go # 에이전트 카드
│ │ └── artifact.go # 결과물
│ ├── store/
│ │ ├── task_store.go # 태스크 저장소
│ │ └── postgres_store.go # PostgreSQL 구현
│ ├── executor/
│ │ ├── executor.go # 태스크 실행기
│ │ └── worker_pool.go # 워커 풀
│ ├── handlers/
│ │ ├── agent_card.go # Agent Card 핸들러
│ │ ├── tasks.go # 태스크 CRUD
│ │ ├── messages.go # 메시지 핸들러
│ │ └── events.go # SSE 스트리밍
│ ├── middleware/
│ │ ├── auth.go # JWT 인증
│ │ └── cors.go # CORS
│ └── skills/
│ ├── skill.go # Skill 인터페이스
│ ├── research.go # 리서치 스킬
│ └── analysis.go # 분석 스킬
├── go.mod
└── README.md
핵심 모델
1. Task 모델
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// internal/models/task.go
package models
import (
"time"
"github.com/google/uuid"
)
// TaskStatus 열거형
type TaskStatus string
const (
StatusPending TaskStatus = "pending" // 대기 중
StatusRunning TaskStatus = "running" // 실행 중
StatusCompleted TaskStatus = "completed" // 완료
StatusFailed TaskStatus = "failed" // 실패
StatusCancelled TaskStatus = "cancelled" // 취소됨
)
// Task 구조체
type Task struct {
ID string `json:"taskId"`
AgentID string `json:"agentId"`
Skill string `json:"skill"`
Input map[string]interface{} `json:"input"`
Status TaskStatus `json:"status"`
Artifacts []Artifact `json:"artifacts,omitempty"`
Error *TaskError `json:"error,omitempty"`
// 메타데이터
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
StartedAt *time.Time `json:"startedAt,omitempty"`
EndedAt *time.Time `json:"endedAt,omitempty"`
// 진행 상황
Progress *TaskProgress `json:"progress,omitempty"`
// 내부 필드
TenantID string `json:"-"`
UserID string `json:"-"`
}
// TaskProgress 진행 상황
type TaskProgress struct {
Current int `json:"current"`
Total int `json:"total"`
Message string `json:"message"`
}
// TaskError 에러 정보
type TaskError struct {
Code string `json:"code"`
Message string `json:"message"`
Details map[string]interface{} `json:"details,omitempty"`
}
// NewTask 생성자
func NewTask(agentID, skill string, input map[string]interface{}) *Task {
now := time.Now()
return &Task{
ID: uuid.New().String(),
AgentID: agentID,
Skill: skill,
Input: input,
Status: StatusPending,
CreatedAt: now,
UpdatedAt: now,
}
}
// Start 태스크 시작
func (t *Task) Start() {
now := time.Now()
t.Status = StatusRunning
t.StartedAt = &now
t.UpdatedAt = now
}
// Complete 태스크 완료
func (t *Task) Complete(artifacts []Artifact) {
now := time.Now()
t.Status = StatusCompleted
t.Artifacts = artifacts
t.EndedAt = &now
t.UpdatedAt = now
}
// Fail 태스크 실패
func (t *Task) Fail(err error) {
now := time.Now()
t.Status = StatusFailed
t.Error = &TaskError{
Code: "EXECUTION_ERROR",
Message: err.Error(),
}
t.EndedAt = &now
t.UpdatedAt = now
}
// UpdateProgress 진행 상황 업데이트
func (t *Task) UpdateProgress(current, total int, message string) {
t.Progress = &TaskProgress{
Current: current,
Total: total,
Message: message,
}
t.UpdatedAt = time.Now()
}
// Duration 실행 시간 (완료된 태스크만)
func (t *Task) Duration() time.Duration {
if t.StartedAt == nil || t.EndedAt == nil {
return 0
}
return t.EndedAt.Sub(*t.StartedAt)
}
2. Artifact 모델
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// internal/models/artifact.go
package models
import (
"time"
"github.com/google/uuid"
)
// Artifact 결과물
type Artifact struct {
ID string `json:"artifactId"`
Name string `json:"name"`
Parts []Part `json:"parts"`
CreatedAt time.Time `json:"createdAt"`
}
// Part 콘텐츠 블록
type Part struct {
Kind string `json:"kind"` // text, data, file, image
Text string `json:"text,omitempty"`
Data interface{} `json:"data,omitempty"`
URL string `json:"url,omitempty"`
MimeType string `json:"mimeType,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// NewArtifact 생성자
func NewArtifact(name string, parts []Part) *Artifact {
return &Artifact{
ID: uuid.New().String(),
Name: name,
Parts: parts,
CreatedAt: time.Now(),
}
}
// NewTextArtifact 텍스트 결과물
func NewTextArtifact(name, text string) *Artifact {
return NewArtifact(name, []Part{
{
Kind: "text",
Text: text,
},
})
}
// NewDataArtifact 데이터 결과물
func NewDataArtifact(name string, data interface{}) *Artifact {
return NewArtifact(name, []Part{
{
Kind: "data",
Data: data,
},
})
}
// NewFileArtifact 파일 결과물
func NewFileArtifact(name, url, mimeType string) *Artifact {
return NewArtifact(name, []Part{
{
Kind: "file",
URL: url,
MimeType: mimeType,
},
})
}
3. Agent Card 모델
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// internal/models/agent_card.go
package models
// AgentCard 에이전트 정의
type AgentCard struct {
Name string `json:"name"`
Description string `json:"description"`
URL string `json:"url"`
Version string `json:"version"`
Skills []Skill `json:"skills"`
Authentication *AuthenticationInfo `json:"authentication,omitempty"`
}
// Skill 스킬 정의
type Skill struct {
Name string `json:"name"`
Description string `json:"description"`
InputModes []string `json:"inputModes"` // text, data, file, image
OutputModes []string `json:"outputModes"`
}
// AuthenticationInfo 인증 정보
type AuthenticationInfo struct {
Type string `json:"type"` // bearer, basic, oauth2
Description string `json:"description,omitempty"`
}
// NewAgentCard 생성자
func NewAgentCard(name, description, url, version string) *AgentCard {
return &AgentCard{
Name: name,
Description: description,
URL: url,
Version: version,
Skills: []Skill{},
}
}
// AddSkill 스킬 추가
func (ac *AgentCard) AddSkill(skill Skill) {
ac.Skills = append(ac.Skills, skill)
}
Task Store 구현
인터페이스
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// internal/store/task_store.go
package store
import (
"context"
"time"
"a2a-server/internal/models"
)
// TaskStore 인터페이스
type TaskStore interface {
// 태스크 생성
Create(ctx context.Context, task *models.Task) error
// 태스크 조회
Get(ctx context.Context, taskID string) (*models.Task, error)
// 태스크 목록 (테넌트별)
List(ctx context.Context, tenantID string, params ListParams) ([]*models.Task, error)
// 태스크 업데이트
Update(ctx context.Context, task *models.Task) error
// 태스크 삭제
Delete(ctx context.Context, taskID string) error
// 상태별 조회
GetByStatus(ctx context.Context, status models.TaskStatus, limit int) ([]*models.Task, error)
}
// ListParams 목록 조회 파라미터
type ListParams struct {
Status models.TaskStatus
AgentID string
Limit int
Offset int
SortBy string // created_at, updated_at
SortOrder string // asc, desc
}
PostgreSQL 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
// internal/store/postgres_store.go
package store
import (
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"a2a-server/internal/models"
)
type PostgresStore struct {
pool *pgxpool.Pool
}
func NewPostgresStore(pool *pgxpool.Pool) *PostgresStore {
return &PostgresStore{pool: pool}
}
// Create 태스크 생성
func (s *PostgresStore) Create(ctx context.Context, task *models.Task) error {
inputJSON, err := json.Marshal(task.Input)
if err != nil {
return fmt.Errorf("failed to marshal input: %w", err)
}
query := `
INSERT INTO tasks (
id, tenant_id, user_id, agent_id, skill,
input, status, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`
_, err = s.pool.Exec(
ctx, query,
task.ID,
task.TenantID,
task.UserID,
task.AgentID,
task.Skill,
inputJSON,
task.Status,
task.CreatedAt,
task.UpdatedAt,
)
return err
}
// Get 태스크 조회
func (s *PostgresStore) Get(ctx context.Context, taskID string) (*models.Task, error) {
query := `
SELECT
id, tenant_id, user_id, agent_id, skill,
input, status, artifacts, error,
created_at, updated_at, started_at, ended_at,
progress
FROM tasks
WHERE id = $1
`
var task models.Task
var inputJSON, artifactsJSON, errorJSON, progressJSON []byte
var startedAt, endedAt *time.Time
err := s.pool.QueryRow(ctx, query, taskID).Scan(
&task.ID,
&task.TenantID,
&task.UserID,
&task.AgentID,
&task.Skill,
&inputJSON,
&task.Status,
&artifactsJSON,
&errorJSON,
&task.CreatedAt,
&task.UpdatedAt,
&startedAt,
&endedAt,
&progressJSON,
)
if err != nil {
return nil, err
}
// JSON 언마샬링
if err := json.Unmarshal(inputJSON, &task.Input); err != nil {
return nil, err
}
if len(artifactsJSON) > 0 {
if err := json.Unmarshal(artifactsJSON, &task.Artifacts); err != nil {
return nil, err
}
}
if len(errorJSON) > 0 {
if err := json.Unmarshal(errorJSON, &task.Error); err != nil {
return nil, err
}
}
if len(progressJSON) > 0 {
if err := json.Unmarshal(progressJSON, &task.Progress); err != nil {
return nil, err
}
}
task.StartedAt = startedAt
task.EndedAt = endedAt
return &task, nil
}
// Update 태스크 업데이트
func (s *PostgresStore) Update(ctx context.Context, task *models.Task) error {
inputJSON, _ := json.Marshal(task.Input)
artifactsJSON, _ := json.Marshal(task.Artifacts)
errorJSON, _ := json.Marshal(task.Error)
progressJSON, _ := json.Marshal(task.Progress)
query := `
UPDATE tasks SET
status = $2,
input = $3,
artifacts = $4,
error = $5,
updated_at = $6,
started_at = $7,
ended_at = $8,
progress = $9
WHERE id = $1
`
_, err := s.pool.Exec(
ctx, query,
task.ID,
task.Status,
inputJSON,
artifactsJSON,
errorJSON,
task.UpdatedAt,
task.StartedAt,
task.EndedAt,
progressJSON,
)
return err
}
// List 태스크 목록
func (s *PostgresStore) List(ctx context.Context, tenantID string, params ListParams) ([]*models.Task, error) {
query := `
SELECT
id, agent_id, skill, status,
created_at, updated_at
FROM tasks
WHERE tenant_id = $1
`
args := []interface{}{tenantID}
argIndex := 2
// 필터 추가
if params.Status != "" {
query += fmt.Sprintf(" AND status = $%d", argIndex)
args = append(args, params.Status)
argIndex++
}
if params.AgentID != "" {
query += fmt.Sprintf(" AND agent_id = $%d", argIndex)
args = append(args, params.AgentID)
argIndex++
}
// 정렬
sortBy := params.SortBy
if sortBy == "" {
sortBy = "created_at"
}
sortOrder := params.SortOrder
if sortOrder == "" {
sortOrder = "desc"
}
query += fmt.Sprintf(" ORDER BY %s %s", sortBy, sortOrder)
// 페이지네이션
if params.Limit > 0 {
query += fmt.Sprintf(" LIMIT $%d", argIndex)
args = append(args, params.Limit)
argIndex++
}
if params.Offset > 0 {
query += fmt.Sprintf(" OFFSET $%d", argIndex)
args = append(args, params.Offset)
}
rows, err := s.pool.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var tasks []*models.Task
for rows.Next() {
var task models.Task
err := rows.Scan(
&task.ID,
&task.AgentID,
&task.Skill,
&task.Status,
&task.CreatedAt,
&task.UpdatedAt,
)
if err != nil {
return nil, err
}
tasks = append(tasks, &task)
}
return tasks, nil
}
// GetByStatus 상태별 조회
func (s *PostgresStore) GetByStatus(ctx context.Context, status models.TaskStatus, limit int) ([]*models.Task, error) {
query := `
SELECT id, tenant_id, agent_id, skill, input, status, created_at
FROM tasks
WHERE status = $1
ORDER BY created_at ASC
LIMIT $2
`
rows, err := s.pool.Query(ctx, query, status, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var tasks []*models.Task
for rows.Next() {
var task models.Task
var inputJSON []byte
err := rows.Scan(
&task.ID,
&task.TenantID,
&task.AgentID,
&task.Skill,
&inputJSON,
&task.Status,
&task.CreatedAt,
)
if err != nil {
return nil, err
}
json.Unmarshal(inputJSON, &task.Input)
tasks = append(tasks, &task)
}
return tasks, nil
}
데이터베이스 스키마
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- tasks 테이블
CREATE TABLE tasks (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
user_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
skill TEXT NOT NULL,
-- 입력/출력
input JSONB NOT NULL,
artifacts JSONB,
error JSONB,
-- 상태
status TEXT NOT NULL,
progress JSONB,
-- 타임스탬프
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
started_at TIMESTAMPTZ,
ended_at TIMESTAMPTZ
);
-- 인덱스
CREATE INDEX idx_tasks_tenant_id ON tasks(tenant_id);
CREATE INDEX idx_tasks_status ON tasks(status) WHERE status IN ('pending', 'running');
CREATE INDEX idx_tasks_agent_id ON tasks(agent_id);
CREATE INDEX idx_tasks_created_at ON tasks(created_at DESC);
-- RLS 활성화
ALTER TABLE tasks ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON tasks
USING (tenant_id = current_setting('app.current_tenant_id', true)::uuid);
Task Executor 구현
Executor 인터페이스
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// internal/executor/executor.go
package executor
import (
"context"
"a2a-server/internal/models"
)
// Executor 태스크 실행기
type Executor interface {
Execute(ctx context.Context, task *models.Task) error
}
// SkillExecutor 스킬별 실행기
type SkillExecutor interface {
Name() string
Execute(ctx context.Context, input map[string]interface{}) ([]models.Artifact, error)
}
Worker Pool 구현
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// internal/executor/worker_pool.go
package executor
import (
"context"
"log"
"sync"
"a2a-server/internal/models"
"a2a-server/internal/store"
)
type WorkerPool struct {
taskStore store.TaskStore
skills map[string]SkillExecutor
workers int
taskQueue chan *models.Task
wg sync.WaitGroup
eventBus *EventBus
}
func NewWorkerPool(
taskStore store.TaskStore,
workers int,
eventBus *EventBus,
) *WorkerPool {
return &WorkerPool{
taskStore: taskStore,
skills: make(map[string]SkillExecutor),
workers: workers,
taskQueue: make(chan *models.Task, workers*2),
eventBus: eventBus,
}
}
// RegisterSkill 스킬 등록
func (wp *WorkerPool) RegisterSkill(skill SkillExecutor) {
wp.skills[skill.Name()] = skill
}
// Start 워커 풀 시작
func (wp *WorkerPool) Start(ctx context.Context) {
log.Printf("Starting worker pool with %d workers", wp.workers)
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(ctx, i)
}
// 대기 중인 태스크 폴링
go wp.pollPendingTasks(ctx)
}
// Stop 워커 풀 중지
func (wp *WorkerPool) Stop() {
close(wp.taskQueue)
wp.wg.Wait()
log.Println("Worker pool stopped")
}
// Submit 태스크 제출
func (wp *WorkerPool) Submit(task *models.Task) {
wp.taskQueue <- task
}
// worker 개별 워커
func (wp *WorkerPool) worker(ctx context.Context, id int) {
defer wp.wg.Done()
log.Printf("Worker %d started", id)
for task := range wp.taskQueue {
log.Printf("Worker %d processing task %s", id, task.ID)
if err := wp.executeTask(ctx, task); err != nil {
log.Printf("Worker %d failed task %s: %v", id, task.ID, err)
}
}
log.Printf("Worker %d stopped", id)
}
// executeTask 태스크 실행
func (wp *WorkerPool) executeTask(ctx context.Context, task *models.Task) error {
// 스킬 찾기
skill, ok := wp.skills[task.Skill]
if !ok {
task.Fail(fmt.Errorf("skill not found: %s", task.Skill))
wp.taskStore.Update(ctx, task)
wp.eventBus.Publish(task.ID, Event{
Type: "error",
Data: map[string]interface{}{
"message": "Skill not found",
},
})
return fmt.Errorf("skill not found: %s", task.Skill)
}
// 태스크 시작
task.Start()
wp.taskStore.Update(ctx, task)
wp.eventBus.Publish(task.ID, Event{
Type: "started",
Data: map[string]interface{}{
"taskId": task.ID,
},
})
// 스킬 실행
artifacts, err := skill.Execute(ctx, task.Input)
if err != nil {
// 실패 처리
task.Fail(err)
wp.taskStore.Update(ctx, task)
wp.eventBus.Publish(task.ID, Event{
Type: "error",
Data: map[string]interface{}{
"message": err.Error(),
},
})
return err
}
// 완료 처리
task.Complete(artifacts)
wp.taskStore.Update(ctx, task)
wp.eventBus.Publish(task.ID, Event{
Type: "completed",
Data: map[string]interface{}{
"taskId": task.ID,
"artifacts": artifacts,
},
})
log.Printf("Task %s completed in %v", task.ID, task.Duration())
return nil
}
// pollPendingTasks 대기 중인 태스크 폴링
func (wp *WorkerPool) pollPendingTasks(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
tasks, err := wp.taskStore.GetByStatus(ctx, models.StatusPending, wp.workers)
if err != nil {
log.Printf("Failed to poll pending tasks: %v", err)
continue
}
for _, task := range tasks {
select {
case wp.taskQueue <- task:
log.Printf("Queued pending task %s", task.ID)
default:
log.Printf("Queue full, skipping task %s", task.ID)
}
}
}
}
}
Event Bus (SSE용)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// internal/executor/event_bus.go
package executor
import (
"sync"
)
type Event struct {
Type string
Data map[string]interface{}
}
type EventBus struct {
subscribers map[string][]chan Event
mu sync.RWMutex
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]chan Event),
}
}
// Subscribe 이벤트 구독
func (eb *EventBus) Subscribe(taskID string) chan Event {
eb.mu.Lock()
defer eb.mu.Unlock()
ch := make(chan Event, 10)
eb.subscribers[taskID] = append(eb.subscribers[taskID], ch)
return ch
}
// Unsubscribe 구독 취소
func (eb *EventBus) Unsubscribe(taskID string, ch chan Event) {
eb.mu.Lock()
defer eb.mu.Unlock()
subs := eb.subscribers[taskID]
for i, sub := range subs {
if sub == ch {
eb.subscribers[taskID] = append(subs[:i], subs[i+1:]...)
close(ch)
break
}
}
if len(eb.subscribers[taskID]) == 0 {
delete(eb.subscribers, taskID)
}
}
// Publish 이벤트 발행
func (eb *EventBus) Publish(taskID string, event Event) {
eb.mu.RLock()
defer eb.mu.RUnlock()
for _, ch := range eb.subscribers[taskID] {
select {
case ch <- event:
default:
// 채널이 가득 차면 무시
}
}
}
HTTP 핸들러
Agent Card 핸들러
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// internal/handlers/agent_card.go
package handlers
import (
"encoding/json"
"net/http"
"a2a-server/internal/models"
)
type AgentCardHandler struct {
agentCard *models.AgentCard
}
func NewAgentCardHandler(agentCard *models.AgentCard) *AgentCardHandler {
return &AgentCardHandler{agentCard: agentCard}
}
// GET /.well-known/agent-card.json
func (h *AgentCardHandler) Get(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(h.agentCard)
}
Task 핸들러
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// internal/handlers/tasks.go
package handlers
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"a2a-server/internal/executor"
"a2a-server/internal/models"
"a2a-server/internal/store"
)
type TaskHandler struct {
taskStore store.TaskStore
workerPool *executor.WorkerPool
}
func NewTaskHandler(taskStore store.TaskStore, workerPool *executor.WorkerPool) *TaskHandler {
return &TaskHandler{
taskStore: taskStore,
workerPool: workerPool,
}
}
// POST /a2a/{agentId}/tasks
func (h *TaskHandler) Create(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentId")
var req struct {
Skill string `json:"skill"`
Input map[string]interface{} `json:"input"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}
// 태스크 생성
task := models.NewTask(agentID, req.Skill, req.Input)
task.TenantID = r.Context().Value("tenant_id").(string)
task.UserID = r.Context().Value("user_id").(string)
// 저장
if err := h.taskStore.Create(r.Context(), task); err != nil {
http.Error(w, "Failed to create task", http.StatusInternalServerError)
return
}
// 워커 풀에 제출
h.workerPool.Submit(task)
// 응답
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(task)
}
// GET /a2a/{agentId}/tasks/{taskId}
func (h *TaskHandler) Get(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.taskStore.Get(r.Context(), taskID)
if err != nil {
http.Error(w, "Task not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(task)
}
// GET /a2a/{agentId}/tasks
func (h *TaskHandler) List(w http.ResponseWriter, r *http.Request) {
tenantID := r.Context().Value("tenant_id").(string)
params := store.ListParams{
Limit: 10,
Offset: 0,
}
// 쿼리 파라미터 파싱
if status := r.URL.Query().Get("status"); status != "" {
params.Status = models.TaskStatus(status)
}
tasks, err := h.taskStore.List(r.Context(), tenantID, params)
if err != nil {
http.Error(w, "Failed to list tasks", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"tasks": tasks,
})
}
SSE 이벤트 핸들러
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// internal/handlers/events.go
package handlers
import (
"encoding/json"
"fmt"
"net/http"
"github.com/go-chi/chi/v5"
"a2a-server/internal/executor"
)
type EventHandler struct {
eventBus *executor.EventBus
}
func NewEventHandler(eventBus *executor.EventBus) *EventHandler {
return &EventHandler{eventBus: eventBus}
}
// GET /a2a/{agentId}/tasks/{taskId}/events
func (h *EventHandler) Stream(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// SSE 헤더 설정
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// 이벤트 구독
eventChan := h.eventBus.Subscribe(taskID)
defer h.eventBus.Unsubscribe(taskID, eventChan)
// 이벤트 스트리밍
for {
select {
case event, ok := <-eventChan:
if !ok {
return
}
// JSON 직렬화
data, _ := json.Marshal(event.Data)
// SSE 포맷으로 전송
fmt.Fprintf(w, "event: %s\n", event.Type)
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
// 완료 또는 에러 시 종료
if event.Type == "completed" || event.Type == "error" {
return
}
case <-r.Context().Done():
return
}
}
}
main.go - 전체 조합
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// cmd/server/main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/jackc/pgx/v5/pgxpool"
"a2a-server/internal/executor"
"a2a-server/internal/handlers"
custommw "a2a-server/internal/middleware"
"a2a-server/internal/models"
"a2a-server/internal/skills"
"a2a-server/internal/store"
)
func main() {
// 환경 변수
dbURL := os.Getenv("DATABASE_URL")
port := os.Getenv("PORT")
if port == "" {
port = "8081"
}
// DB 연결
pool, err := pgxpool.New(context.Background(), dbURL)
if err != nil {
log.Fatal(err)
}
defer pool.Close()
log.Println("✅ Database connected")
// Store 생성
taskStore := store.NewPostgresStore(pool)
// Event Bus
eventBus := executor.NewEventBus()
// Worker Pool
workerPool := executor.NewWorkerPool(taskStore, 10, eventBus)
// 스킬 등록
workerPool.RegisterSkill(skills.NewResearchSkill())
workerPool.RegisterSkill(skills.NewAnalysisSkill())
log.Println("✅ Skills registered")
// Worker Pool 시작
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
workerPool.Start(ctx)
log.Println("✅ Worker pool started")
// Agent Card
agentCard := models.NewAgentCard(
"Research Assistant",
"Multi-step research and analysis agent",
"https://api.example.com/a2a/research-assistant",
"1.0.0",
)
agentCard.AddSkill(models.Skill{
Name: "research",
Description: "Comprehensive research on a topic",
InputModes: []string{"text"},
OutputModes: []string{"text", "data"},
})
// 핸들러
agentCardHandler := handlers.NewAgentCardHandler(agentCard)
taskHandler := handlers.NewTaskHandler(taskStore, workerPool)
eventHandler := handlers.NewEventHandler(eventBus)
// 라우터
r := chi.NewRouter()
// 미들웨어
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
// Agent Card (공개)
r.Get("/.well-known/agent-card.json", agentCardHandler.Get)
// A2A 엔드포인트 (인증 필요)
r.Group(func(r chi.Router) {
r.Use(custommw.AuthMiddleware(jwtValidator))
r.Post("/a2a/{agentId}/tasks", taskHandler.Create)
r.Get("/a2a/{agentId}/tasks/{taskId}", taskHandler.Get)
r.Get("/a2a/{agentId}/tasks", taskHandler.List)
r.Get("/a2a/{agentId}/tasks/{taskId}/events", eventHandler.Stream)
})
// 서버 시작
srv := &http.Server{
Addr: ":" + port,
Handler: r,
}
go func() {
log.Printf("🚀 A2A Server listening on :%s", port)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
cancel() // Worker pool 중지
workerPool.Stop()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Fatal(err)
}
log.Println("✅ Server stopped")
}
핵심 요약
A2A 서버 구성
- ✅ Models: Task, Artifact, AgentCard
- ✅ Store: PostgreSQL 기반 영속성
- ✅ Executor: Worker Pool + Event Bus
- ✅ Handlers: Task CRUD + SSE 스트리밍
- ✅ Skills: 비즈니스 로직 구현
주요 특징
- ✅ 비동기: 백그라운드 Worker Pool
- ✅ 확장성: 워커 수 조정 가능
- ✅ 실시간: SSE로 진행 상황 스트리밍
- ✅ 상태 관리: PostgreSQL에 태스크 저장
- ✅ 멀티테넌시: RLS로 격리
작성일: 2024-12-13
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.