Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.sportsxapp.com/llms.txt

Use this file to discover all available pages before exploring further.

STXWebSocket is the async-native WebSocket client. One TCP connection, many Phoenix channels, reconnect + heartbeat + resubscribe handled for you.

Quick start

import asyncio
from stx import STX, STXWebSocket
from stx.enums import Channels

async def main():
    # Seed the JWT singleton (one-time login).
    STX(region="ontario", env="staging",
        email="you@example.com", password="...")

    async def on_market(msg):
        print(f"{msg.event}: {msg.payload}")

    async with STXWebSocket(region="ontario", env="staging") as ws:
        await ws.join(Channels.MARKETS, on_message=on_market)
        await asyncio.sleep(60)   # listen for a minute

asyncio.run(main())
That’s the full picture. Connection lifecycle, heartbeats (every 30 s by default), reconnect-on-drop, and resubscribe-after-reconnect happen under the hood.

What you get for free

Reconnect

Connection drops trigger exponential-backoff reconnect. Your handlers keep firing on the new socket — no code change needed.

Resubscribe

Every join() is cached. After a reconnect the SDK replays all phx_join frames before dispatching any user messages.

Heartbeat

Phoenix heartbeat frames every 30 s. Servers consider a socket stale without them and drop the connection.

Clean close

async with ws: (or await ws.close()) sends phx_leave for each active subscription before closing the socket.

Channels

Import from stx.enums.Channels:
ChannelScopeWhat it pushes
MARKETSbroadcastMarket-level price and status updates, with optional server-side filtering
TRADESper-userYour trade fills
ORDERSper-userYour order status changes
POSITIONSper-userYour position updates
SETTLEMENTSper-userSettlement events for your account
PORTFOLIOper-userBalance and PnL changes
USER_INFOper-userAccount-level updates
Per-user channels are auto-scoped to the logged-in user’s uid when you pass the Channels enum — Channels.PORTFOLIO becomes the wire topic portfolio:<your-uid> automatically.

Markets

Broadcast — anyone with a valid JWT can join:
async def on_msg(msg):
    if msg.event == "market_updated":
        print(msg.payload)

await ws.join(Channels.MARKETS, on_message=on_msg)
Two events flow on this channel:
  • market_updated — price, status, or order-book change for an existing market
  • market_created — a new market was listed

Filter at join time

The server supports three filter axes via join_params. All three are optional and combinable.
FieldEffect
fieldsWhitelist of fields to include in each frame. market_id, timestamp, unix_timestamp are always sent. Omit or empty list = all fields.
rule_filtersWhitelist of rule names (e.g. spread, home_winner). Omit, empty, or null = all markets.
message_typesSubset of ["market_updated", "market_created"]. Omit = both.
Narrow to bids/offers for spread markets only:
await ws.join(
    Channels.MARKETS,
    on_message=on_msg,
    join_params={
        "fields": ["bids", "offers"],
        "rule_filters": ["spread"],
    },
)
The join reply (a phx_reply frame) echoes the effective config:
{
    "selected_fields": ["bids", "offers"],
    "mandatory_fields": ["market_id", "timestamp", "unix_timestamp"],
    "available_rules": [...],         # full list valid on this env
    "selected_rule_filters": ["spread"],
    "selected_message_types": ["market_updated", "market_created"],
}
Invalid field/rule/message names are dropped silently — the join still succeeds with whatever was valid.

Change filters mid-stream

You don’t have to re-join to swap filters. Use ws.push() to send a control message on an already-joined channel:
# Swap the field selection.
await ws.push(Channels.MARKETS, "select_fields",
              {"fields": ["title", "price", "recent_trades"]})

# Disable rule filtering.
await ws.push(Channels.MARKETS, "select_rule_filters",
              {"rule_filters": None})

# Subscribe to updates only — no new-listing notifications.
await ws.push(Channels.MARKETS, "select_message_types",
              {"message_types": ["market_updated"]})
EventPayloadEffect
select_fields{"fields": [...]}Replace the field selection
select_rule_filters{"rule_filters": [...] or null}Replace or disable rule filtering
select_message_types{"message_types": [...]}Replace message-type filter
Each push triggers a phx_reply echoing the new config — handle it in your on_message like any other frame.

Orders

Per-user — receives your order-status transitions in real time. Replaces polling client.orders(). The Channels.ORDERS enum auto-scopes to your uid:
async def on_order(msg):
    # Payload is the raw frame from the server; shape depends on event.
    print(f"event={msg.event} payload={msg.payload}")

await ws.join(Channels.ORDERS, on_message=on_order)

Portfolio

Per-user — balance updates and PnL tick updates.
async def on_portfolio(msg):
    print("portfolio:", msg.payload)

await ws.join(Channels.PORTFOLIO, on_message=on_portfolio)
Per-user channels (ORDERS, PORTFOLIO, TRADES, …) rely on the JWT to identify you. The WS client reuses the same User singleton as STX / AsyncSTX — call any authenticated sync or async method once before opening the socket, or pass email/password to STXWebSocket directly.For admin tools that need to watch another user’s channel, pass a raw string topic ("portfolio:other-uid") instead of the enum — the auto-scoping is skipped.

Message shape

Every handler receives a ChannelMessage:
@dataclass
class ChannelMessage:
    channel:  str                  # "markets", "portfolio", ...
    event:    str                  # "phx_reply", "market_updated", "order_update", ...
    payload:  Optional[dict]       # None for control events
    join_ref: Optional[str]
    msg_ref:  Optional[str]

    @property
    def is_reply(self) -> bool: ...     # True for phx_reply
    @property
    def is_error(self) -> bool: ...     # True for phx_error
    @property
    def reply_status(self) -> Optional[str]: ...   # "ok" | "error" | None
Control events come through too (phx_reply, phx_error) — filter for them in your handler if you don’t want to see them:
async def on_msg(msg):
    if msg.event == "market_updated":
        ...

Multiple subscriptions

One socket, many channels:
async with STXWebSocket(region="ontario", env="staging") as ws:
    await ws.join(Channels.MARKETS,    on_message=handle_market)
    await ws.join(Channels.ORDERS,     on_message=handle_order)
    await ws.join(Channels.PORTFOLIO,  on_message=handle_portfolio)
    await asyncio.Future()   # run forever
Each join() gets its own handler. The SDK routes inbound frames to the right one by channel topic.

Reconnect behaviour

The reconnect loop follows the SDK’s standard RetryPolicy — exponential backoff with jitter, up to max_attempts. Override it:
from stx._retry import RetryPolicy

ws = STXWebSocket(
    region="ontario", env="staging",
    retry=RetryPolicy(max_attempts=10, initial_backoff=0.5),
)
During a reconnect the SDK:
  1. Drops the old socket.
  2. Backs off (up to retry.max_attempts).
  3. Opens a new socket to the same host.
  4. Replays every phx_join it sent before the drop.
  5. Resumes dispatching user frames.
Your handlers see no gap — though frames published during the disconnect are lost (Phoenix doesn’t replay on reconnect). For strict delivery, reconcile via HTTP on reconnect.

Custom heartbeat interval

Default is 30 s, matching the server’s idle timeout. For tests you might want faster:
STXWebSocket(region="ontario", env="staging", heartbeat_interval=5)

Close and leave

async with STXWebSocket(...) as ws:
    await ws.join(Channels.MARKETS, on_message=on_msg)
    ...
# Exiting the context sends phx_leave for each subscription, then closes the socket.
Manually:
ws = STXWebSocket(...)
await ws.connect()
await ws.join(Channels.MARKETS, on_message=on_msg)
...
await ws.close()

Handler exceptions

An exception in a handler won’t kill the reader loop — it’s caught, logged at WARNING, and the next frame proceeds. This means a buggy handler can silently swallow messages. Instrument your handlers if that matters to you.

Example: market-maker skeleton

import asyncio
from stx import STX, STXWebSocket, Selection
from stx.enums import Channels

async def run():
    client = STX(region="ontario", env="staging",
                 email="you@example.com", password="...")

    async def on_market(msg):
        if msg.event != "market_updated":
            return
        payload = msg.payload
        # Your quoting logic here.
        # e.g., client.place_order(...) to (re)post a quote.

    async def on_order(msg):
        if msg.event != "order_update":
            return
        # Track fills, adjust inventory.

    async with STXWebSocket(region="ontario", env="staging") as ws:
        await ws.join(Channels.MARKETS,   on_message=on_market)
        await ws.join(Channels.ORDERS, on_message=on_order)
        await asyncio.Future()   # run until cancelled

asyncio.run(run())

Next

Async HTTP

Pair STXWebSocket with AsyncSTX for event-driven bots.

Errors & retries

Reconnect policy and how it interacts with the retry system.