Elsai Cloud Connectors v1.0.0#
The Elsai Cloud Connectors package provides utilities to interact with various cloud storage and notification services with enhanced capabilities for v1.0.0:
AWS S3
Azure Blob Storage
Microsoft Graph Webhooks
Microsoft OneDrive
Microsoft SharePoint
Microsoft Delta API (New)
Microsoft Outlook (New)
Elastic Search Connector
Prerequisites#
Python >= 3.9
.env file with appropriate API keys and configuration variables
Installation#
To install the elsai-cloud-connectors package:
pip install --index-url https://elsai-core-package.optisolbusiness.com/root/elsai-cloud-connectors/ elsai-cloud==1.0.0
Components#
1. AwsS3Connector#
AwsS3Connector is a class for interacting with AWS S3 buckets, enabling secure upload, download, and deletion of files.
from elsai_cloud.aws import AwsS3Connector
s3_client = AwsS3Connector(
access_key="your_access_key",
secret_key="your_secret_key",
session_token="your_session_token"
) # Or use environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
# Upload a local file to the specified S3 bucket and key (path in S3)
s3_client.upload_file_to_s3(
bucket_name="your_bucket_name",
s3_key="your_s3_key",
file_path="path/to/your/file.txt"
)
# Download a file from the specified S3 bucket and key to a local path
s3_client.download_file_from_s3(
bucket_name="your_bucket_name",
file_name="your_s3_key",
download_path="path/to/download/file.txt"
)
# Delete a file from the specified S3 bucket
s3_client.delete_file_from_s3(
bucket_name="your_bucket_name",
s3_key="your_s3_key"
)
Required Environment Variables:
AWS_ACCESS_KEY_ID
– Your AWS access key IDAWS_SECRET_ACCESS_KEY
– Your AWS secret access keyAWS_SESSION_TOKEN
– Temporary session token for authentication
2. AzureBlobStorage#
AzureBlobStorage is a class for downloading files from Azure Blob containers.
from elsai_cloud.azureblobstorage import AzureBlobStorage
azure_blob_client = AzureBlobStorage(connection_string="your_connection_string")
azure_blob_client.download_file(
container_name="your_container_name",
blob_name="your_blob_name",
target_folder_path="path/to/download/folder"
)
3. MSGraphWebhooks#
MSGraphWebhooks helps manage webhook subscriptions for Microsoft Graph API resources.
from elsai_cloud.microsoft_webhooks import MSGraphWebhooks
ms_graph = MSGraphWebhooks(
client_id="your_client_id",
tenant_id="your_tenant_id",
client_secret="your_client_secret"
) # Or use environment variables: CLIENT_ID, TENANT_ID, CLIENT_SECRET
# Create a new subscription (webhook) to listen for changes on a resource
ms_graph.create_subscription(
change_type="change_type",
notification_url="https://webhook.example.com/send-notifications",
resource="me/mailFolders('Inbox')/messages",
expiration_date_time="2016-11-20T18:23:45.9356913Z", # datetime format
client_state="custom_state_123"
)
# Retrieve details of a specific subscription by its ID
subscriptions = ms_graph.get_subscription(subscription_id="your_subscription_id")
# List all current active subscriptions for the authenticated app
subs_list = ms_graph.list_subscriptions()
# Update an existing subscription's expiration or notification URL
ms_graph.update_subscription(
subscription_id="subscription_id",
notification_url="https://webhook.example.com/send-notifications",
expiration_date_time="2016-11-20T18:23:45.9356913Z"
)
# Delete a subscription by its ID to stop receiving notifications
ms_graph.delete_subscription(subscription_id="subscription_id")
Required Environment Variables:
TENANT_ID
– Azure Active Directory tenant identifier used for authentication.CLIENT_ID
– Application (client) ID registered in Azure AD.CLIENT_SECRET
– Client secret generated for the Azure AD app to authorize access.
4. OneDriveService#
from elsai_cloud.onedrive import OneDriveService
one_drive_service = OneDriveService(
tenant_id="your_tenant_id",
client_id="your_client_id",
client_secret="your_client_secret"
) # Or set in environment variables
# Get the unique Microsoft user ID associated with the email address
user_id = one_drive_service.get_user_id(email="your_mail_address")
# Upload a local file to a specified folder in the OneDrive of the given user
one_drive_service.upload_file_to_onedrive(
email="your_mail_address",
local_file_path="path/to/your/file.txt",
folder_path="path/to/folder/in/onedrive"
)
# Retrieve a list of files from a specific folder in the user's OneDrive
files = one_drive_service.retrieve_onedrive_files_from_folder(
email="your_mail_address",
folder_path="path/to/folder/in/onedrive"
)
# Download a specific file from the user's OneDrive to a local directory
one_drive_service.download_file_from_onedrive(
email="your_mail_address",
file_id="your_file_id",
target_folder="path/to/download/folder"
)
Environment Variables:
TENANT_ID
– Azure Active Directory tenant identifier used for authentication.CLIENT_ID
– Application (client) ID registered in Azure AD.CLIENT_SECRET
– Client secret generated for the Azure AD app to authorize access.
6. ElasticSearch#
ElasticSearchConnector is used for interacting with an Elasticsearch index using API keys.
from elsai_cloud.elastic_search import ElasticSearchConnector
es_connector = ElasticSearchConnector(
cloud_url="your_cloud_url",
api_key="your_api_key"
) # Or use environment variables: ELASTIC_SEARCH_URL, ELASTIC_SEARCH_API_KEY
# Add a document
doc = {
"text": "This is a sample document to be indexed in Elasticsearch.",
}
es_connector.add_document(index_name="sampleindex", document=doc, doc_id="1")
# Retrieve a document
retrieved_doc = es_connector.get_document(index_name="sampleindex", doc_id="1")
# Search documents
search_doc = es_connector.search_documents(
index_name="sampleindex",
query={"match": {"text": "sample"}}
)
Environment Variables:
ELASTIC_SEARCH_URL
– URL of the Elasticsearch cloud instanceELASTIC_SEARCH_API_KEY
– API key for authenticating with Elasticsearch
5. MSGraphDeltaService#
MSGraphDeltaService provides an asynchronous, enterprise‑grade interface to the Microsoft Graph API’s Delta Query feature. It allows you to efficiently track incremental changes (new, updated, or deleted messages) in a user’s mailbox without repeatedly fetching the entire dataset. This helps build scalable, real‑time sync solutions with minimal API calls and optimized performance.
from elsai_cloud.microsoft_delta import MSGraphDeltaService, DeltaConfig, DeltaStrategy
from datetime import datetime, timezone, timedelta
import asyncio
async def delta_service_usage():
"""Example usage of MSGraphDeltaService for email synchronization."""
# Create custom configuration
config = DeltaConfig(
timeout=45.0,
max_retries=3,
page_size=50,
establishment_strategies=[
DeltaStrategy.CURRENT_TIME,
DeltaStrategy.FUTURE_FILTER,
DeltaStrategy.SKIP_TOKEN,
DeltaStrategy.STANDARD
]
)
# Initialize the service
service = MSGraphDeltaService(
config=config,
client_id="your_client_id",
client_secret="your_client_secret",
tenant_id="your_tenant_id"
)
user_id = "user@example.com"
folder = "inbox"
# Establish a new delta link
delta_link = await service.establish_delta_link(user_id, folder)
print(f"Delta link established: {delta_link[:100]}...")
# Validate delta link
is_valid = await service.validate_delta_link(delta_link)
print(f"Delta link valid: {is_valid}")
# Fetch initial delta (without date filter)
initial_result = await service.get_initial_delta(user_id, folder)
print(f"Initial messages: {initial_result.total_count}")
print(f"Has delta link: {bool(initial_result.delta_link)}")
# Fetch initial delta (with date filter)
start_date = datetime.now(timezone.utc) - timedelta(days=7)
filtered_result = await service.get_initial_delta(user_id, folder, start_date)
print(f"Filtered messages (last 7 days): {filtered_result.total_count}")
# Fetch incremental changes
if initial_result.delta_link:
changes_result = await service.get_delta_changes(initial_result.delta_link)
print(f"Changes detected: {changes_result.total_count}")
# Get delta statistics
if initial_result.delta_link:
stats = await service.get_delta_statistics(initial_result.delta_link)
print(f"Delta statistics: {stats}")
# Update configuration dynamically
new_config = DeltaConfig(
timeout=60.0,
max_retries=5,
page_size=100,
establishment_strategies=[DeltaStrategy.CURRENT_TIME, DeltaStrategy.STANDARD]
)
service.update_config(new_config)
print(f"Updated timeout: {service.get_config().timeout}s")
# Run the example
asyncio.run(delta_service_usage())
Environment Variables:
TENANT_ID
– Azure Active Directory tenant identifier.CLIENT_ID
– Azure AD application (client) ID.CLIENT_SECRET
– Client secret for the Azure AD app.
Key Methods:
establish_delta_link(user_id, folder_name="inbox")
– Establish a new delta link using multiple strategies.validate_delta_link(delta_link)
– Validate if an existing delta link is still valid.get_initial_delta(user_id, folder_name="inbox", start_date=None)
– Fetch initial messages and delta link.get_delta_changes(delta_link)
– Get incremental changes since the last delta link.get_delta_changes_with_fallback(delta_link, user_id, folder_name="inbox")
– Fetch changes and auto re‑establish delta link if expired.get_delta_statistics(delta_link)
– Retrieve quick statistics for a delta link.get_config()
/update_config(config)
– Retrieve or update runtime configuration.
7. Microsoft Outlook Integration (New Feature)#
OutlookService provides comprehensive email management capabilities through Microsoft Graph API integration.
from elsai_cloud.outlook import OutlookService
def example_send_simple_email():
"""Example of sending a simple email without attachments."""
# Initialize the service with your Azure AD credentials
outlook_service = OutlookService(
tenant_id="your_tenant_id",
client_id="your_client_id",
client_secret="your_client_secret"
)
# Send a simple email
result = outlook_service.send_email(
sender_email="sender@example.com",
to_recipients=["recipient@example.com"],
subject="Test Email from Outlook Service",
body="This is a test email sent using the Outlook service.",
cc_recipients=["cc@example.com"],
is_html=False
)
print("Email send result:", result)
def example_send_email_with_attachments():
"""Example of sending an email with attachments."""
outlook_service = OutlookService(
tenant_id="your_tenant_id",
client_id="your_client_id",
client_secret="your_client_secret"
)
# Send email with attachments
result = outlook_service.send_email(
sender_email="sender@example.com",
to_recipients=["recipient@example.com"],
subject="Email with Attachments",
body="Please find the attached files.",
attachments=["/path/to/your/file.pdf"],
is_html=False
)
print("Email with attachments result:", result)
def example_send_html_email():
"""Example of sending an HTML formatted email."""
outlook_service = OutlookService(
tenant_id="your_tenant_id",
client_id="your_client_id",
client_secret="your_client_secret"
)
html_body = """
<html>
<body>
<h1>Welcome to Our Service!</h1>
<p>This is an <strong>HTML formatted</strong> email.</p>
<p>You can include:</p>
<ul>
<li>Bold text</li>
<li>Lists</li>
<li>Links</li>
</ul>
<p>Best regards,<br>The Team</p>
</body>
</html>
"""
result = outlook_service.send_email(
sender_email="sender@example.com",
to_recipients=["recipient@example.com"],
subject="HTML Email Test",
body=html_body,
is_html=True
)
print("HTML email result:", result)
if __name__ == "__main__":
# Run the examples
example_send_simple_email()
example_send_email_with_attachments()
example_send_html_email()
Required Environment Variables:
TENANT_ID
– Azure Active Directory tenant identifierCLIENT_ID
– Azure AD application (client) IDCLIENT_SECRET
– Client secret for the Azure AD app
Key Features:
Simple Email Sending: Send basic text emails with recipients and CC
Attachment Support: Send emails with file attachments
HTML Email Support: Send rich HTML formatted emails
Flexible Configuration: Support for multiple recipients and CC
Error Handling: Comprehensive error handling and result reporting