Pub/Sub events are internal to the service mesh. They are not directly accessible via the API gateway. Use WebSocket subscriptions or webhook integrations for external event consumption.
Pub/Sub Events Reference
Complete reference for the event-driven architecture using Google Cloud Pub/Sub in the Olympus Cloud platform.
Overview
The Olympus Cloud platform uses Google Cloud Pub/Sub as the primary message bus for asynchronous event communication between services. Events flow from publishers through topics to subscribers, enabling loose coupling and scalable event-driven architectures.
Key Concepts
| Concept | Description |
|---|---|
| Topic | Named resource to which messages are sent |
| Subscription | Named resource representing a stream of messages from a topic |
| Message | Data and attributes published to a topic |
| DomainEvent | Canonical event schema used across all services |
| Publisher | Service that publishes events to topics |
| Subscriber | Service that receives events from subscriptions |
Topics
Production Topics
| Topic | Description | Event Types |
|---|---|---|
platform-events | Platform-wide events and system notifications | Platform, system, admin events |
restaurant-events | Restaurant operations and order lifecycle | Orders, payments, tables, kitchen |
analytics-events | Business intelligence and metrics | Analytics, reports, dashboards |
notification-events | User notifications and alerts | Push, email, SMS, in-app |
cache-invalidation-events | Cache busting across services | Cache invalidation signals |
ml-predictions | Machine learning predictions and forecasts | AI/ML inference results |
anomaly-alerts | Anomaly detection and alerting | System anomalies, fraud detection |
Topic Configuration
Topics are configured via environment variables:
PLATFORM_EVENTS_TOPIC=platform-events
RESTAURANT_EVENTS_TOPIC=restaurant-events
ANALYTICS_EVENTS_TOPIC=analytics-events
NOTIFICATION_EVENTS_TOPIC=notification-events
CACHE_INVALIDATION_EVENTS_TOPIC=cache-invalidation-events
ML_PREDICTIONS_EVENTS_TOPIC=ml-predictions
ANOMALY_ALERTS_EVENTS_TOPIC=anomaly-alerts
DomainEvent Schema
All events follow the canonical DomainEvent schema (v3.0):
Event Structure
{
"id": "evt_abc123def456",
"type": "order.created",
"aggregate_id": "order_12345",
"aggregate_type": "order",
"restaurant_id": "rest_789",
"tenant_id": "tenant_xyz",
"user_id": "user_456",
"timestamp": "2026-01-19T14:30:00Z",
"data": {
"order_total": 45.99,
"item_count": 3,
"table_number": 12
},
"metadata": {
"source": "api-gateway",
"version": "1.0.0",
"ip_address": "192.168.1.1",
"user_agent": "OlympusStaff/2.0",
"causation": "req_abc123",
"topic": "restaurant-events"
}
}
Field Reference
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique event identifier (UUID) |
type | string | Yes | Event type in domain.action format |
aggregate_id | string | Yes | ID of the affected entity |
aggregate_type | string | No | Type of the affected entity |
restaurant_id | string | No | Restaurant context |
tenant_id | string | No | Tenant context for multi-tenancy |
user_id | string | No | User who triggered the event |
timestamp | ISO 8601 | Yes | Event creation timestamp |
data | object | Yes | Event-specific payload |
metadata | object | Yes | Event metadata and tracing |
Metadata Fields
| Field | Type | Description |
|---|---|---|
source | string | Publishing service name |
version | string | Event schema version |
ip_address | string | Client IP address |
user_agent | string | Client user agent |
causation | string | Causing request/event ID |
topic | string | Target Pub/Sub topic |
Event Types
Order Lifecycle Events
| Event Type | Description | Data Fields |
|---|---|---|
order.created | New order placed | order_total, items, table_number |
order.confirmed | Order confirmed by restaurant | confirmed_at, estimated_time |
order.preparing | Kitchen started preparation | started_at, cook_id |
order.ready | Order ready for service | ready_at, expediter_id |
order.served | Order served to customer | served_at, server_id |
order.delivering | Out for delivery | driver_id, eta |
order.delivered | Delivery completed | delivered_at, signature |
order.completed | Order fully completed | completed_at, duration |
order.cancelled | Order cancelled | cancelled_by, reason, refund |
order.status_updated | Generic status change | old_status, new_status |
Payment Events
| Event Type | Description | Data Fields |
|---|---|---|
payment.started | Payment initiated | amount, method, provider |
payment.completed | Payment successful | transaction_id, amount, tip |
payment.failed | Payment declined | error_code, reason |
payment.refunded | Refund processed | refund_amount, reason |
Kitchen Events
| Event Type | Description | Data Fields |
|---|---|---|
kitchen.item_started | Item prep started | item_id, station, cook_id |
kitchen.item_completed | Item prep completed | item_id, completion_time |
kitchen.order_ready | Full order ready | order_id, items |
kitchen.delayed | Order delayed | order_id, reason, new_eta |
Table Events
| Event Type | Description | Data Fields |
|---|---|---|
table.assigned | Table assigned to server | table_id, server_id |
table.seated | Guests seated | table_id, party_size |
table.cleaned | Table cleaned | table_id, cleaned_by |
table.available | Table available | table_id |
Menu Events
| Event Type | Description | Data Fields |
|---|---|---|
menu.item.86ed | Item marked unavailable | item_id, reason |
menu.item.restored | Item back in stock | item_id |
menu.item.auto_86ed | Auto 86'd by inventory | item_id, threshold |
menu.item.low_stock | Low stock warning | item_id, current_qty |
menu.item.updated | Item details changed | item_id, changes |
menu.category.updated | Category changed | category_id, changes |
Reservation Events
| Event Type | Description | Data Fields |
|---|---|---|
reservation.created | New reservation | party_size, time, name |
reservation.cancelled | Reservation cancelled | reason, cancelled_by |
reservation.seated | Guests seated | table_id, seated_at |
waitlist.added | Added to waitlist | party_size, wait_time |
Topic Routing
Events are automatically routed to appropriate topics based on aggregate type and event type.
Routing Rules
// Aggregate type routing
switch aggregateType {
case "analytics":
→ analytics-events
case "notification":
→ notification-events
case "cache":
→ cache-invalidation-events
case "ml", "prediction":
→ ml-predictions
case "anomaly", "alert":
→ anomaly-alerts
case "restaurant", "order":
→ restaurant-events
default:
→ platform-events
}
// Event type prefix routing
switch {
case strings.HasPrefix(type, "analytics."):
→ analytics-events
case strings.HasPrefix(type, "notification."):
→ notification-events
case strings.HasPrefix(type, "cache."):
→ cache-invalidation-events
case strings.HasPrefix(type, "ml."):
→ ml-predictions
case strings.HasPrefix(type, "anomaly."):
→ anomaly-alerts
case strings.HasPrefix(type, "platform."):
→ platform-events
default:
→ restaurant-events
}
Explicit Topic Override
Set metadata.topic to force routing to a specific topic:
{
"metadata": {
"topic": "analytics-events"
}
}
Publishing Events
Publisher Configuration
type PubSubPublisherConfig struct {
ProjectID string // GCP project ID (olympuscloud-dev/staging/prod)
RequestTimeout time.Duration // Per-publish timeout (default: 10s)
MaxRetries int // Retry attempts (default: 3)
InitialBackoff time.Duration // Initial retry delay (default: 200ms)
EnableOrdering bool // Enable message ordering
PublishBatchSize int // Batch size (default: 100)
}
Publishing Example
// Create publisher
publisher, err := events.NewPubSubPublisher(ctx, events.PubSubPublisherConfig{
ProjectID: "olympuscloud-dev",
RequestTimeout: 10 * time.Second,
MaxRetries: 3,
EnableOrdering: true,
})
if err != nil {
return err
}
defer publisher.Close()
// Create event manager with topic router
router := events.TopicRouter{
DefaultTopic: "platform-events",
RestaurantTopic: "restaurant-events",
AnalyticsTopic: "analytics-events",
}
eventManager := events.NewEventManager(publisher, router)
// Publish event
event := &events.DomainEvent{
Type: events.OrderCreated,
AggregateID: orderID,
AggregateType: "order",
RestaurantID: restaurantID,
TenantID: tenantID,
UserID: userID,
Data: map[string]interface{}{
"order_total": 45.99,
"item_count": 3,
},
}
err = eventManager.Publish(ctx, event)
Message Attributes
Published messages include these attributes for filtering and routing:
| Attribute | Source | Description |
|---|---|---|
event_type | Type | Event type string |
event_id | ID | Unique event ID |
aggregate_id | AggregateID | Entity ID |
aggregate_type | AggregateType | Entity type |
restaurant_id | RestaurantID | Restaurant context |
tenant_id | TenantID | Tenant context |
user_id | UserID | User context |
source | Metadata.Source | Publishing service |
version | Metadata.Version | Schema version |
causation_id | Metadata.Causation | Causing request ID |
topic | Metadata.Topic | Target topic |
payload_checksum | Computed | SHA-256 of payload |
Message Ordering
When EnableOrdering is true, messages are ordered by:
- AggregateID (if present) - Orders events for same entity
- TenantID (fallback) - Orders events within tenant
// Ordering key selection
if event.AggregateID != "" {
message.OrderingKey = event.AggregateID
} else if event.TenantID != "" {
message.OrderingKey = event.TenantID
}
Subscribing to Events
Subscriber Configuration
type PubSubSubscriberConfig struct {
ProjectID string // GCP project ID (olympuscloud-dev/staging/prod)
RequestTimeout time.Duration // Operation timeout (default: 10s)
ChannelBuffer int // Event channel size (default: 100)
AckDeadline time.Duration // Message ack deadline (default: 30s)
RetentionDuration time.Duration // Message retention (default: 24h)
ReceiveGoroutines int // Parallel receivers (default: 4)
MaxOutstandingMessages int // Max unacked messages (default: 200)
MaxReconnectAttempts int // Reconnect attempts (0 = unlimited)
InitialReconnectDelay time.Duration // Initial retry delay (default: 200ms)
MaxReconnectDelay time.Duration // Max retry delay (default: 5s)
DeduplicationWindow time.Duration // Dedup window (default: 5m)
DeduplicationCacheSize int // Dedup cache size (default: 256)
}
Subscription Example
// Create subscriber
subscriber, err := events.NewPubSubSubscriber(ctx, events.PubSubSubscriberConfig{
ProjectID: "olympuscloud-dev",
ChannelBuffer: 100,
AckDeadline: 30 * time.Second,
ReceiveGoroutines: 4,
})
if err != nil {
return err
}
defer subscriber.Close()
// Subscribe to topic
eventChan, err := subscriber.Subscribe(ctx, "restaurant-events", "my-service-subscription")
if err != nil {
return err
}
// Process events
for event := range eventChan {
switch event.Type {
case events.OrderCreated:
handleOrderCreated(event)
case events.PaymentCompleted:
handlePaymentCompleted(event)
default:
log.Printf("Unhandled event type: %s", event.Type)
}
}
Subscription Features
Automatic Deduplication
Messages are deduplicated within a configurable window:
DeduplicationWindow: 5 * time.Minute, // Time window
DeduplicationCacheSize: 256, // Cache capacity
Automatic Reconnection
Subscribers automatically reconnect with exponential backoff:
MaxReconnectAttempts: 0, // 0 = unlimited
InitialReconnectDelay: 200 * time.Millisecond,
MaxReconnectDelay: 5 * time.Second,
Message Ordering
Subscriptions preserve message ordering when enabled on the topic.
Retry and Error Handling
Publisher Retries
Publishers retry with exponential backoff:
Attempt 1: Immediate
Attempt 2: 200ms delay
Attempt 3: 400ms delay
Attempt 4: 800ms delay (capped at 5s)
Configuration:
MaxRetries: 3,
InitialBackoff: 200 * time.Millisecond,
Subscriber Retries
Failed message processing results in:
- Nack - Message requeued for redelivery
- Backoff - Exponential delay before retry
- Dead Letter - After max attempts, sent to DLQ
Dead Letter Queues
Configure dead letter topics for failed messages:
subscription:
name: my-service-subscription
topic: restaurant-events
dead_letter_topic: restaurant-events-dlq
max_delivery_attempts: 5
Monitoring and Observability
Metrics
Events publish metrics via the observability package:
observability.GlobalMetrics.RecordEventPublished(eventType, restaurantID)
Logging
All event operations are logged with structured fields:
{
"component": "event_manager",
"event_id": "evt_abc123",
"event_type": "order.created",
"topics": ["restaurant-events"],
"msg": "published domain event"
}
Tracing
Events propagate trace context via:
X-Request-IDheadercausation_idattribute- OpenTelemetry span context
Development Environment
Using the Dev Project
All development uses the live olympuscloud-dev GCP project. There is no local Pub/Sub emulator.
publisher, err := events.NewPubSubPublisher(ctx, events.PubSubPublisherConfig{
ProjectID: "olympuscloud-dev",
RequestTimeout: 10 * time.Second,
MaxRetries: 3,
})
Creating Topics in Dev
# Ensure you are targeting the dev project
gcloud config set project olympuscloud-dev
# Create topics
gcloud pubsub topics create restaurant-events
gcloud pubsub topics create platform-events
gcloud pubsub topics create analytics-events
gcloud pubsub topics create notification-events
Best Practices
Event Design
- Immutable events - Events are facts that happened, never modify
- Self-contained - Include all data needed to process the event
- Idempotent handlers - Design subscribers to handle duplicates
- Versioned schemas - Use
metadata.versionfor evolution
Publishing
- Batch when possible - Use batch publishing for bulk operations
- Set timeouts - Configure appropriate request timeouts
- Handle failures - Implement proper error handling and retries
- Include context - Always set tenant_id, user_id, restaurant_id
Subscribing
- Process quickly - Ack messages promptly to avoid redelivery
- Use dead letters - Configure DLQ for poison messages
- Monitor lag - Track subscription backlog metrics
- Scale receivers - Adjust
ReceiveGoroutinesbased on load
Related Documentation
- WebSocket Events - Real-time client events
- Webhooks - External event delivery
- API Gateway Architecture - Gateway overview
- Event-Driven Architecture - System design