Appearance
Deploying to Production
Guide for running Elsai agents in production environments.
FastAPI REST API
The most common deployment pattern — wrap your agent in a REST API:
python
# app.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from elsai import Agent
from elsai_model.openai import OpenAIConnector
from elsai.session import FileSessionManager
import json
app = FastAPI(title="Elsai Agent API")
session_manager = FileSessionManager("./sessions")
class ChatRequest(BaseModel):
message: str
session_id: str = "default"
class ChatResponse(BaseModel):
response: str
session_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
agent = Agent(
model=OpenAIConnector(model_name="gpt-4o"),
agent_id=request.session_id,
session_manager=session_manager,
callback_handler=None, # Suppress stdout output
)
result = await agent.invoke_async(request.message)
return ChatResponse(response=str(result), session_id=request.session_id)
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
agent = Agent(
model=OpenAIConnector(model_name="gpt-4o"),
agent_id=request.session_id,
session_manager=session_manager,
callback_handler=None,
)
async def generate():
async for event in agent.stream_async(request.message):
if "data" in event:
yield f"data: {json.dumps({'text': event['data']})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")bash
uvicorn app:app --host 0.0.0.0 --port 8080AWS Lambda
Deploy as a serverless function:
python
# lambda_handler.py
import json
from elsai import Agent
from elsai.session import S3SessionManager
session_manager = S3SessionManager(bucket_name="my-agent-sessions")
def handler(event, context):
body = json.loads(event.get("body", "{}"))
message = body.get("message", "")
session_id = body.get("session_id", "default")
agent = Agent(
agent_id=session_id,
session_manager=session_manager,
callback_handler=None,
)
result = agent(message)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"response": str(result), "session_id": session_id}),
}Docker
dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]text
# requirements.txt
elsai-agents
elsai-model
fastapi
uvicornEnvironment variables
Never hard-code secrets. Use environment variables:
bash
# .env (do not commit this file)
ANTHROPIC_API_KEY=sk-ant-...
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_DEFAULT_REGION=us-west-2python
import os
from dotenv import load_dotenv
from elsai import Agent
from elsai_model.openai import OpenAIConnector
load_dotenv()
agent = Agent(
model=OpenAIConnector(
openai_api_key=os.environ["OPENAI_API_KEY"],
model_name="gpt-4o",
)
)Concurrency
Agents are not thread-safe by default. Each request should use its own agent instance:
python
# ✅ Correct: new agent per request
@app.post("/chat")
async def chat(request: ChatRequest):
agent = Agent(agent_id=request.session_id, session_manager=session_manager)
result = await agent.invoke_async(request.message)
return {"response": str(result)}
# ❌ Wrong: shared agent instance across requests
shared_agent = Agent() # Race condition!
@app.post("/chat")
async def chat(request: ChatRequest):
result = await shared_agent.invoke_async(request.message) # Not safe!
return {"response": str(result)}Observability
OpenTelemetry
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# Set up OTLP exporter
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4318/v1/traces"))
)
trace.set_tracer_provider(provider)
# Agents automatically emit spans
agent = Agent(
trace_attributes={"service.name": "my-agent-service", "environment": "production"}
)Logging
python
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s"
)
# Reduce verbosity of internal SDK logs
logging.getLogger("elsai").setLevel(logging.WARNING)Health checks
python
@app.get("/health")
async def health():
return {"status": "ok", "sdk": "elsai-agents"}
@app.get("/health/model")
async def health_model():
try:
agent = Agent(callback_handler=None)
result = await agent.invoke_async("Say 'OK'")
return {"status": "ok", "model_reachable": True}
except Exception as e:
return {"status": "error", "detail": str(e)}, 503Rate limiting
python
from collections import defaultdict
from time import time
class RateLimiter:
def __init__(self, max_per_minute: int = 10):
self.max = max_per_minute
self.calls: dict[str, list[float]] = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = time()
self.calls[user_id] = [t for t in self.calls[user_id] if now - t < 60]
if len(self.calls[user_id]) >= self.max:
return False
self.calls[user_id].append(now)
return True
rate_limiter = RateLimiter(max_per_minute=10)
@app.post("/chat")
async def chat(request: ChatRequest):
if not rate_limiter.is_allowed(request.session_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# ... rest of handler