포스트

[MCP&A2A] 13. 실시간 스트리밍

[MCP&A2A] 13. 실시간 스트리밍

Server-Sent Events (SSE)

SSE는 서버에서 클라이언트로 실시간 데이터를 푸시하는 단방향 통신 프로토콜입니다. AI 에이전트의 진행 상황, 중간 결과, 스트리밍 응답을 실시간으로 전달하는 데 이상적입니다.

SSE vs WebSocket vs Long Polling

특성SSEWebSocketLong Polling
방향성단방향 (서버→클라이언트)양방향단방향
프로토콜HTTPTCPHTTP
재연결✅ 자동❌ 수동❌ 수동
복잡도낮음높음중간
브라우저 지원✅ 대부분✅ 모든 최신 브라우저✅ 모든 브라우저
프록시 호환✅ 좋음⚠️ 설정 필요✅ 좋음
오버헤드낮음낮음높음
용도진행 상황, 알림채팅, 게임레거시

SSE를 선택해야 하는 경우

SSE 사용:

  • AI 응답 스트리밍 (토큰 단위)
  • 태스크 진행 상황 업데이트
  • 실시간 알림
  • 서버 → 클라이언트만 필요
  • 간단한 구현 원함

WebSocket 사용:

  • 실시간 양방향 통신 (채팅)
  • 낮은 지연시간 필수 (게임)
  • 바이너리 데이터 전송
  • 클라이언트 → 서버도 빈번

SSE 프로토콜 이해

SSE 메시지 포맷

1
2
3
4
5
event: message_type
data: {"key": "value"}
id: unique_id
retry: 3000

필드 설명:

  • event: 이벤트 타입 (선택, 기본값: “message”)
  • data: 페이로드 (여러 줄 가능)
  • id: 이벤트 ID (재연결 시 사용)
  • retry: 재연결 대기 시간 (밀리초)
  • 빈 줄: 메시지 종료 표시

여러 줄 데이터

1
2
3
4
data: This is the first line
data: This is the second line
data: This is the third line

클라이언트는 \n으로 연결:

1
2
3
This is the first line
This is the second line
This is the third line

재연결 메커니즘

1
2
3
id: 42
data: Some data

클라이언트가 연결 끊김:

1
2
GET /events HTTP/1.1
Last-Event-ID: 42

서버는 ID 43부터 전송 재개

Go 서버 구현

기본 SSE 핸들러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// internal/handlers/sse.go
package handlers

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

type SSEHandler struct {
    // 클라이언트 관리
    clients   map[string]chan SSEMessage
    register  chan SSEClient
    unregister chan string
    
    // 이벤트 브로드캐스트
    broadcast chan SSEEvent
}

type SSEClient struct {
    ID      string
    Channel chan SSEMessage
}

type SSEMessage struct {
    Event string                 `json:"-"`
    ID    string                 `json:"-"`
    Data  map[string]interface{} `json:"data"`
    Retry int                    `json:"-"` // milliseconds
}

type SSEEvent struct {
    ClientID string
    Message  SSEMessage
}

func NewSSEHandler() *SSEHandler {
    h := &SSEHandler{
        clients:    make(map[string]chan SSEMessage),
        register:   make(chan SSEClient),
        unregister: make(chan string),
        broadcast:  make(chan SSEEvent, 100),
    }
    
    go h.run()
    
    return h
}

// run 클라이언트 관리 루프
func (h *SSEHandler) run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client.ID] = client.Channel
            fmt.Printf("✅ Client registered: %s (total: %d)\n", client.ID, len(h.clients))
            
        case clientID := <-h.unregister:
            if ch, ok := h.clients[clientID]; ok {
                close(ch)
                delete(h.clients, clientID)
                fmt.Printf("❌ Client unregistered: %s (total: %d)\n", clientID, len(h.clients))
            }
            
        case event := <-h.broadcast:
            if ch, ok := h.clients[event.ClientID]; ok {
                select {
                case ch <- event.Message:
                default:
                    // 채널이 가득 차면 클라이언트 제거
                    close(ch)
                    delete(h.clients, event.ClientID)
                    fmt.Printf("⚠️ Client removed (slow consumer): %s\n", event.ClientID)
                }
            }
        }
    }
}

// ServeHTTP SSE 스트림 핸들러
func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // SSE 헤더 설정
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")
    
    // Flusher 지원 확인
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming not supported", http.StatusInternalServerError)
        return
    }
    
    // 클라이언트 ID (쿼리 파라미터 또는 생성)
    clientID := r.URL.Query().Get("client_id")
    if clientID == "" {
        clientID = fmt.Sprintf("client_%d", time.Now().UnixNano())
    }
    
    // 클라이언트 채널 생성
    messageChan := make(chan SSEMessage, 10)
    
    // 클라이언트 등록
    h.register <- SSEClient{
        ID:      clientID,
        Channel: messageChan,
    }
    
    defer func() {
        h.unregister <- clientID
    }()
    
    // 연결 확인 메시지
    h.sendMessage(w, flusher, SSEMessage{
        Event: "connected",
        Data: map[string]interface{}{
            "clientId": clientID,
            "message":  "Connected to SSE stream",
        },
    })
    
    // 컨텍스트 완료 대기
    ctx := r.Context()
    
    // Heartbeat ticker (30초마다)
    heartbeat := time.NewTicker(30 * time.Second)
    defer heartbeat.Stop()
    
    for {
        select {
        case <-ctx.Done():
            // 클라이언트 연결 종료
            return
            
        case msg := <-messageChan:
            // 메시지 전송
            if err := h.sendMessage(w, flusher, msg); err != nil {
                return
            }
            
        case <-heartbeat.C:
            // Heartbeat (연결 유지)
            h.sendMessage(w, flusher, SSEMessage{
                Event: "heartbeat",
                Data: map[string]interface{}{
                    "timestamp": time.Now().Unix(),
                },
            })
        }
    }
}

// sendMessage SSE 메시지 전송
func (h *SSEHandler) sendMessage(w http.ResponseWriter, flusher http.Flusher, msg SSEMessage) error {
    // Event 타입
    if msg.Event != "" {
        fmt.Fprintf(w, "event: %s\n", msg.Event)
    }
    
    // ID
    if msg.ID != "" {
        fmt.Fprintf(w, "id: %s\n", msg.ID)
    }
    
    // Data (JSON)
    data, err := json.Marshal(msg.Data)
    if err != nil {
        return err
    }
    fmt.Fprintf(w, "data: %s\n", data)
    
    // Retry
    if msg.Retry > 0 {
        fmt.Fprintf(w, "retry: %d\n", msg.Retry)
    }
    
    // 메시지 종료
    fmt.Fprintf(w, "\n")
    
    // 즉시 전송
    flusher.Flush()
    
    return nil
}

// Send 클라이언트에게 메시지 전송
func (h *SSEHandler) Send(clientID string, msg SSEMessage) {
    h.broadcast <- SSEEvent{
        ClientID: clientID,
        Message:  msg,
    }
}

// Broadcast 모든 클라이언트에게 전송
func (h *SSEHandler) Broadcast(msg SSEMessage) {
    for clientID := range h.clients {
        h.Send(clientID, msg)
    }
}

AI 스트리밍 통합

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// internal/handlers/ai_stream.go
package handlers

import (
    "context"
    "fmt"
    "net/http"
    "strings"
    
    "github.com/go-chi/chi/v5"
)

type AIStreamHandler struct {
    sseHandler *SSEHandler
    llmClient  LLMClient
}

func NewAIStreamHandler(sseHandler *SSEHandler, llmClient LLMClient) *AIStreamHandler {
    return &AIStreamHandler{
        sseHandler: sseHandler,
        llmClient:  llmClient,
    }
}

// StreamCompletion AI 응답 스트리밍
func (h *AIStreamHandler) StreamCompletion(w http.ResponseWriter, r *http.Request) {
    var req struct {
        Prompt   string `json:"prompt"`
        ClientID string `json:"clientId"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "Invalid request", http.StatusBadRequest)
        return
    }
    
    // 즉시 응답 (비동기 처리)
    w.WriteHeader(http.StatusAccepted)
    json.NewEncoder(w).Encode(map[string]string{
        "status":   "streaming",
        "clientId": req.ClientID,
    })
    
    // 백그라운드에서 스트리밍
    go h.streamResponse(req.ClientID, req.Prompt)
}

func (h *AIStreamHandler) streamResponse(clientID, prompt string) {
    // 시작 이벤트
    h.sseHandler.Send(clientID, SSEMessage{
        Event: "start",
        Data: map[string]interface{}{
            "message": "AI 응답 생성 시작",
        },
    })
    
    // 스트리밍 요청
    ctx := context.Background()
    stream, err := h.llmClient.StreamCompletion(ctx, prompt)
    
    if err != nil {
        h.sseHandler.Send(clientID, SSEMessage{
            Event: "error",
            Data: map[string]interface{}{
                "message": err.Error(),
            },
        })
        return
    }
    
    // 토큰 스트리밍
    var fullResponse strings.Builder
    tokenCount := 0
    
    for token := range stream {
        tokenCount++
        fullResponse.WriteString(token)
        
        // 토큰 이벤트
        h.sseHandler.Send(clientID, SSEMessage{
            Event: "token",
            Data: map[string]interface{}{
                "token": token,
                "count": tokenCount,
            },
        })
        
        // 10 토큰마다 진행 상황
        if tokenCount%10 == 0 {
            h.sseHandler.Send(clientID, SSEMessage{
                Event: "progress",
                Data: map[string]interface{}{
                    "tokens": tokenCount,
                },
            })
        }
    }
    
    // 완료 이벤트
    h.sseHandler.Send(clientID, SSEMessage{
        Event: "complete",
        Data: map[string]interface{}{
            "response":    fullResponse.String(),
            "tokenCount":  tokenCount,
            "message":     "AI 응답 생성 완료",
        },
    })
}

Task 진행 상황 스트리밍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// internal/handlers/task_stream.go
package handlers

import (
    "context"
    "net/http"
    
    "github.com/go-chi/chi/v5"
)

type TaskStreamHandler struct {
    sseHandler *SSEHandler
    taskStore  TaskStore
}

func NewTaskStreamHandler(sseHandler *SSEHandler, taskStore TaskStore) *TaskStreamHandler {
    return &TaskStreamHandler{
        sseHandler: sseHandler,
        taskStore:  taskStore,
    }
}

// StreamTaskProgress 태스크 진행 상황 스트리밍
func (h *TaskStreamHandler) StreamTaskProgress(w http.ResponseWriter, r *http.Request) {
    taskID := chi.URLParam(r, "taskId")
    clientID := r.URL.Query().Get("client_id")
    
    // 태스크 조회
    task, err := h.taskStore.Get(r.Context(), taskID)
    if err != nil {
        http.Error(w, "Task not found", http.StatusNotFound)
        return
    }
    
    // 현재 상태 전송
    h.sseHandler.Send(clientID, SSEMessage{
        Event: "task_status",
        Data: map[string]interface{}{
            "taskId": task.ID,
            "status": task.Status,
            "step":   task.Progress.Current,
            "total":  task.Progress.Total,
        },
    })
    
    // 완료된 태스크는 즉시 종료
    if task.Status == "completed" || task.Status == "failed" {
        h.sseHandler.Send(clientID, SSEMessage{
            Event: task.Status,
            Data: map[string]interface{}{
                "taskId": task.ID,
                "result": task.Result,
            },
        })
        return
    }
    
    // 태스크 완료 대기
    h.waitForTaskCompletion(r.Context(), clientID, taskID)
}

func (h *TaskStreamHandler) waitForTaskCompletion(ctx context.Context, clientID, taskID string) {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
            
        case <-ticker.C:
            task, err := h.taskStore.Get(ctx, taskID)
            if err != nil {
                h.sseHandler.Send(clientID, SSEMessage{
                    Event: "error",
                    Data: map[string]interface{}{
                        "message": "Failed to get task",
                    },
                })
                return
            }
            
            // 진행 상황 업데이트
            if task.Progress != nil {
                h.sseHandler.Send(clientID, SSEMessage{
                    Event: "progress",
                    Data: map[string]interface{}{
                        "step":    task.Progress.Current,
                        "total":   task.Progress.Total,
                        "message": task.Progress.Message,
                    },
                })
            }
            
            // 완료 확인
            if task.Status == "completed" {
                h.sseHandler.Send(clientID, SSEMessage{
                    Event: "completed",
                    Data: map[string]interface{}{
                        "taskId": task.ID,
                        "result": task.Result,
                    },
                })
                return
            }
            
            if task.Status == "failed" {
                h.sseHandler.Send(clientID, SSEMessage{
                    Event: "failed",
                    Data: map[string]interface{}{
                        "taskId": task.ID,
                        "error":  task.Error,
                    },
                })
                return
            }
        }
    }
}

JavaScript 클라이언트

기본 EventSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// client/sse-client.js

class SSEClient {
    constructor(url) {
        this.url = url;
        this.eventSource = null;
        this.listeners = new Map();
        this.clientId = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
    }
    
    connect() {
        // 클라이언트 ID 생성
        this.clientId = `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
        
        const url = `${this.url}?client_id=${this.clientId}`;
        
        this.eventSource = new EventSource(url);
        
        // 연결 이벤트
        this.eventSource.addEventListener('connected', (event) => {
            const data = JSON.parse(event.data);
            console.log('✅ Connected:', data);
            this.reconnectAttempts = 0;
            this.emit('connected', data);
        });
        
        // Heartbeat
        this.eventSource.addEventListener('heartbeat', (event) => {
            const data = JSON.parse(event.data);
            console.log('💓 Heartbeat:', new Date(data.timestamp * 1000));
        });
        
        // 에러 처리
        this.eventSource.onerror = (error) => {
            console.error('❌ SSE Error:', error);
            
            if (this.eventSource.readyState === EventSource.CLOSED) {
                this.handleReconnect();
            }
        };
        
        return this;
    }
    
    handleReconnect() {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error('Max reconnect attempts reached');
            this.emit('max_reconnect_reached');
            return;
        }
        
        this.reconnectAttempts++;
        const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
        
        console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
        
        setTimeout(() => {
            this.connect();
        }, delay);
    }
    
    on(event, callback) {
        if (!this.listeners.has(event)) {
            this.listeners.set(event, []);
            
            // EventSource 리스너 등록
            this.eventSource.addEventListener(event, (e) => {
                const data = JSON.parse(e.data);
                this.emit(event, data);
            });
        }
        
        this.listeners.get(event).push(callback);
        return this;
    }
    
    emit(event, data) {
        const callbacks = this.listeners.get(event) || [];
        callbacks.forEach(callback => callback(data));
    }
    
    close() {
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
    }
    
    getClientId() {
        return this.clientId;
    }
}

// 사용 예제
const client = new SSEClient('/api/sse/stream');

client
    .on('connected', (data) => {
        console.log('Connected:', data);
    })
    .on('token', (data) => {
        console.log('Token:', data.token);
        // UI 업데이트
        appendToken(data.token);
    })
    .on('progress', (data) => {
        console.log('Progress:', data);
        updateProgressBar(data.step, data.total);
    })
    .on('complete', (data) => {
        console.log('Complete:', data);
        showCompleteMessage(data.response);
    })
    .on('error', (data) => {
        console.error('Error:', data);
        showError(data.message);
    })
    .connect();

// 정리
window.addEventListener('beforeunload', () => {
    client.close();
});

React 통합

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// client/hooks/useSSE.js
import { useEffect, useState, useRef } from 'react';

export function useSSE(url) {
    const [status, setStatus] = useState('disconnected');
    const [data, setData] = useState(null);
    const [error, setError] = useState(null);
    const eventSourceRef = useRef(null);
    const listenersRef = useRef(new Map());
    
    useEffect(() => {
        const clientId = `client_${Date.now()}`;
        const eventSource = new EventSource(`${url}?client_id=${clientId}`);
        eventSourceRef.current = eventSource;
        
        eventSource.addEventListener('connected', (event) => {
            setStatus('connected');
            setData(JSON.parse(event.data));
        });
        
        eventSource.onerror = (event) => {
            setStatus('error');
            setError(event);
        };
        
        // 등록된 리스너 연결
        listenersRef.current.forEach((callback, eventType) => {
            eventSource.addEventListener(eventType, (event) => {
                callback(JSON.parse(event.data));
            });
        });
        
        return () => {
            eventSource.close();
        };
    }, [url]);
    
    const on = (eventType, callback) => {
        listenersRef.current.set(eventType, callback);
        
        if (eventSourceRef.current) {
            eventSourceRef.current.addEventListener(eventType, (event) => {
                callback(JSON.parse(event.data));
            });
        }
    };
    
    return { status, data, error, on };
}

// 사용 예제
function StreamingComponent() {
    const [tokens, setTokens] = useState([]);
    const [progress, setProgress] = useState({ current: 0, total: 0 });
    const { status, on } = useSSE('/api/sse/stream');
    
    useEffect(() => {
        on('token', (data) => {
            setTokens(prev => [...prev, data.token]);
        });
        
        on('progress', (data) => {
            setProgress({ current: data.step, total: data.total });
        });
        
        on('complete', (data) => {
            console.log('Complete:', data);
        });
    }, []);
    
    return (
        <div>
            <div>Status: {status}</div>
            <div>Progress: {progress.current}/{progress.total}</div>
            <div>{tokens.join('')}</div>
        </div>
    );
}

Vue 통합

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// client/composables/useSSE.js
import { ref, onMounted, onUnmounted } from 'vue';

export function useSSE(url) {
    const status = ref('disconnected');
    const data = ref(null);
    const error = ref(null);
    
    let eventSource = null;
    const listeners = new Map();
    
    const connect = () => {
        const clientId = `client_${Date.now()}`;
        eventSource = new EventSource(`${url}?client_id=${clientId}`);
        
        eventSource.addEventListener('connected', (event) => {
            status.value = 'connected';
            data.value = JSON.parse(event.data);
        });
        
        eventSource.onerror = (event) => {
            status.value = 'error';
            error.value = event;
        };
        
        // 등록된 리스너 연결
        listeners.forEach((callback, eventType) => {
            eventSource.addEventListener(eventType, (event) => {
                callback(JSON.parse(event.data));
            });
        });
    };
    
    const on = (eventType, callback) => {
        listeners.set(eventType, callback);
        
        if (eventSource) {
            eventSource.addEventListener(eventType, (event) => {
                callback(JSON.parse(event.data));
            });
        }
    };
    
    const close = () => {
        if (eventSource) {
            eventSource.close();
            eventSource = null;
        }
    };
    
    onMounted(() => {
        connect();
    });
    
    onUnmounted(() => {
        close();
    });
    
    return { status, data, error, on, close };
}

// 사용 예제
<script setup>
import { ref } from 'vue';
import { useSSE } from './composables/useSSE';

const tokens = ref([]);
const { status, on } = useSSE('/api/sse/stream');

on('token', (data) => {
    tokens.value.push(data.token);
});

on('complete', (data) => {
    console.log('Complete:', data);
});
</script>

<template>
    <div>
        <div>Status: </div>
        <div></div>
    </div>
</template>

프로덕션 고려사항

1. 연결 제한

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// internal/handlers/sse_limited.go
type LimitedSSEHandler struct {
    *SSEHandler
    maxClients    int
    clientsByUser map[string][]string
    mu            sync.RWMutex
}

func (h *LimitedSSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    userID := r.Context().Value("user_id").(string)
    
    h.mu.Lock()
    currentClients := len(h.clientsByUser[userID])
    h.mu.Unlock()
    
    if currentClients >= 3 {
        http.Error(w, "Too many connections", http.StatusTooManyRequests)
        return
    }
    
    // 나머지 로직
    h.SSEHandler.ServeHTTP(w, r)
}

2. 타임아웃

1
2
3
4
5
6
7
func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 최대 1시간
    ctx, cancel := context.WithTimeout(r.Context(), 1*time.Hour)
    defer cancel()
    
    // ...
}

3. 버퍼 관리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 느린 클라이언트 탐지
const maxBufferSize = 100

func (h *SSEHandler) Send(clientID string, msg SSEMessage) {
    if ch, ok := h.clients[clientID]; ok {
        if len(ch) >= maxBufferSize {
            // 버퍼 가득 - 클라이언트 제거
            close(ch)
            delete(h.clients, clientID)
            return
        }
        
        ch <- msg
    }
}

4. 메모리 관리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 정기적 정리
func (h *SSEHandler) cleanup() {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for range ticker.C {
        h.mu.Lock()
        for clientID, ch := range h.clients {
            select {
            case ch <- SSEMessage{Event: "ping"}:
                // 살아있음
            default:
                // 응답 없음 - 제거
                close(ch)
                delete(h.clients, clientID)
            }
        }
        h.mu.Unlock()
    }
}

핵심 요약

SSE 장점

  • 간단함: HTTP 기반, 복잡한 프로토콜 불필요
  • 재연결: 브라우저가 자동 재연결
  • 호환성: 프록시, 방화벽 친화적
  • 효율성: WebSocket보다 오버헤드 낮음
  • 표준: W3C 표준, 브라우저 네이티브 지원

실무 체크리스트

  • 헤더 설정: Content-Type, Cache-Control, Connection
  • Flusher: 즉시 전송 확인
  • Heartbeat: 연결 유지 (30초)
  • 에러 처리: 재연결, 백오프
  • 제한: 사용자당 최대 연결 수
  • 타임아웃: 장시간 연결 방지
  • 정리: 끊긴 연결 제거

작성일: 2024-12-13

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.