Flow Myna

Python SDK

Official Flow Myna SDK for Python 3.8+

The official Python SDK for Flow Myna provides a simple, Pythonic interface for recording process events and managing objects. It includes automatic retries, batch operations, and comprehensive type hints.

Installation

Install with pip

pip install flowmyna

Quick Start

Basic Usage

from flowmyna import FlowMyna


# Initialize the client
client = FlowMyna(api_key="fm_live_your_key_here")


# Record an event
client.record_event(
    event="Order Placed",
    objects=[
        {"type": "Order", "id": "ORD-123"},
        {"type": "Customer", "id": "CUST-456"}
    ],
    properties={"total": 149.99}
)


# Upsert an object
client.upsert_object(
    type="Customer",
    id="CUST-456",
    properties={
        "name": "Jane Doe",
        "email": "jane@example.com"
    }
)

Configuration

  • Name
    api_key
    Type
    str
    Required
    required
    Description
    Your Flow Myna API key (starts with fm_live_)
  • Name
    base_url
    Type
    str
    Description
    API base URL. Defaults to https://api.flowmyna.com/api/public/v1
  • Name
    timeout
    Type
    float
    Description
    Request timeout in seconds. Default: 30
  • Name
    max_retries
    Type
    int
    Description
    Maximum number of retry attempts. Default: 3

Configuration Options

import os
from flowmyna import FlowMyna


# Using environment variable (recommended)
client = FlowMyna(api_key=os.environ["FLOWMYNA_API_KEY"])


# With custom configuration
client = FlowMyna(
    api_key="fm_live_xxx",
    timeout=60,          # 60 second timeout
    max_retries=5,       # Retry up to 5 times
)


# For testing/development
client = FlowMyna(
    api_key="fm_live_xxx",
    base_url="https://staging-api.flowmyna.com/api/public/v1"
)

record_event()

Record a process event with associated objects.

  • Name
    event
    Type
    str
    Required
    required
    Description
    Event name (1-200 characters)
  • Name
    objects
    Type
    List[dict]
    Required
    required
    Description
    List of objects involved in the event
  • Name
    timestamp
    Type
    str | datetime
    Description
    When the event occurred (ISO 8601 or datetime object)
  • Name
    properties
    Type
    dict
    Description
    Additional event properties

record_event() Examples

from datetime import datetime, timezone


# Simple event
client.record_event(
    event="Order Created",
    objects=[{"type": "Order", "id": "ORD-123"}]
)


# With timestamp (datetime object)
client.record_event(
    event="Order Shipped",
    objects=[
        {"type": "Order", "id": "ORD-123"},
        {"type": "Shipment", "id": "SHIP-456"}
    ],
    timestamp=datetime.now(timezone.utc)
)


# With timestamp (ISO string)
client.record_event(
    event="Order Delivered",
    objects=[{"type": "Order", "id": "ORD-123"}],
    timestamp="2024-01-15T10:30:00Z"
)


# With properties
client.record_event(
    event="Payment Processed",
    objects=[
        {"type": "Order", "id": "ORD-123"},
        {"type": "Payment", "id": "PAY-789"}
    ],
    properties={
        "amount": 149.99,
        "currency": "USD",
        "method": "credit_card"
    }
)


# With object properties (merged on upsert)
client.record_event(
    event="Customer Login",
    objects=[{
        "type": "Customer",
        "id": "CUST-456",
        "properties": {
            "last_login": datetime.now(timezone.utc).isoformat(),
            "login_count": 42
        }
    }]
)

upsert_object()

Create or update an object with properties (upsert semantics).

  • Name
    type
    Type
    str
    Required
    required
    Description
    Object type (1-100 characters)
  • Name
    id
    Type
    str
    Required
    required
    Description
    Object ID (1-500 characters)
  • Name
    properties
    Type
    dict
    Description
    Object properties (merged with existing)

upsert_object() Examples

# Upsert a customer
result = client.upsert_object(
    type="Customer",
    id="CUST-456",
    properties={
        "name": "Jane Doe",
        "email": "jane@example.com",
        "tier": "gold",
        "signup_date": "2023-01-15"
    }
)


print(f"Object {'created' if result.created else 'updated'}")
print(f"Internal ID: {result.object_id}")


# Update properties (properties are merged)
client.upsert_object(
    type="Customer",
    id="CUST-456",
    properties={
        "lifetime_value": 5420.00,  # Added
        "tier": "platinum"          # Updated
    }
)


# Upsert without properties (just ensures object exists)
client.upsert_object(type="Order", id="ORD-123")

Batch Operations

Send multiple events or objects efficiently in a single request.

Batch Examples

# Batch record events (up to 100 per call)
events = [
    {
        "event": "Case Opened",
        "timestamp": "2024-01-01T09:00:00Z",
        "objects": [{"type": "Case", "id": "CASE-001"}],
        "properties": {"priority": "high"}
    },
    {
        "event": "Case Assigned",
        "timestamp": "2024-01-01T09:15:00Z",
        "objects": [
            {"type": "Case", "id": "CASE-001"},
            {"type": "Agent", "id": "AGENT-A"}
        ]
    },
    {
        "event": "Case Resolved",
        "timestamp": "2024-01-01T14:30:00Z",
        "objects": [{"type": "Case", "id": "CASE-001"}]
    }
]


result = client.record_event_batch(events)
print(f"Processed: {result.processed}, Failed: {result.failed}")


# Batch upsert objects
customers = [
    {"type": "Customer", "id": f"CUST-{i:03d}", "properties": {"index": i}}
    for i in range(100)
]


result = client.upsert_object_batch(customers)
print(f"Upserted {result.processed} objects")

health()

Verify API key and get workspace/dataset information.

Health Check

# Check API key and connection
health = client.health()


print(f"Status: {health.status}")
print(f"Workspace: {health.workspace_name}")
print(f"Dataset: {health.dataset_name}")
print(f"Key Name: {health.api_key_name}")


# Use in startup validation
def validate_flowmyna_connection():
    try:
        health = client.health()
        print(f"✓ Connected to Flow Myna")
        print(f"  Workspace: {health.workspace_name}")
        print(f"  Dataset: {health.dataset_name}")
        return True
    except Exception as e:
        print(f"✗ Flow Myna connection failed: {e}")
        return False

Error Handling

Error Handling

from flowmyna import FlowMyna
from flowmyna.exceptions import (
    FlowMynaError,
    AuthenticationError,
    ValidationError,
    RateLimitError,
    ServerError
)


client = FlowMyna(api_key="fm_live_xxx")


try:
    client.record_event(
        event="Order Placed",
        objects=[{"type": "Order", "id": "ORD-123"}]
    )
except AuthenticationError as e:
    # Invalid, revoked, or expired API key
    print(f"Authentication failed: {e}")
    
except ValidationError as e:
    # Invalid request data
    print(f"Validation error: {e}")
    
except RateLimitError as e:
    # Rate limited (429)
    print(f"Rate limited. Retry after: {e.retry_after}")
    
except ServerError as e:
    # Server error (5xx)
    print(f"Server error: {e}")
    
except FlowMynaError as e:
    # Base exception for all SDK errors
    print(f"Flow Myna error: {e}")

Async Usage

The SDK also provides an async client for use with asyncio:

Async Client

import asyncio
from flowmyna import AsyncFlowMyna


async def main():
    client = AsyncFlowMyna(api_key="fm_live_xxx")
    
    # Record event asynchronously
    await client.record_event(
        event="Order Placed",
        objects=[{"type": "Order", "id": "ORD-123"}]
    )
    
    # Batch operations
    events = [
        {"event": f"Event {i}", "objects": [{"type": "Test", "id": f"T-{i}"}]}
        for i in range(100)
    ]
    result = await client.record_event_batch(events)
    
    # Close the client when done
    await client.close()


asyncio.run(main())


# Or use as context manager
async def with_context_manager():
    async with AsyncFlowMyna(api_key="fm_live_xxx") as client:
        await client.record_event(
            event="Order Placed",
            objects=[{"type": "Order", "id": "ORD-123"}]
        )

AI-Assisted Integration

Want an AI assistant to help integrate Flow Myna into your codebase? We provide a comprehensive prompt that guides the AI through analyzing your code, suggesting events to track, and implementing the integration step-by-step.

Complete Example

E-commerce Integration

import os
from datetime import datetime, timezone
from flowmyna import FlowMyna
from flowmyna.exceptions import FlowMynaError


# Initialize client
client = FlowMyna(api_key=os.environ["FLOWMYNA_API_KEY"])


# Validate connection on startup
health = client.health()
print(f"Connected to {health.workspace_name}")


def process_order(order):
    """Record an order through its lifecycle"""
    try:
        # Upsert customer with details
        client.upsert_object(
            type="Customer",
            id=order.customer_id,
            properties={
                "email": order.customer_email,
                "name": order.customer_name
            }
        )
        
        # Record order creation
        client.record_event(
            event="Order Created",
            objects=[
                {"type": "Order", "id": order.id, "properties": {
                    "total": order.total,
                    "currency": order.currency,
                    "items_count": len(order.items)
                }},
                {"type": "Customer", "id": order.customer_id}
            ],
            properties={"channel": order.channel}
        )
        
        # Record each item
        for item in order.items:
            client.record_event(
                event="Item Added",
                objects=[
                    {"type": "Order", "id": order.id},
                    {"type": "Product", "id": item.product_id}
                ],
                properties={
                    "quantity": item.quantity,
                    "price": item.price
                }
            )
        
    except FlowMynaError as e:
        print(f"Failed to record order {order.id}: {e}")


def import_historical_orders(orders):
    """Batch import historical orders"""
    events = []
    
    for order in orders:
        events.append({
            "event": "Order Created",
            "timestamp": order.created_at.isoformat(),
            "objects": [
                {"type": "Order", "id": order.id},
                {"type": "Customer", "id": order.customer_id}
            ]
        })
        
        # Send in batches of 100
        if len(events) >= 100:
            result = client.record_event_batch(events)
            print(f"Imported batch: {result.processed} events")
            events = []
    
    # Send remaining events
    if events:
        result = client.record_event_batch(events)
        print(f"Imported final batch: {result.processed} events")