什么是NSQite:Go语言轻量级消息队列解决方案

在现代软件开发中,消息队列是一个至关重要的工具。它可以实现服务之间的解耦,提高系统的伸缩性和可靠性。对于项目初期,开发团队可能不需要复杂的分布式消息队列系统,如NSQ、NATs或Pulsar。此时,NSQite就成为了理想的选择。它是一款用Go语言实现的轻量级消息队列,支持SQLite、PostgreSQL和ORM作为持久化存储。

NSQite的优势

1. 简单可靠

NSQite的设计理念是提供一个简单且可靠的解决方案,满足项目初期的基本消息队列需求。它的API设计与go-nsq类似,这意味着在未来项目需要升级到更高并发的NSQ时,开发人员可以轻松过渡。

2. 多种存储支持

NSQite支持多种存储方式,包括SQLite和PostgreSQL。这种灵活性使得开发人员可以根据项目需求选择最合适的存储方案。

3. 消息传递保证

NSQite保证消息至少被传递一次。这意味着在某些情况下可能会出现重复消息。因此,消费者需要实现去重或幂等操作,以确保消息处理的正确性。

快速开始:事件总线

事件总线是NSQite的一个重要功能,特别适用于单体架构中的业务场景。它基于内存实现,支持发布者与订阅者之间的1:N关系,并包含任务失败延迟重试机制。

适用场景

事件总线适用于以下场景:

  • 单体架构
  • 需要实时通知订阅者
  • 支持服务重启后的消息补偿
  • 支持泛型消息体

示例场景

假设系统发生告警时,需要同时记录到数据库并通过WebSocket通知客户端。在这种情况下:

  1. 数据库记录模块订阅告警主题。
  2. WebSocket通知模块订阅告警主题。
  3. 告警发生时,发布者发送告警消息。
  4. 两个订阅者分别处理消息。

事件总线的作用是解耦模块,将命令式编程改为事件驱动架构。

关于消息顺序

当订阅者协程数为1且处理函数始终返回nil时,NSQite保证消息有序。但在其他情况下(并发处理或消息重试),NSQite无法保证消息顺序。

快速开始:事务消息

事务消息是NSQite的另一个强大功能。它基于数据库实现,支持GORM,并由生产者与消费者组成。事务消息确保消息与数据库事务绑定,事务回滚时消息可撤销。

适用场景

事务消息适用于以下场景:

  • 单体架构或分布式架构
  • 消息与数据库事务绑定,事务回滚时消息可撤销
  • 单体架构中消息快速处理
  • 分布式架构中消息延迟100~5000毫秒

示例场景

假设需要删除用户时,同时删除用户相关的数据。这种情况下:

  1. 用户资料模块订阅删除用户主题。
  2. 在删除用户的事务中,发布事务消息。
  3. 事务提交后,消费者收到消息开始处理。
  4. 如果处理过程中服务器崩溃,重启后消费者会重新收到消息并处理。

注意:消息可能会被多次触发,因此消费者需要实现幂等性处理。

代码示例

基本使用

type Reader1 struct{}

// HandleMessage implements Handler.
func (r *Reader1) HandleMessage(message *EventMessage[string]) error {
 fmt.Println("reader one :", message.Body)
 return nil
}

// 模拟一个作者疯狂写书,出版社派出 5 个编辑,每个编辑每秒只能处理一本书
func main() {
 // 1. SetGorm
 nsqite.SetGorm(db)

 const topic = "a-book"
 p := NewProducer[string]()
 // 限制任务失败重试次数 10 次
 c := NewConsumer(topic, "comsumer1", WithMaxAttempts(10))
 c.AddConcurrentHandlers(&Reader1{}, 5)
 for i := 0; i < 5; i++ {
  p.Publish(topic, fmt.Sprintf("a >> hello %d", i))
 }
 time.Sleep(2 * time.Second)
}

手动完成

type Reader3 struct {
 receivedMessages sync.Map
 attemptCount     int32
}

// HandleMessage implements Handler.
func (r *Reader3) HandleMessage(message *EventMessage[string]) error {
 // 禁用自动完成
 message.DisableAutoResponse()
 if message.Body == "hello" || message.Attempts > 3 {
  // 手动完成
  r.receivedMessages.Store(message.Body, true)
  message.Finish()
  return nil
 }
 // 手动延迟 1 秒后重试
 atomic.AddInt32(&r.attemptCount, 1)
 message.Requeue(time.Second)
 return nil
}

维护与优化

NSQite使用slog记录日志。如果出现以下警告日志,需要及时优化参数:

  • [NSQite] publish message timeout:表示发布速度过快,消费者处理不过来。可以通过以下方式优化:

    • 增加缓存队列长度
    • 增加并发处理协程数
    • 优化消费者处理函数性能

默认超时时间为3秒。如果频繁出现超时,可以通过WithCheckTimeout(10*time.Second)调整超时时间。

Benchmark

事件总线

一个发布者,一个订阅者,每秒并发300百万:

事件总线Benchmark
事件总线Benchmark

事务消息队列

一个生产者,一个消费者,基于SQLite数据库:

事务消息队列Benchmark
事务消息队列Benchmark

使用PostgreSQL会有更好的表现。

下一步开发任务

  • 事件总线支持Redis作为持久化存储,支持分布式。
  • 事务消息队列支持分布式,其消费者收到消息后要更新一下数据库即可。

常见问题解答(QA)

问题1:a、b、c三个订阅者,当b阻塞时,会发生什么?

  • a正常收到消息。
  • b阻塞,导致c收不到消息。
  • b阻塞,导致发布者也阻塞。

解决方法:

  • 使用WithDiscardOnBlocking(true)丢弃消息。
  • 使用PublicWithContext(ctx, topic, message)来限制发布超时时间。
  • 使用WithQueueSize(1024)来设置缓存队列长度。
  • 调整回调,使消费者更快处理任务。

问题2:使用事务消息,消息已发布,a、c已完成任务,此时服务重启,b未完成会怎样?

  • 服务重启,b会重新收到消息继续处理。
  • a、c因为已完成,不会收到消息。

问题3:任务执行失败,可以自定义延迟执行时间吗?

可以,参考案例:案例

问题4:如果任务一直失败,达到最大超时次数会怎样?

任务结束有2个判定标准:

  • 任务执行成功。
  • 任务达到最大执行次数。

如果需要无限次执行,可以使用WithMaxAttempts(0)。默认重试10次,可以改为更多次数WithMaxAttempts(100)

问题5:WithMaxAttempts(10)表示重试10次,如果一直失败,问回调一共执行了多少次?

  • 10次。

问题6:事务消息会在数据库存储多久?

  • 自动删除15天前的全部消息。
  • 自动删除7天前已完成的消息。
  • 当表数据超过1万条时,自动删除3天前的已完成消息。

需要自定义时间?请提pr或issues。

问题7:在事件总线中,回调一直失败会阻塞队列吗?

  • 不会,会进入优先队列中,延迟处理。
  • 大量任务失败,会导致消息堆积在内存中,达到最大尝试次数时释放。

问题8:在事件总线中,发布的某个主题阻塞了,会影响其它主题发布吗?

  • 不会,主题之间互不影响。

通过以上内容,我们对NSQite有了全面的了解。它是一个强大的轻量级消息队列解决方案,适用于多种场景。无论是事件总线还是事务消息,NSQite都能提供高效、可靠的性能。希望这篇博客文章能帮助你更好地理解和使用NSQite。