Elsai Chat History#
A flexible and extensible Python package for managing chat conversation histories with multiple storage backends and memory management strategies.
Overview#
The Elsai Chat History provides a robust solution for storing, retrieving, and managing conversational data in chat applications. It offers:
Pluggable storage backends (JSON, SQLite, PostgreSQL, in-memory)
Intelligent memory management strategies (summarization, trimming, LRU, TTL, similarity search)
Vector database integration (Pinecone, ChromaDB, Weaviate) for semantic search
Async support
Multi-session tracking
LLM-friendly interfaces
Advanced metadata filtering and message management
Key Features#
Multiple Storage Backends: JSON, SQLite, PostgreSQL, in-memory
Memory Management Strategies: Summarization, trimming, LRU (Least Recently Used), TTL (Time To Live), and similarity search
Vector Database Integration: Automatic embedding generation and storage with Pinecone, ChromaDB, and Weaviate
Async Support: Fully
async/awaitcompatibleFlexible Architecture: Abstract base classes for easy extension
Session Management: Track multiple user sessions with stats
Advanced Message Operations: Update, delete, and retrieve individual messages
Metadata Filtering: Filter messages by metadata with single values or list-based queries
LLM Integration: Works with LangChain and other LLM tools
Caching: In-memory caching with configurable auto-save
Prerequisites#
System Requirements
Python >= 3.11
Installation#
To install the elsai-chat-history package:
pip install --extra-index-url https://elsai-core-package.optisolbusiness.com/root/elsai-chat-history/ elsai-chat-history==1.0.0
Components & Usage#
1. Core Manager:#
Basic Usage#
import asyncio
from elsai_chat_history.manager.chat_manager import ChatHistoryManager
from elsai_chat_history.stores.json_store import JSONStore
async def basic_usage():
store = JSONStore(storage_dir="./chat_data")
manager = ChatHistoryManager(store=store, auto_save=True)
session_id = "user_123_session"
await manager.add_message(session_id, role="user", content="Hello, how are you?")
await manager.add_message(session_id, role="assistant", content="I'm good! How can I help?")
history = await manager.get_history(session_id)
print(f"Conversation has {len(history)} messages")
context = await manager.get_context(session_id)
print("Context for LLM:", context)
asyncio.run(basic_usage())
Advanced Usage#
from elsai_chat_history.strategies.trimming import TrimmingStrategy
async def advanced_usage():
store = JSONStore(storage_dir="./chat_data")
trimming = TrimmingStrategy(max_messages=50, max_tokens=4000, preserve_recent=5)
manager = ChatHistoryManager(store=store, strategy=trimming, auto_save=True)
await manager.add_message(
session_id="session_123",
role="user",
content="What's the weather like?",
metadata={"timestamp": "2024-01-15", "user_id": "user_456"}
)
stats = await manager.get_session_stats("session_123")
print("Session stats:", stats)
asyncio.run(advanced_usage())
Message Management#
Update Messages#
Update existing messages by ID:
async def update_example():
manager = ChatHistoryManager(store=json_store)
# Update message content
updated = await manager.update_message(
session_id="session_123",
message_id="msg_456",
content="Updated message content"
)
# Update role and metadata
updated = await manager.update_message(
session_id="session_123",
message_id="msg_456",
role="assistant",
metadata={"updated": True, "version": 2}
)
asyncio.run(update_example())
Delete Messages#
Delete specific messages by ID:
async def delete_example():
manager = ChatHistoryManager(store=json_store)
deleted = await manager.delete_message(
session_id="session_123",
message_id="msg_456"
)
if deleted:
print("Message deleted successfully")
asyncio.run(delete_example())
Get Individual Messages#
Retrieve a specific message by ID:
async def get_message_example():
manager = ChatHistoryManager(store=json_store)
message = await manager.get_message(
session_id="session_123",
message_id="msg_456"
)
if message:
print(f"Message: {message.content}")
print(f"Role: {message.role}")
print(f"Metadata: {message.metadata}")
asyncio.run(get_message_example())
Get Context with Filtering#
Get formatted context with role filtering and message limits:
async def context_example():
manager = ChatHistoryManager(store=json_store)
# Get last 10 messages
context = await manager.get_context(
session_id="session_123",
max_messages=10
)
# Get only user and assistant messages
context = await manager.get_context(
session_id="session_123",
roles=["user", "assistant"]
)
asyncio.run(context_example())
Force Reload from Storage#
Force reload history from storage, bypassing cache:
async def reload_example():
manager = ChatHistoryManager(store=json_store)
# Force reload from storage
history = await manager.get_history(
session_id="session_123",
force_reload=True
)
asyncio.run(reload_example())
2. Storage Backends#
JSON Store#
from elsai_chat_history.stores.json_store import JSONStore
json_store = JSONStore(storage_dir="./conversations")
manager = ChatHistoryManager(store=json_store)
SQLite Store#
from elsai_chat_history.stores.sqlite_store import SQLiteStore
sqlite_store = SQLiteStore(db_path="./chat_history.db")
manager = ChatHistoryManager(store=sqlite_store)
PostgreSQL Store#
from elsai_chat_history.stores.postgres_store import PostgresStore
async def postgres_example():
postgres_store = PostgresStore(
connection_string="postgresql://user:password@localhost/chatdb"
)
manager = ChatHistoryManager(store=postgres_store)
await manager.add_message("session_1", "user", "Hello database!")
await postgres_store.close()
asyncio.run(postgres_example())
Memory Store#
from elsai_chat_history.stores.memory_store import MemoryStore
memory_store = MemoryStore()
manager = ChatHistoryManager(store=memory_store)
Note
Data is lost once the program ends.
3. Memory Management Strategies#
Trimming Strategy#
Limits messages based on count or token limits:
from elsai_chat_history.strategies.trimming import TrimmingStrategy
trimming = TrimmingStrategy(
max_messages=30, # Maximum number of messages to keep
max_tokens=3000, # Maximum tokens (optional, uses TokenCounter)
preserve_system=True, # Always preserve system messages
preserve_recent=3 # Always preserve the N most recent messages
)
manager = ChatHistoryManager(store=json_store, strategy=trimming)
# Get trimmed messages with metadata filtering
trimmed = await manager.get_trimmed_messages(
"session_123",
metadata_filter={"user_id": "user_456"}
)
Summarization Strategy#
Incrementally summarizes old messages using an LLM to save space:
from elsai_chat_history.strategies.summarization import SummarizationStrategy
summarization = SummarizationStrategy(
summarizer_llm=llm_model, # elsai-model instance
trigger_count=25, # Trigger summarization after 25 messages
preserve_system=True # Always preserve system messages
)
manager = ChatHistoryManager(store=json_store, strategy=summarization)
Note
When messages are updated or deleted, summaries containing those messages are automatically updated via LLM to maintain consistency.
LRU Strategy#
Evicts least recently used messages based on access patterns:
from elsai_chat_history.strategies.lru import LRUStrategy
lru = LRUStrategy(
max_messages=30, # Maximum number of messages to keep
preserve_system=True, # Always preserve system messages
preserve_recent=5 # Always preserve the 5 most recent messages
)
manager = ChatHistoryManager(store=json_store, strategy=lru)
Note
Access times are automatically tracked when messages are retrieved. Messages without access times are treated as least recently used.
TTL Strategy#
Evicts messages older than a specified time-to-live:
from elsai_chat_history.strategies.ttl import TTLStrategy
ttl = TTLStrategy(
ttl_seconds=3600, # Messages expire after 1 hour
preserve_system=True, # Always preserve system messages
preserve_recent=5, # Always preserve the 5 most recent messages
use_last_accessed=False # Use message timestamp (True = use last_accessed time)
)
manager = ChatHistoryManager(store=json_store, strategy=ttl)
Note
TTL metadata is persisted in message metadata, allowing TTL to work across application restarts. When use_last_accessed=True, the TTL timer resets each time a message is accessed.
Similarity Search Strategy#
Retrieves semantically similar messages using vector databases:
from elsai_chat_history.strategies.similarity_search import SimilaritySearchStrategy
# Configuration for ChromaDB
similarity_config = {
"vector_database": {
"name": "chroma",
"client": chroma_client, # ChromaVectorDb instance
"collection_name": "chat_history" # Required for ChromaDB
},
"embedding_model": {
"name": "azure-ada",
"client": embedding_model # Required for ChromaDB
},
"top_k": 5 # Number of similar messages to retrieve
}
# Configuration for Pinecone
similarity_config = {
"vector_database": {
"name": "pinecone",
"client": pinecone_client, # PineconeVectorDb instance
"namespace": "chat_history" # Required for Pinecone
},
"embedding_model": {
"name": "azure-ada",
"client": embedding_model # Required for Pinecone
},
"top_k": 5
}
# Configuration for Weaviate
similarity_config = {
"vector_database": {
"name": "weaviate",
"client": weaviate_client, # WeaviateVectorDb instance
"collection_name": "ChatMessage", # Optional for Weaviate
"use_default_vectorizer": False, # Use Weaviate's default vectorizer
"distance": 0.75 # Optional distance threshold
},
"embedding_model": {
"name": "azure-ada",
"client": embedding_model # Required unless use_default_vectorizer=True
},
"top_k": 5
}
similarity = SimilaritySearchStrategy(similarity_config)
manager = ChatHistoryManager(store=json_store, strategy=similarity)
Note
Messages are automatically embedded and stored in the vector database when added. The embedding model must support the embed_query() method.
4. Vector Database Integration#
The ChatHistoryManager automatically integrates with vector databases when a strategy that uses vector databases (like SimilaritySearchStrategy) is configured. Messages are automatically embedded and stored in the vector database.
Automatic Operations#
When a vector database is configured, the following operations happen automatically:
On Message Add: Message content is embedded and stored in the vector database
On Message Update: Embedding is regenerated and the vector database entry is updated
On Message Delete: The vector database entry is deleted
Vector Database Setup#
Pinecone Setup#
from elsai_vectordb import PineconeVectorDb
from elsai_embeddings import AzureOpenAIEmbeddings
# Initialize Pinecone client
pinecone_client = PineconeVectorDb(
api_key="your-api-key",
environment="us-west1-gcp",
index_name="chat-history"
)
# Initialize embedding model
embedding_model = AzureOpenAIEmbeddings(
api_key="your-key",
azure_endpoint="https://your-endpoint.openai.azure.com/",
api_version="2024-02-15-preview",
deployment="text-embedding-ada-002"
)
# Configure SimilaritySearchStrategy
similarity_config = {
"vector_database": {
"name": "pinecone",
"client": pinecone_client,
"namespace": "session_namespace" # Optional, defaults to session_id
},
"embedding_model": {
"name": "azure-ada",
"client": embedding_model
},
"top_k": 5
}
ChromaDB Setup#
from elsai_vectordb import ChromaVectorDb
from elsai_embeddings import AzureOpenAIEmbeddings
# Initialize ChromaDB client
chroma_client = ChromaVectorDb(
persist_directory="./chroma_db"
)
# Initialize embedding model
embedding_model = AzureOpenAIEmbeddings(
api_key="your-key",
azure_endpoint="https://your-endpoint.openai.azure.com/",
api_version="2024-02-15-preview",
deployment="text-embedding-ada-002"
)
# Configure SimilaritySearchStrategy
similarity_config = {
"vector_database": {
"name": "chroma",
"client": chroma_client,
"collection_name": "chat_history" # Optional, defaults to session_id
},
"embedding_model": {
"name": "azure-ada",
"client": embedding_model
},
"top_k": 5
}
Weaviate Setup#
from elsai_vectordb import WeaviateVectorDb
from elsai_embeddings import AzureOpenAIEmbeddings
# Initialize Weaviate client
weaviate_client = WeaviateVectorDb(
url="http://localhost:8080",
api_key="your-api-key"
)
# Initialize embedding model (optional if using default vectorizer)
embedding_model = AzureOpenAIEmbeddings(
api_key="your-key",
azure_endpoint="https://your-endpoint.openai.azure.com/",
api_version="2024-02-15-preview",
deployment="text-embedding-ada-002"
)
# Configure SimilaritySearchStrategy with custom embedding
similarity_config = {
"vector_database": {
"name": "weaviate",
"client": weaviate_client,
"collection_name": "ChatMessage", # Optional
"use_default_vectorizer": False, # Use custom embedding model
"distance": 0.75 # Optional distance threshold
},
"embedding_model": {
"name": "azure-ada",
"client": embedding_model
},
"top_k": 5
}
# Or use Weaviate's default vectorizer
similarity_config = {
"vector_database": {
"name": "weaviate",
"client": weaviate_client,
"use_default_vectorizer": True # Use Weaviate's built-in vectorizer
},
"embedding_model": {
"name": "weaviate-default",
"client": None # Not needed with default vectorizer
},
"top_k": 5
}
Embedding Model Requirements#
The embedding model must implement the embed_query(text: str) -> List[float] method that returns a vector embedding for the input text. Supported embedding models include:
Azure OpenAI Embeddings (from
elsai-embeddings)OpenAI Embeddings (from
elsai-embeddings)Any LangChain-compatible embedding model
Metadata in Vector Database#
Message metadata is automatically stored in the vector database, including:
session_id: Session identifierrole: Message role (user, assistant, system)content: Message contenttimestamp: Message timestamp (RFC3339 format)message_id: Unique message identifierCustom metadata fields (e.g.,
user_id,file_id)
This metadata can be used for filtering during similarity search.
5. Session Management#
# List sessions
sessions = await manager.list_sessions()
print("Active sessions:", sessions)
# Clear a session
await manager.clear_session("session_to_delete")
# Manually save session
await manager.save_session("session_123")
# Get session statistics
stats = await manager.get_session_stats("session_123")
print(f"Total messages: {stats['total_messages']}")
print(f"Role distribution: {stats['role_distribution']}")
6. Advanced Features#
Metadata Filtering#
Filter messages by metadata using single values or list-based queries:
Single Value Filtering#
# Filter by single metadata value
trimmed = await manager.get_trimmed_messages(
"session_123",
metadata_filter={"user_id": "user_456"}
)
# Filter by multiple metadata fields (AND condition)
trimmed = await manager.get_trimmed_messages(
"session_123",
metadata_filter={
"user_id": "user_456",
"status": "active"
}
)
List-Based Filtering ($in style)#
Filter messages where metadata value matches any value in a list:
# Filter where file_id is in the list
trimmed = await manager.get_trimmed_messages(
"session_123",
metadata_filter={"file_id": ["file_1", "file_2", "file_3"]}
)
# Combine single values and lists
trimmed = await manager.get_trimmed_messages(
"session_123",
metadata_filter={
"user_id": "user_456", # Exact match
"file_id": ["file_1", "file_2"] # Match any in list
}
)
Metadata filtering is supported by all strategy-specific retrieval methods:
get_trimmed_messages()get_lru_messages()get_ttl_messages()get_similar_messages()
Message Metadata Tracking#
The manager automatically tracks metadata for messages:
Last Accessed Timestamp#
The last_accessed timestamp is automatically updated when messages are retrieved:
# Accessing a message updates its last_accessed timestamp
message = await manager.get_message("session_123", "msg_456")
print(message.metadata.get("last_accessed")) # ISO format timestamp
# get_history() also updates access times for all messages
history = await manager.get_history("session_123")
for msg in history:
print(msg.metadata.get("last_accessed"))
This is used by LRUStrategy to determine which messages to evict.
TTL Metadata#
When using TTLStrategy, TTL metadata is automatically stored in message metadata:
ttl = TTLStrategy(ttl_seconds=3600, use_last_accessed=False)
manager = ChatHistoryManager(store=json_store, strategy=ttl)
# After adding a message, TTL metadata is stored
await manager.add_message("session_123", "user", "Hello")
message = await manager.get_message("session_123", message_id)
ttl_metadata = message.metadata.get("ttl")
# {
# "ttl_seconds": 3600,
# "ttl_expires_at": "2024-01-15T10:00:00",
# "use_last_accessed": False
# }
This allows TTL to persist across application restarts.
Summary Management#
When using SummarizationStrategy, summaries are stored separately from regular messages:
Automatic Summary Creation#
Summaries are automatically created when the message count exceeds trigger_count:
summarization = SummarizationStrategy(
summarizer_llm=llm,
trigger_count=20,
preserve_system=True
)
manager = ChatHistoryManager(store=json_store, strategy=summarization)
# After adding 20+ messages, summaries are automatically created
for i in range(25):
await manager.add_message("session_123", "user", f"Message {i}")
# Get summaries and remaining messages
summary_history = await manager.get_summary("session_123")
Summary Updates on Message Changes#
When messages are updated or deleted, summaries containing those messages are automatically updated via LLM:
# Update a message that's in a summary
await manager.update_message(
"session_123",
"msg_456",
content="Updated content"
)
# Summary is automatically re-generated via LLM
# Delete a message that's in a summary
await manager.delete_message("session_123", "msg_456")
# Summary is automatically updated to remove references to deleted message
Caching and Auto-Save#
In-Memory Caching#
The manager maintains an in-memory cache for performance:
# First access loads from storage
history1 = await manager.get_history("session_123")
# Subsequent accesses use cache
history2 = await manager.get_history("session_123") # Uses cache
# Force reload from storage
history3 = await manager.get_history("session_123", force_reload=True)
Auto-Save Configuration#
Control automatic persistence:
# Auto-save enabled (default)
manager = ChatHistoryManager(store=json_store, auto_save=True)
# Messages are automatically saved after each operation
# Auto-save disabled
manager = ChatHistoryManager(store=json_store, auto_save=False)
# Must manually save
await manager.save_session("session_123")
Note
When auto_save=True, access times and TTL metadata are automatically persisted. When disabled, you must call save_session() to persist changes.
7. Complete Examples#
Complete Workflow Example#
A complete example showing message management with trimming:
import asyncio
from elsai_chat_history.manager.chat_manager import ChatHistoryManager
from elsai_chat_history.stores.json_store import JSONStore
from elsai_chat_history.strategies.trimming import TrimmingStrategy
async def complete_workflow():
# Setup
store = JSONStore(storage_dir="./chat_data")
trimming = TrimmingStrategy(max_messages=10, preserve_recent=3)
manager = ChatHistoryManager(store=store, strategy=trimming, auto_save=True)
session_id = "user_123_session"
# Add messages with metadata
msg1 = await manager.add_message(
session_id,
role="user",
content="Hello!",
metadata={"user_id": "user_123", "file_id": "file_1"}
)
msg2 = await manager.add_message(
session_id,
role="assistant",
content="Hi there! How can I help?",
metadata={"user_id": "user_123"}
)
# Update a message
updated = await manager.update_message(
session_id,
msg1.message_id,
content="Hello, updated!"
)
# Get full history
history = await manager.get_history(session_id)
print(f"Total messages: {len(history)}")
# Get trimmed messages with metadata filter
trimmed = await manager.get_trimmed_messages(
session_id,
metadata_filter={"user_id": "user_123"}
)
# Get context for LLM
context = await manager.get_context(session_id, max_messages=5)
# Get session stats
stats = await manager.get_session_stats(session_id)
print(f"Stats: {stats}")
asyncio.run(complete_workflow())
Vector Database Integration Example#
Complete example with similarity search:
import asyncio
from elsai_chat_history.manager.chat_manager import ChatHistoryManager
from elsai_chat_history.stores.postgres_store import PostgresStore
from elsai_chat_history.strategies.similarity_search import SimilaritySearchStrategy
from elsai_vectordb import ChromaVectorDb
from elsai_embeddings import AzureOpenAIEmbeddings
async def vector_db_example():
# Setup storage
store = PostgresStore(
connection_string="postgresql://user:password@localhost/chatdb"
)
# Setup vector database
chroma_client = ChromaVectorDb(persist_directory="./chroma_db")
# Setup embedding model
embedding_model = AzureOpenAIEmbeddings(
api_key="your-key",
azure_endpoint="https://your-endpoint.openai.azure.com/",
api_version="2024-02-15-preview",
deployment="text-embedding-ada-002"
)
# Configure similarity search
similarity_config = {
"vector_database": {
"name": "chroma",
"client": chroma_client,
"collection_name": "chat_history"
},
"embedding_model": {
"name": "azure-ada",
"client": embedding_model
},
"top_k": 5
}
similarity = SimilaritySearchStrategy(similarity_config)
manager = ChatHistoryManager(store=store, strategy=similarity)
session_id = "session_123"
# Add messages (automatically embedded and stored in vector DB)
await manager.add_message(
session_id,
role="user",
content="What's the weather like today?",
metadata={"user_id": "user_456"}
)
await manager.add_message(
session_id,
role="assistant",
content="It's sunny and 75 degrees.",
metadata={"user_id": "user_456"}
)
# Search for similar messages
similar = await manager.get_similar_messages(
user_query="weather forecast",
session_id=session_id
)
print(f"Found {len(similar)} similar messages")
# With metadata filtering
similar = await manager.get_similar_messages(
user_query="weather",
session_id=session_id,
metadata_filter={"user_id": "user_456"}
)
await store.close()
asyncio.run(vector_db_example())
Summarization with LLM Updates Example#
Example showing automatic summary updates:
import asyncio
from elsai_chat_history.manager.chat_manager import ChatHistoryManager
from elsai_chat_history.stores.json_store import JSONStore
from elsai_chat_history.strategies.summarization import SummarizationStrategy
from langchain_openai import ChatOpenAI
async def summarization_example():
store = JSONStore(storage_dir="./chat_data")
llm = ChatOpenAI(model="gpt-3.5-turbo")
summarization = SummarizationStrategy(
summarizer_llm=llm,
trigger_count=10,
preserve_system=True
)
manager = ChatHistoryManager(store=store, strategy=summarization)
session_id = "session_123"
# Add many messages (triggers summarization after 10)
for i in range(15):
await manager.add_message(
session_id,
role="user",
content=f"Message {i}"
)
# Get summarized history
summary_history = await manager.get_summary(session_id)
print(f"Summary history has {len(summary_history)} items")
# Update a message that's in a summary
# Summary is automatically updated via LLM
await manager.update_message(
session_id,
message_id="msg_5",
content="Updated message content"
)
# Delete a message that's in a summary
# Summary is automatically updated to remove references
await manager.delete_message(session_id, "msg_6")
asyncio.run(summarization_example())
Metadata Filtering Example#
Example showing various metadata filtering patterns:
import asyncio
from elsai_chat_history.manager.chat_manager import ChatHistoryManager
from elsai_chat_history.stores.json_store import JSONStore
from elsai_chat_history.strategies.lru import LRUStrategy
async def metadata_filtering_example():
store = JSONStore(storage_dir="./chat_data")
lru = LRUStrategy(max_messages=20, preserve_recent=5)
manager = ChatHistoryManager(store=store, strategy=lru)
session_id = "session_123"
# Add messages with different metadata
await manager.add_message(
session_id,
role="user",
content="Message 1",
metadata={"user_id": "user_1", "file_id": "file_a"}
)
await manager.add_message(
session_id,
role="user",
content="Message 2",
metadata={"user_id": "user_1", "file_id": "file_b"}
)
await manager.add_message(
session_id,
role="user",
content="Message 3",
metadata={"user_id": "user_2", "file_id": "file_a"}
)
# Filter by single value
filtered = await manager.get_lru_messages(
session_id,
metadata_filter={"user_id": "user_1"}
)
print(f"Messages for user_1: {len(filtered)}")
# Filter by list (match any in list)
filtered = await manager.get_lru_messages(
session_id,
metadata_filter={"file_id": ["file_a", "file_b"]}
)
print(f"Messages for file_a or file_b: {len(filtered)}")
# Combine filters (AND condition)
filtered = await manager.get_lru_messages(
session_id,
metadata_filter={
"user_id": "user_1",
"file_id": ["file_a", "file_b"]
}
)
print(f"Messages matching both filters: {len(filtered)}")
asyncio.run(metadata_filtering_example())