Python SDK
Official Flow Myna SDK for Python 3.8+
The official Python SDK for Flow Myna provides a simple, Pythonic interface for recording process events and managing objects. It includes automatic retries, batch operations, and comprehensive type hints.
Installation
Install with pip
pip install flowmynaQuick Start
Basic Usage
from flowmyna import FlowMyna
# Initialize the client
client = FlowMyna(api_key="fm_live_your_key_here")
# Record an event
client.record_event(
event="Order Placed",
objects=[
{"type": "Order", "id": "ORD-123"},
{"type": "Customer", "id": "CUST-456"}
],
properties={"total": 149.99}
)
# Upsert an object
client.upsert_object(
type="Customer",
id="CUST-456",
properties={
"name": "Jane Doe",
"email": "jane@example.com"
}
)Configuration
- Name
api_key- Type
- str
- Required
- required
- Description
- Your Flow Myna API key (starts with
fm_live_)
- Name
base_url- Type
- str
- Description
- API base URL. Defaults to
https://api.flowmyna.com/api/public/v1
- Name
timeout- Type
- float
- Description
- Request timeout in seconds. Default: 30
- Name
max_retries- Type
- int
- Description
- Maximum number of retry attempts. Default: 3
Configuration Options
import os
from flowmyna import FlowMyna
# Using environment variable (recommended)
client = FlowMyna(api_key=os.environ["FLOWMYNA_API_KEY"])
# With custom configuration
client = FlowMyna(
api_key="fm_live_xxx",
timeout=60, # 60 second timeout
max_retries=5, # Retry up to 5 times
)
# For testing/development
client = FlowMyna(
api_key="fm_live_xxx",
base_url="https://staging-api.flowmyna.com/api/public/v1"
)record_event()
Record a process event with associated objects.
- Name
event- Type
- str
- Required
- required
- Description
- Event name (1-200 characters)
- Name
objects- Type
- List[dict]
- Required
- required
- Description
- List of objects involved in the event
- Name
timestamp- Type
- str | datetime
- Description
- When the event occurred (ISO 8601 or datetime object)
- Name
properties- Type
- dict
- Description
- Additional event properties
record_event() Examples
from datetime import datetime, timezone
# Simple event
client.record_event(
event="Order Created",
objects=[{"type": "Order", "id": "ORD-123"}]
)
# With timestamp (datetime object)
client.record_event(
event="Order Shipped",
objects=[
{"type": "Order", "id": "ORD-123"},
{"type": "Shipment", "id": "SHIP-456"}
],
timestamp=datetime.now(timezone.utc)
)
# With timestamp (ISO string)
client.record_event(
event="Order Delivered",
objects=[{"type": "Order", "id": "ORD-123"}],
timestamp="2024-01-15T10:30:00Z"
)
# With properties
client.record_event(
event="Payment Processed",
objects=[
{"type": "Order", "id": "ORD-123"},
{"type": "Payment", "id": "PAY-789"}
],
properties={
"amount": 149.99,
"currency": "USD",
"method": "credit_card"
}
)
# With object properties (merged on upsert)
client.record_event(
event="Customer Login",
objects=[{
"type": "Customer",
"id": "CUST-456",
"properties": {
"last_login": datetime.now(timezone.utc).isoformat(),
"login_count": 42
}
}]
)upsert_object()
Create or update an object with properties (upsert semantics).
- Name
type- Type
- str
- Required
- required
- Description
- Object type (1-100 characters)
- Name
id- Type
- str
- Required
- required
- Description
- Object ID (1-500 characters)
- Name
properties- Type
- dict
- Description
- Object properties (merged with existing)
upsert_object() Examples
# Upsert a customer
result = client.upsert_object(
type="Customer",
id="CUST-456",
properties={
"name": "Jane Doe",
"email": "jane@example.com",
"tier": "gold",
"signup_date": "2023-01-15"
}
)
print(f"Object {'created' if result.created else 'updated'}")
print(f"Internal ID: {result.object_id}")
# Update properties (properties are merged)
client.upsert_object(
type="Customer",
id="CUST-456",
properties={
"lifetime_value": 5420.00, # Added
"tier": "platinum" # Updated
}
)
# Upsert without properties (just ensures object exists)
client.upsert_object(type="Order", id="ORD-123")Batch Operations
Send multiple events or objects efficiently in a single request.
Batch Examples
# Batch record events (up to 100 per call)
events = [
{
"event": "Case Opened",
"timestamp": "2024-01-01T09:00:00Z",
"objects": [{"type": "Case", "id": "CASE-001"}],
"properties": {"priority": "high"}
},
{
"event": "Case Assigned",
"timestamp": "2024-01-01T09:15:00Z",
"objects": [
{"type": "Case", "id": "CASE-001"},
{"type": "Agent", "id": "AGENT-A"}
]
},
{
"event": "Case Resolved",
"timestamp": "2024-01-01T14:30:00Z",
"objects": [{"type": "Case", "id": "CASE-001"}]
}
]
result = client.record_event_batch(events)
print(f"Processed: {result.processed}, Failed: {result.failed}")
# Batch upsert objects
customers = [
{"type": "Customer", "id": f"CUST-{i:03d}", "properties": {"index": i}}
for i in range(100)
]
result = client.upsert_object_batch(customers)
print(f"Upserted {result.processed} objects")health()
Verify API key and get workspace/dataset information.
Health Check
# Check API key and connection
health = client.health()
print(f"Status: {health.status}")
print(f"Workspace: {health.workspace_name}")
print(f"Dataset: {health.dataset_name}")
print(f"Key Name: {health.api_key_name}")
# Use in startup validation
def validate_flowmyna_connection():
try:
health = client.health()
print(f"✓ Connected to Flow Myna")
print(f" Workspace: {health.workspace_name}")
print(f" Dataset: {health.dataset_name}")
return True
except Exception as e:
print(f"✗ Flow Myna connection failed: {e}")
return FalseError Handling
Error Handling
from flowmyna import FlowMyna
from flowmyna.exceptions import (
FlowMynaError,
AuthenticationError,
ValidationError,
RateLimitError,
ServerError
)
client = FlowMyna(api_key="fm_live_xxx")
try:
client.record_event(
event="Order Placed",
objects=[{"type": "Order", "id": "ORD-123"}]
)
except AuthenticationError as e:
# Invalid, revoked, or expired API key
print(f"Authentication failed: {e}")
except ValidationError as e:
# Invalid request data
print(f"Validation error: {e}")
except RateLimitError as e:
# Rate limited (429)
print(f"Rate limited. Retry after: {e.retry_after}")
except ServerError as e:
# Server error (5xx)
print(f"Server error: {e}")
except FlowMynaError as e:
# Base exception for all SDK errors
print(f"Flow Myna error: {e}")Async Usage
The SDK also provides an async client for use with asyncio:
Async Client
import asyncio
from flowmyna import AsyncFlowMyna
async def main():
client = AsyncFlowMyna(api_key="fm_live_xxx")
# Record event asynchronously
await client.record_event(
event="Order Placed",
objects=[{"type": "Order", "id": "ORD-123"}]
)
# Batch operations
events = [
{"event": f"Event {i}", "objects": [{"type": "Test", "id": f"T-{i}"}]}
for i in range(100)
]
result = await client.record_event_batch(events)
# Close the client when done
await client.close()
asyncio.run(main())
# Or use as context manager
async def with_context_manager():
async with AsyncFlowMyna(api_key="fm_live_xxx") as client:
await client.record_event(
event="Order Placed",
objects=[{"type": "Order", "id": "ORD-123"}]
)HTTP/2 support: For HTTP/2 connections, install with: pip install flowmyna[async]
AI-Assisted Integration
Want an AI assistant to help integrate Flow Myna into your codebase? We provide a comprehensive prompt that guides the AI through analyzing your code, suggesting events to track, and implementing the integration step-by-step.
Complete Example
E-commerce Integration
import os
from datetime import datetime, timezone
from flowmyna import FlowMyna
from flowmyna.exceptions import FlowMynaError
# Initialize client
client = FlowMyna(api_key=os.environ["FLOWMYNA_API_KEY"])
# Validate connection on startup
health = client.health()
print(f"Connected to {health.workspace_name}")
def process_order(order):
"""Record an order through its lifecycle"""
try:
# Upsert customer with details
client.upsert_object(
type="Customer",
id=order.customer_id,
properties={
"email": order.customer_email,
"name": order.customer_name
}
)
# Record order creation
client.record_event(
event="Order Created",
objects=[
{"type": "Order", "id": order.id, "properties": {
"total": order.total,
"currency": order.currency,
"items_count": len(order.items)
}},
{"type": "Customer", "id": order.customer_id}
],
properties={"channel": order.channel}
)
# Record each item
for item in order.items:
client.record_event(
event="Item Added",
objects=[
{"type": "Order", "id": order.id},
{"type": "Product", "id": item.product_id}
],
properties={
"quantity": item.quantity,
"price": item.price
}
)
except FlowMynaError as e:
print(f"Failed to record order {order.id}: {e}")
def import_historical_orders(orders):
"""Batch import historical orders"""
events = []
for order in orders:
events.append({
"event": "Order Created",
"timestamp": order.created_at.isoformat(),
"objects": [
{"type": "Order", "id": order.id},
{"type": "Customer", "id": order.customer_id}
]
})
# Send in batches of 100
if len(events) >= 100:
result = client.record_event_batch(events)
print(f"Imported batch: {result.processed} events")
events = []
# Send remaining events
if events:
result = client.record_event_batch(events)
print(f"Imported final batch: {result.processed} events")