Skip to main content
Internal API

Pub/Sub events are internal to the service mesh. They are not directly accessible via the API gateway. Use WebSocket subscriptions or webhook integrations for external event consumption.

Pub/Sub Events Reference

Complete reference for the event-driven architecture using Google Cloud Pub/Sub in the Olympus Cloud platform.

Overview

The Olympus Cloud platform uses Google Cloud Pub/Sub as the primary message bus for asynchronous event communication between services. Events flow from publishers through topics to subscribers, enabling loose coupling and scalable event-driven architectures.

Key Concepts

ConceptDescription
TopicNamed resource to which messages are sent
SubscriptionNamed resource representing a stream of messages from a topic
MessageData and attributes published to a topic
DomainEventCanonical event schema used across all services
PublisherService that publishes events to topics
SubscriberService that receives events from subscriptions

Topics

Production Topics

TopicDescriptionEvent Types
platform-eventsPlatform-wide events and system notificationsPlatform, system, admin events
restaurant-eventsRestaurant operations and order lifecycleOrders, payments, tables, kitchen
analytics-eventsBusiness intelligence and metricsAnalytics, reports, dashboards
notification-eventsUser notifications and alertsPush, email, SMS, in-app
cache-invalidation-eventsCache busting across servicesCache invalidation signals
ml-predictionsMachine learning predictions and forecastsAI/ML inference results
anomaly-alertsAnomaly detection and alertingSystem anomalies, fraud detection

Topic Configuration

Topics are configured via environment variables:

PLATFORM_EVENTS_TOPIC=platform-events
RESTAURANT_EVENTS_TOPIC=restaurant-events
ANALYTICS_EVENTS_TOPIC=analytics-events
NOTIFICATION_EVENTS_TOPIC=notification-events
CACHE_INVALIDATION_EVENTS_TOPIC=cache-invalidation-events
ML_PREDICTIONS_EVENTS_TOPIC=ml-predictions
ANOMALY_ALERTS_EVENTS_TOPIC=anomaly-alerts

DomainEvent Schema

All events follow the canonical DomainEvent schema (v3.0):

Event Structure

{
"id": "evt_abc123def456",
"type": "order.created",
"aggregate_id": "order_12345",
"aggregate_type": "order",
"restaurant_id": "rest_789",
"tenant_id": "tenant_xyz",
"user_id": "user_456",
"timestamp": "2026-01-19T14:30:00Z",
"data": {
"order_total": 45.99,
"item_count": 3,
"table_number": 12
},
"metadata": {
"source": "api-gateway",
"version": "1.0.0",
"ip_address": "192.168.1.1",
"user_agent": "OlympusStaff/2.0",
"causation": "req_abc123",
"topic": "restaurant-events"
}
}

Field Reference

FieldTypeRequiredDescription
idstringYesUnique event identifier (UUID)
typestringYesEvent type in domain.action format
aggregate_idstringYesID of the affected entity
aggregate_typestringNoType of the affected entity
restaurant_idstringNoRestaurant context
tenant_idstringNoTenant context for multi-tenancy
user_idstringNoUser who triggered the event
timestampISO 8601YesEvent creation timestamp
dataobjectYesEvent-specific payload
metadataobjectYesEvent metadata and tracing

Metadata Fields

FieldTypeDescription
sourcestringPublishing service name
versionstringEvent schema version
ip_addressstringClient IP address
user_agentstringClient user agent
causationstringCausing request/event ID
topicstringTarget Pub/Sub topic

Event Types

Order Lifecycle Events

Event TypeDescriptionData Fields
order.createdNew order placedorder_total, items, table_number
order.confirmedOrder confirmed by restaurantconfirmed_at, estimated_time
order.preparingKitchen started preparationstarted_at, cook_id
order.readyOrder ready for serviceready_at, expediter_id
order.servedOrder served to customerserved_at, server_id
order.deliveringOut for deliverydriver_id, eta
order.deliveredDelivery completeddelivered_at, signature
order.completedOrder fully completedcompleted_at, duration
order.cancelledOrder cancelledcancelled_by, reason, refund
order.status_updatedGeneric status changeold_status, new_status

Payment Events

Event TypeDescriptionData Fields
payment.startedPayment initiatedamount, method, provider
payment.completedPayment successfultransaction_id, amount, tip
payment.failedPayment declinederror_code, reason
payment.refundedRefund processedrefund_amount, reason

Kitchen Events

Event TypeDescriptionData Fields
kitchen.item_startedItem prep starteditem_id, station, cook_id
kitchen.item_completedItem prep completeditem_id, completion_time
kitchen.order_readyFull order readyorder_id, items
kitchen.delayedOrder delayedorder_id, reason, new_eta

Table Events

Event TypeDescriptionData Fields
table.assignedTable assigned to servertable_id, server_id
table.seatedGuests seatedtable_id, party_size
table.cleanedTable cleanedtable_id, cleaned_by
table.availableTable availabletable_id
Event TypeDescriptionData Fields
menu.item.86edItem marked unavailableitem_id, reason
menu.item.restoredItem back in stockitem_id
menu.item.auto_86edAuto 86'd by inventoryitem_id, threshold
menu.item.low_stockLow stock warningitem_id, current_qty
menu.item.updatedItem details changeditem_id, changes
menu.category.updatedCategory changedcategory_id, changes

Reservation Events

Event TypeDescriptionData Fields
reservation.createdNew reservationparty_size, time, name
reservation.cancelledReservation cancelledreason, cancelled_by
reservation.seatedGuests seatedtable_id, seated_at
waitlist.addedAdded to waitlistparty_size, wait_time

Topic Routing

Events are automatically routed to appropriate topics based on aggregate type and event type.

Routing Rules

// Aggregate type routing
switch aggregateType {
case "analytics":
→ analytics-events
case "notification":
→ notification-events
case "cache":
→ cache-invalidation-events
case "ml", "prediction":
→ ml-predictions
case "anomaly", "alert":
→ anomaly-alerts
case "restaurant", "order":
→ restaurant-events
default:
→ platform-events
}

// Event type prefix routing
switch {
case strings.HasPrefix(type, "analytics."):
→ analytics-events
case strings.HasPrefix(type, "notification."):
→ notification-events
case strings.HasPrefix(type, "cache."):
→ cache-invalidation-events
case strings.HasPrefix(type, "ml."):
→ ml-predictions
case strings.HasPrefix(type, "anomaly."):
→ anomaly-alerts
case strings.HasPrefix(type, "platform."):
→ platform-events
default:
→ restaurant-events
}

Explicit Topic Override

Set metadata.topic to force routing to a specific topic:

{
"metadata": {
"topic": "analytics-events"
}
}

Publishing Events

Publisher Configuration

type PubSubPublisherConfig struct {
ProjectID string // GCP project ID (olympuscloud-dev/staging/prod)
RequestTimeout time.Duration // Per-publish timeout (default: 10s)
MaxRetries int // Retry attempts (default: 3)
InitialBackoff time.Duration // Initial retry delay (default: 200ms)
EnableOrdering bool // Enable message ordering
PublishBatchSize int // Batch size (default: 100)
}

Publishing Example

// Create publisher
publisher, err := events.NewPubSubPublisher(ctx, events.PubSubPublisherConfig{
ProjectID: "olympuscloud-dev",
RequestTimeout: 10 * time.Second,
MaxRetries: 3,
EnableOrdering: true,
})
if err != nil {
return err
}
defer publisher.Close()

// Create event manager with topic router
router := events.TopicRouter{
DefaultTopic: "platform-events",
RestaurantTopic: "restaurant-events",
AnalyticsTopic: "analytics-events",
}
eventManager := events.NewEventManager(publisher, router)

// Publish event
event := &events.DomainEvent{
Type: events.OrderCreated,
AggregateID: orderID,
AggregateType: "order",
RestaurantID: restaurantID,
TenantID: tenantID,
UserID: userID,
Data: map[string]interface{}{
"order_total": 45.99,
"item_count": 3,
},
}

err = eventManager.Publish(ctx, event)

Message Attributes

Published messages include these attributes for filtering and routing:

AttributeSourceDescription
event_typeTypeEvent type string
event_idIDUnique event ID
aggregate_idAggregateIDEntity ID
aggregate_typeAggregateTypeEntity type
restaurant_idRestaurantIDRestaurant context
tenant_idTenantIDTenant context
user_idUserIDUser context
sourceMetadata.SourcePublishing service
versionMetadata.VersionSchema version
causation_idMetadata.CausationCausing request ID
topicMetadata.TopicTarget topic
payload_checksumComputedSHA-256 of payload

Message Ordering

When EnableOrdering is true, messages are ordered by:

  1. AggregateID (if present) - Orders events for same entity
  2. TenantID (fallback) - Orders events within tenant
// Ordering key selection
if event.AggregateID != "" {
message.OrderingKey = event.AggregateID
} else if event.TenantID != "" {
message.OrderingKey = event.TenantID
}

Subscribing to Events

Subscriber Configuration

type PubSubSubscriberConfig struct {
ProjectID string // GCP project ID (olympuscloud-dev/staging/prod)
RequestTimeout time.Duration // Operation timeout (default: 10s)
ChannelBuffer int // Event channel size (default: 100)
AckDeadline time.Duration // Message ack deadline (default: 30s)
RetentionDuration time.Duration // Message retention (default: 24h)
ReceiveGoroutines int // Parallel receivers (default: 4)
MaxOutstandingMessages int // Max unacked messages (default: 200)
MaxReconnectAttempts int // Reconnect attempts (0 = unlimited)
InitialReconnectDelay time.Duration // Initial retry delay (default: 200ms)
MaxReconnectDelay time.Duration // Max retry delay (default: 5s)
DeduplicationWindow time.Duration // Dedup window (default: 5m)
DeduplicationCacheSize int // Dedup cache size (default: 256)
}

Subscription Example

// Create subscriber
subscriber, err := events.NewPubSubSubscriber(ctx, events.PubSubSubscriberConfig{
ProjectID: "olympuscloud-dev",
ChannelBuffer: 100,
AckDeadline: 30 * time.Second,
ReceiveGoroutines: 4,
})
if err != nil {
return err
}
defer subscriber.Close()

// Subscribe to topic
eventChan, err := subscriber.Subscribe(ctx, "restaurant-events", "my-service-subscription")
if err != nil {
return err
}

// Process events
for event := range eventChan {
switch event.Type {
case events.OrderCreated:
handleOrderCreated(event)
case events.PaymentCompleted:
handlePaymentCompleted(event)
default:
log.Printf("Unhandled event type: %s", event.Type)
}
}

Subscription Features

Automatic Deduplication

Messages are deduplicated within a configurable window:

DeduplicationWindow:    5 * time.Minute,  // Time window
DeduplicationCacheSize: 256, // Cache capacity

Automatic Reconnection

Subscribers automatically reconnect with exponential backoff:

MaxReconnectAttempts:  0,                    // 0 = unlimited
InitialReconnectDelay: 200 * time.Millisecond,
MaxReconnectDelay: 5 * time.Second,

Message Ordering

Subscriptions preserve message ordering when enabled on the topic.


Retry and Error Handling

Publisher Retries

Publishers retry with exponential backoff:

Attempt 1: Immediate
Attempt 2: 200ms delay
Attempt 3: 400ms delay
Attempt 4: 800ms delay (capped at 5s)

Configuration:

MaxRetries:     3,
InitialBackoff: 200 * time.Millisecond,

Subscriber Retries

Failed message processing results in:

  1. Nack - Message requeued for redelivery
  2. Backoff - Exponential delay before retry
  3. Dead Letter - After max attempts, sent to DLQ

Dead Letter Queues

Configure dead letter topics for failed messages:

subscription:
name: my-service-subscription
topic: restaurant-events
dead_letter_topic: restaurant-events-dlq
max_delivery_attempts: 5

Monitoring and Observability

Metrics

Events publish metrics via the observability package:

observability.GlobalMetrics.RecordEventPublished(eventType, restaurantID)

Logging

All event operations are logged with structured fields:

{
"component": "event_manager",
"event_id": "evt_abc123",
"event_type": "order.created",
"topics": ["restaurant-events"],
"msg": "published domain event"
}

Tracing

Events propagate trace context via:

  • X-Request-ID header
  • causation_id attribute
  • OpenTelemetry span context

Development Environment

Using the Dev Project

All development uses the live olympuscloud-dev GCP project. There is no local Pub/Sub emulator.

publisher, err := events.NewPubSubPublisher(ctx, events.PubSubPublisherConfig{
ProjectID: "olympuscloud-dev",
RequestTimeout: 10 * time.Second,
MaxRetries: 3,
})

Creating Topics in Dev

# Ensure you are targeting the dev project
gcloud config set project olympuscloud-dev

# Create topics
gcloud pubsub topics create restaurant-events
gcloud pubsub topics create platform-events
gcloud pubsub topics create analytics-events
gcloud pubsub topics create notification-events

Best Practices

Event Design

  1. Immutable events - Events are facts that happened, never modify
  2. Self-contained - Include all data needed to process the event
  3. Idempotent handlers - Design subscribers to handle duplicates
  4. Versioned schemas - Use metadata.version for evolution

Publishing

  1. Batch when possible - Use batch publishing for bulk operations
  2. Set timeouts - Configure appropriate request timeouts
  3. Handle failures - Implement proper error handling and retries
  4. Include context - Always set tenant_id, user_id, restaurant_id

Subscribing

  1. Process quickly - Ack messages promptly to avoid redelivery
  2. Use dead letters - Configure DLQ for poison messages
  3. Monitor lag - Track subscription backlog metrics
  4. Scale receivers - Adjust ReceiveGoroutines based on load