import requests
import pandas as pd
import clickhouse_connect
from datetime import datetime, timedelta
import logging
import time
import asyncio
import aiohttp
from typing import List, Dict
import json
import math
# --- Configuration ---
API_BASE_URL = "https://api.dexpaprika.com"
NETWORK = "ethereum"
DEX_ID = "uniswap_v3"
HISTORY_DAYS = 7 # Fetch 7 days of OHLCV data
TOP_POOLS_LIMIT = 250 # Focus on top 250 pools by volume
BATCH_SIZE = 15 # Process pools in smaller batches
CONCURRENT_REQUESTS = 4 # Concurrent requests for API calls
OHLCV_API_LIMIT = 100 # API limit for OHLCV requests
INTERVAL = "15m" # 15-minute intervals
# ClickHouse Configuration
CLICKHOUSE_HOST = "localhost" # or your ClickHouse Cloud host
CLICKHOUSE_PORT = 8123
CLICKHOUSE_USER = "default"
CLICKHOUSE_PASSWORD = "" # Set if using ClickHouse Cloud
CLICKHOUSE_DATABASE = "crypto_analytics"
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
async def fetch_with_retry(session, url, params=None, retries=5, backoff_factor=0.5):
"""Generic fetch function with exponential backoff."""
for attempt in range(retries):
try:
async with session.get(url, params=params, timeout=30) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == retries - 1:
logging.error(f"Final attempt failed for {url}: {e}")
raise
sleep_time = backoff_factor * (2 ** attempt)
logging.warning(f"Request to {url} failed: {e}. Retrying in {sleep_time:.2f}s...")
await asyncio.sleep(sleep_time)
class ClickHouseETL:
def __init__(self):
# Connect without a database first to ensure it exists
with clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD
) as client:
client.command(f"CREATE DATABASE IF NOT EXISTS {CLICKHOUSE_DATABASE}")
# Now, connect to the specific database for table operations
self.client = clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD,
database=CLICKHOUSE_DATABASE
)
self.api_semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
self.setup_database()
def setup_database(self):
"""Create tables optimized for 15-minute interval data."""
logging.info("Setting up ClickHouse tables...")
# Create pools table with ReplacingMergeTree to handle duplicates
pools_schema = """
CREATE TABLE IF NOT EXISTS pools (
address String,
dex_name String,
volume_24h_usd Float64,
created_at DateTime,
token0_symbol Nullable(String),
token1_symbol Nullable(String),
pair Nullable(String) MATERIALIZED if(isNotNull(token0_symbol) AND isNotNull(token1_symbol), concat(token0_symbol, '-', token1_symbol), NULL),
created_date Date MATERIALIZED toDate(created_at),
volume_rank UInt32
) ENGINE = ReplacingMergeTree(created_at)
ORDER BY (address, volume_24h_usd, created_at)
PARTITION BY toYYYYMM(created_date)
"""
self.client.command(pools_schema)
# Create OHLCV table optimized for time-series analytics
ohlcv_schema = """
CREATE TABLE IF NOT EXISTS pool_ohlcv (
timestamp DateTime,
network String,
pool_address String,
open Float64,
high Float64,
low Float64,
close Float64,
volume_usd Float64,
date Date MATERIALIZED toDate(timestamp),
hour UInt8 MATERIALIZED toHour(timestamp),
minute UInt8 MATERIALIZED toMinute(timestamp),
quarter_hour UInt8 MATERIALIZED intDiv(toMinute(timestamp), 15)
) ENGINE = ReplacingMergeTree(timestamp)
ORDER BY (pool_address, timestamp)
PARTITION BY (date, network)
"""
self.client.command(ohlcv_schema)
logging.info("Database and tables setup complete.")
async def fetch_top_pools(self) -> List[Dict]:
"""Fetch top pools by volume from the specified DEX, handling pagination."""
logging.info(f"Fetching top {TOP_POOLS_LIMIT} pools for {DEX_ID} on {NETWORK}...")
all_pools = []
page = 0
async with aiohttp.ClientSession() as session:
while len(all_pools) < TOP_POOLS_LIMIT:
url = f"{API_BASE_URL}/networks/{NETWORK}/dexes/{DEX_ID}/pools"
params = {"page": page, "limit": 100, "order_by": "volume_usd", "sort": "desc"}
try:
data = await fetch_with_retry(session, url, params=params)
pools = data.get('pools', [])
if not pools:
break
all_pools.extend(pools)
logging.info(f"Fetched page {page}, got {len(pools)} pools. Total: {len(all_pools)}")
page += 1
if len(all_pools) >= TOP_POOLS_LIMIT:
all_pools = all_pools[:TOP_POOLS_LIMIT]
break
await asyncio.sleep(0.5) # Be respectful to the API
except Exception as e:
logging.error(f"Error fetching page {page}: {e}")
break
logging.info(f"Finished fetching pools. Total: {len(all_pools)}")
return all_pools
async def fetch_pool_ohlcv_paginated(self, session: aiohttp.ClientSession, pool_address: str) -> List[Dict]:
"""Fetch complete OHLCV data for a pool using intelligent, dynamic windowing."""
async with self.api_semaphore:
final_end_time = datetime.utcnow()
current_start_time = final_end_time - timedelta(days=HISTORY_DAYS)
all_ohlcv = []
try:
interval_minutes = int(INTERVAL.replace('m', ''))
minutes_per_call = OHLCV_API_LIMIT * interval_minutes
time_delta_per_call = timedelta(minutes=minutes_per_call)
except ValueError:
logging.error(f"Invalid INTERVAL format: {INTERVAL}. Defaulting to 15 minutes.")
interval_minutes = 15
time_delta_per_call = timedelta(minutes=OHLCV_API_LIMIT * 15)
while current_start_time < final_end_time:
batch_end_time = min(current_start_time + time_delta_per_call, final_end_time)
url = f"{API_BASE_URL}/networks/{NETWORK}/pools/{pool_address}/ohlcv"
params = {
"start": current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"end": batch_end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"interval": INTERVAL,
"limit": OHLCV_API_LIMIT
}
try:
batch_data = await fetch_with_retry(session, url, params=params)
if batch_data:
for record in batch_data:
record['network'] = NETWORK
record['pool_address'] = pool_address
if 'volume_usd' not in record:
avg_price = (record.get('open', 0) + record.get('close', 0)) / 2
record['volume_usd'] = record.get('volume', 0) * avg_price if avg_price > 0 else 0
all_ohlcv.extend(batch_data)
except Exception as e:
logging.warning(f"Could not fetch OHLCV batch for {pool_address}: {e}")
current_start_time = batch_end_time
await asyncio.sleep(0.75) # Crucial delay to prevent rate-limiting
logging.info(f"Pool {pool_address}: collected {len(all_ohlcv)} OHLCV records.")
return all_ohlcv
async def fetch_pool_ohlcv_batch(self, pool_addresses: List[str]) -> List[Dict]:
"""Fetch OHLCV data for multiple pools concurrently."""
logging.info(f"Fetching {INTERVAL} OHLCV for {len(pool_addresses)} pools...")
all_ohlcv = []
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_pool_ohlcv_paginated(session, addr) for addr in pool_addresses]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, list):
all_ohlcv.extend(result)
elif isinstance(result, Exception):
logging.warning(f"OHLCV fetch failed for pool {pool_addresses[i]}: {result}")
return all_ohlcv
def load_pools_data(self, pools: List[Dict]):
"""Load pools data into ClickHouse with volume ranking."""
if not pools: return
logging.info("Processing and loading pools data...")
for i, pool in enumerate(pools):
tokens = pool.get('tokens', [])
pool['token0_symbol'] = tokens[0]['symbol'] if len(tokens) > 0 else None
pool['token1_symbol'] = tokens[1]['symbol'] if len(tokens) > 1 else None
pool['volume_rank'] = i + 1
df = pd.DataFrame(pools)
df = df[['id', 'dex_name', 'volume_usd', 'created_at', 'token0_symbol', 'token1_symbol', 'volume_rank']]
df = df.rename(columns={'id': 'address', 'volume_usd': 'volume_24h_usd'})
df['created_at'] = pd.to_datetime(df['created_at'])
self.client.insert_df('pools', df)
logging.info(f"Loaded {len(df)} pools into 'pools' table.")
def load_ohlcv_data(self, ohlcv_data: List[Dict]):
"""Load OHLCV data into ClickHouse."""
if not ohlcv_data: return
logging.info(f"Processing and loading {len(ohlcv_data)} OHLCV records...")
df = pd.DataFrame(ohlcv_data)
df['timestamp'] = pd.to_datetime(df['time_close'])
df = df[['timestamp', 'network', 'pool_address', 'open', 'high', 'low', 'close', 'volume_usd']]
self.client.insert_df('pool_ohlcv', df)
logging.info(f"Loaded {len(df)} records into 'pool_ohlcv' table.")
async def run_etl(self):
"""Run the complete ETL process."""
logging.info(f"Starting ClickHouse ETL process for top {TOP_POOLS_LIMIT} pools...")
pools = await self.fetch_top_pools()
if pools:
self.load_pools_data(pools)
pool_addresses = [pool['id'] for pool in pools if pool.get('id')]
for i in range(0, len(pool_addresses), BATCH_SIZE):
batch_addresses = pool_addresses[i:i + BATCH_SIZE]
batch_num = (i // BATCH_SIZE) + 1
total_batches = (len(pool_addresses) + BATCH_SIZE - 1) // BATCH_SIZE
logging.info(f"Processing OHLCV batch {batch_num}/{total_batches} ({len(batch_addresses)} pools)")
ohlcv_data = await self.fetch_pool_ohlcv_batch(batch_addresses)
self.load_ohlcv_data(ohlcv_data)
if i + BATCH_SIZE < len(pool_addresses):
logging.info(f"--- Finished batch {batch_num}, sleeping for 10 seconds ---")
await asyncio.sleep(10)
logging.info("ETL process completed!")
pool_count = self.client.command("SELECT COUNT() FROM pools")
ohlcv_count = self.client.command("SELECT COUNT() FROM pool_ohlcv")
unique_pools_with_data = self.client.command("SELECT COUNT(DISTINCT pool_address) FROM pool_ohlcv")
avg_records = ohlcv_count / unique_pools_with_data if unique_pools_with_data > 0 else 0
logging.info(f"Final counts - Pools: {pool_count}, OHLCV records: {ohlcv_count:,}")
logging.info(f"Coverage - {unique_pools_with_data} pools with data, avg {avg_records:.1f} records/pool.")
async def main():
etl = ClickHouseETL()
await etl.run_etl()
if __name__ == "__main__":
# pip install clickhouse-connect aiohttp pandas requests
asyncio.run(main())