NexusMQ / Documentation
Give this a star!

NexusMQ

A lightweight, in-process message broker for Go. Built to be a fast, thread-safe pub/sub broker that doesn't require running external services.

Installation

NexusMQ is a pure Go package with zero external dependencies. Install it via go get:

bash
go get github.com/luponetn/nexusmq
Requirements
Go 1.21 or later. The package relies strictly on standard library concurrency primitives (sync.RWMutex, sync.Once) and channels.

Quick Start

Here's a minimal example showing how to create a broker, subscribe to a topic, and publish a message using the implemented API.

main.gogo
package main

import (
    "fmt"
    "time"
    "github.com/luponetn/nexusmq/pkg/broker"
)

func main() {
    b := broker.NewBroker()
    defer b.Shutdown()

    b.CreateTopic("orders")

    sub, err := b.Subscribe("orders")
    if err != nil {
        panic(err)
    }

    // Consumer runs in the background
    go func() {
        for {
            msg, err := sub.Receive()
            if err != nil {
                // channel closed due to shutdown
                return 
            }
            fmt.Printf("Received payload: %s\n", msg.Payload)
        }
    }()

    b.Publish("orders", &broker.Message{
        Payload:   []byte("order-123"),
        Timestamp: time.Now(),
    })
}

Architecture

The project is structured around three core structs/interfaces: the Broker, Topics, and Subscriptions.

  • Broker (Brk): Maintains a global map of topics under a sync.RWMutex and uses a sync.Once for graceful shutdown execution.
  • Topic: Manages its own internal map of subscribers with a separate sync.RWMutex. Granular locking ensures that publishing to "topic A" doesn't block "topic B".
  • Subscription: Backed by a buffered channel (capacity 100). The consumer owns the goroutine and calls Receive() to pull messages from the internal channel buffer.

Topics

go
b := broker.NewBroker()

err := b.CreateTopic("payments")
// Returns ErrTopicAlreadyExists if the topic is already registered

err = b.DeleteTopic("payments")
// Closes all subscriber channels associated with the topic and removes it

topics := b.Topics() 
// Returns []*broker.Topic (a snapshot of active topics)
Topic Deletion
Calling DeleteTopic immediately closes all subscriber channels on that topic. Consumers blocking on Receive() will unblock and receive an error.

Subscribers

The Subscriber interface is the main consumer API, which abstracts away the internal buffered channels.

pkg/broker/types.gogo
type Subscriber interface {
    Receive() (*Message, error)
}
go
sub, err := b.Subscribe("payments")
if err != nil {
    // Handle ErrTopicNotFound or ErrBrokerShutdown
    return
}

// Blocks until a message is available or the channel closes
msg, err := sub.Receive()
if err != nil {
    return
}
Unsubscribing Limitation
The Broker.Unsubscribe(topicName string, subID string) method exists, but the current Subscriber interface does not expose the internal subID generated during Subscribe(). If you need to unsubscribe dynamically, you will need to extend the Subscriber interface to return its ID.

Publishing

Publishing is an O(1) non-blocking operation. We use a select/default pattern to attempt a channel send. If a subscriber's channel buffer is full, we record an overflow and continue delivering the message to the remaining subscribers.

go
msg := &broker.Message{
    Payload:   []byte("event-data"),
    Timestamp: time.Now(),
}

err := b.Publish("orders", msg)
if err != nil {
    switch e := err.(type) {
    case *broker.OverflowError:
        fmt.Printf("Slow subscriber %s on topic %s dropped a message\n",
            e.SubscriberID, e.Topic)
    default:
        // Handle ErrTopicNotFound or ErrBrokerShutdown
    }
}

Broker API & Errors

pkg/broker/types.gogo
type Broker interface {
    CreateTopic(topic string) error
    DeleteTopic(topic string) error
    Topics() []*Topic
    Subscribe(topic string) (Subscriber, error)
    Unsubscribe(topicName string, subID string) error
    Publish(topic string, message *Message) error
    Shutdown() error
}

type Message struct {
    Payload   []byte
    Timestamp time.Time
}

Defined Errors

pkg/broker/errors.gogo
var (
    ErrTopicAlreadyExists = errors.New("topic already exists")
    ErrTopicNotFound      = errors.New("topic not found")
    ErrPublishTimeout     = errors.New("publish timeout on one or more subscribers")
    ErrBrokerShutdown     = errors.New("broker is shut down")
)

type OverflowError struct {
    SubscriberID string
    Topic        string
    DroppedCount int
}

Note: ErrPublishTimeout is a legacy error from an older goroutine-based publish implementation and is no longer actively returned.

Concurrency Model

The locking strategy relies on strict lock ordering to prevent deadlocks:

OperationLock SequenceReasoning
Publishb.mu.RLock()topic.mu.RLock()Read-only path. Allows massive parallel throughput on the same topic.
Subscribeb.mu.RLock()topic.mu.Lock()Read broker to find topic, write-lock the topic to register the subscriber.
CreateTopicb.mu.Lock()Global write lock required to mutate the broker's topic map.
DeleteTopicb.mu.Lock()topic.mu.Lock()Global write lock to remove from map, then topic write lock to safely close channels.
Shutdownb.once.Dob.mu.Lock()topic.mu.Lock()sync.Once ensures cleanup executes exactly once. Both broker and all topics are locked sequentially for safe teardown.