Sync and async Python SDK with full API coverage, authoring DSL, and composition helpers.
The Python SDK (strait-python) provides both synchronous and asynchronous clients with full API coverage, an authoring DSL, and composition helpers.
Requires Python 3.11+.
Installation
pip install strait-pythonClient Creation
From strait.json (recommended)
from strait import Client
client = Client.from_file()See Configuration for strait.json schema and options.
From environment variables
# Reads STRAIT_BASE_URL, STRAIT_API_KEY, STRAIT_AUTH_TYPE, STRAIT_TIMEOUT_MS
client = Client.from_env()Inline
client = Client(base_url="https://api.strait.dev", api_key="sk_live_...")Async client
All configuration methods work with AsyncClient:
from strait import AsyncClient
async with AsyncClient.from_file() as client:
jobs = await client.jobs.list()Calling Operations
All 186 API operations are available through 19 typed service classes:
# List jobs
jobs = client.jobs.list()
# Create a job
job = client.jobs.create({
"name": "Sync Inventory",
"slug": "sync-inventory",
"project_id": "proj_1",
"endpoint_url": "https://worker.example.com/run",
})
# Trigger a job
run = client.jobs.trigger(job["id"], {"payload": {"sku": "ABC-123"}})
# Get run status
run = client.runs.get(run["id"])
# Workflow operations
wf = client.workflows.create({...})
client.workflow_runs.approve_step(wf_run_id, step_ref)Available services
client.jobs, client.runs, client.workflows, client.workflow_runs, client.deployments, client.environments, client.secrets, client.api_keys, client.webhooks, client.event_triggers, client.event_sources, client.batch_operations, client.stats, client.analytics, client.log_drains, client.sdk_runs, client.rbac, client.job_groups, client.health
Authoring DSL
Defining jobs
from strait.authoring import define_job, JobOptions
job = define_job(JobOptions(
name="Process Order",
slug="process-order",
endpoint_url="https://worker.example.com/run",
project_id="proj-1",
max_concurrency=10,
timeout_secs=300,
))Defining workflows with DAG validation
from strait.authoring import define_workflow, WorkflowOptions
from strait.authoring import job_step, approval_step, sleep_step
wf = define_workflow(WorkflowOptions(
name="Order Pipeline",
slug="order-pipeline",
project_id="proj-1",
steps=[
job_step("validate", "validate-job"),
job_step("charge", "charge-job", depends_on=["validate"]),
approval_step("approve", depends_on=["charge"]),
job_step("ship", "ship-job", depends_on=["approve"]),
],
))
# DAG is validated at definition time — cycles and missing refs raise DagValidationErrorComposition Helpers
from strait.composition import with_retry, wait_for_run, paginate, RetryOptions
# Retry with exponential backoff
result = with_retry(
lambda: client.jobs.trigger("j1", payload),
RetryOptions(attempts=5),
)
# Wait for a run to complete
run = wait_for_run(
get_run=lambda rid: client.runs.get(rid),
get_status=lambda r: r["status"],
run_id="run-1",
)
# Paginate through results
from strait.composition import PaginatedResponse
for item in paginate(lambda q: PaginatedResponse(
data=client.jobs.list(query={"cursor": q.cursor})["data"]
)):
print(item)FSM State Machines
from strait.fsm import transition_run, RunStatus, RunEvent, is_terminal_run_status
next_status = transition_run(RunStatus.EXECUTING, RunEvent.COMPLETE)
assert next_status == RunStatus.COMPLETED
assert is_terminal_run_status(next_status)Middleware
from strait import Client, Middleware
mw = Middleware(
on_request=lambda ctx: print(f"-> {ctx.method} {ctx.url}"),
on_response=lambda ctx: print(f"<- {ctx.status} ({ctx.duration_ms}ms)"),
on_error=lambda ctx: print(f"!! {ctx.error}"),
)
client = Client(base_url="...", api_key="...", middleware=[mw])Custom HTTP Client
Inject your own httpx.Client or httpx.AsyncClient:
import httpx
custom = httpx.Client(timeout=60.0)
client = Client(base_url="...", api_key="...", http_client=custom)
# Async
custom_async = httpx.AsyncClient(timeout=60.0)
async_client = AsyncClient(base_url="...", api_key="...", http_client=custom_async)Error Handling
All errors inherit from StraitError:
from strait import NotFoundError, UnauthorizedError, RateLimitedError
try:
job = client.jobs.get("nonexistent")
except NotFoundError as e:
print(f"Not found: {e} (status={e.status})")
except UnauthorizedError as e:
print(f"Auth error: {e}")
except RateLimitedError as e:
print(f"Rate limited: {e}")| Exception | HTTP Status | Description |
|---|---|---|
TransportError | — | Network failure |
DecodeError | — | JSON decode failure |
ValidationError | — | Config/input validation |
UnauthorizedError | 401, 403 | Auth failure |
NotFoundError | 404 | Resource not found |
ConflictError | 409 | Duplicate/conflict |
RateLimitedError | 429 | Rate limit exceeded |
ApiError | other | Generic HTTP error |
StraitTimeoutError | — | Polling timeout |
DagValidationError | — | Invalid workflow DAG |