async در پایتون

چطور یک پایپ‌لاین دیتای کند رو با Python Async درست کردم – راهنمای عملی

سلام به همه!

چند هفته پیش دقیقاً همون چالشی که سد راه هر مهندس داده‌ای میشه، جلوم رو گرفت. پایپ‌لاینی داشتم که باید این کارها رو انجام می‌داد:

  1. لیست آیدی کتابخانه‌ها رو از یه API می‌گرفت
  2. برای هر آیدی، یه درخواست جدا به یه اندپوینت دیگه می‌زدم تا جزئیات کاملش رو بگیرم
  3. یه مقدار تبدیل و پردازش می‌کردم و توی دیتابیس می‌ریختم
  4. در نهایت یه API دیگه رو صدا می‌زدم که امبدینگ‌ها رو حساب کنه و داده‌های جدید رو توی دیتابیس دیگه ذخیره کنه

حدود ۳۰۰۰ تا کتابخانه داشتم. اگر هر کتابخانه فقط ۱ ثانیه طول می‌کشید، کل فرآیند نزدیک ۲ روز طول می‌کشید! جالب اینجاست که CPU تقریباً ۹۹٪ وقتش بیکار بود و فقط منتظر جواب HTTP بود. آشنا نیست؟

خب، کلش رو با asyncio پایتون بازنویسی کردم و زمان اجرا از حدود ۲ روز اومد پایین به حدود ۳۰ دقیقه. یعنی تقریباً ۱۰۰ برابر سریع‌تر، بدون هیچ سخت‌افزار اضافه‌ای. کار خاصی کردم؟ نه، فقط استفاده درست از async بود.

امروز می‌خوام قدم به قدم دقیقاً همون کاری که کردم رو براتون توضیح بدم و تو راه، ابزارهای async رو که هر دیتا اینجینیری باید بلد باشه بهتون یاد بدم:

  • مفاهیم پایه async / await
  • asyncio.gather برای اجرای همزمان خیلی از درخواست‌ها
  • asyncio.Semaphore برای محدود کردن نرخ درخواست و برخورد خوب با API
  • asyncio.Queue برای الگوی تولیدکننده-مصرف‌کننده

بریم شروع کنیم!

۱. اصلاً async توی پایتون یعنی چی؟

فرض کن لوپ رویداد پایتون یه آشپز فوق‌العاده کاربلده که فقط یه جفت دست داره (single-threaded).

توی کد معمولی (هم‌زمان) آشپز این کارا رو می‌کنه:

  1. پاستا رو می‌ذاره تو آب جوش
  2. ۱۰ دقیقه وایمیسته جلوی قابلمه و زل می‌زنه بهش (بلاک شده!)
  3. بعد آبکش می‌کنه
  4. تازه می‌ره سراغ سس…

اما توی حالت async:

  1. پاستا رو می‌ذاره تو آب
  2. تو اون ۱۰ دقیقه سبزیجات خرد می‌کنه، میز رو می‌چینه، تلفن جواب می‌ده
  3. وقتی تایمر زنگ زد فوری برمی‌گرده و پاستا رو آبکش می‌کنه

برای استفاده از حالت دوم:

async def cook_pasta():   # "Hey Python, I’m a coroutine"
    print("Start boiling")
    await asyncio.sleep(10)  # "I’m waiting for I/O – do something else!"
    print("Pasta ready")
  • async def: نشون می‌ده این تابع یه coroutine هست
  • await: یعنی «اینجا خوشحال میشم مکث کنم تا بقیه کارها انجام بشن»

مهم: await فقط داخل توابعی که با async def تعریف شدن می‌تونه استفاده بشه.

۲. اجرای همزمان خیلی از درخواست‌ها – asyncio.gather

اولین نسخه‌ای که نوشتم همین بود و همین به تنهایی حدود ۱۰ برابر سریع‌تر شد:

import asyncio
import aiohttp

async def fetch_library(session, library_id):
    url = f"https://api.example.com/libraries/{library_id}"
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    async with aiohttp.ClientSession() as session:
        # Suppose we already have the list of IDs
        library_ids = [1, 2, 3, ..., 3000]

        # Fire all requests almost simultaneously!
        tasks = [fetch_library(session, id) for id in library_ids]
        libraries = await asyncio.gather(*tasks)  # magic happens here

        print(f"Fetched {len(libraries)} libraries")

asyncio.run(main())

asyncio.gather منتظر می‌مونه تا همه تمام بشن، ولی همه تقریباً همزمان شروع می‌کنن. پس زمان کل تقریباً برابر با کندترین درخواست می‌شه، نه مجموع همه.

۳. با API مهربون باشیم – Semaphore (محدود کردن نرخ درخواست)

وقتی گازش رو تا ته باز کردم، API شروع کرد خطای 429 Too Many Requests بده.

اینجاست که asyncio.Semaphore مثل یه دربان مودب وارد می‌شه:

MAX_CONCURRENT = 30  # safe for this API

semaphore = asyncio.Semaphore(MAX_CONCURRENT)

async def fetch_library(session, library_id):
    async with semaphore:                 # only 30 can enter at once
        async with session.get(url) as resp:
            if resp.status == 429:
                retry_after = int(resp.headers.get("Retry-After", 60))
                await asyncio.sleep(retry_after)
                return await fetch_library(session, library_id)  # retry
            return await resp.json()

اینطوری هیچ‌وقت بیشتر از ۳۰ درخواست همزمان نمی‌فرستیم و API خوشحال می‌مونه و ما بن نمی‌شیم.

نکته: می‌تونی برای بخش‌های مختلف پایپ‌لاین سمافورهای جدا داشته باشی.

۴. الگوی تولیدکننده-مصرف‌کننده – asyncio.Queue

پایپ‌لاین من این شکلی بود:

  • مرحله ۱: گرفتن لیست آیدی‌ها (سریع)
  • مرحله ۲: گرفتن جزئیات (هنوز نسبتاً سریع)
  • مرحله ۳: تبدیل سنگین + نوشتن توی الستیک‌سرچ (کند و باید ترتیب حفظ بشه)

اگر مرحله ۲ سریع‌تر از مرحله ۳ داده تولید کنه، دو تا مشکل پیش میاد:

  • یا حافظه پر می‌شه (اگه همه رو تو لیست نگه داریم)
  • یا هم‌زمانی رو از دست می‌دیم (اگه صبر کنیم تا هر کدوم تموم بشه)

راه‌حل طلایی: asyncio.Queue

async def producer(queue, library_ids):
    semaphore = asyncio.Semaphore(50)
    async with aiohttp.ClientSession() as session:
        for id_batch in chunked(library_ids, 50):
            tasks = [fetch_library(session, id, semaphore) for id in id_batch]
            results = await asyncio.gather(*tasks)
            for data in results:
                await queue.put(data)   # fire and forget

async def consumer(queue):
    semaphore = asyncio.Semaphore(1)
    while True:
        library_data = await queue.get()   # waits cooperatively if empty
        async with semaphore:
            await transform_and_setup(library_data)
        queue.task_done()   # super important!

async def main():
    queue = asyncio.Queue(maxsize=1000)  # optional backpressure

    # Start consumer(s) — you can have several if safe
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(4)]

    # Start producer
    await producer(queue, all_library_ids)

    # Wait until the queue is fully processed
    await queue.join()

    # Cancel consumers
    for c in consumers:
        c.cancel()

نکات طلایی:

  • await queue.put(item): هیچ‌وقت لوپ رو بلاک نمی‌کنه
  • await queue.get(): اگه صف خالی بود، همکاری می‌کنه و می‌ره کنار
  • queue.task_done(): به صف می‌گه «این آیتم تموم شد»
  • await queue.join(): صبر می‌کنه تا همه آیتم‌ها کامل پردازش بشن

این الگو واقعاً زندگی و رم منو نجات داد.

حرف آخر

asyncio یه مدل پیچیده نیست – فقط یه مدل همکاری (cooperative multitasking) هست که وقتی کارتون I/O-bound باشه (API، دیتابیس، S3، صف پیام و…) می‌درخشه. از پایتون ۳.۷ به بعد داخل خود زبان هست، اکوسیستمش (aiohttp، asyncpg، httpx، aioredis و…) خیلی پخته‌ست و وقتی با نحوه کارش آشنا بشی، پایپ‌لاین‌های حرفه‌ای تر و بهتری مینویسی.

و بله – همین ایده توی زبان‌های دیگه هم هست (Promise/async-await در جاوااسکریپت، goroutine در گو، async/await در Rust).

پس دفعه بعد که دیدی پایپ‌لاینت «فقط داره منتظر می‌مونه»… می‌دونی چیکار کنی 😉

اگه خواستید قالب کامل این پایپ‌لاین رو به صورت ریپازیتوری گیت‌هاب یا یه ویدیو براتون بذارم، تو کامنت‌ها بگید.

پ.ن. نیم ساعت در مقابل ۲ روز. هنوز هر بار که کرون ران می‌شه لبخند می‌زنم 😄

نوشته‌های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *