深入浅出 GoFrame SSE:从入门到分布式实践

前言

SSE(Server-Sent Events)是一种服务器推送技术,它允许服务器向客户端实时推送数据。与WebSocket不同,SSE是单向的,只能由服务器向客户端推送数据。SSE基于HTTP协议,实现简单,特别适合需要服务器实时推送数据的场景。

GoFrame中实现SSE的步骤

1. 实现SSE Handler

func SseHandler(r *ghttp.Request) {
    // 设置SSE相关的响应头
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")
    r.Response.Header().Set("Access-Control-Allow-Origin", "*")

    // 创建一个通道用于发送数据
    messageChan := make(chan string)

    // 清理函数
    defer func() {
       close(messageChan)
    }()

    // 启动一个goroutine用于生成消息
    go func() {
       for {
          // 这里可以是你的业务逻辑
          message := fmt.Sprintf("当前时间: %s", gtime.Now().String())
          messageChan <- message
          time.Sleep(time.Second * 2)
       }
    }()

    // 向客户端写入数据
    for {
       select {
       case message := <-messageChan:
          // SSE消息格式
          r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", message)))
          r.Response.Flush()
       case <-r.Context().Done():
          return
       }
    }
}

2. 注册路由

func main() {
    s := g.Server()
    
    // 注册SSE处理路由
    s.BindHandler("/sse", SseHandler)
    
    s.Run()
}

3. 前端实现

const eventSource = new EventSource('http://localhost:8080/sse');

eventSource.onmessage = function(event) {
    console.log('收到消息:', event.data);
};

eventSource.onerror = function(error) {
    console.error('SSE错误:', error);
    eventSource.close();
};

实现具体业务场景

下面以实现一个实时股票行情推送为例:

type StockPrice struct {
    Symbol string  `json:"symbol"`
    Price  float64 `json:"price"`
    Time   string  `json:"time"`
}

func StockPriceHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    messageChan := make(chan StockPrice)
    defer close(messageChan)

    // 模拟股票行情更新
    go func() {
       symbols := []string{"AAPL", "GOOGL", "MSFT"}
       for {
          for _, symbol := range symbols {
             price := StockPrice{
                Symbol: symbol,
                Price:  rand.Float64() * 1000,
                Time:   gtime.Now().String(),
             }
             messageChan <- price
          }
          time.Sleep(time.Second * 3)
       }
    }()

    for {
       select {
       case price := <-messageChan:
          data, err := gjson.Encode(price)
          if err != nil {
             g.Log().Error(r.Context(), err)
             return
          }

          r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
          r.Response.Flush()

       case <-r.Context().Done():
          return
       }
    }
}

最佳实践

  1. 错误处理: 要始终检查写入操作的错误,并正确处理连接断开的情况。

  2. 心跳机制: 建议实现心跳机制来保持连接:

func sendHeartbeat(r *ghttp.Request) {
    ticker := time.NewTicker(time.Second * 30)
    defer ticker.Stop()

    for {
       select {
       case <-ticker.C:
          r.Response.Write([]byte(": heartbeat\n\n"))
          r.Response.Flush()
       case <-r.Context().Done():
          return
       }
    }
}
  1. 资源清理: 确保在连接断开时正确清理资源:
func SseHandler(r *ghttp.Request) {
    // ... 其他代码 ...
    
    cleanup := func() {
        // 清理资源
        close(messageChan)
        // 其他清理操作
    }
    
    defer cleanup()
    
    // ... 其他代码 ...
}
  1. 重试机制: 在前端实现重试机制:
function connectSSE() {
    const eventSource = new EventSource('http://localhost:8080/sse');
    
    eventSource.onmessage = function(event) {
        console.log('收到消息:', event.data);
    };
    
    eventSource.onerror = function(error) {
        console.error('SSE错误:', error);
        eventSource.close();
        
        // 3秒后重试
        setTimeout(connectSSE, 3000);
    };
    
    return eventSource;
}

SSE消息分类与事件处理

在实际应用中,我们常常需要处理不同类型的消息。SSE协议支持自定义事件类型,我们可以充分利用这个特性:

type Message struct {
    Type string      `json:"type"`
    Data interface{} `json:"data"`
    ID   string      `json:"id"`
}

func EnhancedSseHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    messageChan := make(chan Message)
    defer close(messageChan)

    go func() {
       id := 0
       for {
          // 模拟不同类型的消息
          switch id % 3 {
          case 0:
             messageChan <- Message{
                Type: "notification",
                Data: "系统通知消息",
                ID:   fmt.Sprintf("%d", id),
             }
          case 1:
             messageChan <- Message{
                Type: "alert",
                Data: map[string]interface{}{
                   "level": "warning",
                   "msg":   "系统负载过高",
                },
                ID: fmt.Sprintf("%d", id),
             }
          case 2:
             messageChan <- Message{
                Type: "metrics",
                Data: map[string]interface{}{
                   "cpu": 75,
                   "mem": 80,
                },
                ID: fmt.Sprintf("%d", id),
             }
          }
          id++
          time.Sleep(time.Second * 2)
       }
    }()

    for {
       select {
       case msg := <-messageChan:
          data, err := gjson.Encode(msg.Data)
          if err != nil {
             g.Log().Error(r.Context(), err)
             continue
          }

          // 使用SSE的完整格式
          response := fmt.Sprintf("id: %s\nevent: %s\ndata: %s\n\n",
             msg.ID, msg.Type, data)

          r.Response.Write([]byte(response))
          r.Response.Flush()

       case <-r.Context().Done():
          return
       }
    }
}

前端处理不同类型的事件:

const eventSource = new EventSource('/sse');

// 处理通知类消息
eventSource.addEventListener('notification', function(e) {
    console.log('收到通知:', e.data);
    showNotification(e.data);
});

// 处理告警类消息
eventSource.addEventListener('alert', function(e) {
    const alert = JSON.parse(e.data);
    console.log('收到告警:', alert);
    handleAlert(alert);
});

// 处理指标类消息
eventSource.addEventListener('metrics', function(e) {
    const metrics = JSON.parse(e.data);
    console.log('收到指标:', metrics);
    updateDashboard(metrics);
});

// 处理连接状态
eventSource.onopen = function() {
    console.log('SSE连接已建立');
};

eventSource.onerror = function(error) {
    console.error('SSE连接错误:', error);
    handleConnectionError();
};

使用Redis实现分布式SSE

在分布式环境中,我们可以使用Redis的发布订阅功能来实现SSE消息的分发:

type SseManager struct {
    redis *gredis.Redis
    topic string
}

func NewSseManager(redis *gredis.Redis, topic string) *SseManager {
    return &SseManager{
       redis: redis,
       topic: topic,
    }
}

func (sm *SseManager) HandleSSE(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")
    ctx := r.Context()
    // 创建Redis订阅
    pubSub, _, err := sm.redis.Subscribe(ctx, sm.topic)
    if err != nil {
       g.Log().Error(ctx, err)
       return
    }
    defer func() {
       err := pubSub.Close(ctx)
       if err != nil {
          g.Log().Error(ctx, err)
       }
    }()

    // 处理消息
    for {
       msg, err := pubSub.ReceiveMessage(ctx)
       if err != nil {
          g.Log().Error(ctx, err)
          return
       }
       r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", msg.Payload)))
       r.Response.Flush()
    }
}

// 发布消息的方法
func (sm *SseManager) PublishMessage(ctx context.Context, message interface{}) error {
    data, err := gjson.Encode(message)
    if err != nil {
       return err
    }
    _, err = sm.redis.Publish(ctx, sm.topic, string(data))
    return err
}

性能优化

  1. 消息合并:当消息频率很高时,可以合并多条消息一次性发送:
func BatchSseHandler(r *ghttp.Request) {
    const batchSize = 10
    const batchTimeout = time.Second

    messageChan := make(chan interface{}, batchSize)
    batchMessages := make([]interface{}, 0, batchSize)
    ticker := time.NewTicker(batchTimeout)
    defer ticker.Stop()

    for {
       select {
       case msg := <-messageChan:
          batchMessages = append(batchMessages, msg)
          if len(batchMessages) >= batchSize {
             sendBatch(r, batchMessages)
             batchMessages = batchMessages[:0]
          }

       case <-ticker.C:
          if len(batchMessages) > 0 {
             sendBatch(r, batchMessages)
             batchMessages = batchMessages[:0]
          }

       case <-r.Context().Done():
          return
       }
    }
}

func sendBatch(r *ghttp.Request, messages []interface{}) {
    data, err := gjson.Encode(messages)
    if err != nil {
       g.Log().Error(r.Context(), err)
       return
    }

    r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
    r.Response.Flush()
}
  1. 连接限制与负载均衡:
type SseService struct {
    connections int32
    maxConn     int32
}

func (s *SseService) HandleSSE(r *ghttp.Request) {
    // 检查连接数
    if atomic.LoadInt32(&s.connections) >= s.maxConn {
       r.Response.WriteStatus(http.StatusServiceUnavailable)
       return
    }

    // 增加连接计数
    atomic.AddInt32(&s.connections, 1)
    defer atomic.AddInt32(&s.connections, -1)

    // 处理SSE...
}

完整的生产级示例

最后,让我们来看一个结合了以上所有特性的完整示例:

SSE客户端实现与管理

首先,让我们详细实现SSE客户端的管理:

// SseClient 表示一个SSE客户端连接
type SseClient struct {
    ID           string
    Request      *ghttp.Request
    MessageChan  chan interface{}
    LastActivity time.Time
    Ctx          context.Context
    Cancel       context.CancelFunc
    mu           sync.RWMutex
}

// NewSseClient 创建新的SSE客户端
func NewSseClient(r *ghttp.Request) *SseClient {
    ctx, cancel := context.WithCancel(r.Context())
    return &SseClient{
        ID:           gmd5.MustEncryptString(fmt.Sprintf("%d-%s", gtime.TimestampNano(), r.GetClientIp())),
        Request:      r,
        MessageChan:  make(chan interface{}, 100),  // 设置合适的缓冲区大小
        LastActivity: time.Now(),
        Ctx:          ctx,
        Cancel:       cancel,
    }
}

// UpdateActivity 更新最后活动时间
func (c *SseClient) UpdateActivity() {
    c.mu.Lock()
    c.LastActivity = time.Now()
    c.mu.Unlock()
}

// IsExpired 检查连接是否过期
func (c *SseClient) IsExpired(timeout time.Duration) bool {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return time.Since(c.LastActivity) > timeout
}

// Close 关闭客户端连接
func (c *SseClient) Close() {
    c.Cancel()
    close(c.MessageChan)
}

客户端连接池管理

// SsePool 管理SSE客户端连接池
type SsePool struct {
    clients    sync.Map
    maxClients int32
    timeout    time.Duration
}

func NewSsePool(maxClients int32, timeout time.Duration) *SsePool {
    pool := &SsePool{
        maxClients: maxClients,
        timeout:    timeout,
    }
    
    // 启动清理过期连接的goroutine
    go pool.cleanupExpired()
    return pool
}

// AddClient 添加新客户端到连接池
func (p *SsePool) AddClient(client *SseClient) error {
    count := p.GetClientCount()
    if count >= p.maxClients {
        return gerror.New("达到最大连接数限制")
    }
    
    p.clients.Store(client.ID, client)
    return nil
}

// RemoveClient 从连接池移除客户端
func (p *SsePool) RemoveClient(clientID string) {
    if client, ok := p.clients.LoadAndDelete(clientID); ok {
        if c, ok := client.(*SseClient); ok {
            c.Close()
        }
    }
}

// GetClientCount 获取当前连接数
func (p *SsePool) GetClientCount() int32 {
    var count int32
    p.clients.Range(func(key, value interface{}) bool {
        count++
        return true
    })
    return count
}

// BroadcastMessage 广播消息给所有客户端
func (p *SsePool) BroadcastMessage(message interface{}) {
    p.clients.Range(func(key, value interface{}) bool {
        if client, ok := value.(*SseClient); ok {
            select {
            case client.MessageChan <- message:
                // 消息发送成功
            default:
                // 客户端消息队列已满,可能需要处理
                g.Log().Warning(context.Background(), "客户端消息队列已满", client.ID)
            }
        }
        return true
    })
}

// cleanupExpired 清理过期连接
func (p *SsePool) cleanupExpired() {
    ticker := time.NewTicker(time.Minute)
    defer ticker.Stop()
    
    for {
        <-ticker.C
        p.clients.Range(func(key, value interface{}) bool {
            if client, ok := value.(*SseClient); ok {
                if client.IsExpired(p.timeout) {
                    p.RemoveClient(client.ID)
                    g.Log().Info(context.Background(), "清理过期连接", client.ID)
                }
            }
            return true
        })
    }
}

消息分组与订阅

实现消息分组功能,允许客户端订阅特定主题:

// TopicManager 管理消息主题
type TopicManager struct {
    topics sync.Map  // map[string]map[string]*SseClient
    mu     sync.RWMutex
}

func NewTopicManager() *TopicManager {
    return &TopicManager{}
}

// Subscribe 订阅主题
func (tm *TopicManager) Subscribe(topic string, client *SseClient) {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    
    subscribers, _ := tm.topics.LoadOrStore(topic, make(map[string]*SseClient))
    if m, ok := subscribers.(map[string]*SseClient); ok {
        m[client.ID] = client
    }
}

// Unsubscribe 取消订阅
func (tm *TopicManager) Unsubscribe(topic string, clientID string) {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    
    if subscribers, ok := tm.topics.Load(topic); ok {
        if m, ok := subscribers.(map[string]*SseClient); ok {
            delete(m, clientID)
        }
    }
}

// PublishToTopic 发布消息到特定主题
func (tm *TopicManager) PublishToTopic(topic string, message interface{}) {
    if subscribers, ok := tm.topics.Load(topic); ok {
        if m, ok := subscribers.(map[string]*SseClient); ok {
            for _, client := range m {
                select {
                case client.MessageChan <- message:
                    // 消息发送成功
                default:
                    // 处理消息队列满的情况
                }
            }
        }
    }
}

注意事项

  • SSE连接会占用服务器资源,需要合理控制并发连接数。
  • 对于大规模部署,建议使用Redis等消息队列来处理消息的发布与订阅。
  • 要注意处理跨域问题,必要时配置正确的CORS头。
  • 在生产环境中,建议添加适当的认证机制。

总结

我们详细介绍了如何在GoFrame中实现和使用SSE,包括基础实现、高级特性、性能优化等多个方面。相信读者通过本文的学习,能够掌握SSE的核心原理和实现方法,在实际项目中熟练运用这项技术。

SSE技术虽然简单,但在实际应用中仍然需要考虑很多细节问题。只有充分理解其原理,并结合实际场景做好完善的架构设计,才能构建出稳定可靠的实时推送服务。

关于我
loading