AI Agents Architecture
The Olympus Cloud platform uses a sophisticated AI agent architecture built on LangGraph for stateful multi-step workflows with human-in-the-loop (HITL) capabilities.
Overview
The AI system is designed around three core principles:
- Cost Optimization: Route 70%+ of queries to free or low-cost models
- Human-in-the-Loop: Require approval for sensitive operations
- Graceful Degradation: Automatic fallbacks when providers fail
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Client Applications │
│ Staff Shell │ Platform Portal │ Drive-Thru │ Customer App │
└─────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────────────▼───────────────────────────────────┐
│ Go API Gateway │
│ GraphQL │ REST │ WebSocket │ gRPC │
└─────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────────────▼───────────────────────────────────┐
│ Python AI Service │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ LangGraph Orchestrator │ │
│ │ ┌─────────┐ ┌─────────┐ ┌──────────┐ ┌────────────┐ │ │
│ │ │ Intent │─▶│ Planner │─▶│ Approval │─▶│ Executor │ │ │
│ │ │ Router │ │ │ │ Checker │ │ │ │ │
│ │ └─────────┘ └─────────┘ └──────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼──────────────────────────────┐ │
│ │ ACP AI Router │ │
│ │ Tier Selection │ Caching │ Fallbacks │ Cost Tracking │ │
│ └───────────────────────────┬──────────────────────────────┘ │
│ │ │
└──────────────────────────────┼──────────────────────────────────┘
│
┌──────────────────────────────▼──────────────────────────────────┐
│ Cloudflare AI Gateway │
│ Workers AI (FREE) │ Anthropic │ Google │ OpenAI │ ElevenLabs │
└─────────────────────────────────────────────────────────────────┘
Agent Types
The platform implements specialized AI agents for different domains.
Restaurant Operations Agent
Handles day-to-day restaurant tasks:
- Menu recommendations
- Order modifications
- Complaint resolution
- Upselling suggestions
Model Tiers Used: T1 (simple), T2 (standard), T4 (complex)
Inventory Agent
Manages inventory and ordering:
- Par level monitoring
- Reorder suggestions
- Waste tracking analysis
- Supplier recommendations
Model Tiers Used: T2 (analysis), T4 (planning) HITL Required: Order placement, par level changes
Analytics Agent
Provides business intelligence:
- Natural language report queries
- Trend analysis
- Anomaly detection
- Forecasting
Model Tiers Used: T3 (analysis), T5 (complex queries)
Voice AI Agent
Powers voice ordering and "Hey Maximus":
- Speech-to-text processing
- Intent detection
- Order parsing
- Response generation
Model Tiers Used: T1 (greetings), T2 (orders), T4 (disambiguation)
Customer Service Agent
Handles customer interactions:
- FAQ responses
- Order status queries
- Complaint routing
- Feedback collection
Model Tiers Used: T1 (FAQ), T2 (status), T4 (complaints)
LangGraph Orchestration
Each agent is implemented as a LangGraph state machine with checkpointing.
State Graph Structure
from langgraph import StateGraph
class AgentState(TypedDict):
messages: List[Message]
intent: str
entities: Dict[str, Any]
requires_approval: bool
plan: Optional[List[str]]
result: Optional[Any]
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("intent_router", detect_intent)
graph.add_node("planner", create_plan)
graph.add_node("approval_checker", check_approval)
graph.add_node("executor", execute_action)
graph.add_node("responder", generate_response)
# Add edges
graph.add_edge("intent_router", "planner")
graph.add_conditional_edges(
"planner",
requires_approval,
{
True: "approval_checker",
False: "executor"
}
)
graph.add_edge("approval_checker", "executor")
graph.add_edge("executor", "responder")
Checkpointing
State is persisted at each node for:
- Conversation continuity
- Error recovery
- Audit logging
- HITL pause/resume
# Save checkpoint
await graph.save_checkpoint(state, thread_id=session_id)
# Resume from checkpoint
state = await graph.load_checkpoint(thread_id=session_id)
result = await graph.invoke(state, {"input": new_message})
Human-in-the-Loop (HITL)
All actions with financial, safety, or data-modification implications must go through the HITL approval flow. Bypassing HITL for protected actions is a policy violation and will trigger a security audit.
Sensitive operations require human approval before execution.
HITL-Protected Actions
| Action | Agent | Approval Required |
|---|---|---|
| Place inventory order | Inventory | Manager |
| Adjust par levels | Inventory | Manager |
| Process refund > $50 | Customer Service | Manager |
| Modify employee schedule | Scheduling | Manager |
| Change menu prices | Menu | Owner |
| Send marketing campaign | Marketing | Manager |
Approval Flow
┌─────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐
│ Request │────▶│ Plan │────▶│ Pause │────▶│ Approval │
│ Intent │ │ Created │ │ for HITL │ │ Granted │
└─────────┘ └─────────┘ └──────────┘ └────┬─────┘
│
┌────────────────┘
▼
┌──────────┐ ┌──────────┐
│ Execute │────▶│ Complete │
│ Action │ │ │
└──────────┘ └──────────┘
Implementation
class InventoryAgent:
HITL_ACTIONS = ["order_inventory", "adjust_par_levels"]
async def check_approval(self, state: AgentState) -> AgentState:
if state["plan"][0] in self.HITL_ACTIONS:
state["requires_approval"] = True
# Send approval request
await self.send_approval_request(
action=state["plan"],
context=state["entities"],
approvers=["manager", "owner"]
)
# Checkpoint and pause
await self.save_checkpoint(state)
raise HITLPauseException("Awaiting manager approval")
return state
Model Tier Routing
The ACP AI Router intelligently selects model tiers based on task complexity.
Routing Logic
async def select_tier(
intent: str,
complexity: float,
context: Dict[str, Any]
) -> str:
# Simple intents -> T1 (FREE)
if intent in ["greeting", "faq", "status_check"]:
return "T1"
# Standard operations -> T2
if intent in ["order", "lookup", "simple_query"]:
return "T2"
# Complex reasoning -> T4
if intent in ["complaint", "disambiguation", "negotiation"]:
return "T4"
# Strategic planning -> T5/T6
if intent in ["forecast", "strategy", "analysis"]:
if complexity > 0.8:
return "T6"
return "T5"
# Default to T2
return "T2"
Tier Escalation
Agents can escalate to higher tiers when needed:
async def process_with_escalation(self, input: str) -> str:
# Try T1 first
result = await self.router.generate(input, tier="T1")
if result.confidence < 0.7:
# Escalate to T2
result = await self.router.generate(
input,
tier="T2",
context=result.partial_context
)
if result.requires_reasoning:
# Escalate to T4 for complex reasoning
result = await self.router.generate(
input,
tier="T4",
context=result.full_context
)
return result.response
RAG Integration
Agents use Retrieval-Augmented Generation for domain knowledge.
Knowledge Bases
| Knowledge Base | Content | Index |
|---|---|---|
docs-rag | Product documentation | Vectorize |
menu-rag | Menu items, ingredients | Vectorize |
policy-rag | Business policies | Vectorize |
faq-rag | Common questions | Vectorize |
RAG Query Flow
from app.clients.vectorize_client import VectorizeClient
async def query_knowledge(
query: str,
index: str = "docs-rag",
top_k: int = 5
) -> List[Document]:
client = VectorizeClient()
# Generate embedding
embedding = await client.embed(query)
# Query vector store
results = await client.query(
index=index,
vector=embedding,
top_k=top_k,
min_score=0.7
)
return results
Context Injection
async def generate_with_rag(
query: str,
tier: str = "T2"
) -> str:
# Retrieve relevant documents
docs = await query_knowledge(query)
# Build context
context = "\n".join([d.content for d in docs])
# Generate with context
return await self.router.generate(
prompt=query,
system_prompt=f"Use this context to answer:\n{context}",
tier=tier
)
Safety & Guardrails
Never disable safety validators in production. All agent outputs must pass through input validation, output guardrails, and PII detection before being returned to users. Skipping these checks exposes the platform to prompt injection, data leakage, and harmful content risks.
All agent outputs pass through safety validators.
Input Validation
from app.ai.safety.validators import InputValidator
validator = InputValidator()
# Check for prompt injection
if not validator.is_safe(user_input):
raise UnsafeInputError("Potential prompt injection detected")
# Check for PII
pii_detected = validator.detect_pii(user_input)
if pii_detected:
user_input = validator.redact_pii(user_input)
Output Guardrails
from app.ai.safety.guardrails import OutputGuardrails
guardrails = OutputGuardrails()
# Validate response
response = await agent.generate(input)
# Check for harmful content
if not guardrails.is_safe(response):
response = guardrails.filter(response)
# Check for hallucinations (with RAG context)
confidence = guardrails.check_groundedness(response, context)
if confidence < 0.8:
response = await regenerate_with_constraints(input)
Rate Limiting
# Per-tenant rate limits
RATE_LIMITS = {
"T1": 1000, # requests per minute
"T2": 500,
"T3": 200,
"T4": 100,
"T5": 50,
"T6": 20,
}
Monitoring & Observability
Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
ai.requests.total | Total AI requests | - |
ai.requests.success_rate | Success percentage | < 99% |
ai.requests.latency_p99 | P99 latency | > 2s |
ai.cache.hit_rate | Cache hit percentage | < 50% |
ai.fallback.count | Fallback occurrences | > 10/min |
ai.cost.daily | Daily AI costs | > budget |
Tracing
All requests include trace IDs for debugging:
response = await router.generate(
prompt="...",
trace_id="req-abc123-xyz789"
)
# View traces in Cockpit
# cockpit.olympuscloud.ai/traces/req-abc123-xyz789
Logging
logger.info(
"AI request completed",
extra={
"trace_id": trace_id,
"tenant_id": tenant_id,
"tier": tier,
"model": model,
"latency_ms": latency,
"cache_status": cache_status,
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
}
)
Best Practices
1. Start Simple
Always try the lowest tier first and escalate only when needed.
2. Use Caching
Cache identical queries to reduce costs and latency.
3. Implement HITL
Require human approval for any action with financial or safety implications.
4. Monitor Costs
Set up alerts for unexpected cost increases.
5. Test Fallbacks
Regularly test that fallback chains work correctly.
6. Validate Outputs
Always run outputs through safety guardrails before returning to users.