Skip to main content
STXWebSocket is the async-native WebSocket client. One TCP connection, many Phoenix channels, reconnect + heartbeat + resubscribe handled for you.

Quick start

import asyncio
from stx import STX, STXWebSocket
from stx.enums import Channels

async def main():
    # Seed the JWT singleton (one-time login).
    STX(region="ontario", env="staging",
        email="you@example.com", password="...")

    async def on_market_update(msg):
        print(f"{msg.event}: {msg.payload}")

    async with STXWebSocket(region="ontario", env="staging") as ws:
        await ws.join(Channels.MARKET_INFO, on_message=on_market_update)
        await asyncio.sleep(60)   # listen for a minute

asyncio.run(main())
That’s the full picture. Connection lifecycle, heartbeats (every 30 s by default), reconnect-on-drop, and resubscribe-after-reconnect happen under the hood.

What you get for free

Reconnect

Connection drops trigger exponential-backoff reconnect. Your handlers keep firing on the new socket — no code change needed.

Resubscribe

Every join() is cached. After a reconnect the SDK replays all phx_join frames before dispatching any user messages.

Heartbeat

Phoenix heartbeat frames every 30 s. Servers consider a socket stale without them and drop the connection.

Clean close

async with ws: (or await ws.close()) sends phx_leave for each active subscription before closing the socket.

Channels

Import from stx.enums.Channels:
ChannelWhat it pushes
MARKET_INFOMarket-level price and status updates
ACTIVE_TRADESPublic trade tape
ACTIVE_ORDERSYour order status changes
ACTIVE_POSITIONSYour position updates
ACTIVE_SETTLEMENTSSettlement events
PORTFOLIOBalance and PnL changes
USER_INFOAccount-level updates

Market info

Public — anyone with a valid JWT can join:
async def on_msg(msg):
    if msg.event == "market_update":
        print(msg.payload)

await ws.join(Channels.MARKET_INFO, on_message=on_msg)

Active orders

Per-user — receives your order-status transitions in real time. Replaces polling userOrders.
async def on_order(msg):
    order = msg.payload
    print(f"order {order['orderId']}: {order['status']}")

await ws.join(Channels.ACTIVE_ORDERS, on_message=on_order)

Portfolio

Per-user — balance updates and PnL tick updates.
async def on_portfolio(msg):
    print("portfolio:", msg.payload)

await ws.join(Channels.PORTFOLIO, on_message=on_portfolio)
Channels that include user-scoped data (ACTIVE_ORDERS, PORTFOLIO, etc.) rely on the JWT to identify you. The WS client reuses the same User singleton as STX / AsyncSTX — call any authenticated sync or async method once before opening the socket, or pass email/password to STXWebSocket directly.

Message shape

Every handler receives a ChannelMessage:
@dataclass
class ChannelMessage:
    channel:  str                  # "market_info", "portfolio", ...
    event:    str                  # "phx_reply", "market_update", "order_update", ...
    payload:  Optional[dict]       # None for control events
    join_ref: Optional[str]
    msg_ref:  Optional[str]

    @property
    def is_reply(self) -> bool: ...     # True for phx_reply
    @property
    def is_error(self) -> bool: ...     # True for phx_error
    @property
    def reply_status(self) -> Optional[str]: ...   # "ok" | "error" | None
Control events come through too (phx_reply, phx_error) — filter for them in your handler if you don’t want to see them:
async def on_msg(msg):
    if msg.event == "market_update":
        ...

Multiple subscriptions

One socket, many channels:
async with STXWebSocket(region="ontario", env="staging") as ws:
    await ws.join(Channels.MARKET_INFO,   on_message=handle_market)
    await ws.join(Channels.ACTIVE_ORDERS, on_message=handle_order)
    await ws.join(Channels.PORTFOLIO,     on_message=handle_portfolio)
    await asyncio.Future()   # run forever
Each join() gets its own handler. The SDK routes inbound frames to the right one by channel topic.

Reconnect behaviour

The reconnect loop follows the SDK’s standard RetryPolicy — exponential backoff with jitter, up to max_attempts. Override it:
from stx._retry import RetryPolicy

ws = STXWebSocket(
    region="ontario", env="staging",
    retry=RetryPolicy(max_attempts=10, initial_backoff=0.5),
)
During a reconnect the SDK:
  1. Drops the old socket.
  2. Backs off (up to retry.max_attempts).
  3. Opens a new socket to the same host.
  4. Replays every phx_join it sent before the drop.
  5. Resumes dispatching user frames.
Your handlers see no gap — though frames published during the disconnect are lost (Phoenix doesn’t replay on reconnect). For strict delivery, reconcile via HTTP on reconnect.

Custom heartbeat interval

Default is 30 s, matching the server’s idle timeout. For tests you might want faster:
STXWebSocket(region="ontario", env="staging", heartbeat_interval=5)

Close and leave

async with STXWebSocket(...) as ws:
    await ws.join(Channels.MARKET_INFO, on_message=on_msg)
    ...
# Exiting the context sends phx_leave for each subscription, then closes the socket.
Manually:
ws = STXWebSocket(...)
await ws.connect()
await ws.join(Channels.MARKET_INFO, on_message=on_msg)
...
await ws.close()

Handler exceptions

An exception in a handler won’t kill the reader loop — it’s caught, logged at WARNING, and the next frame proceeds. This means a buggy handler can silently swallow messages. Instrument your handlers if that matters to you.

Example: market-maker skeleton

import asyncio
from stx import STX, STXWebSocket, Selection
from stx.enums import Channels

async def run():
    client = STX(region="ontario", env="staging",
                 email="you@example.com", password="...")

    async def on_market(msg):
        if msg.event != "market_update":
            return
        payload = msg.payload
        # Your quoting logic here.
        # e.g., client.confirmOrder(...) to (re)post a quote.

    async def on_order(msg):
        if msg.event != "order_update":
            return
        # Track fills, adjust inventory.

    async with STXWebSocket(region="ontario", env="staging") as ws:
        await ws.join(Channels.MARKET_INFO,   on_message=on_market)
        await ws.join(Channels.ACTIVE_ORDERS, on_message=on_order)
        await asyncio.Future()   # run until cancelled

asyncio.run(run())

Next

Async HTTP

Pair STXWebSocket with AsyncSTX for event-driven bots.

Errors & retries

Reconnect policy and how it interacts with the retry system.