Skip to main content

Install

pip install websockets

Firehose (all events)

import asyncio
import json
import websockets

API_KEY = "pn_live_your_key_here"
URL = f"wss://hyper.polynode.dev/ws?key={API_KEY}"

async def firehose():
    async with websockets.connect(URL) as ws:
        await ws.send(json.dumps({"action": "subscribe"}))
        confirmation = await ws.recv()
        print(f"Subscribed: {confirmation}")

        async for message in ws:
            event = json.loads(message)
            user = (event.get("user") or "")[:12]
            print(
                f"[{event['block']}] {event['type']:20s} | {user:14s} | "
                f"{event.get('asset', '')}"
            )

asyncio.run(firehose())

Filtered subscription

async def filtered():
    async with websockets.connect(URL) as ws:
        await ws.send(json.dumps({
            "action": "subscribe",
            "id": "eth-cancels",
            "filters": {
                "action_types": ["cancel", "cancelByCloid"],
                "assets": ["ETH"],
            },
        }))
        confirmation = await ws.recv()
        print(f"Subscribed: {confirmation}")

        async for message in ws:
            event = json.loads(message)
            print(f"{event['type']} | {event.get('user', '?')[:14]}")

Track a specific address

async def track_address(address: str):
    async with websockets.connect(URL) as ws:
        await ws.send(json.dumps({
            "action": "subscribe",
            "filters": {"addresses": [address]},
        }))
        await ws.recv()

        async for message in ws:
            event = json.loads(message)
            print(json.dumps(event, indent=2))

Prediction markets (testnet)

async def prediction_markets():
    testnet = f"wss://hyper-testnet.polynode.dev/ws?key={API_KEY}"

    async with websockets.connect(testnet) as ws:
        await ws.send(json.dumps({
            "action": "subscribe",
            "filters": {
                "action_types": [
                    "userOutcome", "splitOutcome", "mergeOutcome",
                    "mergeQuestion", "negateOutcome",
                ],
            },
        }))
        await ws.recv()

        async for message in ws:
            event = json.loads(message)
            data = event.get("data", {})
            print(
                f"[{event['type']}] "
                f"{data.get('prediction_question', '?')} / "
                f"{data.get('prediction_name', '?')}"
            )