import duckdb
import pandas as pd
from datetime import datetime, timedelta
import logging
import os
import asyncio
import aiohttp
from typing import List, Dict
# --- Configuration ---
API_BASE_URL = "https://api.dexpaprika.com"
NETWORK = "ethereum"
DEX_ID = "uniswap_v3"
HISTORY_DAYS = 14 # Default days of OHLCV data to fetch
DB_FILE = "dbs/uniswap_v3.db"
INTERVAL = "1h" # 1-hour intervals
OHLCV_API_LIMIT = 100 # Max records per API call
TOP_POOLS_LIMIT = 500 # Focus on top 500 pools by volume
CONCURRENT_REQUESTS = 3 # Number of concurrent API requests
BATCH_SIZE = 15 # Number of pools to process in each batch
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
async def fetch_with_retry(session: aiohttp.ClientSession, url: str, params: Dict = None, retries=5, backoff_factor=1.0) -> Dict:
"""Generic async fetch function with exponential backoff."""
for attempt in range(retries):
try:
async with session.get(url, params=params, timeout=30) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == retries - 1:
logging.error(f"Final attempt failed for {url}: {e}")
raise
sleep_time = backoff_factor * (2 ** attempt)
logging.warning(f"Request to {url} failed: {e}. Retrying in {sleep_time:.2f}s...")
await asyncio.sleep(sleep_time)
return {}
async def get_top_dex_pools(session: aiohttp.ClientSession, network: str, dex_id: str) -> List[Dict]:
"""Fetches top pools for a given DEX, handling pagination asynchronously."""
logging.info(f"Fetching top {TOP_POOLS_LIMIT} pools for {dex_id} on {network}...")
all_pools = []
page = 1
while len(all_pools) < TOP_POOLS_LIMIT:
url = f"{API_BASE_URL}/networks/{network}/dexes/{dex_id}/pools"
params = {"page": page, "limit": 100, "order_by": "volume_usd", "sort": "desc"}
try:
data = await fetch_with_retry(session, url, params=params)
pools = data.get('pools', [])
if not pools:
break
all_pools.extend(pools)
logging.info(f"Fetched page {page}, got {len(pools)} pools. Total: {len(all_pools)}")
page += 1
if len(all_pools) >= TOP_POOLS_LIMIT:
all_pools = all_pools[:TOP_POOLS_LIMIT]
break
await asyncio.sleep(0.5) # Be respectful to the API
except Exception as e:
logging.error(f"Error fetching page {page} for {dex_id} pools: {e}")
break
logging.info(f"Finished fetching pools. Total found: {len(all_pools)}")
return all_pools
async def get_pool_ohlcv(session: aiohttp.ClientSession, pool_address: str, pool_created_at: str, semaphore: asyncio.Semaphore) -> List[Dict]:
"""
Fetches 1-hour OHLCV data for a pool using an intelligent date range and dynamic windowing.
"""
async with semaphore:
logging.info(f"Fetching OHLCV for pool {pool_address}...")
final_end_time = datetime.utcnow()
# Use the later of: pool creation date or the default history window
start_time = final_end_time - timedelta(days=HISTORY_DAYS)
if pool_created_at:
try:
pool_creation = datetime.strptime(pool_created_at, '%Y-%m-%dT%H:%M:%SZ')
if pool_creation > start_time:
start_time = pool_creation
except (ValueError, TypeError):
logging.warning(f"Could not parse creation date '{pool_created_at}', using default {HISTORY_DAYS} days.")
all_ohlcv = []
current_start_time = start_time
# Calculate how much time each API call can cover
interval_hours = 1 # Based on "1h" interval
time_delta_per_call = timedelta(hours=OHLCV_API_LIMIT * interval_hours)
while current_start_time < final_end_time:
batch_end_time = min(current_start_time + time_delta_per_call, final_end_time)
url = f"{API_BASE_URL}/networks/{NETWORK}/pools/{pool_address}/ohlcv"
params = {
"start": current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"end": batch_end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"interval": INTERVAL,
"limit": OHLCV_API_LIMIT
}
try:
batch_data = await fetch_with_retry(session, url, params=params)
if batch_data:
for record in batch_data:
record['network'] = NETWORK
record['pool_address'] = pool_address
avg_price = (record.get('open', 0) + record.get('close', 0)) / 2
record['volume_usd'] = record.get('volume', 0) * avg_price if avg_price > 0 else 0
all_ohlcv.extend(batch_data)
except Exception as e:
logging.warning(f"Could not fetch OHLCV batch for {pool_address}: {e}")
current_start_time = batch_end_time
await asyncio.sleep(0.75) # Small delay to be respectful
logging.info(f"Successfully fetched {len(all_ohlcv)} OHLCV records for {pool_address}")
return all_ohlcv
async def main():
"""Main ETL function to build the local DuckDB database."""
os.makedirs("dbs", exist_ok=True)
async with aiohttp.ClientSession() as session:
pools = await get_top_dex_pools(session, NETWORK, DEX_ID)
all_ohlcv_data = []
semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
for i in range(0, len(pools), BATCH_SIZE):
batch = pools[i:i+BATCH_SIZE]
tasks = [get_pool_ohlcv(session, p.get('id'), p.get('created_at'), semaphore) for p in batch if p.get('id')]
batch_num = (i // BATCH_SIZE) + 1
total_batches = (len(pools) + BATCH_SIZE - 1) // BATCH_SIZE
logging.info(f"--- Processing batch {batch_num}/{total_batches} ---")
results = await asyncio.gather(*tasks)
for res in results:
all_ohlcv_data.extend(res)
if i + BATCH_SIZE < len(pools):
logging.info(f"--- Finished batch {batch_num}, sleeping for 10 seconds ---")
await asyncio.sleep(10)
logging.info("ETL process finished. Loading data into DuckDB.")
con = duckdb.connect(database=DB_FILE, read_only=False)
if pools:
for pool in pools:
tokens = pool.get('tokens', [])
pool['token0_symbol'] = tokens[0]['symbol'] if len(tokens) > 0 else None
pool['token1_symbol'] = tokens[1]['symbol'] if len(tokens) > 1 else None
pools_df = pd.DataFrame(pools)
pools_df = pools_df[['id', 'dex_name', 'volume_usd', 'created_at', 'token0_symbol', 'token1_symbol']]
pools_df = pools_df.rename(columns={'id': 'address', 'volume_usd': 'volume_24h_usd'})
con.execute("CREATE OR REPLACE TABLE pools AS SELECT * FROM pools_df")
logging.info(f"Loaded {len(pools_df)} records into 'pools' table.")
if all_ohlcv_data:
ohlcv_df = pd.DataFrame(all_ohlcv_data)
ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['time_close'])
ohlcv_df = ohlcv_df[['timestamp', 'network', 'pool_address', 'open', 'high', 'low', 'close', 'volume_usd']]
con.execute("CREATE OR REPLACE TABLE pool_ohlcv AS SELECT * FROM ohlcv_df")
logging.info(f"Loaded {len(ohlcv_df)} records into 'pool_ohlcv' table.")
logging.info("Database build complete. Summary:")
print(con.execute("SHOW TABLES").fetchdf())
print("\nPools Sample:")
print(con.execute("SELECT * FROM pools LIMIT 5").fetchdf())
print("\nOHLCV Sample:")
print(con.execute("SELECT * FROM pool_ohlcv ORDER BY timestamp DESC LIMIT 5").fetchdf())
con.close()
if __name__ == "__main__":
# Ensure you have the required libraries:
# pip install requests pandas duckdb aiohttp
asyncio.run(main())