چطور یک پایپلاین دیتای کند رو با Python Async درست کردم – راهنمای عملی
سلام به همه!
چند هفته پیش دقیقاً همون چالشی که سد راه هر مهندس دادهای میشه، جلوم رو گرفت. پایپلاینی داشتم که باید این کارها رو انجام میداد:
- لیست آیدی کتابخانهها رو از یه API میگرفت
- برای هر آیدی، یه درخواست جدا به یه اندپوینت دیگه میزدم تا جزئیات کاملش رو بگیرم
- یه مقدار تبدیل و پردازش میکردم و توی دیتابیس میریختم
- در نهایت یه API دیگه رو صدا میزدم که امبدینگها رو حساب کنه و دادههای جدید رو توی دیتابیس دیگه ذخیره کنه
حدود ۳۰۰۰ تا کتابخانه داشتم. اگر هر کتابخانه فقط ۱ ثانیه طول میکشید، کل فرآیند نزدیک ۲ روز طول میکشید! جالب اینجاست که CPU تقریباً ۹۹٪ وقتش بیکار بود و فقط منتظر جواب HTTP بود. آشنا نیست؟
خب، کلش رو با asyncio پایتون بازنویسی کردم و زمان اجرا از حدود ۲ روز اومد پایین به حدود ۳۰ دقیقه. یعنی تقریباً ۱۰۰ برابر سریعتر، بدون هیچ سختافزار اضافهای. کار خاصی کردم؟ نه، فقط استفاده درست از async بود.
امروز میخوام قدم به قدم دقیقاً همون کاری که کردم رو براتون توضیح بدم و تو راه، ابزارهای async رو که هر دیتا اینجینیری باید بلد باشه بهتون یاد بدم:
- مفاهیم پایه async / await
- asyncio.gather برای اجرای همزمان خیلی از درخواستها
- asyncio.Semaphore برای محدود کردن نرخ درخواست و برخورد خوب با API
- asyncio.Queue برای الگوی تولیدکننده-مصرفکننده
بریم شروع کنیم!
۱. اصلاً async توی پایتون یعنی چی؟
فرض کن لوپ رویداد پایتون یه آشپز فوقالعاده کاربلده که فقط یه جفت دست داره (single-threaded).
توی کد معمولی (همزمان) آشپز این کارا رو میکنه:
- پاستا رو میذاره تو آب جوش
- ۱۰ دقیقه وایمیسته جلوی قابلمه و زل میزنه بهش (بلاک شده!)
- بعد آبکش میکنه
- تازه میره سراغ سس…
اما توی حالت async:
- پاستا رو میذاره تو آب
- تو اون ۱۰ دقیقه سبزیجات خرد میکنه، میز رو میچینه، تلفن جواب میده
- وقتی تایمر زنگ زد فوری برمیگرده و پاستا رو آبکش میکنه
برای استفاده از حالت دوم:
async def cook_pasta(): # "Hey Python, I’m a coroutine"
print("Start boiling")
await asyncio.sleep(10) # "I’m waiting for I/O – do something else!"
print("Pasta ready")
- async def: نشون میده این تابع یه coroutine هست
- await: یعنی «اینجا خوشحال میشم مکث کنم تا بقیه کارها انجام بشن»
مهم: await فقط داخل توابعی که با async def تعریف شدن میتونه استفاده بشه.
۲. اجرای همزمان خیلی از درخواستها – asyncio.gather
اولین نسخهای که نوشتم همین بود و همین به تنهایی حدود ۱۰ برابر سریعتر شد:
import asyncio
import aiohttp
async def fetch_library(session, library_id):
url = f"https://api.example.com/libraries/{library_id}"
async with session.get(url) as resp:
return await resp.json()
async def main():
async with aiohttp.ClientSession() as session:
# Suppose we already have the list of IDs
library_ids = [1, 2, 3, ..., 3000]
# Fire all requests almost simultaneously!
tasks = [fetch_library(session, id) for id in library_ids]
libraries = await asyncio.gather(*tasks) # magic happens here
print(f"Fetched {len(libraries)} libraries")
asyncio.run(main())
asyncio.gather منتظر میمونه تا همه تمام بشن، ولی همه تقریباً همزمان شروع میکنن. پس زمان کل تقریباً برابر با کندترین درخواست میشه، نه مجموع همه.
۳. با API مهربون باشیم – Semaphore (محدود کردن نرخ درخواست)
وقتی گازش رو تا ته باز کردم، API شروع کرد خطای 429 Too Many Requests بده.
اینجاست که asyncio.Semaphore مثل یه دربان مودب وارد میشه:
MAX_CONCURRENT = 30 # safe for this API
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def fetch_library(session, library_id):
async with semaphore: # only 30 can enter at once
async with session.get(url) as resp:
if resp.status == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
return await fetch_library(session, library_id) # retry
return await resp.json()
اینطوری هیچوقت بیشتر از ۳۰ درخواست همزمان نمیفرستیم و API خوشحال میمونه و ما بن نمیشیم.
نکته: میتونی برای بخشهای مختلف پایپلاین سمافورهای جدا داشته باشی.
۴. الگوی تولیدکننده-مصرفکننده – asyncio.Queue
پایپلاین من این شکلی بود:
- مرحله ۱: گرفتن لیست آیدیها (سریع)
- مرحله ۲: گرفتن جزئیات (هنوز نسبتاً سریع)
- مرحله ۳: تبدیل سنگین + نوشتن توی الستیکسرچ (کند و باید ترتیب حفظ بشه)
اگر مرحله ۲ سریعتر از مرحله ۳ داده تولید کنه، دو تا مشکل پیش میاد:
- یا حافظه پر میشه (اگه همه رو تو لیست نگه داریم)
- یا همزمانی رو از دست میدیم (اگه صبر کنیم تا هر کدوم تموم بشه)
راهحل طلایی: asyncio.Queue
async def producer(queue, library_ids):
semaphore = asyncio.Semaphore(50)
async with aiohttp.ClientSession() as session:
for id_batch in chunked(library_ids, 50):
tasks = [fetch_library(session, id, semaphore) for id in id_batch]
results = await asyncio.gather(*tasks)
for data in results:
await queue.put(data) # fire and forget
async def consumer(queue):
semaphore = asyncio.Semaphore(1)
while True:
library_data = await queue.get() # waits cooperatively if empty
async with semaphore:
await transform_and_setup(library_data)
queue.task_done() # super important!
async def main():
queue = asyncio.Queue(maxsize=1000) # optional backpressure
# Start consumer(s) — you can have several if safe
consumers = [asyncio.create_task(consumer(queue)) for _ in range(4)]
# Start producer
await producer(queue, all_library_ids)
# Wait until the queue is fully processed
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
نکات طلایی:
- await queue.put(item): هیچوقت لوپ رو بلاک نمیکنه
- await queue.get(): اگه صف خالی بود، همکاری میکنه و میره کنار
- queue.task_done(): به صف میگه «این آیتم تموم شد»
- await queue.join(): صبر میکنه تا همه آیتمها کامل پردازش بشن
این الگو واقعاً زندگی و رم منو نجات داد.
حرف آخر
asyncio یه مدل پیچیده نیست – فقط یه مدل همکاری (cooperative multitasking) هست که وقتی کارتون I/O-bound باشه (API، دیتابیس، S3، صف پیام و…) میدرخشه. از پایتون ۳.۷ به بعد داخل خود زبان هست، اکوسیستمش (aiohttp، asyncpg، httpx، aioredis و…) خیلی پختهست و وقتی با نحوه کارش آشنا بشی، پایپلاینهای حرفهای تر و بهتری مینویسی.
و بله – همین ایده توی زبانهای دیگه هم هست (Promise/async-await در جاوااسکریپت، goroutine در گو، async/await در Rust).
پس دفعه بعد که دیدی پایپلاینت «فقط داره منتظر میمونه»… میدونی چیکار کنی 😉
اگه خواستید قالب کامل این پایپلاین رو به صورت ریپازیتوری گیتهاب یا یه ویدیو براتون بذارم، تو کامنتها بگید.
پ.ن. نیم ساعت در مقابل ۲ روز. هنوز هر بار که کرون ران میشه لبخند میزنم 😄
