What is NSQite: A Lightweight Message Queue Solution in Go

In today’s world of software development, message queues play a vital role in building robust and scalable applications. They help decouple services, improve system resilience, and enable asynchronous communication between components. While large-scale distributed message queue systems like NSQ, NATs, and Pulsar are popular, they might be overkill for early-stage projects. This is where NSQite comes into play. As a lightweight message queue implemented in Go, NSQite supports SQLite, PostgreSQL, and ORM for persistent storage, offering a simple yet reliable solution for basic message queue needs.

Advantages of NSQite

Simplicity and Reliability

NSQite is designed to provide a straightforward and dependable solution for projects in their initial stages. Its API is similar to go-nsq, which means developers can easily transition to NSQ if higher concurrency is required in the future.

Multiple Storage Options

NSQite offers flexibility with support for multiple storage methods, including SQLite and PostgreSQL. This allows developers to choose the most suitable storage solution based on their project requirements.

Message Delivery Guarantee

NSQite ensures that messages are delivered at least once. However, this also means that duplicate messages may occur. To handle this, consumers need to implement deduplication or idempotent operations.

Getting Started with NSQite

Event Bus

The event bus is a key feature of NSQite, ideal for business scenarios in monolithic architectures. It is implemented in memory and supports a 1:N relationship between publishers and subscribers. It also includes a task failure delay retry mechanism.

Use Cases:

  • Monolithic architecture

  • Real-time notification to subscribers

  • Message compensation after service restart

  • Support for generic message bodies

Example Scenario:

When a system alert occurs, it needs to be recorded in the database and notified to clients via WebSocket. In this case:

  1. The database logging module subscribes to the alert topic.

  2. The WebSocket notification module subscribes to the alert topic.

  3. When an alert occurs, the publisher sends the alert message.

  4. Both subscribers process the message accordingly.

The event bus decouples modules, transforming imperative programming into an event-driven architecture.

About Message Ordering:

  • When the subscriber goroutine count is 1 and the handler always returns nil, NSQite guarantees message ordering.

  • In other cases (concurrent processing or message retry), NSQite cannot guarantee message order.

Transactional Messages

Transactional messages are another powerful feature of NSQite. Based on a database implementation and supporting GORM, they consist of producers and consumers. Transactional messages ensure that messages are bound to database transactions, allowing messages to be rolled back if a transaction fails.

Use Cases:

  • Monolithic or distributed architectures

  • Messages bound to database transactions, where messages can be rolled back if a transaction fails

  • Fast message processing in monolithic architectures

  • Message delays of 100-5000 milliseconds in distributed architectures

Example Scenario:

When deleting a user, you also need to delete data related to that user. In this case:

  1. The user profile module subscribes to the user deletion topic.

  2. During the user deletion transaction, a transactional message is published.

  3. After the transaction is committed, the consumer receives the message and begins processing.

  4. If the server crashes during processing, after restart, the consumer will receive the message again and continue processing.

Note that messages may be triggered multiple times, so consumers need to handle them in an idempotent manner.

Code Examples

Basic Usage

type Reader1 struct{}

// HandleMessage implements Handler.
func (r *Reader1) HandleMessage(message *EventMessage[string]) error {
 fmt.Println("reader one :", message.Body)
 return nil
}

// Simulate an author writing books frantically, with 5 editors processing one book per second
func main() {
 // 1. SetGorm
 nsqite.SetGorm(db)

 const topic = "a-book"
 p := NewProducer[string]()
 // Limit task failure retry attempts to 10 times
 c := NewConsumer(topic, "comsumer1", WithMaxAttempts(10))
 c.AddConcurrentHandlers(&Reader1{}, 5)
 for i := 0; i < 5; i++ {
  p.Publish(topic, fmt.Sprintf("a >> hello %d", i))
 }
 time.Sleep(2 * time.Second)
}

Manual Message Control

type Reader3 struct {
 receivedMessages sync.Map
 attemptCount     int32
}

// HandleMessage implements Handler.
func (r *Reader3) HandleMessage(message *EventMessage[string]) error {
 // Disable auto-completion
 message.DisableAutoResponse()
 if message.Body == "hello" || message.Attempts > 3 {
  // Manual completion
  r.receivedMessages.Store(message.Body, true)
  message.Finish()
  return nil
 }
 // Manual retry after 1 second delay
 atomic.AddInt32(&r.attemptCount, 1)
 message.Requeue(time.Second)
 return nil
}

Maintenance and Optimization

NSQite uses slog for logging. If you encounter the following warning logs, it is necessary to optimize parameters promptly:

  • [NSQite] publish message timeout: This indicates that publishing is too fast for consumers to handle. You can optimize by:

    • Increasing cache queue length

    • Increasing concurrent processing goroutines

    • Optimizing consumer handler performance

The default timeout is 3 seconds. If timeouts occur frequently, you can adjust the timeout using WithCheckTimeout(10*time.Second).

Benchmark

Event Bus

One publisher and one subscriber, with a concurrent rate of 3 million per second:

Event Bus Benchmark
Event Bus Benchmark

Transactional Message Queue

One producer and one consumer, based on SQLite database:

Transactional Message Queue Benchmark
Transactional Message Queue Benchmark

Using PostgreSQL offers better performance.

Next Development Tasks

  • Event Bus support for Redis as persistent storage, enabling distributed deployment.

  • Transactional Message Queue support for distributed deployment, where consumers update the database after receiving messages.

Frequently Asked Questions (FAQ)

What happens when subscriber b blocks among subscribers a, b, and c?

  • a receives messages normally.

  • b blocks, causing c to not receive messages.

  • b blocks, causing the publisher to block.

Solutions:

  • Use WithDiscardOnBlocking(true) to discard messages.

  • Use PublicWithContext(ctx, topic, message) to limit publishing timeout.

  • Use WithQueueSize(1024) to set cache queue length.

  • Optimize callbacks to make consumers process tasks faster.

When using transactional messages, if messages are published and a, c have completed tasks, what happens when the service restarts with b not completed?

  • After service restart, b will receive the message again and continue processing.

  • a and c won’t receive the message again as they have already completed.

Can I customize the delay time when a task fails?

Yes, refer to the example.

What happens when a task keeps failing and reaches the maximum retry count?

A task ends under two conditions:

  • Task execution succeeds.

  • Task reaches the maximum execution count.

For unlimited retries, use WithMaxAttempts(0). By default, it retries 10 times, but you can increase it with WithMaxAttempts(100).

If WithMaxAttempts(10) means 10 retries, how many times will the callback be executed if it keeps failing?

  • 10 times.

How long will transactional messages be stored in the database?

  • Automatically deletes all messages older than 15 days.

  • Automatically deletes completed messages older than 7 days.

  • When table data exceeds 10,000 rows, automatically deletes completed messages older than 3 days.

Need to customize these times? Please submit a PR or issue.

In the event bus, will continuous callback failures block the queue?

  • No, failed tasks will enter a priority queue for delayed processing.

  • Large numbers of failed tasks will cause messages to accumulate in memory and will be released when reaching maximum retry attempts.

In the event bus, if publishing to one topic is blocked, will it affect publishing to other topics?

  • No, topics are independent of each other.

By leveraging NSQite, you can efficiently manage messages in your Go applications. Its lightweight nature and powerful features make it an excellent choice for both monolithic and distributed architectures. Whether you’re handling real-time notifications or ensuring data consistency across transactions, NSQite provides the tools you need to build robust and scalable systems.