Events
λ Cosmos includes a publish/subscribe event system with a common interface and five broker implementations. Events enable decoupled communication between components of your application and across distributed systems.
type Events interface { Publish(ctx context.Context, event string, payload any) error Subscribe(ctx context.Context, event string, handler EventHandler) (EventUnsubscribeFunc, error) Close() error}
type EventPayload = func(dest any) errortype EventHandler = func(payload EventPayload)type EventUnsubscribeFunc = func() errorPayloads are serialized to JSON. The EventPayload function passed to handlers is a JSON unmarshaler — call it with a pointer to your target type:
events.Subscribe(ctx, "user.created", func(payload contract.EventPayload) { var user User if err := payload(&user); err != nil { log.Error("failed to decode event", "err", err) return } // handle user})Publishing Events
Section titled “Publishing Events”events.Publish(ctx, "user.created", User{ ID: "42", Name: "Alice", Email: "alice@example.com",})The payload is JSON-encoded before being sent to subscribers. Any JSON-serializable value works.
Subscribing to Events
Section titled “Subscribing to Events”unsubscribe, err := events.Subscribe(ctx, "user.created", func(payload contract.EventPayload) { var user User if err := payload(&user); err != nil { return } sendWelcomeEmail(user)})
// Later, to stop receiving events:err = unsubscribe()Wildcard Patterns
Section titled “Wildcard Patterns”All brokers support wildcard subscriptions with a consistent syntax:
*matches a single token:user.*.createdmatchesuser.42.createdbut notuser.created#matches zero or more tokens:logs.#matcheslogs,logs.error,logs.error.database
events.Subscribe(ctx, "user.*", handler) // all user events with one sub-tokenevents.Subscribe(ctx, "audit.#", handler) // all audit events at any depthThe brokers automatically convert these wildcards to their native format (e.g., > for NATS, +/# for MQTT).
Memory Broker
Section titled “Memory Broker”An in-memory broker with zero dependencies. Ideal for testing, development, and single-instance applications:
import "github.com/studiolambda/cosmos/framework/event"
broker := event.NewMemoryBroker()defer broker.Close()Messages are delivered asynchronously in separate goroutines with panic recovery, so one handler’s failure doesn’t affect others. Since everything is in-memory, events are lost if the process restarts.
Redis Broker
Section titled “Redis Broker”Uses Redis Pub/Sub for distributed applications:
import "github.com/studiolambda/cosmos/framework/event"
broker := event.NewRedisBroker(&event.RedisBrokerOptions{ Addr: "localhost:6379", Password: "", DB: 0,})defer broker.Close()Or wrap an existing Redis client:
broker := event.NewRedisBrokerFrom(existingRedisClient)Redis Pub/Sub provides at-most-once delivery — messages are delivered to connected subscribers but not persisted. Subscribers that are offline when a message is published will not receive it.
NATS Broker
Section titled “NATS Broker”High-performance messaging with NATS:
import "github.com/studiolambda/cosmos/framework/event"
broker, err := event.NewNATSBroker("nats://localhost:4222")defer broker.Close()With full configuration:
broker, err := event.NewNATSBrokerWith(&event.NATSBrokerOptions{ URLs: []string{"nats://server1:4222", "nats://server2:4222"}, Name: "my-service", MaxReconnects: -1, // unlimited reconnection attempts ReconnectWait: 2 * time.Second, Username: "user", Password: "pass", TLSConfig: tlsConfig,})NATS supports multiple authentication methods:
- Username/password
- Token-based
- NKey (cryptographic)
- Credentials file (JWT + NKey)
The NATS # wildcard is automatically converted to NATS’s > (multi-level wildcard). Single-level * wildcards are natively compatible.
AMQP Broker (RabbitMQ)
Section titled “AMQP Broker (RabbitMQ)”For applications using RabbitMQ:
import "github.com/studiolambda/cosmos/framework/event"
broker, err := event.NewAMQPBroker("amqp://guest:guest@localhost:5672/")defer broker.Close()With custom exchange name:
broker, err := event.NewAMQPBrokerWith(&event.AMQPBrokerOptions{ URL: "amqp://guest:guest@localhost:5672/", Exchange: "my-events", // default: "cosmos.events"})The AMQP broker uses:
- A topic exchange for wildcard routing.
- Exclusive, auto-delete queues per subscriber for fan-out.
- A dedicated publish channel with mutex protection.
- Separate channels per subscriber following RabbitMQ best practices.
MQTT Broker
Section titled “MQTT Broker”For IoT and edge computing scenarios using MQTT v5:
import "github.com/studiolambda/cosmos/framework/event"
broker, err := event.NewMQTTBroker("mqtt://localhost:1883")defer broker.Close()With full configuration:
broker, err := event.NewMQTTBrokerWith(&event.MQTTBrokerOptions{ URLs: []string{"mqtt://broker1:1883", "mqtt://broker2:1883"}, QoS: 1, // at-least-once delivery (default) KeepAlive: 30, // seconds Username: "user", Password: "pass",})Event names are automatically converted to MQTT topic format:
- Dots become slashes:
user.createdbecomesuser/created *becomes+(single-level wildcard)#stays#(multi-level wildcard)
QoS levels:
- 0 — At most once (fire and forget)
- 1 — At least once (recommended default)
- 2 — Exactly once (highest overhead)
Choosing a Broker
Section titled “Choosing a Broker”| Broker | Dependencies | Persistence | Best For |
|---|---|---|---|
| Memory | None | No | Testing, single instance |
| Redis | Redis server | No (Pub/Sub) | Simple distributed apps |
| NATS | NATS server | No (core NATS) | High-throughput microservices |
| AMQP | RabbitMQ | Yes (queued) | Enterprise messaging |
| MQTT | MQTT broker | Configurable | IoT, edge computing |
All brokers share the same contract.Events interface, so you can swap implementations without changing application code.