为 Kratos 微服务框架打造的统一传输层与消息代理扩展集
一套抽象,30+ 传输协议全覆盖,开箱即用
- 30+ 传输协议与消息中间件适配:RabbitMQ、Kafka、RocketMQ、Pulsar、NATS、NSQ、MQTT、Redis Stream、Azure Service Bus、GCP Pub/Sub、AWS SQS、WebSocket、HTTP/3、WebTransport、SSE、SignalR、Socket.IO、MCP、KCP、WebRTC…… 一站式覆盖主流消息队列、云消息服务、RPC 框架与实时通信协议
- 双模式接入:
transport.Server实现可直接注册到 Kratos 服务生命周期;独立broker.Broker接口支持纯消息代理场景,按需选用 - 泛型类型安全:基于 Go 1.18+ 泛型提供
TypedHandler[T]、Subscribe[T]、RegisterSubscriber[S, T]等类型安全 API,告别interface{}运行时恐慌 - 统一消息抽象:
broker.Message统一封装 Headers / Body / Metadata / Partition / Offset,屏蔽底层协议差异 - 可观测性就绪:内置 OpenTelemetry 链路追踪集成,支持 OTLP gRPC/HTTP、Jaeger、Zipkin 等主流 Exporter,发布 / 订阅全链路 Trace
- 中间件链:Publish / Subscribe 双向中间件机制,可灵活注入日志、指标、链路追踪、限流等横切关注点
- 高可靠性消息投递:RabbitMQ 支持 Publisher Confirms(发布确认)、Publisher Returns(消息退回)、多 Exchange 路由,确保消息不丢失
- 模块化按需引入:每个 transport / broker 实现独立 Go Module,只引入你需要的依赖,避免依赖膨胀
graph TB
App["Kratos Application<br/>kratos.New()"]
subgraph Transport["transport.Server 扩展层"]
MQ["消息队列 Server<br/>Kafka · RabbitMQ · RocketMQ · Pulsar<br/>NATS · NSQ · MQTT · Redis · ActiveMQ<br/>Azure SB · GCP Pub/Sub · SQS"]
RPC["RPC / Web 扩展<br/>Thrift · GraphQL · FastHttp<br/>Gin · Go-Zero · Hertz · Iris"]
RT["实时通信 Server<br/>WebSocket · HTTP/3 · WebTransport<br/>SSE · SignalR · Socket.IO · MCP"]
NET["网络协议 Server<br/>KCP · WebRTC · TCP"]
TASK["分布式任务队列<br/>Asynq · Machinery · Cron"]
end
subgraph Broker["broker.Broker 消息代理层"]
BK["Broker 实现<br/>Kafka · MQTT · NATS · NSQ · Pulsar<br/>RabbitMQ · Redis · RocketMQ · STOMP<br/>Azure SB · GCP Pub/Sub · SQS"]
end
subgraph Core["核心抽象层"]
BI["Broker Interface<br/>Publish · Subscribe · Request"]
MSG["Message<br/>Headers · Body · Metadata · Key"]
MID["Middleware<br/>PublishMiddleware · SubscriberMiddleware"]
TRACE["Tracing<br/>OpenTelemetry 集成"]
end
App --> Transport
App --> Broker
Transport --> Core
Broker --> Core
| 中间件 | 说明 | 文档 |
|---|---|---|
| RabbitMQ | AMQP 0-9-1 协议,广泛用于企业异步消息 | README |
| Kafka | 高吞吐分布式事件流平台 | README |
| RocketMQ | 阿里云级分布式消息中间件 | README |
| ActiveMQ | STOMP 协议接入 ActiveMQ / Apollo | README |
| Pulsar | Apache Pulsar 云原生消息平台 | README |
| NATS | 轻量高性能消息系统 | README |
| NSQ | 实时分布式消息平台 | README |
| Redis | Redis Stream 消息消费 | README |
| MQTT | 物联网 MQTT v3.1.1 / v5.0 协议 | README |
| Azure Service Bus | Azure 云消息队列服务 | README |
| GCP Pub/Sub | Google Cloud 消息发布/订阅服务 | README |
| AWS SQS | Amazon Simple Queue Service | README |
| 框架 | 说明 | 文档 |
|---|---|---|
| Thrift | Apache Thrift RPC 协议 | README |
| GraphQL | GraphQL 查询语言 | README |
| FastHttp | 高性能 HTTP 框架 fasthttp | README |
| Gin | Gin Web 框架 | README |
| Go-Zero | go-zero 微服务框架 | README |
| Hertz | 字节跳动 CloudWeGo Hertz HTTP 框架 | README |
| Iris | Iris Web 框架 | README |
| tRPC | 腾讯 tRPC 微服务框架 | README |
| 框架 | 说明 | 文档 |
|---|---|---|
| Asynq | 基于 Redis 的异步任务队列 | README |
| Machinery | 分布式任务处理框架 | README |
| Cron | 定时任务调度 | README |
| HPTimer | 高精度定时器 | README |
| 协议 | 说明 | 文档 |
|---|---|---|
| WebSocket | 全双工实时通信 | README |
| HTTP/3 | 基于 QUIC 的下一代 HTTP 协议 | README |
| WebTransport | 基于 QUIC 的 Web 传输协议 | README |
| SSE | Server-Sent Events 服务端推送 | README |
| SignalR | ASP.NET SignalR 协议 | README |
| Socket.IO | Socket.IO 实时通信协议 | README |
| MCP | Model Context Protocol (AI Agent 通信) | README |
| 协议 | 说明 | 文档 |
|---|---|---|
| KCP | 可靠 UDP 协议 | README |
| WebRTC | 点对点实时通信 | README |
| TCP | 原始 TCP 长连接 | README |
| 中间件 | 说明 | 文档 |
|---|---|---|
| Kafka | 高吞吐事件流 | README |
| MQTT | 物联网消息协议 | README |
| NATS | 轻量级消息系统 | README |
| NSQ | 实时消息平台 | README |
| Pulsar | 云原生消息平台 | README |
| RabbitMQ | AMQP 消息中间件 | README |
| Redis | Redis Stream 消息 | README |
| RocketMQ | 阿里分布式消息中间件 | README |
| STOMP | STOMP 协议消息中间件 | README |
| Azure Service Bus | Azure 云消息队列服务 | README |
| GCP Pub/Sub | Google Cloud 消息发布/订阅服务 | README |
| AWS SQS | Amazon Simple Queue Service | README |
| 层级 | 技术 | 说明 |
|---|---|---|
| 语言 | Go 1.24+ | 高性能编译型语言 |
| 框架 | go-kratos v2 | B 站开源微服务框架 |
| 链路追踪 | OpenTelemetry | 统一可观测性标准 |
| Exporter | OTLP / Jaeger / Zipkin | 多种 Trace 导出后端 |
| 编解码 | JSON / Protobuf | 灵活的序列化方案 |
| TLS | crypto/tls | 安全传输层支持 |
根据需要按模块引入:
# Transport Server
go get github.com/tx7do/kratos-transport/transport/kafka
go get github.com/tx7do/kratos-transport/transport/rabbitmq
go get github.com/tx7do/kratos-transport/transport/websocket
go get github.com/tx7do/kratos-transport/transport/sse
# Broker
go get github.com/tx7do/kratos-transport/broker/kafka
go get github.com/tx7do/kratos-transport/broker/redispackage main
import (
"context"
"log"
"github.com/go-kratos/kratos/v2"
kfk "github.com/tx7do/kratos-transport/transport/kafka"
)
type Event struct {
Message string `json:"message"`
}
func main() {
ctx := context.Background()
kafkaSrv := kfk.NewServer(
kfk.WithAddress("localhost:9092"),
kfk.WithSubscribe("test-topic", "test-group", handleMessage),
)
app := kratos.New(
kratos.Name("my-service"),
kratos.Server(kafkaSrv),
)
if err := app.Run(); err != nil {
log.Fatal(err)
}
}
func handleMessage(ctx context.Context, topic string, headers broker.Headers, msg *Event) error {
log.Printf("received: %s", msg.Message)
return nil
}package main
import (
"context"
"log"
"github.com/tx7do/kratos-transport/broker"
kfk "github.com/tx7do/kratos-transport/broker/kafka"
)
func main() {
ctx := context.Background()
b := kfk.NewBroker(
broker.WithAddress("localhost:9092"),
)
if err := b.Connect(); err != nil {
log.Fatal(err)
}
defer b.Disconnect()
// 发布消息
_ = b.Publish(ctx, "test-topic", broker.NewMessage([]byte(`{"hello":"world"}`)))
// 订阅消息
_, _ = broker.Subscribe[[]byte](b, "test-topic",
func(ctx context.Context, topic string, headers broker.Headers, msg *[]byte) error {
log.Printf("received: %s", string(*msg))
return nil
},
)
}broker.Broker 是所有消息代理实现的顶层接口:
type Broker interface {
Name() string
Options() Options
Address() string
Init(...Option) error
Connect() error
Disconnect() error
Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error
Subscribe(topic string, handler Handler, binder Binder, opts ...SubscribeOption) (Subscriber, error)
Request(ctx context.Context, topic string, msg *Message, opts ...RequestOption) (*Message, error)
}统一消息模型,屏蔽底层协议差异:
type Message struct {
ID string // 消息 ID
Headers Headers // 消息头
Body any // 消息体
Key string // 分区键(Kafka Key / RabbitMQ RoutingKey)
Metadata Metadata // 元数据
Partition int // 分区号
Offset int64 // 偏移量
Msg any // 原始消息
}利用 Go 泛型实现编译期类型安全:
// 泛型订阅(Broker 层)
broker.Subscribe[MyEvent](b, "topic", handler)
// 泛型注册(Transport 层)
transport.RegisterSubscriber[MyServer](srv, ctx, "topic", "group", false, handler)支持 Publish / Subscribe 双向中间件链:
// 发布中间件
b := kfk.NewBroker(
broker.WithPublishMiddlewares(loggingMiddleware, tracingMiddleware),
)
// 订阅中间件
b := kfk.NewBroker(
broker.WithSubscriberMiddlewares(metricsMiddleware, recoveryMiddleware),
)kratos-transport/
├── broker/ # 消息代理抽象与多实现
│ ├── kafka/ # Kafka Broker
│ ├── mqtt/ # MQTT Broker
│ ├── nats/ # NATS Broker
│ ├── nsq/ # NSQ Broker
│ ├── pulsar/ # Pulsar Broker
│ ├── rabbitmq/ # RabbitMQ Broker
│ ├── redis/ # Redis Broker
│ ├── rocketmq/ # RocketMQ Broker
│ ├── azuresb/ # Azure Service Bus Broker
│ ├── gcpubsub/ # GCP Pub/Sub Broker
│ ├── sqs/ # AWS SQS Broker
│ ├── stomp/ # STOMP Broker
│ ├── broker.go # Broker 接口定义
│ ├── message.go # 统一消息模型
│ ├── options.go # Broker 全局配置
│ ├── publish.go # 发布中间件链
│ ├── subscriber.go # 订阅者管理(线程安全)
│ └── typed_handler.go # 泛型 Handler
├── transport/ # Transport Server 扩展
│ ├── activemq/ # ActiveMQ Transport
│ ├── asynq/ # Asynq 异步任务队列
│ ├── azuresb/ # Azure Service Bus Transport
│ ├── cron/ # 定时任务调度
│ ├── fasthttp/ # FastHttp Transport
│ ├── gcpubsub/ # GCP Pub/Sub Transport
│ ├── gin/ # Gin Transport
│ ├── gozero/ # Go-Zero Transport
│ ├── graphql/ # GraphQL Transport
│ ├── hertz/ # Hertz Transport
│ ├── hptimer/ # 高精度定时器
│ ├── http3/ # HTTP/3 + QUIC Transport
│ ├── iris/ # Iris Transport
│ ├── kafka/ # Kafka Transport
│ ├── kcp/ # KCP Transport
│ ├── keepalive/ # Keep-Alive Transport
│ ├── machinery/ # Machinery 任务队列
│ ├── mcp/ # MCP (Model Context Protocol)
│ ├── mqtt/ # MQTT Transport
│ ├── nats/ # NATS Transport
│ ├── nsq/ # NSQ Transport
│ ├── pulsar/ # Pulsar Transport
│ ├── rabbitmq/ # RabbitMQ Transport
│ ├── redis/ # Redis Transport
│ ├── rocketmq/ # RocketMQ Transport
│ ├── signalr/ # SignalR Transport
│ ├── socketio/ # Socket.IO Transport
│ ├── sqs/ # AWS SQS Transport
│ ├── sse/ # SSE Transport
│ ├── tcp/ # TCP Transport
│ ├── thrift/ # Thrift RPC Transport
│ ├── trpc/ # tRPC Transport
│ ├── webrtc/ # WebRTC Transport
│ ├── websocket/ # WebSocket Transport
│ ├── webtransport/ # WebTransport Transport
│ ├── register.go # 泛型订阅注册器
│ ├── options.go # Transport 全局配置
│ └── utils.go # 网络工具函数
├── tracing/ # 链路追踪扩展
│ ├── provider.go # TracerProvider 工厂
│ ├── exporter.go # 多后端 Exporter
│ ├── tracer.go # Trace 注入 / 提取
│ └── options.go # 追踪配置
├── _example/ # 示例工程
│ ├── broker/ # Broker 使用示例
│ └── server/ # Server 使用示例
├── testing/ # 测试相关
├── script/ # 辅助脚本
├── Makefile # 构建脚本
└── LICENSE # MIT 开源协议
| 项目 | 说明 |
|---|---|
| kratos-chatroom | WebSocket 实时聊天室 |
| kratos-cqrs | CQRS 架构示例(Kafka + MongoDB) |
| kratos-realtimemap | 物联网实时地图(MQTT + WebSocket) |
| go-wind-uba | 企业级用户行为分析系统 |
| go-wind-admin | 中后台管理系统脚手架 |
以上项目均收录于 Kratos 官方 Examples。
- 消息队列接入:需要将 Kafka / RabbitMQ / RocketMQ 等消息队列统一纳入 Kratos 微服务框架管理
- 实时通信服务:需要 WebSocket / SSE / SignalR / Socket.IO 等实时通信能力的微服务
- 物联网后端:MQTT 协议接入物联网设备,配合实时推送
- AI Agent 接入:通过 MCP 协议为 AI Agent 提供工具调用能力
- 异步任务处理:基于 Asynq / Machinery 构建分布式任务队列
- 多协议网关:在同一服务内同时支持 HTTP / gRPC / Thrift / GraphQL 等多种协议
- 纯消息代理:仅需消息发布 / 订阅能力,无需 Kratos 框架依赖
欢迎提交 Issue 和 Pull Request!
- Fork 本仓库
- 创建特性分支 (
git checkout -b feature/amazing-feature) - 提交变更 (
git commit -m 'Add some amazing feature') - 推送到分支 (
git push origin feature/amazing-feature) - 发起 Pull Request
本项目基于 MIT License 开源。