什么是NSQite:Go语言轻量级消息队列解决方案
在现代软件开发中,消息队列是一个至关重要的工具。它可以实现服务之间的解耦,提高系统的伸缩性和可靠性。对于项目初期,开发团队可能不需要复杂的分布式消息队列系统,如NSQ、NATs或Pulsar。此时,NSQite就成为了理想的选择。它是一款用Go语言实现的轻量级消息队列,支持SQLite、PostgreSQL和ORM作为持久化存储。
NSQite的优势
1. 简单可靠
NSQite的设计理念是提供一个简单且可靠的解决方案,满足项目初期的基本消息队列需求。它的API设计与go-nsq类似,这意味着在未来项目需要升级到更高并发的NSQ时,开发人员可以轻松过渡。
2. 多种存储支持
NSQite支持多种存储方式,包括SQLite和PostgreSQL。这种灵活性使得开发人员可以根据项目需求选择最合适的存储方案。
3. 消息传递保证
NSQite保证消息至少被传递一次。这意味着在某些情况下可能会出现重复消息。因此,消费者需要实现去重或幂等操作,以确保消息处理的正确性。
快速开始:事件总线
事件总线是NSQite的一个重要功能,特别适用于单体架构中的业务场景。它基于内存实现,支持发布者与订阅者之间的1:N关系,并包含任务失败延迟重试机制。
适用场景
事件总线适用于以下场景:
-
单体架构 -
需要实时通知订阅者 -
支持服务重启后的消息补偿 -
支持泛型消息体
示例场景
假设系统发生告警时,需要同时记录到数据库并通过WebSocket通知客户端。在这种情况下:
-
数据库记录模块订阅告警主题。 -
WebSocket通知模块订阅告警主题。 -
告警发生时,发布者发送告警消息。 -
两个订阅者分别处理消息。
事件总线的作用是解耦模块,将命令式编程改为事件驱动架构。
关于消息顺序
当订阅者协程数为1且处理函数始终返回nil时,NSQite保证消息有序。但在其他情况下(并发处理或消息重试),NSQite无法保证消息顺序。
快速开始:事务消息
事务消息是NSQite的另一个强大功能。它基于数据库实现,支持GORM,并由生产者与消费者组成。事务消息确保消息与数据库事务绑定,事务回滚时消息可撤销。
适用场景
事务消息适用于以下场景:
-
单体架构或分布式架构 -
消息与数据库事务绑定,事务回滚时消息可撤销 -
单体架构中消息快速处理 -
分布式架构中消息延迟100~5000毫秒
示例场景
假设需要删除用户时,同时删除用户相关的数据。这种情况下:
-
用户资料模块订阅删除用户主题。 -
在删除用户的事务中,发布事务消息。 -
事务提交后,消费者收到消息开始处理。 -
如果处理过程中服务器崩溃,重启后消费者会重新收到消息并处理。
注意:消息可能会被多次触发,因此消费者需要实现幂等性处理。
代码示例
基本使用
type Reader1 struct{}
// HandleMessage implements Handler.
func (r *Reader1) HandleMessage(message *EventMessage[string]) error {
fmt.Println("reader one :", message.Body)
return nil
}
// 模拟一个作者疯狂写书,出版社派出 5 个编辑,每个编辑每秒只能处理一本书
func main() {
// 1. SetGorm
nsqite.SetGorm(db)
const topic = "a-book"
p := NewProducer[string]()
// 限制任务失败重试次数 10 次
c := NewConsumer(topic, "comsumer1", WithMaxAttempts(10))
c.AddConcurrentHandlers(&Reader1{}, 5)
for i := 0; i < 5; i++ {
p.Publish(topic, fmt.Sprintf("a >> hello %d", i))
}
time.Sleep(2 * time.Second)
}
手动完成
type Reader3 struct {
receivedMessages sync.Map
attemptCount int32
}
// HandleMessage implements Handler.
func (r *Reader3) HandleMessage(message *EventMessage[string]) error {
// 禁用自动完成
message.DisableAutoResponse()
if message.Body == "hello" || message.Attempts > 3 {
// 手动完成
r.receivedMessages.Store(message.Body, true)
message.Finish()
return nil
}
// 手动延迟 1 秒后重试
atomic.AddInt32(&r.attemptCount, 1)
message.Requeue(time.Second)
return nil
}
维护与优化
NSQite使用slog记录日志。如果出现以下警告日志,需要及时优化参数:
-
[NSQite] publish message timeout
:表示发布速度过快,消费者处理不过来。可以通过以下方式优化:-
增加缓存队列长度 -
增加并发处理协程数 -
优化消费者处理函数性能
-
默认超时时间为3秒。如果频繁出现超时,可以通过WithCheckTimeout(10*time.Second)
调整超时时间。
Benchmark
事件总线
一个发布者,一个订阅者,每秒并发300百万:

事务消息队列
一个生产者,一个消费者,基于SQLite数据库:

使用PostgreSQL会有更好的表现。
下一步开发任务
-
事件总线支持Redis作为持久化存储,支持分布式。 -
事务消息队列支持分布式,其消费者收到消息后要更新一下数据库即可。
常见问题解答(QA)
问题1:a、b、c三个订阅者,当b阻塞时,会发生什么?
-
a正常收到消息。 -
b阻塞,导致c收不到消息。 -
b阻塞,导致发布者也阻塞。
解决方法:
-
使用 WithDiscardOnBlocking(true)
丢弃消息。 -
使用 PublicWithContext(ctx, topic, message)
来限制发布超时时间。 -
使用 WithQueueSize(1024)
来设置缓存队列长度。 -
调整回调,使消费者更快处理任务。
问题2:使用事务消息,消息已发布,a、c已完成任务,此时服务重启,b未完成会怎样?
-
服务重启,b会重新收到消息继续处理。 -
a、c因为已完成,不会收到消息。
问题3:任务执行失败,可以自定义延迟执行时间吗?
可以,参考案例:案例
问题4:如果任务一直失败,达到最大超时次数会怎样?
任务结束有2个判定标准:
-
任务执行成功。 -
任务达到最大执行次数。
如果需要无限次执行,可以使用WithMaxAttempts(0)
。默认重试10次,可以改为更多次数WithMaxAttempts(100)
。
问题5:WithMaxAttempts(10)
表示重试10次,如果一直失败,问回调一共执行了多少次?
-
10次。
问题6:事务消息会在数据库存储多久?
-
自动删除15天前的全部消息。 -
自动删除7天前已完成的消息。 -
当表数据超过1万条时,自动删除3天前的已完成消息。
需要自定义时间?请提pr或issues。
问题7:在事件总线中,回调一直失败会阻塞队列吗?
-
不会,会进入优先队列中,延迟处理。 -
大量任务失败,会导致消息堆积在内存中,达到最大尝试次数时释放。
问题8:在事件总线中,发布的某个主题阻塞了,会影响其它主题发布吗?
-
不会,主题之间互不影响。
通过以上内容,我们对NSQite有了全面的了解。它是一个强大的轻量级消息队列解决方案,适用于多种场景。无论是事件总线还是事务消息,NSQite都能提供高效、可靠的性能。希望这篇博客文章能帮助你更好地理解和使用NSQite。