Build a powerful, local crypto analytics database with DuckDB. This tutorial guides you through creating an ETL pipeline for Uniswap v3 data to run complex SQL queries instantly.
The local powerhouse: A core pattern for on-chain analysis
Why make thousands of slow, rate-limited API calls when you can run complex SQL queries instantly on your own machine? This tutorial introduces the most effective pattern for crypto data analysis: creating a local, high-performance copy of a complete on-chain dataset. By fetching the data once, you unlock unlimited, high-speed analytical capabilities.
Looking for other analytics solutions? Check out our full list of API Tutorials for more step-by-step guides.
Our “free tier” isn’t about delayed or incomplete data; it’s about providing a full, live, but scope-limited dataset for you to master. We’re giving you the tools, but it’s up to you to use them in the way that makes the most sense for your project.
The goal:
By the end of this guide, you will have a local uniswap_v3.db file containing all pools and their recent trading history from Uniswap v3 on Ethereum. You will be able to:
Run a robust, high-performance ETL script that pulls a complete dataset from the DexPaprika API.
Perform complex SQL queries on this data instantly, without rate limits.
Understand a professional workflow for acquiring and analyzing on-chain data.
Why this is a foundational skill:
Eliminates Rate Limiting: Instead of thousands of small, repetitive API calls, you perform one efficient batch download.
Unlocks True Analytical Power: Run complex joins, aggregations, and window functions that are impossible with a simple API.
Creates a Foundation: The skills you learn here can be applied to any data source, preparing you for more advanced, real-time analysis.
First, let’s create a Python script to act as our ETL (Extract, Transform, Load) pipeline. This script will fetch all pool data for Uniswap v3 on Ethereum and their recent trading history, then load it into a local DuckDB database file. It leverages two key endpoints: the Top Pools on a DEX endpoint to discover pools, and the Pool OHLCV Data endpoint to fetch historical price data.
Create a new file named build_uniswap_db.py.
build_uniswap_db.py
Copy
import duckdbimport pandas as pdfrom datetime import datetime, timedeltaimport loggingimport osimport asyncioimport aiohttpfrom typing import List, Dict# --- Configuration ---API_BASE_URL = "https://api.dexpaprika.com"NETWORK = "ethereum"DEX_ID = "uniswap_v3"HISTORY_DAYS = 14 # Default days of OHLCV data to fetchDB_FILE = "dbs/uniswap_v3.db"INTERVAL = "1h" # 1-hour intervalsOHLCV_API_LIMIT = 100 # Max records per API callTOP_POOLS_LIMIT = 500 # Focus on top 500 pools by volumeCONCURRENT_REQUESTS = 3 # Number of concurrent API requestsBATCH_SIZE = 15 # Number of pools to process in each batch# Setup logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')async def fetch_with_retry(session: aiohttp.ClientSession, url: str, params: Dict = None, retries=5, backoff_factor=1.0) -> Dict: """Generic async fetch function with exponential backoff.""" for attempt in range(retries): try: async with session.get(url, params=params, timeout=30) as response: response.raise_for_status() return await response.json() except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt == retries - 1: logging.error(f"Final attempt failed for {url}: {e}") raise sleep_time = backoff_factor * (2 ** attempt) logging.warning(f"Request to {url} failed: {e}. Retrying in {sleep_time:.2f}s...") await asyncio.sleep(sleep_time) return {}async def get_top_dex_pools(session: aiohttp.ClientSession, network: str, dex_id: str) -> List[Dict]: """Fetches top pools for a given DEX, handling pagination asynchronously.""" logging.info(f"Fetching top {TOP_POOLS_LIMIT} pools for {dex_id} on {network}...") all_pools = [] page = 1 while len(all_pools) < TOP_POOLS_LIMIT: url = f"{API_BASE_URL}/networks/{network}/dexes/{dex_id}/pools" params = {"page": page, "limit": 100, "order_by": "volume_usd", "sort": "desc"} try: data = await fetch_with_retry(session, url, params=params) pools = data.get('pools', []) if not pools: break all_pools.extend(pools) logging.info(f"Fetched page {page}, got {len(pools)} pools. Total: {len(all_pools)}") page += 1 if len(all_pools) >= TOP_POOLS_LIMIT: all_pools = all_pools[:TOP_POOLS_LIMIT] break await asyncio.sleep(0.5) # Be respectful to the API except Exception as e: logging.error(f"Error fetching page {page} for {dex_id} pools: {e}") break logging.info(f"Finished fetching pools. Total found: {len(all_pools)}") return all_poolsasync def get_pool_ohlcv(session: aiohttp.ClientSession, pool_address: str, pool_created_at: str, semaphore: asyncio.Semaphore) -> List[Dict]: """ Fetches 1-hour OHLCV data for a pool using an intelligent date range and dynamic windowing. """ async with semaphore: logging.info(f"Fetching OHLCV for pool {pool_address}...") final_end_time = datetime.utcnow() # Use the later of: pool creation date or the default history window start_time = final_end_time - timedelta(days=HISTORY_DAYS) if pool_created_at: try: pool_creation = datetime.strptime(pool_created_at, '%Y-%m-%dT%H:%M:%SZ') if pool_creation > start_time: start_time = pool_creation except (ValueError, TypeError): logging.warning(f"Could not parse creation date '{pool_created_at}', using default {HISTORY_DAYS} days.") all_ohlcv = [] current_start_time = start_time # Calculate how much time each API call can cover interval_hours = 1 # Based on "1h" interval time_delta_per_call = timedelta(hours=OHLCV_API_LIMIT * interval_hours) while current_start_time < final_end_time: batch_end_time = min(current_start_time + time_delta_per_call, final_end_time) url = f"{API_BASE_URL}/networks/{NETWORK}/pools/{pool_address}/ohlcv" params = { "start": current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'), "end": batch_end_time.strftime('%Y-%m-%dT%H:%M:%SZ'), "interval": INTERVAL, "limit": OHLCV_API_LIMIT } try: batch_data = await fetch_with_retry(session, url, params=params) if batch_data: for record in batch_data: record['network'] = NETWORK record['pool_address'] = pool_address avg_price = (record.get('open', 0) + record.get('close', 0)) / 2 record['volume_usd'] = record.get('volume', 0) * avg_price if avg_price > 0 else 0 all_ohlcv.extend(batch_data) except Exception as e: logging.warning(f"Could not fetch OHLCV batch for {pool_address}: {e}") current_start_time = batch_end_time await asyncio.sleep(0.75) # Small delay to be respectful logging.info(f"Successfully fetched {len(all_ohlcv)} OHLCV records for {pool_address}") return all_ohlcvasync def main(): """Main ETL function to build the local DuckDB database.""" os.makedirs("dbs", exist_ok=True) async with aiohttp.ClientSession() as session: pools = await get_top_dex_pools(session, NETWORK, DEX_ID) all_ohlcv_data = [] semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS) for i in range(0, len(pools), BATCH_SIZE): batch = pools[i:i+BATCH_SIZE] tasks = [get_pool_ohlcv(session, p.get('id'), p.get('created_at'), semaphore) for p in batch if p.get('id')] batch_num = (i // BATCH_SIZE) + 1 total_batches = (len(pools) + BATCH_SIZE - 1) // BATCH_SIZE logging.info(f"--- Processing batch {batch_num}/{total_batches} ---") results = await asyncio.gather(*tasks) for res in results: all_ohlcv_data.extend(res) if i + BATCH_SIZE < len(pools): logging.info(f"--- Finished batch {batch_num}, sleeping for 10 seconds ---") await asyncio.sleep(10) logging.info("ETL process finished. Loading data into DuckDB.") con = duckdb.connect(database=DB_FILE, read_only=False) if pools: for pool in pools: tokens = pool.get('tokens', []) pool['token0_symbol'] = tokens[0]['symbol'] if len(tokens) > 0 else None pool['token1_symbol'] = tokens[1]['symbol'] if len(tokens) > 1 else None pools_df = pd.DataFrame(pools) pools_df = pools_df[['id', 'dex_name', 'volume_usd', 'created_at', 'token0_symbol', 'token1_symbol']] pools_df = pools_df.rename(columns={'id': 'address', 'volume_usd': 'volume_24h_usd'}) con.execute("CREATE OR REPLACE TABLE pools AS SELECT * FROM pools_df") logging.info(f"Loaded {len(pools_df)} records into 'pools' table.") if all_ohlcv_data: ohlcv_df = pd.DataFrame(all_ohlcv_data) ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['time_close']) ohlcv_df = ohlcv_df[['timestamp', 'network', 'pool_address', 'open', 'high', 'low', 'close', 'volume_usd']] con.execute("CREATE OR REPLACE TABLE pool_ohlcv AS SELECT * FROM ohlcv_df") logging.info(f"Loaded {len(ohlcv_df)} records into 'pool_ohlcv' table.") logging.info("Database build complete. Summary:") print(con.execute("SHOW TABLES").fetchdf()) print("\nPools Sample:") print(con.execute("SELECT * FROM pools LIMIT 5").fetchdf()) print("\nOHLCV Sample:") print(con.execute("SELECT * FROM pool_ohlcv ORDER BY timestamp DESC LIMIT 5").fetchdf()) con.close()if __name__ == "__main__": # Ensure you have the required libraries: # pip install requests pandas duckdb aiohttp asyncio.run(main())
A simple, sequential script is great for learning, but real-world data fetching requires a more robust approach. Here is what we’ve used to make sure it runs reliably:
Asynchronous Operations: By using asyncio and aiohttp, the script can make many API requests concurrently instead of one by one. This means shorter time for completion.
Dynamic Windowing: The get_pool_ohlcv function calculates how much data to request per API call so that it gets all the data for each pool.
Concurrency Control & Throttling: An asyncio.Semaphore, combined with carefully tuned BATCH_SIZE and asyncio.sleep() calls, makes sure we don’t hit the rate limit.
Resiliency: The fetch_with_retry function automatically retries failed requests with an exponential backoff delay, making the pipeline resilient to temporary network issues.
Now, execute the script from your terminal. It will fetch all Uniswap v3 pool data from Ethereum and their recent trading history, then create a uniswap_v3.db file in a new dbs directory. This may take several minutes, but it will be significantly faster than a purely sequential script.
Once the script completes, you have a powerful local database at your fingertips. You can now use any SQL client that supports DuckDB, or Python itself, to perform instant, complex analysis. In step 3, we will connect the database to an AI assistant for natural language queries.
If you want to query the database with a Python script, create a new file named query_duckdb.py and paste the following code into it.
query_duckdb.py
Copy
import duckdbimport pandas as pdimport time# Connect to the DuckDB database filecon = duckdb.connect(database='dbs/uniswap_v3.db', read_only=True)print("=== DuckDB Uniswap v3 Analytics ===\n")# --- Query 1: Database Summary ---print("--- 1. Database Summary ---")pool_count = con.execute("SELECT COUNT(*) FROM pools").fetchone()[0]ohlcv_count = con.execute("SELECT COUNT(*) FROM pool_ohlcv").fetchone()[0]print(f"Total Pools Loaded: {pool_count}")print(f"Total OHLCV Records: {ohlcv_count:,}\n")# --- Query 2: Top 10 Pools by 24h Volume ---print("--- 2. Top 10 Pools by 24h Volume ---")start_time = time.time()top_pools_df = con.execute(""" SELECT address, token0_symbol, token1_symbol, volume_24h_usd FROM pools ORDER BY volume_24h_usd DESC LIMIT 10""").fetchdf()print(top_pools_df)print(f"Query executed in {time.time() - start_time:.4f} seconds\n")# --- Query 3: Peak Trading Hours ---print("--- 3. Peak Trading Hours (UTC) ---")start_time = time.time()hourly_volume_df = con.execute(""" SELECT EXTRACT(hour FROM timestamp) AS hour_of_day, SUM(volume_usd) AS total_volume_usd FROM pool_ohlcv WHERE volume_usd > 0 AND volume_usd < 1000000000 -- Defensive filter against outliers GROUP BY hour_of_day ORDER BY total_volume_usd DESC""").fetchdf()# Format the volume for better readabilityhourly_volume_df['total_volume_usd'] = hourly_volume_df['total_volume_usd'].map('${:,.2f}'.format)print(hourly_volume_df)print(f"Query executed in {time.time() - start_time:.4f} seconds\n")con.close()
While you can use any SQL client to query your database, the real power comes from connecting it to an AI assistant. By using a Model Context Protocol (MCP) server, you can enable your assistant to directly query the uniswap_v3.db file you created. This allows you to ask for insights in plain English instead of writing SQL.
For this, we will use mcp-server-duckdb, an open-source MCP server for DuckDB.
Next, you need to tell your AI assistant how to run the server. Add the following to your claude_desktop_config.json file.
If you see a “Server disconnected” error after restarting your AI assistant, it means the application cannot find the uvx or npx command. This happens because the application doesn’t share the same PATH environment variable as your terminal.
To fix this, you must use the full, absolute path to the command.
Find the absolute path by running which uvx or which npx in your terminal.
Copy the output (e.g., /Users/yourname/.local/bin/uvx or /opt/homebrew/bin/npx).
Use that full path as the command value in the JSON configuration below.
The example below uses uvx, which is recommended. Make sure to replace </path/to/your/project> with the actual absolute path to your project directory.
What you’ve built: From API calls to analytics powerhouse
By completing this tutorial, you have successfully transitioned from being a passive data consumer to an active data analyst. You’ve replaced the slow, restrictive pattern of making individual API calls with a fast, powerful, and scalable local analytics workflow.
Key achievements:
Built a professional ETL pipeline: You have a reusable, high-performance Python script that can create a comprehensive local database from any supported DEX and network.
Unlocked high-speed SQL: You can now perform complex analytical queries on a rich dataset in milliseconds, directly on your machine.
Mastered a foundational workflow: This “local-first” data strategy is a cornerstone of professional data analysis. It enables deeper exploration, from high-level market trends down to individual wallet behaviors.
Created a Reusable Asset: Your uniswap_v3.db file is a valuable, reusable asset for any future analysis, dashboarding, or AI integration project.
When your project grows and you need to explore other data solutions, check out our full list of API Tutorials for more advanced guides.