AsyncSTX is a drop-in async variant of STX. Same operation names, same Selection, same exceptions — just await every call.
Basic usage
import asyncio
from stx import AsyncSTX, Selection
async def main ():
async with AsyncSTX(
region = "ontario" , env = "staging" ,
email = "you@example.com" , password = "..." ,
) as client:
markets = await client.marketInfos(
selections = Selection( "marketId" , "status" ),
)
for m in markets:
print (m.marketId, m.status)
asyncio.run(main())
When to use which client
Use Sync STX Async AsyncSTX Scripts, notebooks, cron jobs ✅ — Single-threaded bots that make one call at a time ✅ — Bots with concurrent market feeds — ✅ Long-running services with many markets in flight — ✅ Pairing with STXWebSocket — ✅ (same event loop) FastAPI / aiohttp / Starlette handlers — ✅
If you’re reaching for asyncio.to_thread(sync_client.marketInfos, ...) repeatedly, that’s the signal to switch to AsyncSTX.
Context manager
AsyncSTX cleans up its aiohttp session on exit. Use async with whenever possible:
async with AsyncSTX( region = "ontario" , env = "staging" ) as client:
await client.marketInfos()
# Session closed here.
Without async with, call close() manually:
client = AsyncSTX( region = "ontario" , env = "staging" )
try :
await client.marketInfos()
finally :
await client.close()
Letting the client go out of scope without closing leaks the aiohttp session — you’ll see Unclosed client session warnings.
Parallelizing requests
The async client’s killer feature — fanning out over markets:
import asyncio
from stx import AsyncSTX, Selection
async def fetch_orderbook ( client , market_id ):
return await client.marketInfo(
params = { "marketId" : market_id},
selections = Selection(
"marketId" ,
orderbook = Selection( "bids" , "asks" ),
),
)
async def main ():
async with AsyncSTX( region = "ontario" , env = "staging" ) as client:
market_ids = [ "mkt_1" , "mkt_2" , "mkt_3" , "mkt_4" , "mkt_5" ]
markets = await asyncio.gather(
* (fetch_orderbook(client, mid) for mid in market_ids)
)
for m in markets:
print (m.marketId, m.orderbook)
asyncio.run(main())
Concurrency limits
AsyncSTX shares one aiohttp connection pool. The default pool is generous but not infinite — for huge fan-outs, bound concurrency with a semaphore:
sem = asyncio.Semaphore( 20 ) # at most 20 in flight
async def bounded ( coro ):
async with sem:
return await coro
books = await asyncio.gather(
* (bounded(fetch_orderbook(client, mid)) for mid in huge_list)
)
Sharing auth with sync STX and STXWebSocket
All three clients read and write the same User singleton — one login covers them all:
# In a single process / event loop:
sync = STX( region = "ontario" , env = "staging" , email = "..." , password = "..." )
async_ = AsyncSTX( region = "ontario" , env = "staging" ) # re-uses JWT from singleton
ws = STXWebSocket( region = "ontario" , env = "staging" ) # re-uses JWT
sync.marketInfos() # triggers the one-time login
await async_.marketInfos() # uses cached JWT
async with ws:
await ws.join(Channels. MARKET_INFO , on_message = ... )
Error handling
Identical to sync — same exceptions, same retry policy. See Errors & retries .
from stx.exceptions import STXRateLimitException
try :
resp = await client.userOrders()
except STXRateLimitException as e:
await asyncio.sleep(e.retry_after)
resp = await client.userOrders()
Pairing with STXWebSocket
The common pattern: WebSocket for live feeds, AsyncSTX for control-plane calls (place/cancel orders, fetch positions). One event loop, both clients:
import asyncio
from stx import AsyncSTX, STXWebSocket
from stx.enums import Channels
async def run ():
async with AsyncSTX( region = "ontario" , env = "staging" ,
email = "you@example.com" , password = "..." ) as client, \
STXWebSocket( region = "ontario" , env = "staging" ) as ws:
async def on_market ( msg ):
if msg.event == "market_update" :
# Act on the signal — async call into the same event loop.
await client.confirmOrder( params = ... , selections = ... )
await ws.join(Channels. MARKET_INFO , on_message = on_market)
await asyncio.Future() # run forever
asyncio.run(run())
FastAPI example
from contextlib import asynccontextmanager
from fastapi import FastAPI
from stx import AsyncSTX, Selection
@asynccontextmanager
async def lifespan ( app : FastAPI):
app.state.stx = AsyncSTX(
region = "ontario" , env = "production" ,
email = "..." , password = "..." ,
)
yield
await app.state.stx.close()
app = FastAPI( lifespan = lifespan)
@app.get ( "/markets" )
async def list_markets ():
markets = await app.state.stx.marketInfos(
selections = Selection( "marketId" , "title" , "status" ),
)
# FastAPI serializes Pydantic models automatically — no .model_dump() needed.
return markets
One shared AsyncSTX across all requests — its aiohttp session is connection-pool-backed and designed for concurrent use.
Next
WebSockets Live order and market updates over Phoenix channels.
Errors & retries Exception hierarchy and the automatic retry policy.