import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
import requests
from datetime import datetime, timedelta, timezone
import time
import logging
import asyncio
import aiohttp
from typing import List, Dict
import math
# --- Configuration ---
API_BASE_URL = "https://api.dexpaprika.com"
NETWORK = "ethereum"
DEX_ID = "uniswap_v3"
HISTORY_DAYS = 14 # Fetch 14 days of OHLCV data
TOP_POOLS_LIMIT = 500 # Focus on top 500 pools by volume
BATCH_SIZE = 15 # Process pools in smaller batches
CONCURRENT_REQUESTS = 3 # Concurrent requests for API calls
OHLCV_API_LIMIT = 100 # API limit for OHLCV requests
INTERVAL = "1h" # 1-hour intervals
# InfluxDB Configuration
INFLUX_URL = "http://localhost:8087"
INFLUX_TOKEN = "my-super-secret-token"
INFLUX_ORG = "my-org"
INFLUX_BUCKET = "crypto-data"
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
async def fetch_with_retry(session: aiohttp.ClientSession, url: str, params: Dict = None, retries=5, backoff_factor=1.0):
"""Generic async fetch function with exponential backoff."""
for attempt in range(retries):
try:
async with session.get(url, params=params, timeout=30) as response:
response.raise_for_status()
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == retries - 1:
logging.error(f"Final attempt failed for {url}: {e}")
raise
sleep_time = backoff_factor * (2 ** attempt)
logging.warning(f"Request to {url} failed: {e}. Retrying in {sleep_time:.2f}s...")
await asyncio.sleep(sleep_time)
return {}
class InfluxDBETL:
def __init__(self):
self.client = influxdb_client.InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.api_semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""Checks if the bucket exists and creates it if not."""
logging.info(f"Ensuring bucket '{INFLUX_BUCKET}' exists...")
buckets_api = self.client.buckets_api()
bucket = buckets_api.find_bucket_by_name(INFLUX_BUCKET)
if not bucket:
logging.warning(f"Bucket '{INFLUX_BUCKET}' not found. Creating it...")
buckets_api.create_bucket(bucket_name=INFLUX_BUCKET, org=INFLUX_ORG)
logging.info(f"Bucket '{INFLUX_BUCKET}' created successfully.")
else:
logging.info(f"Bucket '{INFLUX_BUCKET}' already exists.")
def clear_bucket_data(self):
"""Deletes all data from the 'ohlcv' measurement in the bucket."""
logging.info(f"Clearing existing data from measurement 'ohlcv' in bucket '{INFLUX_BUCKET}'...")
try:
delete_api = self.client.delete_api()
start = "1970-01-01T00:00:00Z"
stop = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
delete_api.delete(start, stop, '_measurement="ohlcv"', bucket=INFLUX_BUCKET, org=INFLUX_ORG)
logging.info("Existing data cleared successfully.")
except Exception as e:
logging.error(f"Could not clear data from bucket: {e}")
async def fetch_top_pools(self) -> List[Dict]:
"""Fetch top pools by volume from the specified DEX, handling pagination."""
logging.info(f"Fetching top {TOP_POOLS_LIMIT} pools for {DEX_ID} on {NETWORK}...")
all_pools = []
page = 0
async with aiohttp.ClientSession() as session:
while len(all_pools) < TOP_POOLS_LIMIT:
url = f"{API_BASE_URL}/networks/{NETWORK}/dexes/{DEX_ID}/pools"
params = {"page": page, "limit": 100, "order_by": "volume_usd", "sort": "desc"}
try:
data = await fetch_with_retry(session, url, params=params)
pools = data.get('pools', [])
if not pools:
break
all_pools.extend(pools)
logging.info(f"Fetched page {page}, got {len(pools)} pools. Total: {len(all_pools)}")
page += 1
if len(all_pools) >= TOP_POOLS_LIMIT:
all_pools = all_pools[:TOP_POOLS_LIMIT]
break
await asyncio.sleep(0.5) # Be respectful to the API
except Exception as e:
logging.error(f"Error fetching page {page}: {e}")
break
logging.info(f"Finished fetching pools. Total: {len(all_pools)}")
return all_pools
async def fetch_pool_ohlcv_paginated(self, session: aiohttp.ClientSession, pool_address: str) -> List[Dict]:
"""Fetch complete OHLCV data for a pool using intelligent, dynamic windowing."""
async with self.api_semaphore:
final_end_time = datetime.now(timezone.utc)
current_start_time = final_end_time - timedelta(days=HISTORY_DAYS)
all_ohlcv = []
try:
if 'h' in INTERVAL:
interval_value = int(INTERVAL.replace('h', ''))
time_delta_per_call = timedelta(hours=OHLCV_API_LIMIT * interval_value)
elif 'm' in INTERVAL:
interval_value = int(INTERVAL.replace('m', ''))
time_delta_per_call = timedelta(minutes=OHLCV_API_LIMIT * interval_value)
else:
raise ValueError(f"Unsupported interval format: {INTERVAL}")
except ValueError as e:
logging.error(f"Invalid INTERVAL format: {e}. Defaulting to 1 hour.")
time_delta_per_call = timedelta(hours=OHLCV_API_LIMIT * 1)
total_expected_calls = math.ceil((final_end_time - current_start_time) / time_delta_per_call) if time_delta_per_call.total_seconds() > 0 else 0
call_count = 0
while current_start_time < final_end_time:
call_count += 1
batch_end_time = min(current_start_time + time_delta_per_call, final_end_time)
logging.info(f" [Pool {pool_address}] Fetching window {call_count}/{total_expected_calls}: {current_start_time.date()} to {batch_end_time.date()}")
url = f"{API_BASE_URL}/networks/{NETWORK}/pools/{pool_address}/ohlcv"
params = {
"start": current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"end": batch_end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"interval": INTERVAL,
"limit": OHLCV_API_LIMIT
}
try:
batch_data = await fetch_with_retry(session, url, params=params)
if batch_data:
for record in batch_data:
record['network'] = NETWORK
record['pool_address'] = pool_address
all_ohlcv.extend(batch_data)
except Exception as e:
logging.warning(f"Could not fetch OHLCV batch for {pool_address}: {e}")
current_start_time = batch_end_time
await asyncio.sleep(0.75) # Crucial delay to prevent rate-limiting
logging.info(f"Pool {pool_address}: collected {len(all_ohlcv)} OHLCV records.")
return all_ohlcv
async def fetch_pool_ohlcv_batch(self, pool_addresses: List[str]) -> List[Dict]:
"""Fetch OHLCV data for multiple pools concurrently."""
logging.info(f"Fetching {INTERVAL} OHLCV for {len(pool_addresses)} pools...")
all_ohlcv = []
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_pool_ohlcv_paginated(session, addr) for addr in pool_addresses]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, list):
all_ohlcv.extend(result)
elif isinstance(result, Exception):
logging.warning(f"OHLCV fetch failed for pool {pool_addresses[i]}: {result}")
return all_ohlcv
def load_ohlcv_data(self, ohlcv_data: List[Dict], pools_map: Dict):
"""Load OHLCV data into InfluxDB."""
if not ohlcv_data:
logging.warning("No OHLCV data to load.")
return
points = []
for record in ohlcv_data:
pool_id = record.get('pool_address')
pair = pools_map.get(pool_id, "Unknown/Unknown")
point = (
influxdb_client.Point("ohlcv")
.tag("pool_id", pool_id)
.tag("pair", pair)
.field("open", float(record['open']))
.field("high", float(record['high']))
.field("low", float(record['low']))
.field("close", float(record['close']))
.field("volume", float(record.get('volume', 0)))
.time(record['time_close'])
)
points.append(point)
if points:
self.write_api.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=points)
logging.info(f"Wrote {len(points)} data points to InfluxDB.")
async def run_etl(self):
"""Run the complete ETL process."""
self.clear_bucket_data()
logging.info(f"Starting InfluxDB ETL process for top {TOP_POOLS_LIMIT} pools...")
pools = await self.fetch_top_pools()
if pools:
pools_map = {
pool['id']: f"{pool['tokens'][0]['symbol']}/{pool['tokens'][1]['symbol']}"
for pool in pools if len(pool.get('tokens', [])) >= 2
}
pool_addresses = [pool['id'] for pool in pools if pool.get('id')]
for i in range(0, len(pool_addresses), BATCH_SIZE):
batch_addresses = pool_addresses[i:i + BATCH_SIZE]
batch_num = (i // BATCH_SIZE) + 1
total_batches = (len(pool_addresses) + BATCH_SIZE - 1) // BATCH_SIZE
logging.info(f"Processing OHLCV batch {batch_num}/{total_batches} ({len(batch_addresses)} pools)")
ohlcv_data = await self.fetch_pool_ohlcv_batch(batch_addresses)
self.load_ohlcv_data(ohlcv_data, pools_map)
if i + BATCH_SIZE < len(pool_addresses):
logging.info(f"--- Finished batch {batch_num}, sleeping for 10 seconds ---")
await asyncio.sleep(10)
logging.info("ETL process completed!")
async def main():
etl = InfluxDBETL()
await etl.run_etl()
if __name__ == "__main__":
# pip install influxdb-client aiohttp requests pandas
asyncio.run(main())