NexusMQ
A lightweight, in-process message broker for Go. Built to be a fast, thread-safe pub/sub broker that doesn't require running external services.
Installation
NexusMQ is a pure Go package with zero external dependencies. Install it via go get:
go get github.com/luponetn/nexusmq
Quick Start
Here's a minimal example showing how to create a broker, subscribe to a topic, and publish a message using the implemented API.
package main
import (
"fmt"
"time"
"github.com/luponetn/nexusmq/pkg/broker"
)
func main() {
b := broker.NewBroker()
defer b.Shutdown()
b.CreateTopic("orders")
sub, err := b.Subscribe("orders")
if err != nil {
panic(err)
}
// Consumer runs in the background
go func() {
for {
msg, err := sub.Receive()
if err != nil {
// channel closed due to shutdown
return
}
fmt.Printf("Received payload: %s\n", msg.Payload)
}
}()
b.Publish("orders", &broker.Message{
Payload: []byte("order-123"),
Timestamp: time.Now(),
})
}Architecture
The project is structured around three core structs/interfaces: the Broker, Topics, and Subscriptions.
- Broker (Brk): Maintains a global map of topics under a sync.RWMutex and uses a sync.Once for graceful shutdown execution.
- Topic: Manages its own internal map of subscribers with a separate sync.RWMutex. Granular locking ensures that publishing to "topic A" doesn't block "topic B".
- Subscription: Backed by a buffered channel (capacity 100). The consumer owns the goroutine and calls Receive() to pull messages from the internal channel buffer.
Topics
b := broker.NewBroker()
err := b.CreateTopic("payments")
// Returns ErrTopicAlreadyExists if the topic is already registered
err = b.DeleteTopic("payments")
// Closes all subscriber channels associated with the topic and removes it
topics := b.Topics()
// Returns []*broker.Topic (a snapshot of active topics)Subscribers
The Subscriber interface is the main consumer API, which abstracts away the internal buffered channels.
type Subscriber interface {
Receive() (*Message, error)
}sub, err := b.Subscribe("payments")
if err != nil {
// Handle ErrTopicNotFound or ErrBrokerShutdown
return
}
// Blocks until a message is available or the channel closes
msg, err := sub.Receive()
if err != nil {
return
}Publishing
Publishing is an O(1) non-blocking operation. We use a select/default pattern to attempt a channel send. If a subscriber's channel buffer is full, we record an overflow and continue delivering the message to the remaining subscribers.
msg := &broker.Message{
Payload: []byte("event-data"),
Timestamp: time.Now(),
}
err := b.Publish("orders", msg)
if err != nil {
switch e := err.(type) {
case *broker.OverflowError:
fmt.Printf("Slow subscriber %s on topic %s dropped a message\n",
e.SubscriberID, e.Topic)
default:
// Handle ErrTopicNotFound or ErrBrokerShutdown
}
}Broker API & Errors
type Broker interface {
CreateTopic(topic string) error
DeleteTopic(topic string) error
Topics() []*Topic
Subscribe(topic string) (Subscriber, error)
Unsubscribe(topicName string, subID string) error
Publish(topic string, message *Message) error
Shutdown() error
}
type Message struct {
Payload []byte
Timestamp time.Time
}Defined Errors
var (
ErrTopicAlreadyExists = errors.New("topic already exists")
ErrTopicNotFound = errors.New("topic not found")
ErrPublishTimeout = errors.New("publish timeout on one or more subscribers")
ErrBrokerShutdown = errors.New("broker is shut down")
)
type OverflowError struct {
SubscriberID string
Topic string
DroppedCount int
}Note: ErrPublishTimeout is a legacy error from an older goroutine-based publish implementation and is no longer actively returned.
Concurrency Model
The locking strategy relies on strict lock ordering to prevent deadlocks:
| Operation | Lock Sequence | Reasoning |
|---|---|---|
Publish | b.mu.RLock() → topic.mu.RLock() | Read-only path. Allows massive parallel throughput on the same topic. |
Subscribe | b.mu.RLock() → topic.mu.Lock() | Read broker to find topic, write-lock the topic to register the subscriber. |
CreateTopic | b.mu.Lock() | Global write lock required to mutate the broker's topic map. |
DeleteTopic | b.mu.Lock() → topic.mu.Lock() | Global write lock to remove from map, then topic write lock to safely close channels. |
Shutdown | b.once.Do → b.mu.Lock() → topic.mu.Lock() | sync.Once ensures cleanup executes exactly once. Both broker and all topics are locked sequentially for safe teardown. |