The local powerhouse: A core pattern for on-chain analysis

Why make thousands of slow, rate-limited API calls when you can run complex SQL queries instantly on your own machine? This tutorial introduces the most effective pattern for crypto data analysis: creating a local, high-performance copy of a complete on-chain dataset. By fetching the data once, you unlock unlimited, high-speed analytical capabilities.

Looking for other analytics solutions? Check out our full list of API Tutorials for more step-by-step guides.

Our “free tier” isn’t about delayed or incomplete data; it’s about providing a full, live, but scope-limited dataset for you to master. We’re giving you the tools, but it’s up to you to use them in the way that makes the most sense for your project.

The goal: By the end of this guide, you will have a local uniswap_v3.db file containing all pools and their recent trading history from Uniswap v3 on Ethereum. You will be able to:

  1. Run a robust, high-performance ETL script that pulls a complete dataset from the DexPaprika API.
  2. Perform complex SQL queries on this data instantly, without rate limits.
  3. Understand a professional workflow for acquiring and analyzing on-chain data.

Why this is a foundational skill:

  • Eliminates Rate Limiting: Instead of thousands of small, repetitive API calls, you perform one efficient batch download.
  • Unlocks True Analytical Power: Run complex joins, aggregations, and window functions that are impossible with a simple API.
  • Creates a Foundation: The skills you learn here can be applied to any data source, preparing you for more advanced, real-time analysis.

Step 1: Build your local data pipeline

First, let’s create a Python script to act as our ETL (Extract, Transform, Load) pipeline. This script will fetch all pool data for Uniswap v3 on Ethereum and their recent trading history, then load it into a local DuckDB database file. It leverages two key endpoints: the Top Pools on a DEX endpoint to discover pools, and the Pool OHLCV Data endpoint to fetch historical price data.

Create a new file named build_uniswap_db.py.

build_uniswap_db.py
import duckdb
import pandas as pd
from datetime import datetime, timedelta
import logging
import os
import asyncio
import aiohttp
from typing import List, Dict

# --- Configuration ---
API_BASE_URL = "https://api.dexpaprika.com"
NETWORK = "ethereum"
DEX_ID = "uniswap_v3"
HISTORY_DAYS = 14       # Default days of OHLCV data to fetch
DB_FILE = "dbs/uniswap_v3.db"
INTERVAL = "1h"         # 1-hour intervals
OHLCV_API_LIMIT = 100   # Max records per API call
TOP_POOLS_LIMIT = 500   # Focus on top 500 pools by volume
CONCURRENT_REQUESTS = 3 # Number of concurrent API requests
BATCH_SIZE = 15         # Number of pools to process in each batch

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

async def fetch_with_retry(session: aiohttp.ClientSession, url: str, params: Dict = None, retries=5, backoff_factor=1.0) -> Dict:
    """Generic async fetch function with exponential backoff."""
    for attempt in range(retries):
        try:
            async with session.get(url, params=params, timeout=30) as response:
                response.raise_for_status()
                return await response.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == retries - 1:
                logging.error(f"Final attempt failed for {url}: {e}")
                raise
            sleep_time = backoff_factor * (2 ** attempt)
            logging.warning(f"Request to {url} failed: {e}. Retrying in {sleep_time:.2f}s...")
            await asyncio.sleep(sleep_time)
    return {}

async def get_top_dex_pools(session: aiohttp.ClientSession, network: str, dex_id: str) -> List[Dict]:
    """Fetches top pools for a given DEX, handling pagination asynchronously."""
    logging.info(f"Fetching top {TOP_POOLS_LIMIT} pools for {dex_id} on {network}...")
    all_pools = []
    page = 1
    while len(all_pools) < TOP_POOLS_LIMIT:
        url = f"{API_BASE_URL}/networks/{network}/dexes/{dex_id}/pools"
        params = {"page": page, "limit": 100, "order_by": "volume_usd", "sort": "desc"}
        try:
            data = await fetch_with_retry(session, url, params=params)
            pools = data.get('pools', [])
            if not pools:
                break
            all_pools.extend(pools)
            logging.info(f"Fetched page {page}, got {len(pools)} pools. Total: {len(all_pools)}")
            page += 1
            if len(all_pools) >= TOP_POOLS_LIMIT:
                all_pools = all_pools[:TOP_POOLS_LIMIT]
                break
            await asyncio.sleep(0.5) # Be respectful to the API
        except Exception as e:
            logging.error(f"Error fetching page {page} for {dex_id} pools: {e}")
            break
    logging.info(f"Finished fetching pools. Total found: {len(all_pools)}")
    return all_pools

async def get_pool_ohlcv(session: aiohttp.ClientSession, pool_address: str, pool_created_at: str, semaphore: asyncio.Semaphore) -> List[Dict]:
    """
    Fetches 1-hour OHLCV data for a pool using an intelligent date range and dynamic windowing.
    """
    async with semaphore:
        logging.info(f"Fetching OHLCV for pool {pool_address}...")
        final_end_time = datetime.utcnow()
        
        # Use the later of: pool creation date or the default history window
        start_time = final_end_time - timedelta(days=HISTORY_DAYS)
        if pool_created_at:
            try:
                pool_creation = datetime.strptime(pool_created_at, '%Y-%m-%dT%H:%M:%SZ')
                if pool_creation > start_time:
                    start_time = pool_creation
            except (ValueError, TypeError):
                logging.warning(f"Could not parse creation date '{pool_created_at}', using default {HISTORY_DAYS} days.")

        all_ohlcv = []
        current_start_time = start_time
        
        # Calculate how much time each API call can cover
        interval_hours = 1 # Based on "1h" interval
        time_delta_per_call = timedelta(hours=OHLCV_API_LIMIT * interval_hours)
        
        while current_start_time < final_end_time:
            batch_end_time = min(current_start_time + time_delta_per_call, final_end_time)
            url = f"{API_BASE_URL}/networks/{NETWORK}/pools/{pool_address}/ohlcv"
            params = {
                "start": current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                "end": batch_end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                "interval": INTERVAL,
                "limit": OHLCV_API_LIMIT
            }
            try:
                batch_data = await fetch_with_retry(session, url, params=params)
                if batch_data:
                    for record in batch_data:
                        record['network'] = NETWORK
                        record['pool_address'] = pool_address
                        avg_price = (record.get('open', 0) + record.get('close', 0)) / 2
                        record['volume_usd'] = record.get('volume', 0) * avg_price if avg_price > 0 else 0
                    all_ohlcv.extend(batch_data)
            except Exception as e:
                logging.warning(f"Could not fetch OHLCV batch for {pool_address}: {e}")
            
            current_start_time = batch_end_time
            await asyncio.sleep(0.75) # Small delay to be respectful

        logging.info(f"Successfully fetched {len(all_ohlcv)} OHLCV records for {pool_address}")
        return all_ohlcv

async def main():
    """Main ETL function to build the local DuckDB database."""
    os.makedirs("dbs", exist_ok=True)
    
    async with aiohttp.ClientSession() as session:
        pools = await get_top_dex_pools(session, NETWORK, DEX_ID)
        
        all_ohlcv_data = []
        semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
        
        for i in range(0, len(pools), BATCH_SIZE):
            batch = pools[i:i+BATCH_SIZE]
            tasks = [get_pool_ohlcv(session, p.get('id'), p.get('created_at'), semaphore) for p in batch if p.get('id')]
            
            batch_num = (i // BATCH_SIZE) + 1
            total_batches = (len(pools) + BATCH_SIZE - 1) // BATCH_SIZE
            logging.info(f"--- Processing batch {batch_num}/{total_batches} ---")

            results = await asyncio.gather(*tasks)
            for res in results:
                all_ohlcv_data.extend(res)
            
            if i + BATCH_SIZE < len(pools):
                logging.info(f"--- Finished batch {batch_num}, sleeping for 10 seconds ---")
                await asyncio.sleep(10)

    logging.info("ETL process finished. Loading data into DuckDB.")

    con = duckdb.connect(database=DB_FILE, read_only=False)

    if pools:
        for pool in pools:
            tokens = pool.get('tokens', [])
            pool['token0_symbol'] = tokens[0]['symbol'] if len(tokens) > 0 else None
            pool['token1_symbol'] = tokens[1]['symbol'] if len(tokens) > 1 else None
        
        pools_df = pd.DataFrame(pools)
        pools_df = pools_df[['id', 'dex_name', 'volume_usd', 'created_at', 'token0_symbol', 'token1_symbol']]
        pools_df = pools_df.rename(columns={'id': 'address', 'volume_usd': 'volume_24h_usd'})
        con.execute("CREATE OR REPLACE TABLE pools AS SELECT * FROM pools_df")
        logging.info(f"Loaded {len(pools_df)} records into 'pools' table.")

    if all_ohlcv_data:
        ohlcv_df = pd.DataFrame(all_ohlcv_data)
        ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['time_close'])
        ohlcv_df = ohlcv_df[['timestamp', 'network', 'pool_address', 'open', 'high', 'low', 'close', 'volume_usd']]
        con.execute("CREATE OR REPLACE TABLE pool_ohlcv AS SELECT * FROM ohlcv_df")
        logging.info(f"Loaded {len(ohlcv_df)} records into 'pool_ohlcv' table.")

    logging.info("Database build complete. Summary:")
    print(con.execute("SHOW TABLES").fetchdf())
    print("\nPools Sample:")
    print(con.execute("SELECT * FROM pools LIMIT 5").fetchdf())
    print("\nOHLCV Sample:")
    print(con.execute("SELECT * FROM pool_ohlcv ORDER BY timestamp DESC LIMIT 5").fetchdf())
    con.close()

if __name__ == "__main__":
    # Ensure you have the required libraries:
    # pip install requests pandas duckdb aiohttp
    asyncio.run(main())

A simple, sequential script is great for learning, but real-world data fetching requires a more robust approach. Here is what we’ve used to make sure it runs reliably:

  • Asynchronous Operations: By using asyncio and aiohttp, the script can make many API requests concurrently instead of one by one. This means shorter time for completion.
  • Dynamic Windowing: The get_pool_ohlcv function calculates how much data to request per API call so that it gets all the data for each pool.
  • Concurrency Control & Throttling: An asyncio.Semaphore, combined with carefully tuned BATCH_SIZE and asyncio.sleep() calls, makes sure we don’t hit the rate limit.
  • Resiliency: The fetch_with_retry function automatically retries failed requests with an exponential backoff delay, making the pipeline resilient to temporary network issues.

Required libraries

Before running the script, make sure you have the necessary Python libraries installed.

pip install requests pandas duckdb aiohttp

Step 2: Run the pipeline and query with SQL

Now, execute the script from your terminal. It will fetch all Uniswap v3 pool data from Ethereum and their recent trading history, then create a uniswap_v3.db file in a new dbs directory. This may take several minutes, but it will be significantly faster than a purely sequential script.

python build_uniswap_db.py

Querying your new database

Once the script completes, you have a powerful local database at your fingertips. You can now use any SQL client that supports DuckDB, or Python itself, to perform instant, complex analysis. In step 3, we will connect the database to an AI assistant for natural language queries.

If you want to query the database with a Python script, create a new file named query_duckdb.py and paste the following code into it.

query_duckdb.py
import duckdb
import pandas as pd
import time

# Connect to the DuckDB database file
con = duckdb.connect(database='dbs/uniswap_v3.db', read_only=True)

print("=== DuckDB Uniswap v3 Analytics ===\n")

# --- Query 1: Database Summary ---
print("--- 1. Database Summary ---")
pool_count = con.execute("SELECT COUNT(*) FROM pools").fetchone()[0]
ohlcv_count = con.execute("SELECT COUNT(*) FROM pool_ohlcv").fetchone()[0]
print(f"Total Pools Loaded: {pool_count}")
print(f"Total OHLCV Records: {ohlcv_count:,}\n")

# --- Query 2: Top 10 Pools by 24h Volume ---
print("--- 2. Top 10 Pools by 24h Volume ---")
start_time = time.time()
top_pools_df = con.execute("""
    SELECT 
        address,
        token0_symbol,
        token1_symbol,
        volume_24h_usd
    FROM pools
    ORDER BY volume_24h_usd DESC
    LIMIT 10
""").fetchdf()
print(top_pools_df)
print(f"Query executed in {time.time() - start_time:.4f} seconds\n")


# --- Query 3: Peak Trading Hours ---
print("--- 3. Peak Trading Hours (UTC) ---")
start_time = time.time()
hourly_volume_df = con.execute("""
    SELECT 
        EXTRACT(hour FROM timestamp) AS hour_of_day,
        SUM(volume_usd) AS total_volume_usd
    FROM pool_ohlcv
    WHERE volume_usd > 0 AND volume_usd < 1000000000 -- Defensive filter against outliers
    GROUP BY hour_of_day
    ORDER BY total_volume_usd DESC
""").fetchdf()

# Format the volume for better readability
hourly_volume_df['total_volume_usd'] = hourly_volume_df['total_volume_usd'].map('${:,.2f}'.format)

print(hourly_volume_df)
print(f"Query executed in {time.time() - start_time:.4f} seconds\n")


con.close()

Now, execute the script from your terminal:

python query_duckdb.py

Step 3: AI-powered analysis with an MCP server

While you can use any SQL client to query your database, the real power comes from connecting it to an AI assistant. By using a Model Context Protocol (MCP) server, you can enable your assistant to directly query the uniswap_v3.db file you created. This allows you to ask for insights in plain English instead of writing SQL.

For this, we will use mcp-server-duckdb, an open-source MCP server for DuckDB.

Install the DuckDB MCP server

You can install the server easily using npx:

npx -y @smithery/cli install mcp-server-duckdb --client claude

Configure your AI assistant

Next, you need to tell your AI assistant how to run the server. Add the following to your claude_desktop_config.json file.

If you see a “Server disconnected” error after restarting your AI assistant, it means the application cannot find the uvx or npx command. This happens because the application doesn’t share the same PATH environment variable as your terminal.

To fix this, you must use the full, absolute path to the command.

  1. Find the absolute path by running which uvx or which npx in your terminal.
  2. Copy the output (e.g., /Users/yourname/.local/bin/uvx or /opt/homebrew/bin/npx).
  3. Use that full path as the command value in the JSON configuration below.

The example below uses uvx, which is recommended. Make sure to replace </path/to/your/project> with the actual absolute path to your project directory.

{
  "mcpServers": {
    "duckdb-crypto": {
      "command": "/Users/<yourname>/.local/bin/uvx",
      "args": [
        "mcp-server-duckdb",
        "--db-path",
        "</path/to/your/project>/dbs/uniswap_v3.db",
        "--readonly"
      ]
    }
  }
}

Now, when you start your AI assistant, it will have the tools to query your local Uniswap V3 database. You can ask it things like:

  • “Using the duckdb-crypto tool, find the 5 pools with the highest 24-hour volume.”
  • “What was the hourly volatility for the top pool yesterday?”

What you’ve built: From API calls to analytics powerhouse

By completing this tutorial, you have successfully transitioned from being a passive data consumer to an active data analyst. You’ve replaced the slow, restrictive pattern of making individual API calls with a fast, powerful, and scalable local analytics workflow.

Key achievements:

  • Built a professional ETL pipeline: You have a reusable, high-performance Python script that can create a comprehensive local database from any supported DEX and network.
  • Unlocked high-speed SQL: You can now perform complex analytical queries on a rich dataset in milliseconds, directly on your machine.
  • Mastered a foundational workflow: This “local-first” data strategy is a cornerstone of professional data analysis. It enables deeper exploration, from high-level market trends down to individual wallet behaviors.
  • Created a Reusable Asset: Your uniswap_v3.db file is a valuable, reusable asset for any future analysis, dashboarding, or AI integration project.

When your project grows and you need to explore other data solutions, check out our full list of API Tutorials for more advanced guides.

The local powerhouse: A core pattern for on-chain analysis

Why make thousands of slow, rate-limited API calls when you can run complex SQL queries instantly on your own machine? This tutorial introduces the most effective pattern for crypto data analysis: creating a local, high-performance copy of a complete on-chain dataset. By fetching the data once, you unlock unlimited, high-speed analytical capabilities.

Looking for other analytics solutions? Check out our full list of API Tutorials for more step-by-step guides.

Our “free tier” isn’t about delayed or incomplete data; it’s about providing a full, live, but scope-limited dataset for you to master. We’re giving you the tools, but it’s up to you to use them in the way that makes the most sense for your project.

The goal: By the end of this guide, you will have a local uniswap_v3.db file containing all pools and their recent trading history from Uniswap v3 on Ethereum. You will be able to:

  1. Run a robust, high-performance ETL script that pulls a complete dataset from the DexPaprika API.
  2. Perform complex SQL queries on this data instantly, without rate limits.
  3. Understand a professional workflow for acquiring and analyzing on-chain data.

Why this is a foundational skill:

  • Eliminates Rate Limiting: Instead of thousands of small, repetitive API calls, you perform one efficient batch download.
  • Unlocks True Analytical Power: Run complex joins, aggregations, and window functions that are impossible with a simple API.
  • Creates a Foundation: The skills you learn here can be applied to any data source, preparing you for more advanced, real-time analysis.

Step 1: Build your local data pipeline

First, let’s create a Python script to act as our ETL (Extract, Transform, Load) pipeline. This script will fetch all pool data for Uniswap v3 on Ethereum and their recent trading history, then load it into a local DuckDB database file. It leverages two key endpoints: the Top Pools on a DEX endpoint to discover pools, and the Pool OHLCV Data endpoint to fetch historical price data.

Create a new file named build_uniswap_db.py.

build_uniswap_db.py
import duckdb
import pandas as pd
from datetime import datetime, timedelta
import logging
import os
import asyncio
import aiohttp
from typing import List, Dict

# --- Configuration ---
API_BASE_URL = "https://api.dexpaprika.com"
NETWORK = "ethereum"
DEX_ID = "uniswap_v3"
HISTORY_DAYS = 14       # Default days of OHLCV data to fetch
DB_FILE = "dbs/uniswap_v3.db"
INTERVAL = "1h"         # 1-hour intervals
OHLCV_API_LIMIT = 100   # Max records per API call
TOP_POOLS_LIMIT = 500   # Focus on top 500 pools by volume
CONCURRENT_REQUESTS = 3 # Number of concurrent API requests
BATCH_SIZE = 15         # Number of pools to process in each batch

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

async def fetch_with_retry(session: aiohttp.ClientSession, url: str, params: Dict = None, retries=5, backoff_factor=1.0) -> Dict:
    """Generic async fetch function with exponential backoff."""
    for attempt in range(retries):
        try:
            async with session.get(url, params=params, timeout=30) as response:
                response.raise_for_status()
                return await response.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == retries - 1:
                logging.error(f"Final attempt failed for {url}: {e}")
                raise
            sleep_time = backoff_factor * (2 ** attempt)
            logging.warning(f"Request to {url} failed: {e}. Retrying in {sleep_time:.2f}s...")
            await asyncio.sleep(sleep_time)
    return {}

async def get_top_dex_pools(session: aiohttp.ClientSession, network: str, dex_id: str) -> List[Dict]:
    """Fetches top pools for a given DEX, handling pagination asynchronously."""
    logging.info(f"Fetching top {TOP_POOLS_LIMIT} pools for {dex_id} on {network}...")
    all_pools = []
    page = 1
    while len(all_pools) < TOP_POOLS_LIMIT:
        url = f"{API_BASE_URL}/networks/{network}/dexes/{dex_id}/pools"
        params = {"page": page, "limit": 100, "order_by": "volume_usd", "sort": "desc"}
        try:
            data = await fetch_with_retry(session, url, params=params)
            pools = data.get('pools', [])
            if not pools:
                break
            all_pools.extend(pools)
            logging.info(f"Fetched page {page}, got {len(pools)} pools. Total: {len(all_pools)}")
            page += 1
            if len(all_pools) >= TOP_POOLS_LIMIT:
                all_pools = all_pools[:TOP_POOLS_LIMIT]
                break
            await asyncio.sleep(0.5) # Be respectful to the API
        except Exception as e:
            logging.error(f"Error fetching page {page} for {dex_id} pools: {e}")
            break
    logging.info(f"Finished fetching pools. Total found: {len(all_pools)}")
    return all_pools

async def get_pool_ohlcv(session: aiohttp.ClientSession, pool_address: str, pool_created_at: str, semaphore: asyncio.Semaphore) -> List[Dict]:
    """
    Fetches 1-hour OHLCV data for a pool using an intelligent date range and dynamic windowing.
    """
    async with semaphore:
        logging.info(f"Fetching OHLCV for pool {pool_address}...")
        final_end_time = datetime.utcnow()
        
        # Use the later of: pool creation date or the default history window
        start_time = final_end_time - timedelta(days=HISTORY_DAYS)
        if pool_created_at:
            try:
                pool_creation = datetime.strptime(pool_created_at, '%Y-%m-%dT%H:%M:%SZ')
                if pool_creation > start_time:
                    start_time = pool_creation
            except (ValueError, TypeError):
                logging.warning(f"Could not parse creation date '{pool_created_at}', using default {HISTORY_DAYS} days.")

        all_ohlcv = []
        current_start_time = start_time
        
        # Calculate how much time each API call can cover
        interval_hours = 1 # Based on "1h" interval
        time_delta_per_call = timedelta(hours=OHLCV_API_LIMIT * interval_hours)
        
        while current_start_time < final_end_time:
            batch_end_time = min(current_start_time + time_delta_per_call, final_end_time)
            url = f"{API_BASE_URL}/networks/{NETWORK}/pools/{pool_address}/ohlcv"
            params = {
                "start": current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                "end": batch_end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                "interval": INTERVAL,
                "limit": OHLCV_API_LIMIT
            }
            try:
                batch_data = await fetch_with_retry(session, url, params=params)
                if batch_data:
                    for record in batch_data:
                        record['network'] = NETWORK
                        record['pool_address'] = pool_address
                        avg_price = (record.get('open', 0) + record.get('close', 0)) / 2
                        record['volume_usd'] = record.get('volume', 0) * avg_price if avg_price > 0 else 0
                    all_ohlcv.extend(batch_data)
            except Exception as e:
                logging.warning(f"Could not fetch OHLCV batch for {pool_address}: {e}")
            
            current_start_time = batch_end_time
            await asyncio.sleep(0.75) # Small delay to be respectful

        logging.info(f"Successfully fetched {len(all_ohlcv)} OHLCV records for {pool_address}")
        return all_ohlcv

async def main():
    """Main ETL function to build the local DuckDB database."""
    os.makedirs("dbs", exist_ok=True)
    
    async with aiohttp.ClientSession() as session:
        pools = await get_top_dex_pools(session, NETWORK, DEX_ID)
        
        all_ohlcv_data = []
        semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
        
        for i in range(0, len(pools), BATCH_SIZE):
            batch = pools[i:i+BATCH_SIZE]
            tasks = [get_pool_ohlcv(session, p.get('id'), p.get('created_at'), semaphore) for p in batch if p.get('id')]
            
            batch_num = (i // BATCH_SIZE) + 1
            total_batches = (len(pools) + BATCH_SIZE - 1) // BATCH_SIZE
            logging.info(f"--- Processing batch {batch_num}/{total_batches} ---")

            results = await asyncio.gather(*tasks)
            for res in results:
                all_ohlcv_data.extend(res)
            
            if i + BATCH_SIZE < len(pools):
                logging.info(f"--- Finished batch {batch_num}, sleeping for 10 seconds ---")
                await asyncio.sleep(10)

    logging.info("ETL process finished. Loading data into DuckDB.")

    con = duckdb.connect(database=DB_FILE, read_only=False)

    if pools:
        for pool in pools:
            tokens = pool.get('tokens', [])
            pool['token0_symbol'] = tokens[0]['symbol'] if len(tokens) > 0 else None
            pool['token1_symbol'] = tokens[1]['symbol'] if len(tokens) > 1 else None
        
        pools_df = pd.DataFrame(pools)
        pools_df = pools_df[['id', 'dex_name', 'volume_usd', 'created_at', 'token0_symbol', 'token1_symbol']]
        pools_df = pools_df.rename(columns={'id': 'address', 'volume_usd': 'volume_24h_usd'})
        con.execute("CREATE OR REPLACE TABLE pools AS SELECT * FROM pools_df")
        logging.info(f"Loaded {len(pools_df)} records into 'pools' table.")

    if all_ohlcv_data:
        ohlcv_df = pd.DataFrame(all_ohlcv_data)
        ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['time_close'])
        ohlcv_df = ohlcv_df[['timestamp', 'network', 'pool_address', 'open', 'high', 'low', 'close', 'volume_usd']]
        con.execute("CREATE OR REPLACE TABLE pool_ohlcv AS SELECT * FROM ohlcv_df")
        logging.info(f"Loaded {len(ohlcv_df)} records into 'pool_ohlcv' table.")

    logging.info("Database build complete. Summary:")
    print(con.execute("SHOW TABLES").fetchdf())
    print("\nPools Sample:")
    print(con.execute("SELECT * FROM pools LIMIT 5").fetchdf())
    print("\nOHLCV Sample:")
    print(con.execute("SELECT * FROM pool_ohlcv ORDER BY timestamp DESC LIMIT 5").fetchdf())
    con.close()

if __name__ == "__main__":
    # Ensure you have the required libraries:
    # pip install requests pandas duckdb aiohttp
    asyncio.run(main())

A simple, sequential script is great for learning, but real-world data fetching requires a more robust approach. Here is what we’ve used to make sure it runs reliably:

  • Asynchronous Operations: By using asyncio and aiohttp, the script can make many API requests concurrently instead of one by one. This means shorter time for completion.
  • Dynamic Windowing: The get_pool_ohlcv function calculates how much data to request per API call so that it gets all the data for each pool.
  • Concurrency Control & Throttling: An asyncio.Semaphore, combined with carefully tuned BATCH_SIZE and asyncio.sleep() calls, makes sure we don’t hit the rate limit.
  • Resiliency: The fetch_with_retry function automatically retries failed requests with an exponential backoff delay, making the pipeline resilient to temporary network issues.

Required libraries

Before running the script, make sure you have the necessary Python libraries installed.

pip install requests pandas duckdb aiohttp

Step 2: Run the pipeline and query with SQL

Now, execute the script from your terminal. It will fetch all Uniswap v3 pool data from Ethereum and their recent trading history, then create a uniswap_v3.db file in a new dbs directory. This may take several minutes, but it will be significantly faster than a purely sequential script.

python build_uniswap_db.py

Querying your new database

Once the script completes, you have a powerful local database at your fingertips. You can now use any SQL client that supports DuckDB, or Python itself, to perform instant, complex analysis. In step 3, we will connect the database to an AI assistant for natural language queries.

If you want to query the database with a Python script, create a new file named query_duckdb.py and paste the following code into it.

query_duckdb.py
import duckdb
import pandas as pd
import time

# Connect to the DuckDB database file
con = duckdb.connect(database='dbs/uniswap_v3.db', read_only=True)

print("=== DuckDB Uniswap v3 Analytics ===\n")

# --- Query 1: Database Summary ---
print("--- 1. Database Summary ---")
pool_count = con.execute("SELECT COUNT(*) FROM pools").fetchone()[0]
ohlcv_count = con.execute("SELECT COUNT(*) FROM pool_ohlcv").fetchone()[0]
print(f"Total Pools Loaded: {pool_count}")
print(f"Total OHLCV Records: {ohlcv_count:,}\n")

# --- Query 2: Top 10 Pools by 24h Volume ---
print("--- 2. Top 10 Pools by 24h Volume ---")
start_time = time.time()
top_pools_df = con.execute("""
    SELECT 
        address,
        token0_symbol,
        token1_symbol,
        volume_24h_usd
    FROM pools
    ORDER BY volume_24h_usd DESC
    LIMIT 10
""").fetchdf()
print(top_pools_df)
print(f"Query executed in {time.time() - start_time:.4f} seconds\n")


# --- Query 3: Peak Trading Hours ---
print("--- 3. Peak Trading Hours (UTC) ---")
start_time = time.time()
hourly_volume_df = con.execute("""
    SELECT 
        EXTRACT(hour FROM timestamp) AS hour_of_day,
        SUM(volume_usd) AS total_volume_usd
    FROM pool_ohlcv
    WHERE volume_usd > 0 AND volume_usd < 1000000000 -- Defensive filter against outliers
    GROUP BY hour_of_day
    ORDER BY total_volume_usd DESC
""").fetchdf()

# Format the volume for better readability
hourly_volume_df['total_volume_usd'] = hourly_volume_df['total_volume_usd'].map('${:,.2f}'.format)

print(hourly_volume_df)
print(f"Query executed in {time.time() - start_time:.4f} seconds\n")


con.close()

Now, execute the script from your terminal:

python query_duckdb.py

Step 3: AI-powered analysis with an MCP server

While you can use any SQL client to query your database, the real power comes from connecting it to an AI assistant. By using a Model Context Protocol (MCP) server, you can enable your assistant to directly query the uniswap_v3.db file you created. This allows you to ask for insights in plain English instead of writing SQL.

For this, we will use mcp-server-duckdb, an open-source MCP server for DuckDB.

Install the DuckDB MCP server

You can install the server easily using npx:

npx -y @smithery/cli install mcp-server-duckdb --client claude

Configure your AI assistant

Next, you need to tell your AI assistant how to run the server. Add the following to your claude_desktop_config.json file.

If you see a “Server disconnected” error after restarting your AI assistant, it means the application cannot find the uvx or npx command. This happens because the application doesn’t share the same PATH environment variable as your terminal.

To fix this, you must use the full, absolute path to the command.

  1. Find the absolute path by running which uvx or which npx in your terminal.
  2. Copy the output (e.g., /Users/yourname/.local/bin/uvx or /opt/homebrew/bin/npx).
  3. Use that full path as the command value in the JSON configuration below.

The example below uses uvx, which is recommended. Make sure to replace </path/to/your/project> with the actual absolute path to your project directory.

{
  "mcpServers": {
    "duckdb-crypto": {
      "command": "/Users/<yourname>/.local/bin/uvx",
      "args": [
        "mcp-server-duckdb",
        "--db-path",
        "</path/to/your/project>/dbs/uniswap_v3.db",
        "--readonly"
      ]
    }
  }
}

Now, when you start your AI assistant, it will have the tools to query your local Uniswap V3 database. You can ask it things like:

  • “Using the duckdb-crypto tool, find the 5 pools with the highest 24-hour volume.”
  • “What was the hourly volatility for the top pool yesterday?”

What you’ve built: From API calls to analytics powerhouse

By completing this tutorial, you have successfully transitioned from being a passive data consumer to an active data analyst. You’ve replaced the slow, restrictive pattern of making individual API calls with a fast, powerful, and scalable local analytics workflow.

Key achievements:

  • Built a professional ETL pipeline: You have a reusable, high-performance Python script that can create a comprehensive local database from any supported DEX and network.
  • Unlocked high-speed SQL: You can now perform complex analytical queries on a rich dataset in milliseconds, directly on your machine.
  • Mastered a foundational workflow: This “local-first” data strategy is a cornerstone of professional data analysis. It enables deeper exploration, from high-level market trends down to individual wallet behaviors.
  • Created a Reusable Asset: Your uniswap_v3.db file is a valuable, reusable asset for any future analysis, dashboarding, or AI integration project.

When your project grows and you need to explore other data solutions, check out our full list of API Tutorials for more advanced guides.