Skip to content

Deploying to Production

Guide for running Elsai agents in production environments.

FastAPI REST API

The most common deployment pattern — wrap your agent in a REST API:

python
# app.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from elsai import Agent
from elsai_model.openai import OpenAIConnector
from elsai.session import FileSessionManager
import json

app = FastAPI(title="Elsai Agent API")

session_manager = FileSessionManager("./sessions")

class ChatRequest(BaseModel):
    message: str
    session_id: str = "default"

class ChatResponse(BaseModel):
    response: str
    session_id: str

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    agent = Agent(
        model=OpenAIConnector(model_name="gpt-4o"),
        agent_id=request.session_id,
        session_manager=session_manager,
        callback_handler=None,  # Suppress stdout output
    )
    result = await agent.invoke_async(request.message)
    return ChatResponse(response=str(result), session_id=request.session_id)

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    agent = Agent(
        model=OpenAIConnector(model_name="gpt-4o"),
        agent_id=request.session_id,
        session_manager=session_manager,
        callback_handler=None,
    )

    async def generate():
        async for event in agent.stream_async(request.message):
            if "data" in event:
                yield f"data: {json.dumps({'text': event['data']})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")
bash
uvicorn app:app --host 0.0.0.0 --port 8080

AWS Lambda

Deploy as a serverless function:

python
# lambda_handler.py
import json
from elsai import Agent
from elsai.session import S3SessionManager

session_manager = S3SessionManager(bucket_name="my-agent-sessions")

def handler(event, context):
    body = json.loads(event.get("body", "{}"))
    message = body.get("message", "")
    session_id = body.get("session_id", "default")

    agent = Agent(
        agent_id=session_id,
        session_manager=session_manager,
        callback_handler=None,
    )

    result = agent(message)

    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"response": str(result), "session_id": session_id}),
    }

Docker

dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
EXPOSE 8080

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]
text
# requirements.txt
elsai-agents
elsai-model
fastapi
uvicorn

Environment variables

Never hard-code secrets. Use environment variables:

bash
# .env (do not commit this file)
ANTHROPIC_API_KEY=sk-ant-...
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_DEFAULT_REGION=us-west-2
python
import os
from dotenv import load_dotenv
from elsai import Agent
from elsai_model.openai import OpenAIConnector

load_dotenv()

agent = Agent(
    model=OpenAIConnector(
        openai_api_key=os.environ["OPENAI_API_KEY"],
        model_name="gpt-4o",
    )
)

Concurrency

Agents are not thread-safe by default. Each request should use its own agent instance:

python
# ✅ Correct: new agent per request
@app.post("/chat")
async def chat(request: ChatRequest):
    agent = Agent(agent_id=request.session_id, session_manager=session_manager)
    result = await agent.invoke_async(request.message)
    return {"response": str(result)}

# ❌ Wrong: shared agent instance across requests
shared_agent = Agent()  # Race condition!

@app.post("/chat")
async def chat(request: ChatRequest):
    result = await shared_agent.invoke_async(request.message)  # Not safe!
    return {"response": str(result)}

Observability

OpenTelemetry

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

# Set up OTLP exporter
provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4318/v1/traces"))
)
trace.set_tracer_provider(provider)

# Agents automatically emit spans
agent = Agent(
    trace_attributes={"service.name": "my-agent-service", "environment": "production"}
)

Logging

python
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(name)s %(levelname)s %(message)s"
)

# Reduce verbosity of internal SDK logs
logging.getLogger("elsai").setLevel(logging.WARNING)

Health checks

python
@app.get("/health")
async def health():
    return {"status": "ok", "sdk": "elsai-agents"}

@app.get("/health/model")
async def health_model():
    try:
        agent = Agent(callback_handler=None)
        result = await agent.invoke_async("Say 'OK'")
        return {"status": "ok", "model_reachable": True}
    except Exception as e:
        return {"status": "error", "detail": str(e)}, 503

Rate limiting

python
from collections import defaultdict
from time import time

class RateLimiter:
    def __init__(self, max_per_minute: int = 10):
        self.max = max_per_minute
        self.calls: dict[str, list[float]] = defaultdict(list)

    def is_allowed(self, user_id: str) -> bool:
        now = time()
        self.calls[user_id] = [t for t in self.calls[user_id] if now - t < 60]
        if len(self.calls[user_id]) >= self.max:
            return False
        self.calls[user_id].append(now)
        return True

rate_limiter = RateLimiter(max_per_minute=10)

@app.post("/chat")
async def chat(request: ChatRequest):
    if not rate_limiter.is_allowed(request.session_id):
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    # ... rest of handler

Copyright © 2026 Elsai Foundry.