How I Fixed a Slow Data Pipeline with Python Async – A Hands-On Guide
Hey there!
A couple of weeks ago I hit a classic data-engineering wall. I had a pipeline that needed to:
- Get a list of library IDs from an API
- For every single ID, call another endpoint to fetch the full library details
- Do some transformation, upload them to a database.
- Run a setup API which calculates embeddings and store new data in another database.
This process has to run for about 3K libraries data. if the whole process takes 1 seconds it takes about 2 days. The CPU was sitting idle 99% of the time just waiting for HTTP responses. Sound familiar?
So I rewrote it with Python’s asyncio and the runtime dropped to just about 30 minutes. That’s a ~100× speedup with almost zero extra hardware. Magic? Nope — just proper async.
Today I want to walk you through exactly how I did it, and along the way teach you the async tools that every data engineer should have in their toolbox:
- async / await basics
- asyncio.gather for fire-and-forget concurrency
- asyncio.Semaphore for rate limiting and resource protection
- asyncio.Queue for producer-consumer pipelines
Let’s go!
1. The Absolute Basics – What Does “async” Actually Mean in Python?
Think of the Python event loop as a super-efficient chef in a kitchen with only one pair of hands (single-threaded).
In synchronous code the chef does:
- Put pasta in boiling water
- Stand there staring at the pot for 10 minutes (blocked!)
- Drain pasta
- Start sauce
In async code the chef does:
- Put pasta in water
- While waiting → chop vegetables, set the table, answer the phone
- When timer dings → instantly come back and drain pasta
The magic words are:
async def cook_pasta(): # "Hey Python, I’m a coroutine"
print("Start boiling")
await asyncio.sleep(10) # "I’m waiting for I/O – do something else!"
print("Pasta ready")
- async def: marks a function as a coroutine
- await: “I’m happy to pause here and let other tasks run”
Important: await can only be used inside an async def function.
2. Running Many Requests at Once asyncio.gather
Here’s the first version that already gave me ~10× speedup:
import asyncio
import aiohttp
async def fetch_library(session, library_id):
url = f"https://api.example.com/libraries/{library_id}"
async with session.get(url) as resp:
return await resp.json()
async def main():
async with aiohttp.ClientSession() as session:
# Suppose we already have the list of IDs
library_ids = [1, 2, 3, ..., 3000]
# Fire all requests almost simultaneously!
tasks = [fetch_library(session, id) for id in library_ids]
libraries = await asyncio.gather(*tasks) # magic happens here
print(f"Fetched {len(libraries)} libraries")
asyncio.run(main())
asyncio.gather waits for all coroutines to finish but they all start more or less at the same time so the total time is roughly the time of the slowest request, not the sum.
3. Be Nice to the API – Semaphore (Rate Limiting)
The API started returning 429 Too Many Requests when I went full throttle.
Enter asyncio.Semaphore — your polite bouncer at the door.
MAX_CONCURRENT = 30 # safe for this API
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def fetch_library(session, library_id):
async with semaphore: # only 30 can enter at once
async with session.get(url) as resp:
if resp.status == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
return await fetch_library(session, library_id) # retry
return await resp.json()
Now we never send more than 30 requests at the same time so we have happy API, no bans.
You can have different semaphores for different parts of the pipeline (e.g. 100 for S3 uploads, 5 for PostgreSQL inserts, 1 for Elasticsearch indexing).
4. Producer-Consumer Pattern – asyncio.Queue
My pipeline looks like this:
Stage 1: many parallel requests to get list of IDs (fast)
Stage 2: many parallel detail requests (still pretty fast)
Stage 3: heavy transformation + write to Elasticsearch (slow & needs to be ordered)
If stage 2 produces data faster than stage 3 can consume it, we either:
- run out of memory (if we keep everything in a list), or
- lose concurrency (if we wait synchronously)
The perfect solution is asyncio.Queue:
async def producer(queue, library_ids):
semaphore = asyncio.Semaphore(50)
async with aiohttp.ClientSession() as session:
for id_batch in chunked(library_ids, 50):
tasks = [fetch_library(session, id, semaphore) for id in id_batch]
results = await asyncio.gather(*tasks)
for data in results:
await queue.put(data) # fire and forget
async def consumer(queue):
semaphore = asyncio.Semaphore(1)
while True:
library_data = await queue.get() # waits cooperatively if empty
async with semaphore:
await transform_and_setup(library_data)
queue.task_done() # super important!
async def main():
queue = asyncio.Queue(maxsize=1000) # optional backpressure
# Start consumer(s) — you can have several if safe
consumers = [asyncio.create_task(consumer(queue)) for _ in range(4)]
# Start producer
await producer(queue, all_library_ids)
# Wait until the queue is fully processed
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
Key points:
- await queue.put(item): never blocks the event loop
- await queue.get(): yields if queue is empty
- queue.task_done(): tells the queue “I finished processing this item
- await queue.join(): waits until all queued items are marked as done
This pattern saved my life (and my RAM).
Consider I use Semaphore(1) for setup consumer, because in my pipeline, running more than one setup could cause a race condition, But this didin’t stop the uploading process.
Final Thoughts
Asyncio isn’t black magic — it’s just cooperative multitasking that shines when you’re I/O-bound (APIs, databases, S3, message queues, etc.). It’s built into Python 3.7+, the ecosystem (aiohttp, aioredis, asyncpg, httpx, etc.) is mature, and once you get the mental model, your pipelines will feel like they got a rocket booster.
And yes — the same idea exists in other languages (JavaScript Promises/async-await, Go goroutines, Rust async/await, etc.).
So next time your pipeline is “just waiting”… you know what to do.
Happy (async) coding! Feel free to drop a comment if you want the template GitHub repo of my library-ingestion pipeline or a video about it.
P.S. 30 Minutes vs 2 Days. Still makes me smile every time the cron runs. 😄
