Skip to main content
Running a reserve stream in production comes down to five rules: trust events, not HTTP status codes; watch the heartbeat; know that state self-heals but ledgers don’t; classify errors before retrying; and multiplex deliberately. This guide covers all five with tested code, drawn from running a 58-pool monitor against /sse/reserves around the clock. One lesson cost us a night of data: treating a transient “stream limit exceeded” error as fatal silenced 8 pools until morning. The patterns below exist so you don’t repeat it. If you haven’t streamed reserves before, start with the drain detection guide. This one assumes you have a working consumer and need it to survive real networks, real restarts, and real error states.

The five rules

RuleWhy it exists
Reset backoff only after a parsed eventProxies can return HTTP 200 and close instantly. Resetting on 200 turns backoff into a 1-second hammer.
Treat 30s of silence as a dead connectionPings arrive every ~15s. A socket can die without an error ever surfacing.
State self-heals, ledgers don’tEvery event carries absolute reserves. After a gap you’re current again, but the per-block change log has a hole. Detect it with previous_block.
Classify errors before retrying”Pool not found” never fixes itself. “Stream limit exceeded” always does. Retrying the first wastes a connection slot forever; abandoning the second loses a working subscription.
Multiplex deliberately25 subscriptions per connection, 10 connections per IP, and one invalid entry rejects the whole batch.

Rule 1: Trust events, not status codes

A reconnect loop usually looks like: connect, reset backoff, consume, on failure wait and retry with doubled delay. The subtle bug is where the backoff resets. If it resets when the HTTP response arrives, any failure mode that returns 200 OK and then closes (load balancer deploys, buffering proxies, overload shedding) defeats the exponential entirely: every cycle reconnects at the floor delay, forever, from every client you’ve shipped. Reset backoff only after the first parsed SSE frame. A ping counts; it proves the stream actually streams.
let sawEvent = false;
for await (const frame of frames) {
  sawEvent = true; // proof the stream works, not just HTTP 200
  // ... handle the frame
}
// in the outer loop:
if (sawEvent) backoff = 1_000;
backoff = Math.min(backoff * 2, 30_000);
await sleep(backoff + Math.random() * 1_000); // jitter spreads reconnect storms

Rule 2: Watch the heartbeat

The server sends a ping every ~15 seconds. If nothing (events or pings) arrives for 30 seconds, assume the socket is dead even though no error fired, and reconnect. In JavaScript that’s a small watchdog timer that aborts the fetch. In Python, requests gives you this for free: set the read timeout to 30 seconds and the staleness check becomes an exception your retry loop already handles.
response = requests.post(url, json=subs, stream=True, timeout=(10, 30))
# 10s to connect, and any 30s silence raises ReadTimeout: your reconnect path

Rule 3: State self-heals, ledgers don’t

This is the property that makes reserve streaming forgiving in production. Every pool_reserves event carries absolute values: total_reserve_usd is the pool’s liquidity right now, not a cumulative sum you must maintain. Disconnect for ten minutes, reconnect, and your first event makes your state current again. There is nothing to resync and no REST snapshot to fetch. What does not self-heal is the change log. If you aggregate deltas over time (net flow over a window, cumulative volume of liquidity changes), the blocks you missed are holes in that ledger. The stream gives you exactly what you need to know when that happened: each event’s previous_block names the last block in which the pool’s reserves changed. If it doesn’t match the block of the previous event you received, you missed something.
const lastBlock = new Map();

function checkContinuity(idx, update) {
  const prev = lastBlock.get(idx);
  if (prev !== undefined && update.previous_block !== prev) {
    // a hole in the ledger: reset windowed aggregations for this subscription
    resetWindows(idx);
  }
  lastBlock.set(idx, update.block);
}
previous_block chains event to event, not block to block. Pools only emit when reserves change, so consecutive events can legitimately be many chain blocks apart while previous_block still matches perfectly. A mismatch means missed events, not quiet markets.
If you need the missed changes themselves, the pool transactions endpoint backfills the gap from REST.

Rule 4: Classify errors before retrying

Two error channels exist, and both carry a mix of permanent and transient conditions. Before the stream starts, a bad request gets an HTTP error with a JSON body. Mid-stream, the server sends an error event and closes the connection. Either way, read the message and split:
ErrorChannelClassCorrect response
unsupported chain: …HTTP 400 or error eventPermanentFix the subscription. Retrying can never succeed.
pool not found: … / asset not foundHTTP 400 or error eventPermanentSame. The message names the bad entry.
too many subscriptionsHTTP 400PermanentYour batch exceeds 25. Chunk it.
ip stream limit exceededHTTP 429 or error eventTransientBack off and retry. A slot will free up.
5xx, network failures, timeoutsHTTP / socketTransientBack off and retry.
A practical classifier needs one line:
const isRetryable = (message) => /limit|rate|too many|capacity|timeout|temporar/i.test(message);
Getting this wrong in either direction hurts. Retrying a permanent error burns one of your 10 connection slots on a request that can never succeed. Treating a transient error as permanent is how we lost 8 pools for a night: the limit cleared within minutes, but the code had already given up.
One invalid entry rejects the whole multiplexed connection. The server sends a single error event naming the bad asset, then closes, and your other 24 subscriptions go down with it. Validate addresses against the REST API before subscribing, and treat “not found” errors as a signal to drop the named entry and resubscribe the rest.

Rule 5: Multiplex deliberately

The limits: 25 subscriptions per POST connection, 10 concurrent streams per IP. That’s 250 pools from one machine, if you chunk correctly.
  • Route events to subscriptions with request_id: it’s the index of the entry in your POSTed array.
  • Keep a fallback route on (chain, pool_id/token_id) for defensive code; both fields are in every payload.
  • Chunk watchlists into groups of 25 and run one consumer loop per chunk. Let chunks fail independently: a permanent error in one chunk shouldn’t stop the other nine.
  • Mixing chains and both methods (pool_reserves, token_reserves) in one connection works fine.

The full client

Both versions implement all five rules and were run against production as written. Bad-address handling verified live: the client exits with the server’s message instead of looping.
// resilient-stream.mjs: production-grade reserve streaming. Node 18+, no deps.
// Reconnects with real backoff, detects missed blocks, classifies errors,
// and watches its own heartbeat.

const SUBSCRIPTIONS = [
  { chain: "base", address: "0xb2cc224c1c9fee385f8ad6a55b4d94e92359dc59", method: "pool_reserves" },
  { chain: "solana", address: "8sjV1AqBFvFuADBCQHhotaRq5DFFYSjjg1jMyVWMqXvZ", method: "pool_reserves" },
];

const STALE_MS = 30_000; // two missed pings = reconnect
const MAX_BACKOFF_MS = 30_000;

// Capacity errors clear on their own; subscription errors never will.
function isRetryable(message) {
  return /limit|rate|too many|capacity|timeout|temporar/i.test(message);
}

const lastBlock = new Map(); // subscription index -> last seen block

function handleEvent(idx, update) {
  // Absolute state: total_reserve_usd is the pool's liquidity NOW, so state
  // self-heals after any gap. Only the per-block ledger can have holes.
  const prev = lastBlock.get(idx);
  if (prev !== undefined && update.previous_block !== prev) {
    console.warn(
      `[gap] sub ${idx}: missed blocks ${prev} -> ${update.previous_block}. ` +
        "Reset any delta-window aggregation for this subscription.",
    );
  }
  lastBlock.set(idx, update.block);

  console.log(
    `sub ${idx} block ${update.block}: $${Math.round(update.total_reserve_usd).toLocaleString("en-US")} ` +
      `(${update.total_delta_usd >= 0 ? "+" : "-"}$${Math.abs(update.total_delta_usd).toFixed(2)})`,
  );
}

async function connectOnce(signal) {
  const res = await fetch("https://streaming.dexpaprika.com/sse/reserves", {
    method: "POST",
    headers: { "content-type": "application/json" },
    body: JSON.stringify(SUBSCRIPTIONS),
    signal,
  });
  if (!res.ok) {
    const body = await res.text();
    // 4xx other than 408/429 means the request itself is wrong: do not retry.
    const fatal = res.status >= 400 && res.status < 500 && ![408, 429].includes(res.status);
    throw Object.assign(new Error(`${res.status}: ${body}`), { fatal });
  }

  let sawEvent = false;
  let lastActivity = Date.now();
  const controller = new AbortController();
  signal.addEventListener("abort", () => controller.abort(), { once: true });
  const watchdog = setInterval(() => {
    if (Date.now() - lastActivity > STALE_MS) {
      console.warn("[watchdog] no events or pings for 30s, forcing reconnect");
      controller.abort(); // tears down the fetch, the outer loop reconnects
    }
  }, 5_000);

  try {
    let buffer = "";
    for await (const chunk of res.body.pipeThrough(new TextDecoderStream())) {
      lastActivity = Date.now();
      buffer += chunk;
      const frames = buffer.split("\n\n");
      buffer = frames.pop();

      for (const frame of frames) {
        const event = frame.match(/^event: (.*)$/m)?.[1];
        const id = frame.match(/^request_id: (.*)$/m)?.[1];
        const data = frame.match(/^data: (.*)$/m)?.[1];
        if (!data) continue;
        sawEvent = true; // proof the stream works, not just HTTP 200

        if (event === "error") {
          const message = JSON.parse(data).message ?? data;
          throw Object.assign(new Error(message), { fatal: !isRetryable(message) });
        }
        if (event === "warning") {
          console.warn(`[server] ${JSON.parse(data).message ?? data}`);
          continue;
        }
        if (event === "ping") continue; // counts as activity, nothing else
        if (event === "pool_reserves" || event === "token_reserves") {
          handleEvent(Number(id), JSON.parse(data));
        }
      }
    }
  } finally {
    clearInterval(watchdog);
  }
  return sawEvent;
}

let backoff = 1_000;
while (true) {
  const session = new AbortController();
  try {
    console.log("connecting…");
    const healthy = await connectOnce(session.signal);
    if (healthy) backoff = 1_000; // reset only after a proven-working stream
  } catch (err) {
    if (err.fatal) {
      console.error(`fatal, fix the subscription list: ${err.message}`);
      process.exit(1);
    }
    console.error(`transient: ${err.message}`);
  }
  await new Promise((r) => setTimeout(r, backoff + Math.random() * 1_000)); // jitter
  backoff = Math.min(backoff * 2, MAX_BACKOFF_MS);
}
Big numbers arrive as strings. The raw amounts (reserve, delta) and block are JSON strings because they exceed Number.MAX_SAFE_INTEGER. Use BigInt (or Python’s native ints, which handle them automatically) if you work with raw amounts. The USD fields are regular floats, computed server-side, and are all you need for most monitoring.

Don’t alert on plumbing

If your production consumer feeds alerts, remember that liquidity moves for boring reasons too. JIT liquidity bots on major pools add and remove six-figure positions around single swaps; we’ve measured ±4% of an $8M pool every block. Threshold accordingly, or net deltas over a window so the add/remove cycles cancel. The drain detection guide covers thresholds that survive this.

Monitor the monitor

A streaming consumer that silently stops is worse than one that crashes. The minimum viable health surface, all derivable from this guide’s client:
  • Last event age: if lastActivity is older than a minute, something is wrong even if the process is alive.
  • Per-chunk status: which subscriptions are live, which died fatally and why (keep the server’s error message; it names the bad entry).
  • Gap count: how many ledger holes you’ve detected since start.
For a worked example, LiquidityRadar (open source) runs this exact stack as a Cloudflare Worker with a public status page showing event freshness, per-subscription failures, and every alert it gated.

FAQ

Not for state. Every event carries absolute reserve values, so your first event after reconnecting makes you current. You only need REST backfill if you maintain a gapless per-block ledger; detect the holes by comparing each event’s previous_block to the last block you saw.
No. The stream has no replay; a reconnect starts fresh from the next change. That’s why the absolute-values property matters: it makes resume-from-now safe for state tracking.
250: 25 subscriptions per POST connection, 10 concurrent streams per IP. Beyond that, distribute across machines or egress IPs.
The whole connection is rejected with one error event naming the bad asset. Drop that entry and resubscribe the rest. Validating addresses against REST before subscribing avoids the round trip.

Next steps

Detect Liquidity Drains

The detection logic this guide keeps alive: thresholds, JIT noise, alerts.

Reserve Streaming Overview

Payload schemas, both subscription methods, limits.

Error Handling Reference

The complete error catalog for REST and streaming.

LiquidityRadar on GitHub

All five rules as a deployable, open-source Cloudflare Worker.

Get support

Join Discord

Connect with our community and get real-time support.

Give Feedback

Share your experience and help us improve.