Installation
- TypeScript
- Python
Copy
Ask AI
npm install simplex-ts
Copy
Ask AI
pip install simplex
Initializing the client
- TypeScript
- Python
Copy
Ask AI
import { SimplexClient } from 'simplex-ts';
const client = new SimplexClient({
apiKey: process.env.SIMPLEX_API_KEY
});
Configuration options
Copy
Ask AI
const client = new SimplexClient({
apiKey: process.env.SIMPLEX_API_KEY,
timeout: 30000, // Request timeout in ms (default: 30000)
maxRetries: 3, // Number of retry attempts (default: 3)
retryDelay: 1000 // Delay between retries in ms (default: 1000)
});
Copy
Ask AI
from simplex import SimplexClient
import os
client = SimplexClient(api_key=os.environ["SIMPLEX_API_KEY"])
Configuration options
Copy
Ask AI
client = SimplexClient(
api_key=os.environ["SIMPLEX_API_KEY"],
timeout=30, # Request timeout in seconds (default: 30)
max_retries=3, # Retry attempts for 429/5xx (default: 3)
retry_delay=1.0, # Seconds between retries (default: 1.0)
)
Running workflows
- TypeScript
- Python
Basic workflow execution
Copy
Ask AI
const result = await client.workflows.run('workflow_id', {
variables: {
username: 'user@example.com',
search_query: 'order #12345'
}
});
console.log('Session ID:', result.session_id);
With webhook callback
Copy
Ask AI
const result = await client.workflows.run('workflow_id', {
variables: {
patient_id: 'P-12345'
},
webhookUrl: 'https://59d9f4dbc.ngrok-free.app/api/webhook',
metadata: 'req-789'
});
Basic workflow execution
Copy
Ask AI
result = client.run_workflow(
"wf_prior_auth_aetna",
variables={
"patient_id": "P-12345",
"insurance_provider": "Aetna",
"policy_number": "AET-9876543",
"procedure_code": "27447",
},
metadata="dr-smith-knee-replacement",
webhook_url="https://your-domain.com/api/webhook",
)
session_id = result["session_id"]
print(f"Session started: {session_id}")
print(f"VNC viewer: {result['vnc_url']}")
print(f"Logs stream: {result['logs_url']}")
Handling webhook responses
- TypeScript
- Python
Next.js App Router
Copy
Ask AI
// app/api/simplex-webhook/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { verifySimplexWebhook, WebhookPayload } from 'simplex-ts';
export async function POST(request: NextRequest) {
const body = await request.text();
const headers: Record<string, string> = {};
request.headers.forEach((value, key) => {
headers[key] = value;
});
try {
verifySimplexWebhook(body, headers, process.env.SIMPLEX_WEBHOOK_SECRET!);
const payload: WebhookPayload = JSON.parse(body);
if (payload.success) {
await processResult(payload);
} else {
await handleFailure(payload);
}
return NextResponse.json({ received: true });
} catch (error) {
return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
}
}
async function processResult(payload: WebhookPayload) {
const { session_id, structured_output, metadata } = payload;
console.log('Processing session:', session_id);
if (structured_output) {
await saveToDatabase(structured_output);
}
}
async function handleFailure(payload: WebhookPayload) {
console.error('Workflow failed:', payload.session_id);
}
Express
Copy
Ask AI
import express from 'express';
import { verifySimplexWebhook, WebhookPayload } from 'simplex-ts';
const app = express();
app.post('/webhook', express.raw({ type: 'application/json' }), (req, res) => {
const body = req.body.toString('utf8');
try {
verifySimplexWebhook(body, req.headers, process.env.SIMPLEX_WEBHOOK_SECRET!);
const payload: WebhookPayload = JSON.parse(body);
processWebhook(payload).catch(console.error);
res.json({ received: true });
} catch (error) {
res.status(401).json({ error: 'Invalid signature' });
}
});
app.listen(3000);
Flask
Copy
Ask AI
import os
import json
from flask import Flask, request, jsonify
from simplex import verify_simplex_webhook, WebhookVerificationError
app = Flask(__name__)
WEBHOOK_SECRET = os.environ["SIMPLEX_WEBHOOK_SECRET"]
@app.route("/api/webhook", methods=["POST"])
def simplex_webhook():
body = request.get_data(as_text=True)
headers = dict(request.headers)
try:
verify_simplex_webhook(body, headers, WEBHOOK_SECRET)
except WebhookVerificationError:
return jsonify({"error": "Invalid signature"}), 401
payload = json.loads(body)
session_id = payload["session_id"]
if payload["success"]:
structured_output = payload.get("structured_output", {})
file_metadata = payload.get("file_metadata", [])
print(f"Session {session_id} succeeded")
print(f" Approval status: {structured_output.get('approval_status')}")
print(f" Files: {len(file_metadata)}")
process_result(payload)
else:
print(f"Session {session_id} failed")
handle_failure(payload)
return jsonify({"received": True})
def process_result(payload):
"""Handle a successful workflow completion."""
pass
def handle_failure(payload):
"""Handle a failed workflow."""
pass
if __name__ == "__main__":
app.run(port=3000)
Always use the raw request body for signature verification. Do not parse it as JSON before verifying.
Polling session status
- TypeScript
- Python
Copy
Ask AI
const status = await client.getSessionStatus('session_id');
if (status.paused) {
console.log('Session paused, pause key:', status.paused_key);
await client.resume('session_id');
} else if (status.in_progress) {
console.log('Session still running...');
} else if (status.success) {
console.log('Session completed successfully');
console.log('Files:', status.file_metadata);
console.log('Scraper outputs:', status.scraper_outputs);
} else {
console.log('Session failed');
}
Copy
Ask AI
import time
def poll_until_complete(client, session_id, timeout=600, interval=5):
"""Poll session status until completion or timeout."""
start = time.time()
while time.time() - start < timeout:
status = client.get_session_status(session_id)
if not status["in_progress"]:
return status
if status["paused"]:
print(f"Session paused at: {status['paused_key']}")
time.sleep(interval)
raise TimeoutError(f"Session {session_id} did not complete within {timeout}s")
# Usage
result = client.run_workflow(
"wf_insurance_quote",
variables={"policy_number": "POL-5678", "coverage_type": "auto"},
)
status = poll_until_complete(client, result["session_id"])
if status["success"]:
print(f"Completed: {status['final_message']}")
print(f"Output: {status['structured_output']}")
print(f"Files: {status['file_metadata']}")
else:
print("Session failed")
Resuming paused flows
- TypeScript
- Python
When a flow script calls
pause(), you can resume it programmatically:Copy
Ask AI
const result = await client.resume('session_id');
if (result.succeeded) {
console.log('Flow resumed successfully');
} else {
console.log('Failed to resume:', result.error);
}
Detect when a workflow pauses for human review and resume:
Copy
Ask AI
import time
def run_with_human_review(client, workflow_id, variables):
"""Run a workflow that may pause for human approval."""
result = client.run_workflow(workflow_id, variables=variables)
session_id = result["session_id"]
while True:
status = client.get_session_status(session_id)
if not status["in_progress"]:
return status
if status["paused"]:
pause_key = status["paused_key"]
print(f"\n--- Session paused: {pause_key} ---")
print(f"Current output: {status.get('structured_output')}")
user_input = input("Resume session? (y/n): ")
if user_input.lower() == "y":
client.resume(session_id)
print("Session resumed.")
else:
print("Session left paused.")
return status
time.sleep(5)
# Usage
status = run_with_human_review(
client,
"wf_prior_auth_review",
variables={
"patient_id": "P-12345",
"procedure_code": "27447",
"insurance_provider": "UnitedHealthcare",
},
)
if status["success"]:
auth_result = status["structured_output"]
print(f"Authorization: {auth_result.get('approval_status')}")
print(f"Reference: {auth_result.get('reference_number')}")
Streaming live events
- TypeScript
- Python
Use
stream_session() to watch agent actions in real time:Copy
Ask AI
result = client.run_workflow(
"wf_security_questionnaire",
variables={
"company_name": "Acme Corp",
"contact_email": "security@acme.com",
"soc2_certified": "yes",
},
)
logs_url = result["logs_url"]
for event in client.stream_session(logs_url):
event_type = event.get("type")
if event_type == "RunContent":
print(f"[Agent] {event['content']}")
elif event_type == "ToolCallStarted":
print(f"[Tool] Starting: {event['tool_name']}")
elif event_type == "ToolCallCompleted":
print(f"[Tool] Completed: {event['tool_name']}")
elif event_type == "FlowPaused":
print("[Paused] Session paused, waiting for input...")
elif event_type == "AskUserQuestion":
print(f"[Question] {event['question']}")
client.send_message(
event.get("message_url", ""),
"Use the SOC 2 Type II report from last quarter",
)
elif event_type == "RunCompleted":
print(f"[Done] Success: {event.get('success')}")
break
elif event_type == "Error":
print(f"[Error] {event['message']}")
break
Retrieving session data
- TypeScript
- Python
Get session replay
Copy
Ask AI
import fs from 'fs';
const replay = await client.retrieveSessionReplay('session_id');
fs.writeFileSync('session-replay.mp4', Buffer.from(replay));
Get session logs
Copy
Ask AI
const logs = await client.retrieveSessionLogs('session_id');
if (logs === null) {
console.log('Session is still running, logs not yet available');
} else {
console.log('Session logs:', JSON.stringify(logs, null, 2));
}
Download session files
Copy
Ask AI
const allFiles = await client.downloadSessionFiles('session_id');
Get session replay
Copy
Ask AI
import os
session_id = "sess_xyz789"
video_bytes = client.retrieve_session_replay(session_id)
os.makedirs("replays", exist_ok=True)
output_path = os.path.join("replays", f"{session_id}.mp4")
with open(output_path, "wb") as f:
f.write(video_bytes)
print(f"Replay saved to {output_path} ({len(video_bytes)} bytes)")
Get session logs
Copy
Ask AI
logs = client.retrieve_session_logs(session_id)
if logs is None:
print("Session is still running, logs not yet available")
else:
print(f"Session logs: {logs}")
Download session files
Copy
Ask AI
import os
zip_bytes = client.download_session_files("sess_xyz789")
output_path = os.path.join("downloads", "sess_xyz789_files.zip")
os.makedirs("downloads", exist_ok=True)
with open(output_path, "wb") as f:
f.write(zip_bytes)
print(f"Downloaded {len(zip_bytes)} bytes to {output_path}")
Download a specific file
Copy
Ask AI
pdf_bytes = client.download_session_files(
"sess_xyz789",
filename="prior_auth_confirmation.pdf",
)
with open("prior_auth_confirmation.pdf", "wb") as f:
f.write(pdf_bytes)
Batch processing
- TypeScript
- Python
Running multiple workflows
Copy
Ask AI
const patients = ['P-001', 'P-002', 'P-003', 'P-004', 'P-005'];
const results = await Promise.all(
patients.map(async (patientId) => {
try {
return await client.workflows.run('verification_workflow', {
variables: { patient_id: patientId },
webhookUrl: 'https://your-domain.com/webhook',
metadata: patientId
});
} catch (error) {
console.error(`Failed to start workflow for ${patientId}:`, error);
return null;
}
})
);
const successful = results.filter(Boolean);
console.log(`Started ${successful.length}/${patients.length} workflows`);
With rate limiting
Copy
Ask AI
async function runWithRateLimit<T>(
items: T[],
fn: (item: T) => Promise<unknown>,
concurrency: number = 5,
delayMs: number = 1000
) {
const results = [];
for (let i = 0; i < items.length; i += concurrency) {
const batch = items.slice(i, i + concurrency);
const batchResults = await Promise.all(batch.map(fn));
results.push(...batchResults);
if (i + concurrency < items.length) {
await new Promise(resolve => setTimeout(resolve, delayMs));
}
}
return results;
}
await runWithRateLimit(
patients,
(patientId) => client.workflows.run('workflow_id', {
variables: { patient_id: patientId }
}),
5,
1000
);
Copy
Ask AI
import time
def run_batch(client, workflow_id, patient_records, delay=2.0):
"""Run a workflow for each patient record with rate limiting."""
sessions = []
for i, record in enumerate(patient_records):
patient_id = record["patient_id"]
try:
result = client.run_workflow(
workflow_id,
variables=record,
metadata=patient_id,
)
sessions.append((patient_id, result["session_id"]))
print(f"[{i+1}/{len(patient_records)}] Started {patient_id}: {result['session_id']}")
except Exception as e:
print(f"[{i+1}/{len(patient_records)}] Failed {patient_id}: {e}")
sessions.append((patient_id, None))
if i < len(patient_records) - 1:
time.sleep(delay)
return sessions
def collect_results(client, sessions, timeout=600, interval=10):
"""Poll all sessions until they complete or timeout."""
results = {}
pending = {pid: sid for pid, sid in sessions if sid is not None}
start = time.time()
while pending and (time.time() - start < timeout):
for patient_id, session_id in list(pending.items()):
status = client.get_session_status(session_id)
if not status["in_progress"]:
results[patient_id] = status
del pending[patient_id]
print(f"Completed {patient_id}: success={status['success']}")
if pending:
time.sleep(interval)
for patient_id in pending:
results[patient_id] = {"success": None, "error": "timeout"}
return results
# Usage
patients = [
{"patient_id": "P-001", "insurance_provider": "Aetna", "policy_number": "AET-111"},
{"patient_id": "P-002", "insurance_provider": "Cigna", "policy_number": "CIG-222"},
{"patient_id": "P-003", "insurance_provider": "Aetna", "policy_number": "AET-333"},
]
sessions = run_batch(client, "wf_prior_auth", patients, delay=2.0)
results = collect_results(client, sessions, timeout=900)
succeeded = sum(1 for r in results.values() if r.get("success") is True)
failed = sum(1 for r in results.values() if r.get("success") is False)
print(f"\nBatch complete: {succeeded} succeeded, {failed} failed")
Searching workflows
- TypeScript
- Python
Copy
Ask AI
const results = await client.workflows.search({
workflowName: 'insurance'
});
for (const workflow of results.workflows) {
console.log(`${workflow.workflow_name}: ${workflow.workflow_id}`);
}
Copy
Ask AI
results = client.search_workflows(workflow_name="prior auth")
print(f"Found {results['count']} prior auth workflows:")
for wf in results["workflows"]:
print(f" {wf['workflow_id']}: {wf['workflow_name']} ({wf.get('metadata', '')})")
# Combine filters
aetna_workflows = client.search_workflows(
workflow_name="aetna",
metadata="production",
)
Error handling
- TypeScript
- Python
Copy
Ask AI
import { SimplexClient,
WorkflowError,
ValidationError,
AuthenticationError,
RateLimitError,
NetworkError
} from 'simplex-ts';
async function runWorkflowSafely(workflowId: string, variables: Record<string, string>) {
try {
return await client.workflows.run(workflowId, { variables });
} catch (error) {
if (error instanceof AuthenticationError) {
console.error('Invalid API key');
throw error;
}
if (error instanceof RateLimitError) {
console.log('Rate limited, retrying after delay...');
await new Promise(resolve => setTimeout(resolve, 5000));
return client.workflows.run(workflowId, { variables });
}
if (error instanceof ValidationError) {
console.error('Invalid parameters:', error.message);
throw error;
}
if (error instanceof WorkflowError) {
console.error('Workflow execution failed:', error.message);
throw error;
}
if (error instanceof NetworkError) {
console.error('Network issue:', error.message);
throw error;
}
throw error;
}
}
Copy
Ask AI
from simplex import (
SimplexClient,
SimplexError,
AuthenticationError,
ValidationError,
RateLimitError,
WorkflowError,
NetworkError,
)
import time
client = SimplexClient()
def run_workflow_safely(workflow_id, variables, max_retries=3):
"""Run a workflow with error handling and retry logic."""
for attempt in range(max_retries):
try:
return client.run_workflow(workflow_id, variables=variables)
except AuthenticationError:
print("Authentication failed. Check your SIMPLEX_API_KEY.")
raise
except ValidationError as e:
print(f"Invalid request: {e}")
raise
except RateLimitError as e:
wait = e.retry_after or 5
print(f"Rate limited (attempt {attempt + 1}/{max_retries}). Waiting {wait}s...")
time.sleep(wait)
continue
except WorkflowError as e:
print(f"Workflow {e.workflow_id} failed in session {e.session_id}: {e}")
raise
except NetworkError as e:
if attempt < max_retries - 1:
delay = 2 ** attempt
print(f"Network error (attempt {attempt + 1}/{max_retries}). Retrying in {delay}s...")
time.sleep(delay)
continue
raise
except SimplexError as e:
print(f"Unexpected Simplex error: {e}")
raise
raise RuntimeError(f"Failed after {max_retries} attempts")
Environment configuration
- TypeScript
- Python
Copy
Ask AI
// config.ts
export const simplexConfig = {
apiKey: process.env.SIMPLEX_API_KEY!,
webhookSecret: process.env.SIMPLEX_WEBHOOK_SECRET!,
webhookUrl: process.env.NODE_ENV === 'production'
? 'https://your-domain.com/api/webhook'
: process.env.NGROK_URL + '/api/webhook'
};
if (!simplexConfig.apiKey) {
throw new Error('SIMPLEX_API_KEY is required');
}
Copy
Ask AI
import os
config = {
"api_key": os.environ.get("SIMPLEX_API_KEY"),
"webhook_secret": os.environ.get("SIMPLEX_WEBHOOK_SECRET"),
"webhook_url": (
os.environ.get("SIMPLEX_WEBHOOK_URL", "https://your-domain.com/api/webhook")
if os.environ.get("SIMPLEX_ENV") == "production"
else os.environ.get("NGROK_URL", "http://localhost:3000") + "/api/webhook"
),
}
if not config["api_key"]:
raise ValueError("SIMPLEX_API_KEY environment variable is required")
client = SimplexClient(api_key=config["api_key"])
Copy
Ask AI
# .env
SIMPLEX_API_KEY=sk-your-api-key
SIMPLEX_WEBHOOK_SECRET=whsec-your-webhook-secret
SIMPLEX_ENV=development
NGROK_URL=https://abc123.ngrok-free.app
Next steps
- Webhooks — Set up webhook handlers
- Variables — Use dynamic data in workflows
- Structured Outputs — Extract specific data
- API Reference — Full API documentation
- CLI Reference — Full reference for all CLI commands
