Skip to main content

LangGraph Agent Workflows

Olympus Cloud uses LangGraph to build stateful, multi-step AI agent workflows with human-in-the-loop approval and production-grade checkpointing.

Overview

LangGraph agents power key platform capabilities:

AgentPurposeUse Case
Restaurant GraphEnterprise operationsMulti-domain queries, planning
Voice ManagerManager assistant"Hey Maximus" voice commands
Voice OrderingCustomer orderingDrive-thru, phone orders
Phone GraphPhone ordersInbound call handling
Support AgentCustomer supportTicket triage, RAG search
Content SuggestionCreator toolsContent ideation
Inventory GraphStock managementDemand forecasting

Architecture

┌─────────────────────────────────────────────────────────────────┐
│ LANGGRAPH AGENT SYSTEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Input │───▶│ State │───▶│ Nodes │ │
│ │ Message │ │ Graph │ │ (Functions) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │Checkpointer │ │ Tools │ │
│ │(Redis/PG) │ │ (Actions) │ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ HITL │ │ AI Router │ │
│ │ Approval │ │ (ACP) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

Restaurant Graph

The main enterprise agent for restaurant operations.

Graph Structure

                    ┌─────────────┐
│ START │
└──────┬──────┘


┌─────────────┐
│intent_router│ ← Classify query complexity
└──────┬──────┘

┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ simple │ │ rag │ │ planner │
│ handler │ │ retriever │ │ agent │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
│ └─────┬──────┘
│ │
│ ▼
│ ┌───────────┐
│ │ approval │ ← Check if HITL needed
│ │ checker │
│ └─────┬─────┘
│ │
│ ┌────────┴────────┐
│ │ │
│ ▼ ▼
│ ┌───────────┐ ┌───────────┐
│ │ human │ │ executor │
│ │ approval │ ◀──│ dispatch │
│ └─────┬─────┘ └─────┬─────┘
│ │ │
│ └────────┬────────┘
│ │
▼ ▼
┌───────────────────────────┐
│ response_generator │
└─────────────┬─────────────┘


┌─────┐
│ END │
└─────┘

State Schema

class RestaurantState(TypedDict):
# Message history
messages: Annotated[list, add_messages]

# Classification
intent: str # Detected intent
domain: AgentDomain # INVENTORY, SCHEDULING, etc.
complexity: int # 1-5 scale

# RAG context
rag_context: Optional[str] # Retrieved documents
rag_sources: list[str] # Source references

# Planning
plan_steps: list[PlanStep] # Execution plan
current_step: int # Current step index

# Approval workflow
requires_approval: bool # HITL needed?
approval_status: ApprovalStatus
approval_reason: str

# Execution
tool_results: list[ToolResult]
execution_errors: list[str]

# Metadata
conversation_context: ConversationContext
metadata: dict

Domains

class AgentDomain(str, Enum):
INVENTORY = "inventory" # Stock, 86 items, reordering
SCHEDULING = "scheduling" # Shifts, time off, coverage
ANALYTICS = "analytics" # Sales, labor, trends
SUPPORT = "support" # Help, troubleshooting
ORDERS = "orders" # Order management
KITCHEN = "kitchen" # KDS, prep, timing
GENERAL = "general" # General questions
CONTENT = "content" # Content creation
SOCIAL = "social" # Social media
AUDIENCE = "audience" # Audience engagement

Voice Manager Graph

Natural language interface for restaurant managers ("Hey Maximus").

Graph Structure

┌───────────┐
│parse_input│ ← Extract intent from voice transcript
└─────┬─────┘


┌───────────────────────────────────────────────────┐
│ INTENT ROUTING │
├───────────────────────────────────────────────────┤
│ sales_query → get_sales_for_period() │
│ labor_query → get_labor_metrics() │
│ performance → get_top_performers() │
│ inventory → check_inventory_alerts() │
│ forecast → get_revenue_target_status() │
│ pricing → execute_pricing_command() │
│ special → add_daily_special() │
│ message → send_team_message() │
│ general → general_response() │
└───────────────────────────────────────────────────┘


┌─────────────────┐
│generate_response│ ← SSML for TTS
└─────────────────┘

Voice Commands

IntentExample CommandAction
Sales Query"How are sales today?"Fetch daily sales summary
Labor Query"Who's in overtime?"Check labor metrics
Performance"Who are my top servers?"Get employee rankings
Inventory"Any 86'd items?"Check inventory alerts
Forecast"Will we hit target?"Revenue prediction
Pricing"Start happy hour"Activate price tier
Special"Add salmon special at $24"Create menu special
Message"Tell kitchen we're busy"Team notification

Proactive Alerts

The Voice Manager generates autonomous alerts:

class ProactiveAlertGenerator:
"""Generate alerts without explicit user query"""

async def check_alerts(self, context: ConversationContext) -> list[VoiceAlert]:
alerts = []

# Check inventory
low_stock = await self.inventory_service.get_low_stock()
if low_stock:
alerts.append(VoiceAlert(
type="inventory",
priority="high",
message=f"{len(low_stock)} items running low"
))

# Check labor
overtime = await self.labor_service.get_overtime_employees()
if overtime:
alerts.append(VoiceAlert(
type="labor",
priority="medium",
message=f"{len(overtime)} employees approaching overtime"
))

return alerts

Voice Ordering Graph

Conversational ordering for drive-thru and phone orders.

Ordering Phases

class OrderingPhase(str, Enum):
GREETING = "greeting" # Welcome message
TAKING_ORDER = "taking_order" # Active order building
CONFIRMING_ITEM = "confirming" # Item confirmation
UPSELLING = "upselling" # Suggestion phase
ORDER_REVIEW = "order_review" # Full order confirmation
CHECKOUT = "checkout" # Payment initiation
PAYMENT = "payment" # Processing payment
COMPLETED = "completed" # Order complete

State Management

class VoiceOrderingState(TypedDict):
messages: Annotated[list, add_messages]

# NLU extraction
nlu_result: Optional[NLUExtractionResult]
current_intent: OrderingIntent
confidence_score: float

# Cart
cart_items: list[CartItem]
cart_total: float
pending_item: Optional[PendingItem]

# Phase tracking
current_phase: OrderingPhase
phase_history: list[str]

# Clarification
clarification_needed: bool
clarification_options: list[str]
allergen_warnings: list[str]

# Upsell
upsell_offered: bool
upsell_item: Optional[str]

# Context
location_id: str
menu_context: dict

Intent Classification

class OrderingIntent(str, Enum):
GREETING = "greeting"
ADD_ITEM = "add_item"
REMOVE_ITEM = "remove_item"
MODIFY_ITEM = "modify_item"
CONFIRM_ORDER = "confirm_order"
CHECKOUT = "checkout"
CANCEL_ORDER = "cancel_order"
ASK_QUESTION = "ask_question"
CHECK_PRICE = "check_price"
REPEAT_ORDER = "repeat_order"
UNCLEAR = "unclear"

Checkpointing System

Production-grade state persistence for multi-turn conversations.

Checkpointer Options

BackendUse CaseLatencyDurability
MemoryDevelopment/testing<1msNone
RedisProduction (fast)<5msSession
Cloud SpannerProduction (durable)~20msPersistent
HybridProduction (best of both)<5msPersistent

Configuration

# Environment-based configuration
CHECKPOINT_BACKEND=hybrid
REDIS_URL=rediss://${REDIS_HOST}:6378/0 # Memorystore with TLS (all environments)
DATABASE_URL=${SPANNER_CONNECTION_STRING} # Cloud Spanner via env var
CHECKPOINT_TTL=86400 # 24 hours
CHECKPOINT_RETENTION_DAYS=30 # Spanner retention
Memorystore TLS

In production, Redis URLs use rediss:// (double s) for TLS connections to GCP Memorystore. CA verification is automatically skipped by all services because Memorystore uses a private CA not in any public trust store. TLS encryption remains active.

Hybrid Checkpointer

class HybridCheckpointer(BaseCheckpointSaver):
"""Redis for speed + Cloud Spanner for durability"""

def __init__(self, redis_client, spanner_db):
self.redis = RedisCheckpointer(redis_client)
self.spanner = SpannerCheckpointer(spanner_db)

async def put(self, config, checkpoint, metadata):
# Write to Redis immediately
await self.redis.put(config, checkpoint, metadata)

# Async write to Cloud Spanner (fire-and-forget)
asyncio.create_task(
self.spanner.put(config, checkpoint, metadata)
)

async def get(self, config):
# Try Redis first
checkpoint = await self.redis.get(config)
if checkpoint:
return checkpoint

# Fallback to Cloud Spanner
return await self.spanner.get(config)

Human-in-the-Loop (HITL)

Approval workflow for sensitive operations.

Approval Flow

# Compile graph with interrupt points
graph = StateGraph(RestaurantState)
# ... add nodes and edges ...

compiled = graph.compile(
checkpointer=checkpointer,
interrupt_before=["human_approval"] # Pause here for approval
)

Approval Check

def approval_checker(state: RestaurantState) -> dict:
"""Determine if human approval is required"""

requires_approval = False
reason = ""

# Check plan steps for sensitive operations
for step in state.get("plan_steps", []):
if step.tool in SENSITIVE_TOOLS:
requires_approval = True
reason = f"Action '{step.tool}' requires manager approval"
break

if step.estimated_cost and step.estimated_cost > 100:
requires_approval = True
reason = f"Estimated cost ${step.estimated_cost} exceeds threshold"
break

return {
"requires_approval": requires_approval,
"approval_status": ApprovalStatus.PENDING if requires_approval
else ApprovalStatus.NOT_REQUIRED,
"approval_reason": reason
}

API Integration

# Process initial message
@router.post("/agent/chat")
async def agent_chat(request: ChatRequest):
config = {"configurable": {"thread_id": request.session_id}}

result = await graph.invoke(
{"messages": [HumanMessage(content=request.message)]},
config
)

return {
"response": result["messages"][-1].content,
"session_id": request.session_id,
"paused_for_approval": result.get("requires_approval", False)
}

# Handle approval
@router.post("/agent/approval")
async def agent_approval(request: ApprovalRequest):
config = {"configurable": {"thread_id": request.session_id}}

# Update state with approval decision
if request.approved:
state_update = {"approval_status": ApprovalStatus.APPROVED}
else:
state_update = {"approval_status": ApprovalStatus.REJECTED}

# Resume graph execution
result = await graph.invoke(state_update, config)

return {"response": result["messages"][-1].content}

Tool System

Tool Categories

CategoryToolsExample
Inventorycheck_stock, suggest_reorder, create_poCheck ribeye stock
Schedulingview_schedule, modify_shift, optimizeAdd closer tonight
Analyticssales_summary, trends, forecastingLast week's sales
Supportkb_search, create_ticketHelp with login
Ordersorder_status, modify_orderCancel order #123
Kitchenprep_status, queue_managementCheck ticket times

Tool Definition

from langchain_core.tools import tool
from pydantic import BaseModel

class InventoryCheckInput(BaseModel):
item_name: str
location_id: str

@tool
async def check_stock(input: InventoryCheckInput) -> dict:
"""Check current stock level for an item"""

result = await inventory_service.get_stock_level(
item_name=input.item_name,
location_id=input.location_id
)

return {
"item": input.item_name,
"current_stock": result.quantity,
"unit": result.unit,
"par_level": result.par_level,
"status": "low" if result.quantity < result.par_level else "ok"
}

AI Router Integration

Agents use the ACP Router for cost-optimized model selection:

Tier Mapping

TierModelCostUse Case
T1Llama 4 ScoutFREESimple classification
T2Gemini 2.0 Flash$0.10/MVoice responses
T3Gemini 3 Flash$0.50/MComplex conversation
T4Claude Haiku 4.5$1/MFast reasoning
T5Claude Sonnet 4.5$3/MHigh-quality analysis
T6Claude Opus 4.5$5/MStrategic planning

Usage in Nodes

async def intent_router(state: RestaurantState) -> dict:
"""Classify intent using T1 (free) tier"""

response = await call_ai_gateway(
messages=[{"role": "user", "content": state["messages"][-1].content}],
tier="t1", # Free Llama 4 Scout
system_prompt=CLASSIFICATION_PROMPT
)

return {
"intent": response.intent,
"domain": response.domain,
"complexity": response.complexity
}

async def response_generator(state: RestaurantState) -> dict:
"""Generate response using appropriate tier based on complexity"""

tier = "t2" if state["complexity"] <= 2 else "t4"

response = await call_ai_gateway(
messages=state["messages"],
tier=tier,
system_prompt=RESPONSE_PROMPT
)

return {"messages": [AIMessage(content=response.content)]}

Testing

Unit Tests

def test_restaurant_graph_compiles():
"""Verify graph compiles without errors"""
graph = create_restaurant_graph()
compiled = graph.compile()
assert compiled is not None

def test_intent_routing():
"""Test intent classification routes correctly"""
state = RestaurantState(
messages=[HumanMessage("How are sales today?")],
intent="",
domain=AgentDomain.GENERAL,
complexity=1
)

result = intent_router(state)
assert result["domain"] == AgentDomain.ANALYTICS

Integration Tests

@pytest.mark.asyncio
async def test_full_conversation():
"""Test multi-turn conversation with checkpointing"""

graph = create_restaurant_graph()
checkpointer = MemorySaver()
compiled = graph.compile(checkpointer=checkpointer)

# First turn
config = {"configurable": {"thread_id": "test-123"}}
result1 = await compiled.ainvoke(
{"messages": [HumanMessage("What items are low?")]},
config
)

# Second turn (continues conversation)
result2 = await compiled.ainvoke(
{"messages": [HumanMessage("Order more of the first one")]},
config
)

assert "reorder" in result2["messages"][-1].content.lower()

Best Practices

Graph Design

  1. Keep nodes focused: Single responsibility per node
  2. Use conditional edges: Route based on state, not in nodes
  3. Handle errors gracefully: Catch exceptions, return error state
  4. Log state transitions: Track flow for debugging

State Management

  1. Minimize state size: Only store what's needed
  2. Use typed schemas: TypedDict with annotations
  3. Clear sensitive data: Remove PII after use
  4. Version state schemas: Plan for migrations

Checkpointing

  1. Choose appropriate backend: Redis for speed, Cloud Spanner for durability
  2. Set reasonable TTLs: Balance memory vs conversation length
  3. Handle checkpoint failures: Fail gracefully, log errors
  4. Clean up old checkpoints: Implement retention policies

HITL Workflow

  1. Be selective: Only require approval for sensitive operations
  2. Provide context: Include reason and estimated impact
  3. Set timeouts: Auto-reject after reasonable period
  4. Audit all decisions: Log approval/rejection with user