> ## Documentation Index
> Fetch the complete documentation index at: https://docs.dexpaprika.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Local crypto analytics with DuckDB

> Build a powerful, local crypto analytics database with DuckDB. This tutorial guides you through creating an ETL pipeline for Uniswap v3 data to run complex SQL queries instantly.

## Tutorial overview

Create a local DuckDB analytics database from DEX pools and OHLCV for instant SQL over on‑chain data.

## The local powerhouse: A core pattern for on-chain analysis

Why make thousands of slow, rate-limited API calls when you can run complex SQL queries instantly on your own machine? This tutorial introduces the most effective pattern for crypto data analysis: creating a local, high-performance copy of a complete on-chain dataset. By fetching the data once, you unlock unlimited, high-speed analytical capabilities.

<Note>
  Looking for other analytics solutions? Check out our full list of [API Tutorials](/tutorials/tutorial_intro) for more step-by-step guides.
</Note>

Our "free tier" isn't about delayed or incomplete data; it's about providing a **full, live, but scope-limited dataset** for you to master. We're giving you the tools, but it's up to you to use them in the way that makes the most sense for your project.

```mermaid theme={null}
graph TD;
    subgraph "Step 1: Data Pipeline";
        A["Run Python ETL Script"] --> B{"Fetches Pool & OHLCV Data"};
        B --> C["DexPaprika API"];
        C --> B;
        B --> D["Local DuckDB File<br/>(uniswap_v3.db)"];
    end

    subgraph "Step 2 & 3: Analysis";
        D --> E{"Query the Database"};
        E --> F["<b>Option A:</b><br/>Directly with SQL"];
        E --> G["<b>Option B:</b><br/>AI Assistant via MCP"];
    end
```

**The goal:**
By the end of this guide, you will have a local `uniswap_v3.db` file containing all pools and their recent trading history from Uniswap v3 on Ethereum. You will be able to:

1. Run a robust, high-performance ETL script that pulls a complete dataset from the DexPaprika API.
2. Perform complex SQL queries on this data instantly, without rate limits.
3. Understand a professional workflow for acquiring and analyzing on-chain data.

**Why this is a foundational skill:**

* **Eliminates Rate Limiting:** Instead of thousands of small, repetitive API calls, you perform one efficient batch download.
* **Unlocks True Analytical Power:** Run complex joins, aggregations, and window functions that are impossible with a simple API.
* **Creates a Foundation:** The skills you learn here can be applied to any data source, preparing you for more advanced, real-time analysis.

<CardGroup cols={3}>
  <Card title="Step 1: ETL pipeline" icon="code" href="#step-1-build-your-local-data-pipeline">
    Create a Python script to fetch complete Uniswap v3 pool and OHLCV data.
  </Card>

  <Card title="Step 2: Instant SQL analysis" icon="chart-simple" href="#step-2-run-the-pipeline-and-query-with-sql">
    Populate your database and run complex SQL queries to find insights.
  </Card>

  <Card title="Step 3: AI-powered analysis with an MCP server" icon="robot" href="#step-3-ai-powered-analysis-with-an-mcp-server">
    Connect your database to an AI assistant for natural language queries.
  </Card>
</CardGroup>

### FAQs

<AccordionGroup>
  <Accordion title="Why use DuckDB for crypto analytics?">
    It’s an embedded analytics database--zero server setup, very fast columnar engine, perfect for local SQL over large datasets.
  </Accordion>

  <Accordion title="How big can the local DB get?">
    Depends on pools and history length; tens of millions of rows are feasible on a laptop with DuckDB’s compression.
  </Accordion>

  <Accordion title="How often should I refresh data?">
    Append new OHLCV daily or hourly depending on your use case; the pipeline is designed for incremental runs.
  </Accordion>

  <Accordion title="Can I join multiple networks or DEXes?">
    Yes--create separate tables per network/DEX and union them or annotate rows with `network`/`dex` columns.
  </Accordion>
</AccordionGroup>

<script type="application/ld+json">
  {JSON.stringify({
      "@context": "https://schema.org",
      "@type": "FAQPage",
      "mainEntity": [
        {"@type": "Question","name": "Why use DuckDB for crypto analytics?","acceptedAnswer": {"@type": "Answer","text": "Embedded, fast columnar engine ideal for local analytics."}},
        {"@type": "Question","name": "How big can the local DB get?","acceptedAnswer": {"@type": "Answer","text": "Tens of millions of rows are feasible on a laptop with compression."}},
        {"@type": "Question","name": "How often should I refresh data?","acceptedAnswer": {"@type": "Answer","text": "Append new OHLCV daily or hourly; pipeline supports incremental runs."}},
        {"@type": "Question","name": "Can I join multiple networks or DEXes?","acceptedAnswer": {"@type": "Answer","text": "Yes--separate tables per network/DEX; union or annotate rows with network/dex."}}
      ]
    })}
</script>

***

## Step 1: Build your local data pipeline

First, let's create a Python script to act as our ETL (Extract, Transform, Load) pipeline. This script will fetch all pool data for **Uniswap v3 on Ethereum** and their recent trading history, then load it into a local DuckDB database file. It leverages two key endpoints: the [Top Pools on a DEX endpoint](/api-reference/pools/get-top-x-pools-on-a-networks-dex) to discover pools, and the [Pool OHLCV Data endpoint](/api-reference/pools/get-ohlcv-data-for-a-pool-pair) to fetch historical price data.

Create a new file named `build_uniswap_db.py`.

```python build_uniswap_db.py [expandable] theme={null}
import duckdb
import pandas as pd
from datetime import datetime, timedelta
import logging
import os
import asyncio
import aiohttp
from typing import List, Dict

# --- Configuration ---
API_BASE_URL = "https://api.dexpaprika.com"
NETWORK = "ethereum"
DEX_ID = "uniswap_v3"
HISTORY_DAYS = 14       # Default days of OHLCV data to fetch
DB_FILE = "dbs/uniswap_v3.db"
INTERVAL = "1h"         # 1-hour intervals
OHLCV_API_LIMIT = 100   # Max records per API call
TOP_POOLS_LIMIT = 500   # Focus on top 500 pools by volume
CONCURRENT_REQUESTS = 3 # Number of concurrent API requests
BATCH_SIZE = 15         # Number of pools to process in each batch

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

async def fetch_with_retry(session: aiohttp.ClientSession, url: str, params: Dict = None, retries=5, backoff_factor=1.0) -> Dict:
    """Generic async fetch function with exponential backoff."""
    for attempt in range(retries):
        try:
            async with session.get(url, params=params, timeout=30) as response:
                response.raise_for_status()
                return await response.json()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == retries - 1:
                logging.error(f"Final attempt failed for {url}: {e}")
                raise
            sleep_time = backoff_factor * (2 ** attempt)
            logging.warning(f"Request to {url} failed: {e}. Retrying in {sleep_time:.2f}s...")
            await asyncio.sleep(sleep_time)
    return {}

async def get_top_dex_pools(session: aiohttp.ClientSession, network: str, dex_id: str) -> List[Dict]:
    """Fetches top pools for a given DEX, handling pagination asynchronously."""
    logging.info(f"Fetching top {TOP_POOLS_LIMIT} pools for {dex_id} on {network}...")
    all_pools = []
    page = 1
    while len(all_pools) < TOP_POOLS_LIMIT:
        url = f"{API_BASE_URL}/networks/{network}/dexes/{dex_id}/pools"
        params = {"page": page, "limit": 100, "order_by": "volume_usd", "sort": "desc"}
        try:
            data = await fetch_with_retry(session, url, params=params)
            pools = data.get('pools', [])
            if not pools:
                break
            all_pools.extend(pools)
            logging.info(f"Fetched page {page}, got {len(pools)} pools. Total: {len(all_pools)}")
            page += 1
            if len(all_pools) >= TOP_POOLS_LIMIT:
                all_pools = all_pools[:TOP_POOLS_LIMIT]
                break
            await asyncio.sleep(0.5) # Be respectful to the API
        except Exception as e:
            logging.error(f"Error fetching page {page} for {dex_id} pools: {e}")
            break
    logging.info(f"Finished fetching pools. Total found: {len(all_pools)}")
    return all_pools

async def get_pool_ohlcv(session: aiohttp.ClientSession, pool_address: str, pool_created_at: str, semaphore: asyncio.Semaphore) -> List[Dict]:
    """
    Fetches 1-hour OHLCV data for a pool using an intelligent date range and dynamic windowing.
    """
    async with semaphore:
        logging.info(f"Fetching OHLCV for pool {pool_address}...")
        final_end_time = datetime.utcnow()
        
        # Use the later of: pool creation date or the default history window
        start_time = final_end_time - timedelta(days=HISTORY_DAYS)
        if pool_created_at:
            try:
                pool_creation = datetime.strptime(pool_created_at, '%Y-%m-%dT%H:%M:%SZ')
                if pool_creation > start_time:
                    start_time = pool_creation
            except (ValueError, TypeError):
                logging.warning(f"Could not parse creation date '{pool_created_at}', using default {HISTORY_DAYS} days.")

        all_ohlcv = []
        current_start_time = start_time
        
        # Calculate how much time each API call can cover
        interval_hours = 1 # Based on "1h" interval
        time_delta_per_call = timedelta(hours=OHLCV_API_LIMIT * interval_hours)
        
        while current_start_time < final_end_time:
            batch_end_time = min(current_start_time + time_delta_per_call, final_end_time)
            url = f"{API_BASE_URL}/networks/{NETWORK}/pools/{pool_address}/ohlcv"
            params = {
                "start": current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                "end": batch_end_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                "interval": INTERVAL,
                "limit": OHLCV_API_LIMIT
            }
            try:
                batch_data = await fetch_with_retry(session, url, params=params)
                if batch_data:
                    for record in batch_data:
                        record['network'] = NETWORK
                        record['pool_address'] = pool_address
                        avg_price = (record.get('open', 0) + record.get('close', 0)) / 2
                        record['volume_usd'] = record.get('volume', 0) * avg_price if avg_price > 0 else 0
                    all_ohlcv.extend(batch_data)
            except Exception as e:
                logging.warning(f"Could not fetch OHLCV batch for {pool_address}: {e}")
            
            current_start_time = batch_end_time
            await asyncio.sleep(0.75) # Small delay to be respectful

        logging.info(f"Successfully fetched {len(all_ohlcv)} OHLCV records for {pool_address}")
        return all_ohlcv

async def main():
    """Main ETL function to build the local DuckDB database."""
    os.makedirs("dbs", exist_ok=True)
    
    async with aiohttp.ClientSession() as session:
        pools = await get_top_dex_pools(session, NETWORK, DEX_ID)
        
        all_ohlcv_data = []
        semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
        
        for i in range(0, len(pools), BATCH_SIZE):
            batch = pools[i:i+BATCH_SIZE]
            tasks = [get_pool_ohlcv(session, p.get('id'), p.get('created_at'), semaphore) for p in batch if p.get('id')]
            
            batch_num = (i // BATCH_SIZE) + 1
            total_batches = (len(pools) + BATCH_SIZE - 1) // BATCH_SIZE
            logging.info(f"--- Processing batch {batch_num}/{total_batches} ---")

            results = await asyncio.gather(*tasks)
            for res in results:
                all_ohlcv_data.extend(res)
            
            if i + BATCH_SIZE < len(pools):
                logging.info(f"--- Finished batch {batch_num}, sleeping for 10 seconds ---")
                await asyncio.sleep(10)

    logging.info("ETL process finished. Loading data into DuckDB.")

    con = duckdb.connect(database=DB_FILE, read_only=False)

    if pools:
        for pool in pools:
            tokens = pool.get('tokens', [])
            pool['token0_symbol'] = tokens[0]['symbol'] if len(tokens) > 0 else None
            pool['token1_symbol'] = tokens[1]['symbol'] if len(tokens) > 1 else None
        
        pools_df = pd.DataFrame(pools)
        pools_df = pools_df[['id', 'dex_name', 'volume_usd', 'created_at', 'token0_symbol', 'token1_symbol']]
        pools_df = pools_df.rename(columns={'id': 'address', 'volume_usd': 'volume_24h_usd'})
        con.execute("CREATE OR REPLACE TABLE pools AS SELECT * FROM pools_df")
        logging.info(f"Loaded {len(pools_df)} records into 'pools' table.")

    if all_ohlcv_data:
        ohlcv_df = pd.DataFrame(all_ohlcv_data)
        ohlcv_df['timestamp'] = pd.to_datetime(ohlcv_df['time_close'])
        ohlcv_df = ohlcv_df[['timestamp', 'network', 'pool_address', 'open', 'high', 'low', 'close', 'volume_usd']]
        con.execute("CREATE OR REPLACE TABLE pool_ohlcv AS SELECT * FROM ohlcv_df")
        logging.info(f"Loaded {len(ohlcv_df)} records into 'pool_ohlcv' table.")

    logging.info("Database build complete. Summary:")
    print(con.execute("SHOW TABLES").fetchdf())
    print("\nPools Sample:")
    print(con.execute("SELECT * FROM pools LIMIT 5").fetchdf())
    print("\nOHLCV Sample:")
    print(con.execute("SELECT * FROM pool_ohlcv ORDER BY timestamp DESC LIMIT 5").fetchdf())
    con.close()

if __name__ == "__main__":
    # Ensure you have the required libraries:
    # pip install requests pandas duckdb aiohttp
    asyncio.run(main())
```

<Note title="Key insight: Building a robust ETL pipeline">
  A simple, sequential script is great for learning, but real-world data fetching requires a more robust approach. Here is what we've used to make sure it runs reliably:

  * **Asynchronous Operations:** By using `asyncio` and `aiohttp`, the script can make many API requests concurrently instead of one by one. This means shorter time for completion.
  * **Dynamic Windowing:** The `get_pool_ohlcv` function calculates how much data to request per API call so that it gets all the data for each pool.
  * **Concurrency Control & Throttling:** An `asyncio.Semaphore`, combined with carefully tuned `BATCH_SIZE` and `asyncio.sleep()` calls, makes sure we don't hit the rate limit.
  * **Resiliency:** The `fetch_with_retry` function automatically retries failed requests with an exponential backoff delay, making the pipeline resilient to temporary network issues.
</Note>

### **Required libraries**

Before running the script, make sure you have the necessary Python libraries installed.

```bash theme={null}
pip install requests pandas duckdb aiohttp
```

***

## Step 2: Run the pipeline and query with SQL

Now, execute the script from your terminal. It will fetch all Uniswap v3 pool data from Ethereum and their recent trading history, then create a `uniswap_v3.db` file in a new `dbs` directory. This may take several minutes, but it will be significantly faster than a purely sequential script.

```bash theme={null}
python build_uniswap_db.py
```

### **Querying your new database**

Once the script completes, you have a powerful local database at your fingertips. You can now use any SQL client that supports DuckDB, or Python itself, to perform instant, complex analysis. In step 3, we will connect the database to an AI assistant for natural language queries.

If you want to query the database with a Python script, create a new file named `query_duckdb.py` and paste the following code into it.

```python query_duckdb.py [expandable] theme={null}
import duckdb
import pandas as pd
import time

# Connect to the DuckDB database file
con = duckdb.connect(database='dbs/uniswap_v3.db', read_only=True)

print("=== DuckDB Uniswap v3 Analytics ===\n")

# --- Query 1: Database Summary ---
print("--- 1. Database Summary ---")
pool_count = con.execute("SELECT COUNT(*) FROM pools").fetchone()[0]
ohlcv_count = con.execute("SELECT COUNT(*) FROM pool_ohlcv").fetchone()[0]
print(f"Total Pools Loaded: {pool_count}")
print(f"Total OHLCV Records: {ohlcv_count:,}\n")

# --- Query 2: Top 10 Pools by 24h Volume ---
print("--- 2. Top 10 Pools by 24h Volume ---")
start_time = time.time()
top_pools_df = con.execute("""
    SELECT 
        address,
        token0_symbol,
        token1_symbol,
        volume_24h_usd
    FROM pools
    ORDER BY volume_24h_usd DESC
    LIMIT 10
""").fetchdf()
print(top_pools_df)
print(f"Query executed in {time.time() - start_time:.4f} seconds\n")


# --- Query 3: Peak Trading Hours ---
print("--- 3. Peak Trading Hours (UTC) ---")
start_time = time.time()
hourly_volume_df = con.execute("""
    SELECT 
        EXTRACT(hour FROM timestamp) AS hour_of_day,
        SUM(volume_usd) AS total_volume_usd
    FROM pool_ohlcv
    WHERE volume_usd > 0 AND volume_usd < 1000000000 -- Defensive filter against outliers
    GROUP BY hour_of_day
    ORDER BY total_volume_usd DESC
""").fetchdf()

# Format the volume for better readability
hourly_volume_df['total_volume_usd'] = hourly_volume_df['total_volume_usd'].map('${:,.2f}'.format)

print(hourly_volume_df)
print(f"Query executed in {time.time() - start_time:.4f} seconds\n")


con.close()
```

Now, execute the script from your terminal:

```bash theme={null}
python query_duckdb.py
```

***

## Step 3: AI-powered analysis with an MCP server

While you can use any SQL client to query your database, the real power comes from connecting it to an AI assistant. By using a Model Context Protocol (MCP) server, you can enable your assistant to directly query the `uniswap_v3.db` file you created. This allows you to ask for insights in plain English instead of writing SQL.

For this, we will use `mcp-server-duckdb`, an open-source MCP server for DuckDB.

### **Install the DuckDB MCP server**

You can install the server easily using `npx`:

```bash theme={null}
npx -y @smithery/cli install mcp-server-duckdb --client claude
```

### **Configure your AI assistant**

Next, you need to tell your AI assistant how to run the server. Add the following to your `claude_desktop_config.json` file.

<Note title="Troubleshooting: 'Server disconnected' error">
  If you see a "Server disconnected" error after restarting your AI assistant, it means the application cannot find the `uvx` or `npx` command. This happens because the application doesn't share the same `PATH` environment variable as your terminal.

  **To fix this, you must use the full, absolute path to the command.**

  1. Find the absolute path by running `which uvx` or `which npx` in your terminal.
  2. Copy the output (e.g., `/Users/yourname/.local/bin/uvx` or `/opt/homebrew/bin/npx`).
  3. Use that full path as the `command` value in the JSON configuration below.
</Note>

The example below uses `uvx`, which is recommended. Make sure to replace `</path/to/your/project>` with the actual absolute path to your project directory.

```json theme={null}
{
  "mcpServers": {
    "duckdb-crypto": {
      "command": "/Users/<yourname>/.local/bin/uvx",
      "args": [
        "mcp-server-duckdb",
        "--db-path",
        "</path/to/your/project>/dbs/uniswap_v3.db",
        "--readonly"
      ]
    }
  }
}
```

Now, when you start your AI assistant, it will have the tools to query your local Uniswap V3 database. You can ask it things like:

* *"Using the duckdb-crypto tool, find the 5 pools with the highest 24-hour volume."*
* *"What was the hourly volatility for the top pool yesterday?"*

***

## What you've built: From API calls to analytics powerhouse

By completing this tutorial, you have successfully transitioned from being a passive data consumer to an active data analyst. You've replaced the slow, restrictive pattern of making individual API calls with a fast, powerful, and scalable local analytics workflow.

**Key achievements:**

* **Built a professional ETL pipeline:** You have a reusable, high-performance Python script that can create a comprehensive local database from any supported DEX and network.
* **Unlocked high-speed SQL:** You can now perform complex analytical queries on a rich dataset in milliseconds, directly on your machine.
* **Mastered a foundational workflow:** This "local-first" data strategy is a cornerstone of professional data analysis. It enables deeper exploration, from high-level market trends down to individual wallet behaviors.
* **Created a Reusable Asset:** Your `uniswap_v3.db` file is a valuable, reusable asset for any future analysis, dashboarding, or AI integration project.

When your project grows and you need to explore other data solutions, check out our full list of [API Tutorials](/tutorials/tutorial_intro) for more advanced guides.
