Elsai Cloud Connectors#
The Elsai Cloud Connectors package provides utilities to interact with various cloud storage and notification services. It currently supports:
AWS S3
Azure Blob Storage
Microsoft Graph Webhooks
Microsoft OneDrive
Microsoft SharePoint
Elastic Search Connector
Prerequisites#
Python >= 3.9
.env file with appropriate API keys and configuration variables
Installation#
To install the elsai-cloud-connectors package:
pip install --index-url https://elsai-core-package.optisolbusiness.com/root/elsai-cloud-connectors/ elsai-cloud==0.1.0
Components#
1. AwsS3Connector#
AwsS3Connector is a class for interacting with AWS S3 buckets, enabling secure upload, download, and deletion of files.
from elsai_cloud.aws import AwsS3Connector
s3_client = AwsS3Connector(
access_key="your_access_key",
secret_key="your_secret_key",
session_token="your_session_token"
) # Or use environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
# Upload a local file to the specified S3 bucket and key (path in S3)
s3_client.upload_file_to_s3(
bucket_name="your_bucket_name",
s3_key="your_s3_key",
file_path="path/to/your/file.txt"
)
# Download a file from the specified S3 bucket and key to a local path
s3_client.download_file_from_s3(
bucket_name="your_bucket_name",
file_name="your_s3_key",
download_path="path/to/download/file.txt"
)
# Delete a file from the specified S3 bucket
s3_client.delete_file_from_s3(
bucket_name="your_bucket_name",
s3_key="your_s3_key"
)
Required Environment Variables:
AWS_ACCESS_KEY_ID
– Your AWS access key IDAWS_SECRET_ACCESS_KEY
– Your AWS secret access keyAWS_SESSION_TOKEN
– Temporary session token for authentication
2. AzureBlobStorage#
AzureBlobStorage is a class for downloading files from Azure Blob containers.
from elsai_cloud.azureblobstorage import AzureBlobStorage
azure_blob_client = AzureBlobStorage(connection_string="your_connection_string")
azure_blob_client.download_file(
container_name="your_container_name",
blob_name="your_blob_name",
target_folder_path="path/to/download/folder"
)
3. MSGraphWebhooks#
MSGraphWebhooks helps manage webhook subscriptions for Microsoft Graph API resources.
from elsai_cloud.microsoft_webhooks import MSGraphWebhooks
ms_graph = MSGraphWebhooks(
client_id="your_client_id",
tenant_id="your_tenant_id",
client_secret="your_client_secret"
) # Or use environment variables: CLIENT_ID, TENANT_ID, CLIENT_SECRET
# Create a new subscription (webhook) to listen for changes on a resource
ms_graph.create_subscription(
change_type="change_type",
notification_url="https://webhook.example.com/send-notifications",
resource="me/mailFolders('Inbox')/messages",
expiration_date_time="2016-11-20T18:23:45.9356913Z", # datetime format
client_state="custom_state_123"
)
# Retrieve details of a specific subscription by its ID
subscriptions = ms_graph.get_subscription(subscription_id="your_subscription_id")
# List all current active subscriptions for the authenticated app
subs_list = ms_graph.list_subscriptions()
# Update an existing subscription's expiration or notification URL
ms_graph.update_subscription(
subscription_id="subscription_id",
notification_url="https://webhook.example.com/send-notifications",
expiration_date_time="2016-11-20T18:23:45.9356913Z"
)
# Delete a subscription by its ID to stop receiving notifications
ms_graph.delete_subscription(subscription_id="subscription_id")
Required Environment Variables:
TENANT_ID
– Azure Active Directory tenant identifier used for authentication.CLIENT_ID
– Application (client) ID registered in Azure AD.CLIENT_SECRET
– Client secret generated for the Azure AD app to authorize access.
4. OneDriveService#
from elsai_cloud.onedrive import OneDriveService
one_drive_service = OneDriveService(
tenant_id="your_tenant_id",
client_id="your_client_id",
client_secret="your_client_secret"
) # Or set in environment variables
# Get the unique Microsoft user ID associated with the email address
user_id = one_drive_service.get_user_id(email="your_mail_address")
# Upload a local file to a specified folder in the OneDrive of the given user
one_drive_service.upload_file_to_onedrive(
email="your_mail_address",
local_file_path="path/to/your/file.txt",
folder_path="path/to/folder/in/onedrive"
)
# Retrieve a list of files from a specific folder in the user's OneDrive
files = one_drive_service.retrieve_onedrive_files_from_folder(
email="your_mail_address",
folder_path="path/to/folder/in/onedrive"
)
# Download a specific file from the user's OneDrive to a local directory
one_drive_service.download_file_from_onedrive(
email="your_mail_address",
file_id="your_file_id",
target_folder="path/to/download/folder"
)
Environment Variables:
TENANT_ID
– Azure Active Directory tenant identifier used for authentication.CLIENT_ID
– Application (client) ID registered in Azure AD.CLIENT_SECRET
– Client secret generated for the Azure AD app to authorize access.
6. ElasticSearch#
ElasticSearchConnector is used for interacting with an Elasticsearch index using API keys.
from elsai_cloud.elastic_search import ElasticSearchConnector
es_connector = ElasticSearchConnector(
cloud_url="your_cloud_url",
api_key="your_api_key"
) # Or use environment variables: ELASTIC_SEARCH_URL, ELASTIC_SEARCH_API_KEY
# Add a document
doc = {
"text": "This is a sample document to be indexed in Elasticsearch.",
}
es_connector.add_document(index_name="sampleindex", document=doc, doc_id="1")
# Retrieve a document
retrieved_doc = es_connector.get_document(index_name="sampleindex", doc_id="1")
# Search documents
search_doc = es_connector.search_documents(
index_name="sampleindex",
query={"match": {"text": "sample"}}
)
Environment Variables:
ELASTIC_SEARCH_URL
– URL of the Elasticsearch cloud instanceELASTIC_SEARCH_API_KEY
– API key for authenticating with Elasticsearch
5. MSGraphDeltaService#
MSGraphDeltaService provides an asynchronous, enterprise‑grade interface to the Microsoft Graph API’s Delta Query feature. It allows you to efficiently track incremental changes (new, updated, or deleted messages) in a user’s mailbox without repeatedly fetching the entire dataset. This helps build scalable, real‑time sync solutions with minimal API calls and optimized performance.
from elsai_cloud.microsoft_delta import MSGraphDeltaService, DeltaConfig, DeltaStrategy
from datetime import datetime, timezone, timedelta
import asyncio
async def example_usage():
# Custom configuration
config = DeltaConfig(
timeout=45.0,
max_retries=3,
page_size=50,
establishment_strategies=[
DeltaStrategy.CURRENT_TIME,
DeltaStrategy.FUTURE_FILTER,
DeltaStrategy.SKIP_TOKEN,
DeltaStrategy.STANDARD
]
)
# Initialize the service
delta_service = MSGraphDeltaService(
config=config, # Optional, defaults will be used if not provided
client_id="your_client_id",
client_secret="your_client_secret",
tenant_id="your_tenant_id"
)
user_id = "user@example.com"
folder = "inbox"
# Establish a new delta link
delta_link = await delta_service.establish_delta_link(user_id, folder)
print(f"Delta link: {delta_link}")
# Validate delta link
valid = await delta_service.validate_delta_link(delta_link)
print(f"Delta link valid: {valid}")
# Fetch initial delta (without date filter)
result = await delta_service.get_initial_delta(user_id, folder)
print(f"Messages: {result.total_count}, Delta link: {result.delta_link}")
# Fetch initial delta (with date filter)
start_date = datetime.now(timezone.utc) - timedelta(days=7)
filtered_result = await delta_service.get_initial_delta(user_id, folder, start_date)
print(f"Filtered messages: {filtered_result.total_count}")
# Fetch incremental changes
if result.delta_link:
changes = await delta_service.get_delta_changes(result.delta_link)
print(f"Changes retrieved: {changes.total_count}")
# Fallback in case delta link expired
safe_changes = await delta_service.get_delta_changes_with_fallback(
delta_link=result.delta_link,
user_id=user_id,
folder_name=folder
)
print(f"Fallback changes: {safe_changes.total_count}")
# Get delta statistics
stats = await delta_service.get_delta_statistics(result.delta_link)
print(f"Delta statistics: {stats}")
# Update configuration dynamically
new_config = DeltaConfig(
timeout=60.0,
max_retries=5,
page_size=100,
establishment_strategies=[DeltaStrategy.CURRENT_TIME, DeltaStrategy.STANDARD]
)
delta_service.update_config(new_config)
print(f"Updated config: {delta_service.get_config().timeout}")
asyncio.run(example_usage())
Environment Variables:
TENANT_ID
– Azure Active Directory tenant identifier.CLIENT_ID
– Azure AD application (client) ID.CLIENT_SECRET
– Client secret for the Azure AD app.
Key Methods:
establish_delta_link(user_id, folder_name="inbox")
– Establish a new delta link using multiple strategies.validate_delta_link(delta_link)
– Validate if an existing delta link is still valid.get_initial_delta(user_id, folder_name="inbox", start_date=None)
– Fetch initial messages and delta link.get_delta_changes(delta_link)
– Get incremental changes since the last delta link.get_delta_changes_with_fallback(delta_link, user_id, folder_name="inbox")
– Fetch changes and auto re‑establish delta link if expired.get_delta_statistics(delta_link)
– Retrieve quick statistics for a delta link.get_config()
/update_config(config)
– Retrieve or update runtime configuration.