Strait Docs
SDKs

Sync and async Python SDK with full API coverage, authoring DSL, and composition helpers.

The Python SDK (strait-python) provides both synchronous and asynchronous clients with full API coverage, an authoring DSL, and composition helpers.

Requires Python 3.11+.

Installation

pip install strait-python

Client Creation

from strait import Client

client = Client.from_file()

See Configuration for strait.json schema and options.

From environment variables

# Reads STRAIT_BASE_URL, STRAIT_API_KEY, STRAIT_AUTH_TYPE, STRAIT_TIMEOUT_MS
client = Client.from_env()

Inline

client = Client(base_url="https://api.strait.dev", api_key="sk_live_...")

Async client

All configuration methods work with AsyncClient:

from strait import AsyncClient

async with AsyncClient.from_file() as client:
    jobs = await client.jobs.list()

Calling Operations

All 186 API operations are available through 19 typed service classes:

# List jobs
jobs = client.jobs.list()

# Create a job
job = client.jobs.create({
    "name": "Sync Inventory",
    "slug": "sync-inventory",
    "project_id": "proj_1",
    "endpoint_url": "https://worker.example.com/run",
})

# Trigger a job
run = client.jobs.trigger(job["id"], {"payload": {"sku": "ABC-123"}})

# Get run status
run = client.runs.get(run["id"])

# Workflow operations
wf = client.workflows.create({...})
client.workflow_runs.approve_step(wf_run_id, step_ref)

Available services

client.jobs, client.runs, client.workflows, client.workflow_runs, client.deployments, client.environments, client.secrets, client.api_keys, client.webhooks, client.event_triggers, client.event_sources, client.batch_operations, client.stats, client.analytics, client.log_drains, client.sdk_runs, client.rbac, client.job_groups, client.health

Authoring DSL

Defining jobs

from strait.authoring import define_job, JobOptions

job = define_job(JobOptions(
    name="Process Order",
    slug="process-order",
    endpoint_url="https://worker.example.com/run",
    project_id="proj-1",
    max_concurrency=10,
    timeout_secs=300,
))

Defining workflows with DAG validation

from strait.authoring import define_workflow, WorkflowOptions
from strait.authoring import job_step, approval_step, sleep_step

wf = define_workflow(WorkflowOptions(
    name="Order Pipeline",
    slug="order-pipeline",
    project_id="proj-1",
    steps=[
        job_step("validate", "validate-job"),
        job_step("charge", "charge-job", depends_on=["validate"]),
        approval_step("approve", depends_on=["charge"]),
        job_step("ship", "ship-job", depends_on=["approve"]),
    ],
))
# DAG is validated at definition time — cycles and missing refs raise DagValidationError

Composition Helpers

from strait.composition import with_retry, wait_for_run, paginate, RetryOptions

# Retry with exponential backoff
result = with_retry(
    lambda: client.jobs.trigger("j1", payload),
    RetryOptions(attempts=5),
)

# Wait for a run to complete
run = wait_for_run(
    get_run=lambda rid: client.runs.get(rid),
    get_status=lambda r: r["status"],
    run_id="run-1",
)

# Paginate through results
from strait.composition import PaginatedResponse

for item in paginate(lambda q: PaginatedResponse(
    data=client.jobs.list(query={"cursor": q.cursor})["data"]
)):
    print(item)

FSM State Machines

from strait.fsm import transition_run, RunStatus, RunEvent, is_terminal_run_status

next_status = transition_run(RunStatus.EXECUTING, RunEvent.COMPLETE)
assert next_status == RunStatus.COMPLETED
assert is_terminal_run_status(next_status)

Middleware

from strait import Client, Middleware

mw = Middleware(
    on_request=lambda ctx: print(f"-> {ctx.method} {ctx.url}"),
    on_response=lambda ctx: print(f"<- {ctx.status} ({ctx.duration_ms}ms)"),
    on_error=lambda ctx: print(f"!! {ctx.error}"),
)

client = Client(base_url="...", api_key="...", middleware=[mw])

Custom HTTP Client

Inject your own httpx.Client or httpx.AsyncClient:

import httpx

custom = httpx.Client(timeout=60.0)
client = Client(base_url="...", api_key="...", http_client=custom)

# Async
custom_async = httpx.AsyncClient(timeout=60.0)
async_client = AsyncClient(base_url="...", api_key="...", http_client=custom_async)

Error Handling

All errors inherit from StraitError:

from strait import NotFoundError, UnauthorizedError, RateLimitedError

try:
    job = client.jobs.get("nonexistent")
except NotFoundError as e:
    print(f"Not found: {e} (status={e.status})")
except UnauthorizedError as e:
    print(f"Auth error: {e}")
except RateLimitedError as e:
    print(f"Rate limited: {e}")
ExceptionHTTP StatusDescription
TransportErrorNetwork failure
DecodeErrorJSON decode failure
ValidationErrorConfig/input validation
UnauthorizedError401, 403Auth failure
NotFoundError404Resource not found
ConflictError409Duplicate/conflict
RateLimitedError429Rate limit exceeded
ApiErrorotherGeneric HTTP error
StraitTimeoutErrorPolling timeout
DagValidationErrorInvalid workflow DAG
Was this page helpful?

On this page