GatewayWorker-Go 代码开发指南
本文档面向 使用 GatewayWorker-Go 编写业务逻辑 的开发者,涵盖两大核心用法:
- Worker 被动事件回调 — 在 Worker 进程中响应客户端连接/消息/断开等事件
- GatewaySDK 主动调用 — 从任意外部 Go 进程(HTTP 服务、定时任务等)向客户端推送消息
目录
架构概览
┌──────────────────────────────────┐
│ Register │
│ 服务发现 & 地址广播 │
└──────┬───────────────┬───────────┘
│ │
┌────────────▼──┐ ┌───▼────────────┐
│ Gateway │◄──────►│ Worker │
│ 持有客户端连接 │ │ 业务逻辑处理 │
└───────▲──────┘ └────────────────┘
│ ▲
┌───────┴──────┐ │
│ 客户端 │ ┌────┴───────────┐
│ (浏览器/App) │ │ GatewaySDK │
└──────────────┘ │ (HTTP/定时任务) │
└────────────────┘
数据流向:
- 被动事件:客户端 → Gateway → Worker(触发
OnConnect/OnMessage/OnClose回调) - Worker 内主动推送:Worker 回调中调用
gateway_api.SendToXxx()→ Gateway → 客户端 - 外部主动推送:外部进程创建
GatewaySDK→ 连接 Register 发现 Gateway → 发送指令 → Gateway → 客户端
Worker 被动事件开发
Worker 是业务逻辑的核心。你只需要创建一个 BusinessWorker,注册事件回调函数,然后在回调中通过 gateway_api 包与客户端交互。
最小示例
package main
import (
"fmt"
"gatewayworker-go/pkg/gateway_api"
"gatewayworker-go/pkg/worker"
"log"
)
func main() {
// 1. 创建 Worker
bw := worker.New(
"my-worker", // Worker 名称
0, // Worker ID(多实例时需不同)
[]string{"127.0.0.1:51234"}, // Register 地址
"my-secret-key", // 认证密钥
)
// 2. 注册事件回调
bw.OnMessage = func(clientID string, message []byte) {
// 收到消息后回显
gateway_api.SendToClient(clientID, []byte(fmt.Sprintf("echo: %s", message)))
}
// 3. 绑定 Worker 到 gateway_api(必须)
gateway_api.SetBusinessWorker(bw)
// 4. 启动(阻塞运行)
if err := bw.Run(); err != nil {
log.Fatal(err)
}
}
关键步骤:调用
gateway_api.SetBusinessWorker(bw)是必须的,否则gateway_api包中的所有函数都无法工作。
事件回调一览
| 回调 | 签名 | 触发时机 |
|---|---|---|
OnWorkerStart |
func() |
Worker 进程启动完成 |
OnWorkerStop |
func() |
Worker 进程停止时 |
OnConnect |
func(clientID string) |
客户端 TCP 连接建立(尚未完成协议握手) |
OnWebSocketConnect |
func(clientID string, data []byte) |
WebSocket 握手完成(data 包含 HTTP 头信息) |
OnMessage |
func(clientID string, message []byte) |
客户端发送消息 |
OnClose |
func(clientID string) |
客户端断开连接 |
触发顺序(以 WebSocket 为例):
OnConnect → OnWebSocketConnect → OnMessage (N次) → OnClose
触发顺序(以 TCP 为例):
OnConnect → OnMessage (N次) → OnClose
OnWorkerStart / OnWorkerStop
用于 Worker 生命周期管理,适合做初始化和清理工作。
bw.OnWorkerStart = func() {
log.Println("Worker 启动,初始化数据库连接...")
// 初始化数据库、缓存等资源
}
bw.OnWorkerStop = func() {
log.Println("Worker 停止,清理资源...")
// 关闭数据库连接、刷新缓存等
}
OnConnect
客户端 TCP 连接建立时触发。此时客户端尚未发送任何数据。
bw.OnConnect = func(clientID string) {
log.Printf("新连接: %s", clientID)
// 注意:WebSocket 客户端在这里还没完成握手,
// 建议在 OnWebSocketConnect 中做欢迎消息等操作
}
clientID 是全局唯一的 20 字节十六进制字符串(如
7f000001d43100000001),编码了 Gateway 的 IP、端口和连接序号。
OnWebSocketConnect
WebSocket 协议握手完成后触发,data 包含 HTTP 请求头信息(JSON 格式)。
bw.OnWebSocketConnect = func(clientID string, data []byte) {
log.Printf("WebSocket 就绪: %s", clientID)
// data 包含 HTTP 头,可用于鉴权
// 例如解析 Cookie、Token 等
// 发送欢迎消息
gateway_api.SendToClient(clientID, []byte("Welcome! Your ID: "+clientID))
}
OnMessage
客户端发送消息时触发,这是最核心的业务回调。
bw.OnMessage = func(clientID string, message []byte) {
// message 就是客户端发送的原始数据(经过协议层 Decode 之后的)
// 示例:解析 JSON 消息并分发处理
var msg struct {
Type string `json:"type"`
Data string `json:"data"`
}
if err := json.Unmarshal(message, &msg); err != nil {
gateway_api.SendToClient(clientID, []byte(`{"error":"invalid json"}`))
return
}
switch msg.Type {
case "chat":
// 广播给所有人
gateway_api.SendToAll(message, nil, nil)
case "ping":
// 回复 pong
gateway_api.SendToClient(clientID, []byte(`{"type":"pong"}`))
}
}
OnClose
客户端断开连接时触发,适合做清理工作。
bw.OnClose = func(clientID string) {
log.Printf("客户端断开: %s", clientID)
// 从业务层面清理该用户的状态
// 注意:UID 绑定和 Group 关系会被 Gateway 自动清理,无需手动处理
}
gateway_api 函数列表
在 Worker 回调函数内部,通过 gateway_api 包进行消息推送和连接管理:
import "gatewayworker-go/pkg/gateway_api"
消息发送
// 向指定客户端发送消息
gateway_api.SendToClient(clientID string, message []byte)
// 广播给所有在线客户端
// clientIDs: 限定目标列表(nil 表示所有人)
// excludeIDs: 排除列表(nil 表示不排除)
gateway_api.SendToAll(message []byte, clientIDs []string, excludeIDs []string)
// 向指定 UID 发送(支持单个 string 或 []interface{})
gateway_api.SendToUID(uid interface{}, message []byte)
// 向指定分组广播(支持单个 string 或 []interface{})
gateway_api.SendToGroup(group interface{}, message []byte)
UID 绑定
// 绑定 UID(一个 UID 可以绑定多个 clientID,实现多端登录)
gateway_api.BindUID(clientID string, uid string)
// 解除绑定
gateway_api.UnbindUID(clientID string, uid string)
分组操作
// 加入分组
gateway_api.JoinGroup(clientID string, group string)
// 离开分组
gateway_api.LeaveGroup(clientID string, group string)
// 解散整个分组(移除所有成员)
gateway_api.Ungroup(group string)
Session 操作
// 覆盖 session(整个替换)
gateway_api.SetSession(clientID string, session map[string]interface{})
// 合并 session(只更新传入的 key)
gateway_api.UpdateSession(clientID string, session map[string]interface{})
连接管理
// 踢出客户端(先发送 message 再断开)
gateway_api.CloseClient(clientID string, message []byte)
// 直接销毁连接(不发送任何消息)
gateway_api.DestroyClient(clientID string)
完整示例:聊天室
package main
import (
"encoding/json"
"fmt"
"gatewayworker-go/pkg/gateway_api"
"gatewayworker-go/pkg/worker"
"log"
"os"
"os/signal"
"syscall"
)
// ChatMsg 聊天消息格式
type ChatMsg struct {
Type string `json:"type"` // login | chat | private
Name string `json:"name,omitempty"` // 用户名(login 时)
To string `json:"to,omitempty"` // 目标 UID(private 时)
Msg string `json:"msg,omitempty"` // 消息内容
From string `json:"from,omitempty"` // 发送者(服务端填充)
}
func main() {
bw := worker.New("chat", 0, []string{"127.0.0.1:51234"}, "my-key")
bw.OnWorkerStart = func() {
log.Println("聊天室 Worker 已启动")
}
bw.OnWebSocketConnect = func(clientID string, data []byte) {
welcome, _ := json.Marshal(ChatMsg{
Type: "system",
Msg: fmt.Sprintf("连接成功,你的 ID 是 %s,请发送 login 消息设置昵称", clientID),
})
gateway_api.SendToClient(clientID, welcome)
}
bw.OnMessage = func(clientID string, message []byte) {
var msg ChatMsg
if err := json.Unmarshal(message, &msg); err != nil {
errResp, _ := json.Marshal(ChatMsg{Type: "error", Msg: "无效的 JSON 格式"})
gateway_api.SendToClient(clientID, errResp)
return
}
switch msg.Type {
case "login":
// 绑定 UID 并加入默认聊天室
gateway_api.BindUID(clientID, msg.Name)
gateway_api.JoinGroup(clientID, "chatroom")
// 设置 session 存储用户信息
gateway_api.SetSession(clientID, map[string]interface{}{
"name": msg.Name,
})
// 通知全房间
notice, _ := json.Marshal(ChatMsg{
Type: "system",
Msg: fmt.Sprintf("'%s' 加入了聊天室", msg.Name),
})
gateway_api.SendToGroup("chatroom", notice)
case "chat":
// 群聊:广播给房间所有人
broadcast, _ := json.Marshal(ChatMsg{
Type: "chat",
From: msg.Name,
Msg: msg.Msg,
})
gateway_api.SendToGroup("chatroom", broadcast)
case "private":
// 私聊:发给指定 UID
pm, _ := json.Marshal(ChatMsg{
Type: "private",
From: msg.Name,
Msg: msg.Msg,
})
gateway_api.SendToUID(msg.To, pm)
}
}
bw.OnClose = func(clientID string) {
log.Printf("用户断开: %s", clientID)
}
gateway_api.SetBusinessWorker(bw)
// 优雅退出
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
bw.Stop()
os.Exit(0)
}()
if err := bw.Run(); err != nil {
log.Fatal(err)
}
}
客户端交互流程(JavaScript):
const ws = new WebSocket('ws://127.0.0.1:7272');
ws.onopen = () => {
// 登录
ws.send(JSON.stringify({ type: 'login', name: 'Alice' }));
};
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
console.log(`[${msg.type}] ${msg.from || 'system'}: ${msg.msg}`);
};
// 发送群聊消息
ws.send(JSON.stringify({ type: 'chat', name: 'Alice', msg: '大家好!' }));
// 发送私聊消息
ws.send(JSON.stringify({ type: 'private', name: 'Alice', to: 'Bob', msg: '你好 Bob' }));
GatewaySDK 主动调用
GatewaySDK 用于从 Worker 外部的任意 Go 进程 主动向客户端推送消息,无需运行在 Worker 进程内。
创建客户端
import "gatewayworker-go/pkg/gateway_sdk"
// 创建 GatewaySDK(自动连接 Register 发现 Gateway 地址)
client := gateway_sdk.New(
[]string{"127.0.0.1:51234"}, // Register 地址(支持多个)
"my-secret-key", // 认证密钥(必须与 Gateway/Register 一致)
)
defer client.Close() // 用完记得关闭
内部机制:
- 连接 Register 获取所有 Gateway 内部地址
- 维护到每个 Gateway 的 TCP 连接池(自动重连,50 秒自动刷新)
- 所有通信通过 AES 加密
API 一览
消息发送
// 向指定客户端发送
err := client.SendToClient(clientID string, message []byte)
// 广播给所有在线客户端
err := client.SendToAll(message []byte)
// 向指定 UID 发送
err := client.SendToUID(uid interface{}, message []byte)
// 向指定分组广播
err := client.SendToGroup(group interface{}, message []byte)
UID / Group 操作
err := client.BindUID(clientID, uid string)
err := client.UnbindUID(clientID, uid string)
err := client.JoinGroup(clientID, group string)
err := client.LeaveGroup(clientID, group string)
Session 操作
err := client.SetSession(clientID string, session map[string]interface{})
err := client.UpdateSession(clientID string, session map[string]interface{})
连接管理
// 踢出(先发 message 再断开)
err := client.CloseClient(clientID string, message []byte)
// 直接销毁
err := client.DestroyClient(clientID string)
查询(仅 GatewaySDK 可用)
// 判断客户端是否在线
online, err := client.IsOnline(clientID string)
// 获取所有在线连接总数
count, err := client.GetAllClientCount()
注意:
IsOnline和GetAllClientCount是 GatewaySDK 独有的,gateway_api包中没有这些方法。因为它们需要从 Gateway 查询并等待响应,而 Worker 内部的gateway_api是单向发送。
完整示例:HTTP 推送服务
在 HTTP API 中接收请求,通过 GatewaySDK 向客户端推送实时通知:
package main
import (
"encoding/json"
"fmt"
"gatewayworker-go/pkg/gateway_sdk"
"log"
"net/http"
)
var gwClient *gateway_sdk.GatewaySDK
func main() {
// 创建 GatewaySDK(全局复用)
gwClient = gateway_sdk.New(
[]string{"127.0.0.1:51234"},
"my-secret-key",
)
defer gwClient.Close()
http.HandleFunc("/api/notify", handleNotify)
http.HandleFunc("/api/broadcast", handleBroadcast)
http.HandleFunc("/api/online-count", handleOnlineCount)
http.HandleFunc("/api/kick", handleKick)
log.Println("HTTP 推送服务启动在 :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
// POST /api/notify { "uid": "user_100", "msg": "你有新订单" }
func handleNotify(w http.ResponseWriter, r *http.Request) {
var req struct {
UID string `json:"uid"`
Msg string `json:"msg"`
}
json.NewDecoder(r.Body).Decode(&req)
payload, _ := json.Marshal(map[string]string{
"type": "notification",
"msg": req.Msg,
})
if err := gwClient.SendToUID(req.UID, payload); err != nil {
http.Error(w, err.Error(), 500)
return
}
fmt.Fprintln(w, `{"status":"ok"}`)
}
// POST /api/broadcast { "msg": "系统将于 22:00 维护" }
func handleBroadcast(w http.ResponseWriter, r *http.Request) {
var req struct {
Msg string `json:"msg"`
}
json.NewDecoder(r.Body).Decode(&req)
payload, _ := json.Marshal(map[string]string{
"type": "announcement",
"msg": req.Msg,
})
if err := gwClient.SendToAll(payload); err != nil {
http.Error(w, err.Error(), 500)
return
}
fmt.Fprintln(w, `{"status":"ok"}`)
}
// GET /api/online-count
func handleOnlineCount(w http.ResponseWriter, r *http.Request) {
count, err := gwClient.GetAllClientCount()
if err != nil {
http.Error(w, err.Error(), 500)
return
}
fmt.Fprintf(w, `{"count":%d}`, count)
}
// POST /api/kick { "client_id": "7f000001d43100000001", "reason": "违规操作" }
func handleKick(w http.ResponseWriter, r *http.Request) {
var req struct {
ClientID string `json:"client_id"`
Reason string `json:"reason"`
}
json.NewDecoder(r.Body).Decode(&req)
msg, _ := json.Marshal(map[string]string{
"type": "kicked",
"msg": req.Reason,
})
if err := gwClient.CloseClient(req.ClientID, msg); err != nil {
http.Error(w, err.Error(), 500)
return
}
fmt.Fprintln(w, `{"status":"ok"}`)
}
测试:
# 推送给指定用户
curl -X POST http://localhost:8080/api/notify \
-d '{"uid":"user_100","msg":"你有新订单"}'
# 全服广播
curl -X POST http://localhost:8080/api/broadcast \
-d '{"msg":"系统将于 22:00 维护"}'
# 查询在线人数
curl http://localhost:8080/api/online-count
# 踢出指定客户端
curl -X POST http://localhost:8080/api/kick \
-d '{"client_id":"7f000001d43100000001","reason":"违规操作"}'
完整示例:管理后台踢人
package main
import (
"fmt"
"gatewayworker-go/pkg/gateway_sdk"
"log"
)
func main() {
client := gateway_sdk.New(
[]string{"127.0.0.1:51234"},
"my-secret-key",
)
defer client.Close()
// 查看在线人数
count, err := client.GetAllClientCount()
if err != nil {
log.Fatal(err)
}
fmt.Printf("当前在线: %d 人\n", count)
// 检查某用户是否在线
targetID := "7f000001d43100000001"
online, err := client.IsOnline(targetID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("用户 %s 在线: %v\n", targetID, online)
// 踢出并发送提示
if online {
client.CloseClient(targetID, []byte(`{"type":"kicked","msg":"您的账号在其他设备登录"}`))
fmt.Println("已踢出")
}
}
gateway_api vs GatewaySDK 对比
| 维度 | gateway_api |
GatewaySDK |
|---|---|---|
| 所在进程 | Worker 进程内部 | 任意外部 Go 进程 |
| 使用位置 | 事件回调函数中 | HTTP Handler、定时任务、CLI 工具等 |
| 连接方式 | 复用 Worker 已有的 Gateway 连接 | 自建连接池(连 Register → 发现 Gateway) |
| 初始化 | gateway_api.SetBusinessWorker(bw) |
gateway_sdk.New(addrs, key) |
| 函数风格 | 包级函数,无 error 返回 | 实例方法,返回 error |
| 查询能力 | ❌ 无 | ✅ IsOnline、GetAllClientCount |
| 适用场景 | 响应客户端事件 | 主动推送、管理操作 |
选择原则:
- 在 Worker 回调里处理事件 → 用
gateway_api - 在其他进程里主动推送 → 用
GatewaySDK
常见模式
1. 多端登录
一个 UID 绑定多个 clientID,发送到 UID 时所有终端都能收到:
// Worker 中
bw.OnWebSocketConnect = func(clientID string, data []byte) {
// 假设从 HTTP 头中解析出了 token,查到了 userID
userID := parseUserID(data)
gateway_api.BindUID(clientID, userID)
}
// 外部推送时
client.SendToUID("user_100", payload) // user_100 的所有终端都会收到
2. 房间/频道
用 Group 实现聊天室、游戏房间:
// 加入房间
gateway_api.JoinGroup(clientID, "room_123")
// 房间内广播
gateway_api.SendToGroup("room_123", message)
// 离开房间
gateway_api.LeaveGroup(clientID, "room_123")
// 解散房间(所有成员被移出)
gateway_api.Ungroup("room_123")
3. Session 存储用户状态
// 设置用户信息
gateway_api.SetSession(clientID, map[string]interface{}{
"name": "Alice",
"role": "admin",
"level": 5,
})
// 更新部分字段(不影响其他 key)
gateway_api.UpdateSession(clientID, map[string]interface{}{
"level": 6, // 只更新 level
})
4. 定时广播
配合 GatewaySDK 在独立进程中做定时推送:
client := gateway_sdk.New(addrs, key)
defer client.Close()
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
msg, _ := json.Marshal(map[string]interface{}{
"type": "heartbeat",
"server_time": time.Now().Unix(),
})
client.SendToAll(msg)
}