深入浅出 GoFrame SSE:从入门到分布式实践
前言
SSE(Server-Sent Events)是一种服务器推送技术,它允许服务器向客户端实时推送数据。与WebSocket不同,SSE是单向的,只能由服务器向客户端推送数据。SSE基于HTTP协议,实现简单,特别适合需要服务器实时推送数据的场景。
GoFrame中实现SSE的步骤
1. 实现SSE Handler
func SseHandler(r *ghttp.Request) {
// 设置SSE相关的响应头
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
r.Response.Header().Set("Access-Control-Allow-Origin", "*")
// 创建一个通道用于发送数据
messageChan := make(chan string)
// 清理函数
defer func() {
close(messageChan)
}()
// 启动一个goroutine用于生成消息
go func() {
for {
// 这里可以是你的业务逻辑
message := fmt.Sprintf("当前时间: %s", gtime.Now().String())
messageChan <- message
time.Sleep(time.Second * 2)
}
}()
// 向客户端写入数据
for {
select {
case message := <-messageChan:
// SSE消息格式
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", message)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
2. 注册路由
func main() {
s := g.Server()
// 注册SSE处理路由
s.BindHandler("/sse", SseHandler)
s.Run()
}
3. 前端实现
const eventSource = new EventSource('http://localhost:8080/sse');
eventSource.onmessage = function(event) {
console.log('收到消息:', event.data);
};
eventSource.onerror = function(error) {
console.error('SSE错误:', error);
eventSource.close();
};
实现具体业务场景
下面以实现一个实时股票行情推送为例:
type StockPrice struct {
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Time string `json:"time"`
}
func StockPriceHandler(r *ghttp.Request) {
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
messageChan := make(chan StockPrice)
defer close(messageChan)
// 模拟股票行情更新
go func() {
symbols := []string{"AAPL", "GOOGL", "MSFT"}
for {
for _, symbol := range symbols {
price := StockPrice{
Symbol: symbol,
Price: rand.Float64() * 1000,
Time: gtime.Now().String(),
}
messageChan <- price
}
time.Sleep(time.Second * 3)
}
}()
for {
select {
case price := <-messageChan:
data, err := gjson.Encode(price)
if err != nil {
g.Log().Error(r.Context(), err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
最佳实践
-
错误处理: 要始终检查写入操作的错误,并正确处理连接断开的情况。
-
心跳机制: 建议实现心跳机制来保持连接:
func sendHeartbeat(r *ghttp.Request) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.Response.Write([]byte(": heartbeat\n\n"))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
- 资源清理: 确保在连接断开时正确清理资源:
func SseHandler(r *ghttp.Request) {
// ... 其他代码 ...
cleanup := func() {
// 清理资源
close(messageChan)
// 其他清理操作
}
defer cleanup()
// ... 其他代码 ...
}
- 重试机制: 在前端实现重试机制:
function connectSSE() {
const eventSource = new EventSource('http://localhost:8080/sse');
eventSource.onmessage = function(event) {
console.log('收到消息:', event.data);
};
eventSource.onerror = function(error) {
console.error('SSE错误:', error);
eventSource.close();
// 3秒后重试
setTimeout(connectSSE, 3000);
};
return eventSource;
}
SSE消息分类与事件处理
在实际应用中,我们常常需要处理不同类型的消息。SSE协议支持自定义事件类型,我们可以充分利用这个特性:
type Message struct {
Type string `json:"type"`
Data interface{} `json:"data"`
ID string `json:"id"`
}
func EnhancedSseHandler(r *ghttp.Request) {
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
messageChan := make(chan Message)
defer close(messageChan)
go func() {
id := 0
for {
// 模拟不同类型的消息
switch id % 3 {
case 0:
messageChan <- Message{
Type: "notification",
Data: "系统通知消息",
ID: fmt.Sprintf("%d", id),
}
case 1:
messageChan <- Message{
Type: "alert",
Data: map[string]interface{}{
"level": "warning",
"msg": "系统负载过高",
},
ID: fmt.Sprintf("%d", id),
}
case 2:
messageChan <- Message{
Type: "metrics",
Data: map[string]interface{}{
"cpu": 75,
"mem": 80,
},
ID: fmt.Sprintf("%d", id),
}
}
id++
time.Sleep(time.Second * 2)
}
}()
for {
select {
case msg := <-messageChan:
data, err := gjson.Encode(msg.Data)
if err != nil {
g.Log().Error(r.Context(), err)
continue
}
// 使用SSE的完整格式
response := fmt.Sprintf("id: %s\nevent: %s\ndata: %s\n\n",
msg.ID, msg.Type, data)
r.Response.Write([]byte(response))
r.Response.Flush()
case <-r.Context().Done():
return
}
}
}
前端处理不同类型的事件:
const eventSource = new EventSource('/sse');
// 处理通知类消息
eventSource.addEventListener('notification', function(e) {
console.log('收到通知:', e.data);
showNotification(e.data);
});
// 处理告警类消息
eventSource.addEventListener('alert', function(e) {
const alert = JSON.parse(e.data);
console.log('收到告警:', alert);
handleAlert(alert);
});
// 处理指标类消息
eventSource.addEventListener('metrics', function(e) {
const metrics = JSON.parse(e.data);
console.log('收到指标:', metrics);
updateDashboard(metrics);
});
// 处理连接状态
eventSource.onopen = function() {
console.log('SSE连接已建立');
};
eventSource.onerror = function(error) {
console.error('SSE连接错误:', error);
handleConnectionError();
};
使用Redis实现分布式SSE
在分布式环境中,我们可以使用Redis的发布订阅功能来实现SSE消息的分发:
type SseManager struct {
redis *gredis.Redis
topic string
}
func NewSseManager(redis *gredis.Redis, topic string) *SseManager {
return &SseManager{
redis: redis,
topic: topic,
}
}
func (sm *SseManager) HandleSSE(r *ghttp.Request) {
r.Response.Header().Set("Content-Type", "text/event-stream")
r.Response.Header().Set("Cache-Control", "no-cache")
r.Response.Header().Set("Connection", "keep-alive")
ctx := r.Context()
// 创建Redis订阅
pubSub, _, err := sm.redis.Subscribe(ctx, sm.topic)
if err != nil {
g.Log().Error(ctx, err)
return
}
defer func() {
err := pubSub.Close(ctx)
if err != nil {
g.Log().Error(ctx, err)
}
}()
// 处理消息
for {
msg, err := pubSub.ReceiveMessage(ctx)
if err != nil {
g.Log().Error(ctx, err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", msg.Payload)))
r.Response.Flush()
}
}
// 发布消息的方法
func (sm *SseManager) PublishMessage(ctx context.Context, message interface{}) error {
data, err := gjson.Encode(message)
if err != nil {
return err
}
_, err = sm.redis.Publish(ctx, sm.topic, string(data))
return err
}
性能优化
- 消息合并:当消息频率很高时,可以合并多条消息一次性发送:
func BatchSseHandler(r *ghttp.Request) {
const batchSize = 10
const batchTimeout = time.Second
messageChan := make(chan interface{}, batchSize)
batchMessages := make([]interface{}, 0, batchSize)
ticker := time.NewTicker(batchTimeout)
defer ticker.Stop()
for {
select {
case msg := <-messageChan:
batchMessages = append(batchMessages, msg)
if len(batchMessages) >= batchSize {
sendBatch(r, batchMessages)
batchMessages = batchMessages[:0]
}
case <-ticker.C:
if len(batchMessages) > 0 {
sendBatch(r, batchMessages)
batchMessages = batchMessages[:0]
}
case <-r.Context().Done():
return
}
}
}
func sendBatch(r *ghttp.Request, messages []interface{}) {
data, err := gjson.Encode(messages)
if err != nil {
g.Log().Error(r.Context(), err)
return
}
r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
r.Response.Flush()
}
- 连接限制与负载均衡:
type SseService struct {
connections int32
maxConn int32
}
func (s *SseService) HandleSSE(r *ghttp.Request) {
// 检查连接数
if atomic.LoadInt32(&s.connections) >= s.maxConn {
r.Response.WriteStatus(http.StatusServiceUnavailable)
return
}
// 增加连接计数
atomic.AddInt32(&s.connections, 1)
defer atomic.AddInt32(&s.connections, -1)
// 处理SSE...
}
完整的生产级示例
最后,让我们来看一个结合了以上所有特性的完整示例:
SSE客户端实现与管理
首先,让我们详细实现SSE客户端的管理:
// SseClient 表示一个SSE客户端连接
type SseClient struct {
ID string
Request *ghttp.Request
MessageChan chan interface{}
LastActivity time.Time
Ctx context.Context
Cancel context.CancelFunc
mu sync.RWMutex
}
// NewSseClient 创建新的SSE客户端
func NewSseClient(r *ghttp.Request) *SseClient {
ctx, cancel := context.WithCancel(r.Context())
return &SseClient{
ID: gmd5.MustEncryptString(fmt.Sprintf("%d-%s", gtime.TimestampNano(), r.GetClientIp())),
Request: r,
MessageChan: make(chan interface{}, 100), // 设置合适的缓冲区大小
LastActivity: time.Now(),
Ctx: ctx,
Cancel: cancel,
}
}
// UpdateActivity 更新最后活动时间
func (c *SseClient) UpdateActivity() {
c.mu.Lock()
c.LastActivity = time.Now()
c.mu.Unlock()
}
// IsExpired 检查连接是否过期
func (c *SseClient) IsExpired(timeout time.Duration) bool {
c.mu.RLock()
defer c.mu.RUnlock()
return time.Since(c.LastActivity) > timeout
}
// Close 关闭客户端连接
func (c *SseClient) Close() {
c.Cancel()
close(c.MessageChan)
}
客户端连接池管理
// SsePool 管理SSE客户端连接池
type SsePool struct {
clients sync.Map
maxClients int32
timeout time.Duration
}
func NewSsePool(maxClients int32, timeout time.Duration) *SsePool {
pool := &SsePool{
maxClients: maxClients,
timeout: timeout,
}
// 启动清理过期连接的goroutine
go pool.cleanupExpired()
return pool
}
// AddClient 添加新客户端到连接池
func (p *SsePool) AddClient(client *SseClient) error {
count := p.GetClientCount()
if count >= p.maxClients {
return gerror.New("达到最大连接数限制")
}
p.clients.Store(client.ID, client)
return nil
}
// RemoveClient 从连接池移除客户端
func (p *SsePool) RemoveClient(clientID string) {
if client, ok := p.clients.LoadAndDelete(clientID); ok {
if c, ok := client.(*SseClient); ok {
c.Close()
}
}
}
// GetClientCount 获取当前连接数
func (p *SsePool) GetClientCount() int32 {
var count int32
p.clients.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}
// BroadcastMessage 广播消息给所有客户端
func (p *SsePool) BroadcastMessage(message interface{}) {
p.clients.Range(func(key, value interface{}) bool {
if client, ok := value.(*SseClient); ok {
select {
case client.MessageChan <- message:
// 消息发送成功
default:
// 客户端消息队列已满,可能需要处理
g.Log().Warning(context.Background(), "客户端消息队列已满", client.ID)
}
}
return true
})
}
// cleanupExpired 清理过期连接
func (p *SsePool) cleanupExpired() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
<-ticker.C
p.clients.Range(func(key, value interface{}) bool {
if client, ok := value.(*SseClient); ok {
if client.IsExpired(p.timeout) {
p.RemoveClient(client.ID)
g.Log().Info(context.Background(), "清理过期连接", client.ID)
}
}
return true
})
}
}
消息分组与订阅
实现消息分组功能,允许客户端订阅特定主题:
// TopicManager 管理消息主题
type TopicManager struct {
topics sync.Map // map[string]map[string]*SseClient
mu sync.RWMutex
}
func NewTopicManager() *TopicManager {
return &TopicManager{}
}
// Subscribe 订阅主题
func (tm *TopicManager) Subscribe(topic string, client *SseClient) {
tm.mu.Lock()
defer tm.mu.Unlock()
subscribers, _ := tm.topics.LoadOrStore(topic, make(map[string]*SseClient))
if m, ok := subscribers.(map[string]*SseClient); ok {
m[client.ID] = client
}
}
// Unsubscribe 取消订阅
func (tm *TopicManager) Unsubscribe(topic string, clientID string) {
tm.mu.Lock()
defer tm.mu.Unlock()
if subscribers, ok := tm.topics.Load(topic); ok {
if m, ok := subscribers.(map[string]*SseClient); ok {
delete(m, clientID)
}
}
}
// PublishToTopic 发布消息到特定主题
func (tm *TopicManager) PublishToTopic(topic string, message interface{}) {
if subscribers, ok := tm.topics.Load(topic); ok {
if m, ok := subscribers.(map[string]*SseClient); ok {
for _, client := range m {
select {
case client.MessageChan <- message:
// 消息发送成功
default:
// 处理消息队列满的情况
}
}
}
}
}
注意事项
- SSE连接会占用服务器资源,需要合理控制并发连接数。
- 对于大规模部署,建议使用Redis等消息队列来处理消息的发布与订阅。
- 要注意处理跨域问题,必要时配置正确的CORS头。
- 在生产环境中,建议添加适当的认证机制。
总结
我们详细介绍了如何在GoFrame中实现和使用SSE,包括基础实现、高级特性、性能优化等多个方面。相信读者通过本文的学习,能够掌握SSE的核心原理和实现方法,在实际项目中熟练运用这项技术。
SSE技术虽然简单,但在实际应用中仍然需要考虑很多细节问题。只有充分理解其原理,并结合实际场景做好完善的架构设计,才能构建出稳定可靠的实时推送服务。