LangGraph Agent Workflows
Olympus Cloud uses LangGraph to build stateful, multi-step AI agent workflows with human-in-the-loop approval and production-grade checkpointing.
Overview
LangGraph agents power key platform capabilities:
| Agent | Purpose | Use Case |
|---|---|---|
| Restaurant Graph | Enterprise operations | Multi-domain queries, planning |
| Voice Manager | Manager assistant | "Hey Maximus" voice commands |
| Voice Ordering | Customer ordering | Drive-thru, phone orders |
| Phone Graph | Phone orders | Inbound call handling |
| Support Agent | Customer support | Ticket triage, RAG search |
| Content Suggestion | Creator tools | Content ideation |
| Inventory Graph | Stock management | Demand forecasting |
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ LANGGRAPH AGENT SYSTEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Input │───▶│ State │───▶│ Nodes │ │
│ │ Message │ │ Graph │ │ (Functions) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │Checkpointer │ │ Tools │ │
│ │(Redis/PG) │ │ (Actions) │ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ HITL │ │ AI Router │ │
│ │ Approval │ │ (ACP) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Restaurant Graph
The main enterprise agent for restaurant operations.
Graph Structure
┌─────────────┐
│ START │
└──────┬──────┘
│
▼
┌─────────────┐
│intent_router│ ← Classify query complexity
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ simple │ │ rag │ │ planner │
│ handler │ │ retriever │ │ agent │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
│ └─────┬──────┘
│ │
│ ▼
│ ┌───────────┐
│ │ approval │ ← Check if HITL needed
│ │ checker │
│ └─────┬─────┘
│ │
│ ┌────────┴────────┐
│ │ │
│ ▼ ▼
│ ┌───────────┐ ┌───────────┐
│ │ human │ │ executor │
│ │ approval │ ◀──│ dispatch │
│ └─────┬─────┘ └─────┬─────┘
│ │ │
│ └────────┬────────┘
│ │
▼ ▼
┌───────────────────────────┐
│ response_generator │
└─────────────┬─────────────┘
│
▼
┌─────┐
│ END │
└─────┘
State Schema
class RestaurantState(TypedDict):
# Message history
messages: Annotated[list, add_messages]
# Classification
intent: str # Detected intent
domain: AgentDomain # INVENTORY, SCHEDULING, etc.
complexity: int # 1-5 scale
# RAG context
rag_context: Optional[str] # Retrieved documents
rag_sources: list[str] # Source references
# Planning
plan_steps: list[PlanStep] # Execution plan
current_step: int # Current step index
# Approval workflow
requires_approval: bool # HITL needed?
approval_status: ApprovalStatus
approval_reason: str
# Execution
tool_results: list[ToolResult]
execution_errors: list[str]
# Metadata
conversation_context: ConversationContext
metadata: dict
Domains
class AgentDomain(str, Enum):
INVENTORY = "inventory" # Stock, 86 items, reordering
SCHEDULING = "scheduling" # Shifts, time off, coverage
ANALYTICS = "analytics" # Sales, labor, trends
SUPPORT = "support" # Help, troubleshooting
ORDERS = "orders" # Order management
KITCHEN = "kitchen" # KDS, prep, timing
GENERAL = "general" # General questions
CONTENT = "content" # Content creation
SOCIAL = "social" # Social media
AUDIENCE = "audience" # Audience engagement
Voice Manager Graph
Natural language interface for restaurant managers ("Hey Maximus").
Graph Structure
┌───────────┐
│parse_input│ ← Extract intent from voice transcript
└─────┬─────┘
│
▼
┌───────────────────────────────────────────────────┐
│ INTENT ROUTING │
├───────────────────────────────────────────────────┤
│ sales_query → get_sales_for_period() │
│ labor_query → get_labor_metrics() │
│ performance → get_top_performers() │
│ inventory → check_inventory_alerts() │
│ forecast → get_revenue_target_status() │
│ pricing → execute_pricing_command() │
│ special → add_daily_special() │
│ message → send_team_message() │
│ general → general_response() │
└───────────────────────────────────────────────────┘
│
▼
┌─────────────────┐
│generate_response│ ← SSML for TTS
└─────────────────┘
Voice Commands
| Intent | Example Command | Action |
|---|---|---|
| Sales Query | "How are sales today?" | Fetch daily sales summary |
| Labor Query | "Who's in overtime?" | Check labor metrics |
| Performance | "Who are my top servers?" | Get employee rankings |
| Inventory | "Any 86'd items?" | Check inventory alerts |
| Forecast | "Will we hit target?" | Revenue prediction |
| Pricing | "Start happy hour" | Activate price tier |
| Special | "Add salmon special at $24" | Create menu special |
| Message | "Tell kitchen we're busy" | Team notification |
Proactive Alerts
The Voice Manager generates autonomous alerts:
class ProactiveAlertGenerator:
"""Generate alerts without explicit user query"""
async def check_alerts(self, context: ConversationContext) -> list[VoiceAlert]:
alerts = []
# Check inventory
low_stock = await self.inventory_service.get_low_stock()
if low_stock:
alerts.append(VoiceAlert(
type="inventory",
priority="high",
message=f"{len(low_stock)} items running low"
))
# Check labor
overtime = await self.labor_service.get_overtime_employees()
if overtime:
alerts.append(VoiceAlert(
type="labor",
priority="medium",
message=f"{len(overtime)} employees approaching overtime"
))
return alerts
Voice Ordering Graph
Conversational ordering for drive-thru and phone orders.
Ordering Phases
class OrderingPhase(str, Enum):
GREETING = "greeting" # Welcome message
TAKING_ORDER = "taking_order" # Active order building
CONFIRMING_ITEM = "confirming" # Item confirmation
UPSELLING = "upselling" # Suggestion phase
ORDER_REVIEW = "order_review" # Full order confirmation
CHECKOUT = "checkout" # Payment initiation
PAYMENT = "payment" # Processing payment
COMPLETED = "completed" # Order complete
State Management
class VoiceOrderingState(TypedDict):
messages: Annotated[list, add_messages]
# NLU extraction
nlu_result: Optional[NLUExtractionResult]
current_intent: OrderingIntent
confidence_score: float
# Cart
cart_items: list[CartItem]
cart_total: float
pending_item: Optional[PendingItem]
# Phase tracking
current_phase: OrderingPhase
phase_history: list[str]
# Clarification
clarification_needed: bool
clarification_options: list[str]
allergen_warnings: list[str]
# Upsell
upsell_offered: bool
upsell_item: Optional[str]
# Context
location_id: str
menu_context: dict
Intent Classification
class OrderingIntent(str, Enum):
GREETING = "greeting"
ADD_ITEM = "add_item"
REMOVE_ITEM = "remove_item"
MODIFY_ITEM = "modify_item"
CONFIRM_ORDER = "confirm_order"
CHECKOUT = "checkout"
CANCEL_ORDER = "cancel_order"
ASK_QUESTION = "ask_question"
CHECK_PRICE = "check_price"
REPEAT_ORDER = "repeat_order"
UNCLEAR = "unclear"
Checkpointing System
Production-grade state persistence for multi-turn conversations.
Checkpointer Options
| Backend | Use Case | Latency | Durability |
|---|---|---|---|
| Memory | Development/testing | <1ms | None |
| Redis | Production (fast) | <5ms | Session |
| Cloud Spanner | Production (durable) | ~20ms | Persistent |
| Hybrid | Production (best of both) | <5ms | Persistent |
Configuration
# Environment-based configuration
CHECKPOINT_BACKEND=hybrid
REDIS_URL=rediss://${REDIS_HOST}:6378/0 # Memorystore with TLS (all environments)
DATABASE_URL=${SPANNER_CONNECTION_STRING} # Cloud Spanner via env var
CHECKPOINT_TTL=86400 # 24 hours
CHECKPOINT_RETENTION_DAYS=30 # Spanner retention
In production, Redis URLs use rediss:// (double s) for TLS connections to GCP Memorystore. CA verification is automatically skipped by all services because Memorystore uses a private CA not in any public trust store. TLS encryption remains active.
Hybrid Checkpointer
class HybridCheckpointer(BaseCheckpointSaver):
"""Redis for speed + Cloud Spanner for durability"""
def __init__(self, redis_client, spanner_db):
self.redis = RedisCheckpointer(redis_client)
self.spanner = SpannerCheckpointer(spanner_db)
async def put(self, config, checkpoint, metadata):
# Write to Redis immediately
await self.redis.put(config, checkpoint, metadata)
# Async write to Cloud Spanner (fire-and-forget)
asyncio.create_task(
self.spanner.put(config, checkpoint, metadata)
)
async def get(self, config):
# Try Redis first
checkpoint = await self.redis.get(config)
if checkpoint:
return checkpoint
# Fallback to Cloud Spanner
return await self.spanner.get(config)
Human-in-the-Loop (HITL)
Approval workflow for sensitive operations.
Approval Flow
# Compile graph with interrupt points
graph = StateGraph(RestaurantState)
# ... add nodes and edges ...
compiled = graph.compile(
checkpointer=checkpointer,
interrupt_before=["human_approval"] # Pause here for approval
)
Approval Check
def approval_checker(state: RestaurantState) -> dict:
"""Determine if human approval is required"""
requires_approval = False
reason = ""
# Check plan steps for sensitive operations
for step in state.get("plan_steps", []):
if step.tool in SENSITIVE_TOOLS:
requires_approval = True
reason = f"Action '{step.tool}' requires manager approval"
break
if step.estimated_cost and step.estimated_cost > 100:
requires_approval = True
reason = f"Estimated cost ${step.estimated_cost} exceeds threshold"
break
return {
"requires_approval": requires_approval,
"approval_status": ApprovalStatus.PENDING if requires_approval
else ApprovalStatus.NOT_REQUIRED,
"approval_reason": reason
}
API Integration
# Process initial message
@router.post("/agent/chat")
async def agent_chat(request: ChatRequest):
config = {"configurable": {"thread_id": request.session_id}}
result = await graph.invoke(
{"messages": [HumanMessage(content=request.message)]},
config
)
return {
"response": result["messages"][-1].content,
"session_id": request.session_id,
"paused_for_approval": result.get("requires_approval", False)
}
# Handle approval
@router.post("/agent/approval")
async def agent_approval(request: ApprovalRequest):
config = {"configurable": {"thread_id": request.session_id}}
# Update state with approval decision
if request.approved:
state_update = {"approval_status": ApprovalStatus.APPROVED}
else:
state_update = {"approval_status": ApprovalStatus.REJECTED}
# Resume graph execution
result = await graph.invoke(state_update, config)
return {"response": result["messages"][-1].content}
Tool System
Tool Categories
| Category | Tools | Example |
|---|---|---|
| Inventory | check_stock, suggest_reorder, create_po | Check ribeye stock |
| Scheduling | view_schedule, modify_shift, optimize | Add closer tonight |
| Analytics | sales_summary, trends, forecasting | Last week's sales |
| Support | kb_search, create_ticket | Help with login |
| Orders | order_status, modify_order | Cancel order #123 |
| Kitchen | prep_status, queue_management | Check ticket times |
Tool Definition
from langchain_core.tools import tool
from pydantic import BaseModel
class InventoryCheckInput(BaseModel):
item_name: str
location_id: str
@tool
async def check_stock(input: InventoryCheckInput) -> dict:
"""Check current stock level for an item"""
result = await inventory_service.get_stock_level(
item_name=input.item_name,
location_id=input.location_id
)
return {
"item": input.item_name,
"current_stock": result.quantity,
"unit": result.unit,
"par_level": result.par_level,
"status": "low" if result.quantity < result.par_level else "ok"
}
AI Router Integration
Agents use the ACP Router for cost-optimized model selection:
Tier Mapping
| Tier | Model | Cost | Use Case |
|---|---|---|---|
| T1 | Llama 4 Scout | FREE | Simple classification |
| T2 | Gemini 2.0 Flash | $0.10/M | Voice responses |
| T3 | Gemini 3 Flash | $0.50/M | Complex conversation |
| T4 | Claude Haiku 4.5 | $1/M | Fast reasoning |
| T5 | Claude Sonnet 4.5 | $3/M | High-quality analysis |
| T6 | Claude Opus 4.5 | $5/M | Strategic planning |
Usage in Nodes
async def intent_router(state: RestaurantState) -> dict:
"""Classify intent using T1 (free) tier"""
response = await call_ai_gateway(
messages=[{"role": "user", "content": state["messages"][-1].content}],
tier="t1", # Free Llama 4 Scout
system_prompt=CLASSIFICATION_PROMPT
)
return {
"intent": response.intent,
"domain": response.domain,
"complexity": response.complexity
}
async def response_generator(state: RestaurantState) -> dict:
"""Generate response using appropriate tier based on complexity"""
tier = "t2" if state["complexity"] <= 2 else "t4"
response = await call_ai_gateway(
messages=state["messages"],
tier=tier,
system_prompt=RESPONSE_PROMPT
)
return {"messages": [AIMessage(content=response.content)]}
Testing
Unit Tests
def test_restaurant_graph_compiles():
"""Verify graph compiles without errors"""
graph = create_restaurant_graph()
compiled = graph.compile()
assert compiled is not None
def test_intent_routing():
"""Test intent classification routes correctly"""
state = RestaurantState(
messages=[HumanMessage("How are sales today?")],
intent="",
domain=AgentDomain.GENERAL,
complexity=1
)
result = intent_router(state)
assert result["domain"] == AgentDomain.ANALYTICS
Integration Tests
@pytest.mark.asyncio
async def test_full_conversation():
"""Test multi-turn conversation with checkpointing"""
graph = create_restaurant_graph()
checkpointer = MemorySaver()
compiled = graph.compile(checkpointer=checkpointer)
# First turn
config = {"configurable": {"thread_id": "test-123"}}
result1 = await compiled.ainvoke(
{"messages": [HumanMessage("What items are low?")]},
config
)
# Second turn (continues conversation)
result2 = await compiled.ainvoke(
{"messages": [HumanMessage("Order more of the first one")]},
config
)
assert "reorder" in result2["messages"][-1].content.lower()
Best Practices
Graph Design
- Keep nodes focused: Single responsibility per node
- Use conditional edges: Route based on state, not in nodes
- Handle errors gracefully: Catch exceptions, return error state
- Log state transitions: Track flow for debugging
State Management
- Minimize state size: Only store what's needed
- Use typed schemas: TypedDict with annotations
- Clear sensitive data: Remove PII after use
- Version state schemas: Plan for migrations
Checkpointing
- Choose appropriate backend: Redis for speed, Cloud Spanner for durability
- Set reasonable TTLs: Balance memory vs conversation length
- Handle checkpoint failures: Fail gracefully, log errors
- Clean up old checkpoints: Implement retention policies
HITL Workflow
- Be selective: Only require approval for sensitive operations
- Provide context: Include reason and estimated impact
- Set timeouts: Auto-reject after reasonable period
- Audit all decisions: Log approval/rejection with user
Related Documentation
- ACP AI Router - Model routing
- AI Gateway - AI API reference
- Voice AI - Voice processing