Examples
Real-world integration examples
Copy-paste examples for common AI agent patterns. LangChain, CrewAI, RAG pipelines, and more.
LangChain Agent
Track every LLM call, tool use, and chain execution in a LangChain agent.
LangChainOpenAIAgent
Python
from langchain.callbacks import EmpressCallback
from langchain.agents import create_openai_functions_agent
from langchain.chat_models import ChatOpenAI
# Initialize callback
callback = EmpressCallback(api_key="...")
# Create agent with tracking
llm = ChatOpenAI(model="gpt-4", callbacks=[callback])
agent = create_openai_functions_agent(llm, tools, prompt)
# All actions automatically tracked
result = agent.invoke({"input": "Analyze customer churn data"})Customer Support Agent
Track ticket resolution, escalations, and customer satisfaction.
SupportTicketsCSAT
Python
from empress import Empress
empress = Empress(api_key="...")
def resolve_ticket(ticket_id, customer_id, resolution):
# Your agent logic here
result = support_agent.resolve(ticket_id, resolution)
# Track the resolution
empress.track(
actor="support-agent-v2",
verb="resolved",
object=f"ticket-{ticket_id}",
result={
"success": result.success,
"resolution_type": resolution.type,
"time_to_resolve_minutes": result.duration,
"csat_score": result.csat
},
context={
"customer_id": customer_id,
"customer_tier": result.customer.tier,
"ticket_priority": result.priority,
"escalated": result.was_escalated
}
)
return resultFinance Agent
Track loan approvals, risk assessments, and compliance decisions.
FinanceRiskCompliance
Python
from empress import Empress
empress = Empress(api_key="...")
def process_loan_application(application):
# Run risk assessment
risk = risk_model.assess(application)
decision = make_decision(application, risk)
# Track for compliance
empress.track(
actor="finance-agent-v2.3",
verb="assessed" if not decision.approved else "approved",
object=f"loan-{application.id}",
result={
"success": True,
"approved": decision.approved,
"amount": application.amount,
"interest_rate": decision.rate,
"risk_score": risk.score,
"confidence": decision.confidence
},
context={
"applicant_id": application.customer_id,
"credit_score": application.credit_score,
"debt_ratio": application.debt_ratio,
"reasoning": decision.factors,
"model_version": "v2.3.1",
"human_override": False
}
)
return decisionMulti-Agent Workflow
Track actions across multiple coordinated agents with correlation IDs.
Multi-AgentWorkflowCrewAI
Python
from empress import Empress
import uuid
empress = Empress(api_key="...")
def process_order(order):
# Create correlation ID for the workflow
workflow_id = str(uuid.uuid4())
# Agent 1: Validate order
empress.track(
actor="validation-agent",
verb="validated",
object=f"order-{order.id}",
result={"valid": True},
context={"workflow_id": workflow_id, "step": 1}
)
# Agent 2: Check inventory
empress.track(
actor="inventory-agent",
verb="checked",
object=f"inventory-{order.product_id}",
result={"in_stock": True, "quantity": 47},
context={"workflow_id": workflow_id, "step": 2}
)
# Agent 3: Process payment
empress.track(
actor="payment-agent",
verb="processed",
object=f"payment-{order.payment_id}",
result={"success": True, "amount": order.total},
context={"workflow_id": workflow_id, "step": 3}
)
# Full workflow traceable via workflow_idRAG Pipeline
Track retrieval, context assembly, and generation in RAG systems.
RAGLlamaIndexVector DB
Python
from empress import Empress
from llama_index import VectorStoreIndex
empress = Empress(api_key="...")
def query_knowledge_base(query, user_id):
# Track retrieval
results = index.query(query, top_k=5)
empress.track(
actor="retrieval-agent",
verb="retrieved",
object=f"query-{hash(query)[:8]}",
result={
"num_results": len(results),
"top_score": results[0].score,
"latency_ms": results.latency
},
context={"user_id": user_id, "query": query[:100]}
)
# Track generation
response = llm.generate(query, context=results)
empress.track(
actor="generation-agent",
verb="generated",
object=f"response-{response.id}",
result={
"tokens": response.token_count,
"cost": response.cost,
"latency_ms": response.latency
},
context={
"user_id": user_id,
"sources": [r.doc_id for r in results],
"model": "gpt-4"
}
)
return responseAsync Batch Processing
Track high-volume batch operations efficiently with async SDK.
AsyncBatchHigh Volume
Python
from empress import AsyncEmpress
import asyncio
empress = AsyncEmpress(api_key="...")
async def process_batch(items):
tasks = []
for item in items:
# Process item
result = await process_item(item)
# Track asynchronously (non-blocking)
task = empress.track_async(
actor="batch-processor",
verb="processed",
object=f"item-{item.id}",
result={"success": result.success},
context={"batch_id": batch_id}
)
tasks.append(task)
# Wait for all tracking to complete
await asyncio.gather(*tasks)
# SDK handles batching and retries automaticallyMore examples on GitHub
Full working examples, integration tests, and deployment templates.
View on GitHub