Skip to content

Events

λ Cosmos includes a publish/subscribe event system with a common interface and five broker implementations. Events enable decoupled communication between components of your application and across distributed systems.

type Events interface {
Publish(ctx context.Context, event string, payload any) error
Subscribe(ctx context.Context, event string, handler EventHandler) (EventUnsubscribeFunc, error)
Close() error
}
type EventPayload = func(dest any) error
type EventHandler = func(payload EventPayload)
type EventUnsubscribeFunc = func() error

Payloads are serialized to JSON. The EventPayload function passed to handlers is a JSON unmarshaler — call it with a pointer to your target type:

events.Subscribe(ctx, "user.created", func(payload contract.EventPayload) {
var user User
if err := payload(&user); err != nil {
log.Error("failed to decode event", "err", err)
return
}
// handle user
})
events.Publish(ctx, "user.created", User{
ID: "42",
Name: "Alice",
Email: "alice@example.com",
})

The payload is JSON-encoded before being sent to subscribers. Any JSON-serializable value works.

unsubscribe, err := events.Subscribe(ctx, "user.created", func(payload contract.EventPayload) {
var user User
if err := payload(&user); err != nil {
return
}
sendWelcomeEmail(user)
})
// Later, to stop receiving events:
err = unsubscribe()

All brokers support wildcard subscriptions with a consistent syntax:

  • * matches a single token: user.*.created matches user.42.created but not user.created
  • # matches zero or more tokens: logs.# matches logs, logs.error, logs.error.database
events.Subscribe(ctx, "user.*", handler) // all user events with one sub-token
events.Subscribe(ctx, "audit.#", handler) // all audit events at any depth

The brokers automatically convert these wildcards to their native format (e.g., > for NATS, +/# for MQTT).

An in-memory broker with zero dependencies. Ideal for testing, development, and single-instance applications:

import "github.com/studiolambda/cosmos/framework/event"
broker := event.NewMemoryBroker()
defer broker.Close()

Messages are delivered asynchronously in separate goroutines with panic recovery, so one handler’s failure doesn’t affect others. Since everything is in-memory, events are lost if the process restarts.

Uses Redis Pub/Sub for distributed applications:

import "github.com/studiolambda/cosmos/framework/event"
broker := event.NewRedisBroker(&event.RedisBrokerOptions{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
defer broker.Close()

Or wrap an existing Redis client:

broker := event.NewRedisBrokerFrom(existingRedisClient)

Redis Pub/Sub provides at-most-once delivery — messages are delivered to connected subscribers but not persisted. Subscribers that are offline when a message is published will not receive it.

High-performance messaging with NATS:

import "github.com/studiolambda/cosmos/framework/event"
broker, err := event.NewNATSBroker("nats://localhost:4222")
defer broker.Close()

With full configuration:

broker, err := event.NewNATSBrokerWith(&event.NATSBrokerOptions{
URLs: []string{"nats://server1:4222", "nats://server2:4222"},
Name: "my-service",
MaxReconnects: -1, // unlimited reconnection attempts
ReconnectWait: 2 * time.Second,
Username: "user",
Password: "pass",
TLSConfig: tlsConfig,
})

NATS supports multiple authentication methods:

  • Username/password
  • Token-based
  • NKey (cryptographic)
  • Credentials file (JWT + NKey)

The NATS # wildcard is automatically converted to NATS’s > (multi-level wildcard). Single-level * wildcards are natively compatible.

For applications using RabbitMQ:

import "github.com/studiolambda/cosmos/framework/event"
broker, err := event.NewAMQPBroker("amqp://guest:guest@localhost:5672/")
defer broker.Close()

With custom exchange name:

broker, err := event.NewAMQPBrokerWith(&event.AMQPBrokerOptions{
URL: "amqp://guest:guest@localhost:5672/",
Exchange: "my-events", // default: "cosmos.events"
})

The AMQP broker uses:

  • A topic exchange for wildcard routing.
  • Exclusive, auto-delete queues per subscriber for fan-out.
  • A dedicated publish channel with mutex protection.
  • Separate channels per subscriber following RabbitMQ best practices.

For IoT and edge computing scenarios using MQTT v5:

import "github.com/studiolambda/cosmos/framework/event"
broker, err := event.NewMQTTBroker("mqtt://localhost:1883")
defer broker.Close()

With full configuration:

broker, err := event.NewMQTTBrokerWith(&event.MQTTBrokerOptions{
URLs: []string{"mqtt://broker1:1883", "mqtt://broker2:1883"},
QoS: 1, // at-least-once delivery (default)
KeepAlive: 30, // seconds
Username: "user",
Password: "pass",
})

Event names are automatically converted to MQTT topic format:

  • Dots become slashes: user.created becomes user/created
  • * becomes + (single-level wildcard)
  • # stays # (multi-level wildcard)

QoS levels:

  • 0 — At most once (fire and forget)
  • 1 — At least once (recommended default)
  • 2 — Exactly once (highest overhead)
BrokerDependenciesPersistenceBest For
MemoryNoneNoTesting, single instance
RedisRedis serverNo (Pub/Sub)Simple distributed apps
NATSNATS serverNo (core NATS)High-throughput microservices
AMQPRabbitMQYes (queued)Enterprise messaging
MQTTMQTT brokerConfigurableIoT, edge computing

All brokers share the same contract.Events interface, so you can swap implementations without changing application code.