Async Python for Data Engineering: When and How to Use It
Fetching 500 API endpoints sequentially takes 500x longer than it needs to. Async Python solves I/O-bound bottlenecks that threading and multiprocessing cannot address as cleanly.
Data pipelines spend a lot of time waiting. Waiting for HTTP responses, waiting for database queries to return, waiting for S3 reads to complete. In synchronous Python, each wait blocks the entire process. In async Python, the process can move on to other work while waiting, which means you can run dozens or hundreds of concurrent operations with a single thread.
This guide covers the async mental model, practical patterns for concurrent API ingestion, rate limiting, and error handling, and the honest assessment of when async is not the right tool.
The Async Mental Model
Async Python uses cooperative multitasking: coroutines voluntarily yield control when they are waiting for I/O, allowing other coroutines to run. This is different from threading (OS-managed preemptive switching) and multiprocessing (separate processes).
import asyncio
import aiohttp
import time
# Synchronous: 10 requests take 10 * response_time
def fetch_sync(urls: list[str]) -> list[dict]:
results = []
for url in urls:
response = requests.get(url) # blocks here
results.append(response.json())
return results
# Async: 10 requests take ~1 * response_time (concurrent)
async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
# Run the async code
results = asyncio.run(fetch_all(urls))The key primitives: async def defines a coroutine. await yields control while waiting. asyncio.gather() runs multiple coroutines concurrently and waits for all of them to complete. asyncio.run() is the entry point that creates an event loop and runs a coroutine.
Concurrent API Ingestion with Rate Limiting
The most common async pattern in data engineering is fetching data from many API endpoints concurrently. Without rate limiting, this will exceed most API quotas immediately. Use a semaphore to limit concurrency.
import asyncio
import aiohttp
from typing import Optional
import logging
logger = logging.getLogger(__name__)
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
max_retries: int = 3,
backoff_base: float = 1.0
) -> Optional[dict]:
"""Fetch a URL with semaphore-limited concurrency and exponential backoff."""
async with semaphore: # limits concurrent requests
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status == 429: # rate limited
retry_after = int(resp.headers.get('Retry-After', 60))
logger.warning(f"Rate limited, waiting {retry_after}s")
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
if attempt == max_retries - 1:
logger.error(f"Failed after {max_retries} attempts: {url} — {e}")
return None
delay = backoff_base * (2 ** attempt)
await asyncio.sleep(delay)
return None
async def ingest_api_endpoints(
urls: list[str],
max_concurrent: int = 20
) -> list[dict]:
"""Fetch all endpoints with controlled concurrency."""
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_retry(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks, return_exceptions=False)
# Filter out None results (failed fetches)
successful = [r for r in results if r is not None]
logger.info(f"Fetched {len(successful)}/{len(urls)} endpoints successfully")
return successfulBatching with asyncio.gather
For very large sets of URLs, launching all tasks at once and relying solely on a semaphore can accumulate too many coroutines in memory. Batching provides a cleaner alternative.
async def fetch_in_batches(
urls: list[str],
batch_size: int = 50,
delay_between_batches: float = 0.5
) -> list[dict]:
"""Process URLs in batches with a delay between batches."""
all_results = []
async with aiohttp.ClientSession() as session:
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
semaphore = asyncio.Semaphore(batch_size)
tasks = [fetch_with_retry(session, url, semaphore) for url in batch]
batch_results = await asyncio.gather(*tasks)
all_results.extend(r for r in batch_results if r is not None)
if i + batch_size < len(urls):
await asyncio.sleep(delay_between_batches)
return all_resultsAsync Database Operations
Standard database drivers (psycopg2, sqlite3) are synchronous and block the event loop when used in async code. Use async drivers instead.
import asyncpg # async PostgreSQL driver
import asyncio
async def batch_insert(records: list[dict], dsn: str) -> int:
"""Insert records concurrently using connection pool."""
pool = await asyncpg.create_pool(dsn, min_size=5, max_size=20)
async def insert_one(record: dict) -> bool:
async with pool.acquire() as conn:
await conn.execute("""
INSERT INTO events (event_id, user_id, amount, event_date)
VALUES ($1, $2, $3, $4)
ON CONFLICT (event_id) DO UPDATE SET
amount = EXCLUDED.amount
""", record['event_id'], record['user_id'],
record['amount'], record['event_date'])
return True
tasks = [insert_one(r) for r in records]
results = await asyncio.gather(*tasks, return_exceptions=True)
await pool.close()
successes = sum(1 for r in results if r is True)
return successesIntegrating Async into Dagster
Dagster assets are synchronous by default. Running async code inside a Dagster asset requires asyncio.run().
from dagster import asset
import asyncio
@asset
def api_ingestion_results(context) -> list[dict]:
"""Sync Dagster asset wrapping async fetch logic."""
urls = generate_api_urls(context)
context.log.info(f"Fetching {len(urls)} endpoints concurrently")
results = asyncio.run(fetch_in_batches(urls, batch_size=50))
context.log.info(f"Fetched {len(results)} records")
return resultsWhen Async Is Not the Right Tool
Async helps with I/O-bound work: HTTP requests, database queries, file reads. It does not help with CPU-bound work: data transformation, compression, encryption, mathematical computation. For CPU-bound work, use multiprocessing or Spark.
Async adds complexity. The code is harder to read than synchronous code, exceptions propagate differently, and debugging requires understanding the event loop. For a pipeline that makes a handful of API calls, the concurrency gain does not justify the complexity overhead. Use async when you genuinely need to make dozens or hundreds of concurrent I/O calls.
Also avoid async when the downstream system cannot handle the concurrency. A database with 20 connections cannot benefit from 100 concurrent async queries. The connection pool becomes the bottleneck, and you have added complexity without gaining performance. Profile first, optimize second.
For the cases where async applies well (large-scale API ingestion, concurrent file uploads, parallel database reads), the performance improvement is dramatic. A pipeline that fetches 1,000 endpoints sequentially and takes 16 minutes can be reduced to under a minute with well-structured async concurrency. That improvement is real and worth the engineering investment when the use case is right.
Found this useful? Share it: