How It Works

A deep dive into the technical architecture of the arbitrage detection engine — from WebSocket connections to opportunity detection algorithms.

Table of Contents
  1. 1. Project Overview
  2. 2. Triangular Arbitrage
  3. 3. Cross-Exchange Arbitrage
  4. 4. Code Architecture
  5. 5. WebSocket Connections
  6. 6. Opportunity Detection
  7. 7. Real-Time Dashboard
  8. 8. Tech Stack

Project Overview

This arbitrage detection engine is a high-frequency trading research tool built to identify price inefficiencies across cryptocurrency markets. It demonstrates two types of arbitrage strategies:

The engine connects to real exchange WebSocket APIs to receive live market data, processes thousands of price updates per second, and identifies profitable opportunities in real-time.

Note: This is a detection and monitoring tool for educational purposes. Actual execution of trades requires API keys, proper risk management, and consideration of trading fees, slippage, and withdrawal times.

Triangular Arbitrage

Triangular arbitrage exploits pricing inefficiencies between three related trading pairs on a single exchange. The idea is to start with one currency, trade through two others, and end up with more of the original currency than you started with.

How It Works

Example: USDT → BTC → ETH → USDT
Start: 1000 USDT

Step 1: Buy BTC with USDT 0.01025 BTC (at $97,560)
Step 2: Buy ETH with BTC 0.2896 ETH (at 0.0354 ETH/BTC)
Step 3: Sell ETH for USDT 1001.12 USDT (at $3,456)

Profit: +$1.12 (0.112%)

The Mathematics

For a triangle path A → B → C → A, we calculate the gross return based on whether we're buying or selling at each leg:

Profit Calculation Formula
gross_return = (1 / ask₁) × (1 / ask₂) × bid₃
net_return = gross_return × (1 - fee)3
profit_pct = (net_return - 1) × 100

The formula accounts for:

Implementation in Code

# From strategy/calculator.py
def calculate(self, path: TrianglePath, orderbook: OrderbookManager) -> CalcResult:
    amount = 1.0  # Start with 1 unit

    for leg in path.legs:
        bbo = orderbook.get(leg.symbol)  # Get best bid/offer

        if leg.side == OrderSide.BUY:
            price = bbo.ask_price  # Pay the ask when buying
            amount = (amount / price) * (1 - self.fee_rate)
        else:
            price = bbo.bid_price  # Receive the bid when selling
            amount = (amount * price) * (1 - self.fee_rate)

    return CalcResult(net_return=amount)

Cross-Exchange Arbitrage

Cross-exchange arbitrage exploits price differences for the same asset across different exchanges. If Bitcoin is trading at $97,500 on Binance but $97,600 on Kraken, you could theoretically buy on Binance and sell on Kraken for a $100 profit.

Connected Exchanges

Binance
WebSocket
Kraken
WebSocket
Coinbase
WebSocket
OKX
WebSocket
Bybit
WebSocket

How It Works

Cross-Exchange Opportunity Detection
BTC/USDT Prices:

Binance: $97,480 bid / $97,485 ask
Kraken: $97,520 bid / $97,530 ask
Coinbase: $97,495 bid / $97,500 ask

Best Ask (buy): Binance @ $97,485
Best Bid (sell): Kraken @ $97,520

Spread: +$35 (0.036%)
Cross-Exchange Profit Formula
profit_pct = ((best_bid - best_ask) / best_ask) × 100
Real-World Considerations

In practice, cross-exchange arbitrage must account for:

  • Trading fees on both exchanges (~0.1% each)
  • Withdrawal fees and transfer times
  • Slippage (price movement during execution)
  • Capital requirements (funds on multiple exchanges)

Code Architecture

The project follows a modular, layered architecture designed for maintainability and performance. Each module has a single responsibility and communicates through well-defined interfaces.

src/arbitrage/
├── config/ # Settings & constants
│ ├── settings.py # Pydantic settings with env vars
│ └── constants.py # Fee rates, limits, thresholds
├── core/ # Core engine & types
│ ├── engine.py # Main orchestrator
│ ├── event_bus.py # Internal pub/sub system
│ └── types.py # TypedDicts, dataclasses
├── market/ # Market data handling
│ ├── websocket.py # WebSocket connection manager
│ ├── orderbook.py # O(1) BBO cache
│ └── symbols.py # Symbol filtering & metadata
├── strategy/ # Arbitrage logic
│ ├── graph.py # Triangle path discovery
│ ├── calculator.py # Profit calculation engine
│ └── opportunity.py # Opportunity data structures
├── execution/ # Order execution
│ ├── executor.py # Async order dispatcher
│ ├── signer.py # HMAC-SHA256 request signing
│ └── risk.py # Position limits & guards
├── dashboard/ # Web interface
│ ├── server.py # FastAPI server & WebSocket
│ ├── live_feed.py # Binance live data feed
│ └── multi_exchange_feed.py # Multi-exchange feed
└── telemetry/ # Monitoring & logging
├── metrics.py # Latency tracking
└── reporter.py # CLI stats reporter

WebSocket Connections

The engine maintains persistent WebSocket connections to exchange APIs for real-time price updates. Each exchange has its own message format, so we normalize the data into a common structure.

Exchange WebSocket URLs

# Binance - Combined stream for multiple symbols
wss://stream.binance.com:9443/stream?streams=btcusdt@bookTicker/ethusdt@bookTicker

# Kraken - Ticker subscription
wss://ws.kraken.com

# Coinbase - Product ticker channel
wss://ws-feed.exchange.coinbase.com

# OKX - Public ticker channel
wss://ws.okx.com:8443/ws/v5/public

# Bybit - Spot tickers
wss://stream.bybit.com/v5/public/spot

Message Handling

Each exchange sends data in a different format. Here's how we handle Binance's bookTicker messages:

# Binance bookTicker message format
{
    "s": "BTCUSDT",     # Symbol
    "b": "97485.20",   # Best bid price
    "B": "2.5",        # Best bid quantity
    "a": "97485.50",   # Best ask price
    "A": "1.2"         # Best ask quantity
}

# We normalize to our BBO (Best Bid/Offer) structure
@dataclass
class BBO:
    symbol: str
    bid_price: float
    bid_qty: float
    ask_price: float
    ask_qty: float
    timestamp_us: int

Reconnection Logic

WebSocket connections can drop due to network issues or exchange maintenance. The engine implements automatic reconnection with exponential backoff:

async def _run_binance(self):
    while self._state.running:
        try:
            async with session.ws_connect(url, heartbeat=30) as ws:
                async for msg in ws:
                    await self._handle_message(msg)
        except Exception:
            if self._state.running:
                await asyncio.sleep(3)  # Reconnect delay

Opportunity Detection

The core of the engine is the opportunity detection loop. Every time a price updates, we check if any arbitrage opportunities have emerged.

Triangular Detection Flow

  1. Receive price update for symbol (e.g., BTCUSDT)
  2. Find all triangle paths that include this symbol
  3. For each path, check if we have prices for all 3 legs
  4. Calculate potential profit using the formula
  5. If profit > threshold, emit opportunity event
async def _check_opportunities(self, updated_symbol: str):
    for triangle in self._triangles:
        if updated_symbol not in triangle.symbols:
            continue

        # Get all prices for this triangle
        prices = {}
        for leg in triangle.legs:
            bbo = self._orderbook.get(leg.symbol)
            if not bbo:
                break
            prices[leg.symbol] = bbo.ask_price if leg.side == BUY else bbo.bid_price

        # Calculate and emit if profitable
        result = self._calculator.calculate(triangle, self._orderbook)
        profit_pct = (result.net_return - 1) * 100

        if profit_pct > self._min_profit_threshold:
            await self._emit("opportunity", {...})

Cross-Exchange Detection

For cross-exchange arbitrage, we compare the best bid across all exchanges with the best ask across all exchanges:

async def _check_opportunities(self):
    for symbol, exchanges in self._state.prices.items():
        # Find best bid (highest) and best ask (lowest)
        best_bid, best_bid_ex = max(
            ((p["bid"], ex) for ex, p in exchanges.items()),
            key=lambda x: x[0]
        )
        best_ask, best_ask_ex = min(
            ((p["ask"], ex) for ex, p in exchanges.items()),
            key=lambda x: x[0]
        )

        # Calculate profit: buy at best_ask, sell at best_bid
        profit_pct = ((best_bid - best_ask) / best_ask) * 100

Real-Time Dashboard

The dashboard uses WebSockets for bidirectional communication between the server and browser. This enables instant updates without polling.

Server → Client Events

Client → Server Commands

# FastAPI WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    connected_clients.append(websocket)

    try:
        while True:
            data = await websocket.receive_text()
            msg = orjson.loads(data)

            if msg["action"] == "start":
                await start_bot()
            elif msg["action"] == "stop":
                await stop_bot()
    finally:
        connected_clients.remove(websocket)

Tech Stack

Python 3.11+
Modern async/await, type hints, performance improvements
FastAPI
High-performance async web framework with WebSocket support
aiohttp
Async HTTP client for WebSocket connections to exchanges
orjson
Fast JSON parsing — 10x faster than standard library
uvloop
High-performance event loop — 2-4x faster than asyncio
Pydantic
Data validation and settings management with type safety
NetworkX
Graph library for discovering triangle arbitrage paths
Poetry
Modern dependency management and packaging

Performance Optimizations

Want to see it in action?

Go to the dashboard and start the engine to see real-time arbitrage detection. Try both simulated and live modes to understand how the system works.