انجام پردازش دسته‌ای در apache airflow
|

پردازش دسته‌ای در Apache Airflow – تجربه‌ای در عصر کلان داده

حتما تا حالا اسم پردازش دسته‌ای یا batch processing به گوشتون خورده. تو این پست قرار هست در مورد این موضوع و استفاده از اون در Apache Airflow صحبت کنیم.
این روزها با این حجم سرسام‌آور داده که هر لحظه هم بیشتر میشه، یکی از مهم‌ترین مهارت‌ها اینه که بلد باشیم چطور این کوه داده رو به شکلی کارآمد پردازش کنیم. ابزاری مثل Apache Airflow برای سازماندهی و هماهنگی کارهای پیچیده عالیه، اما وقتی پای داده‌های خیلی بزرگ وسط میاد، یه سؤال مهم پیش میاد:
چطور کاری کنیم که ورکرهامون از نفس نیفتن، حافظه منفجر نشه و کارهامون هم بی‌نهایت طول نکشه؟

واقعیت اینه که این موضوع تا قبل از تجربه اخیرم برای اضافه کردن یه ویژگی به محصول شرکتی که دراون مشغول هستم، زیاد برای من هم اهمیتی نداشت. می‌خواستم با استفاده از یه سرویس NER که همکاران نوشته بودند، متن‌ها رو بررسی کنم و موجودیت‌های نام دار معنادار برای کسب و کار رو استخراج کنم. طبق برنامه‌ای که توی ذهنم بود با اجرای روزانه این پروسه، می‌تونستم بدون مشکل خاصی پردازش رو پیش ببرم؛ اما ماجرا یکم متفاوت‌تر پیش رفت.

برای شروع لازم بود که پروسه برای تمام چت‌های چندماه اخیر انجام بشه که یه حجم عظیمی داده متنی بود. البته بعد از اجرای اول میشد می‌شد به صورت روزانه صرفا چت‌های همون روز رو بررسی کرد که کمتر هستن. اما برای شروع چالش بزرگی داشتم. این موضوع من رو برد سمت پردازش batch در Apache Airflow و با مفهوم Dynamic Task Mapping در این ابزار آشنا شدم. بعد از انجام کار تصمیم گرفتم این تجربه رو باهاتون به اشتراک بذارم. اما قبل از اینکه جلوتر بریم اگه با Apache Airflow آشنا نیستین توصیه می‌کنم این پست من رو قبلش بخونین.


چرا پردازش دسته‌ای؟ داستان تلخ پردازش “یکباره” داده‌ها

فرض کنید باید یک میلیون رکورد چت مشتری رو پردازش کنید. شاید وسوسه بشید یه تسک توی Airflow بسازید که کل این یک میلیون رکورد رو یکجا بخونه.
برای داده کم، این شاید جواب بده، ولی برای داده‌های بزرگ… فاجعه‌ست!

  • پر شدن حافظه: وقتی همه داده‌ها رو یکجا توی حافظه می‌ریزید، رم پر میشه و تسکتون کرش می‌کنه.
  • تایم‌اوت و دوباره‌کاری: تسک‌های طولانی ممکنه به خاطر محدودیت زمانی قطع بشن و مجبور بشید از اول شروع کنید.
  • عدم استفاده از منابع: یک تسک بزرگ فقط روی یک ورکر اجرا میشه و بقیه منابع بیکار می‌مونن.

راه‌حل ساده‌ست: کار رو به «دسته‌های کوچک‌تر» تقسیم کنید. مثلاً به جای یک تسک با یک میلیون رکورد، ۱۰۰ تسک بسازید که هر کدوم ۱۰هزار رکورد رو پردازش کنه. اینطوری هم موازی‌سازی دارید، هم حافظه و زمان کنترل‌شده‌ست.


قلب ماجرا: Dynamic Task Mapping

با استفاده از قابلیت نگاشت تسک‌های پویا در Airflow شما فقط یه تسک “قالب” تعریف می‌کنید و Airflow به طور پویا، در زمان اجرا و بر اساس لیستی از ورودی‌هایی که بهش میدید، نمونه‌های متعددی از اون تسک رو ایجاد و اجرا می‌کنه.

تصورش کنید یه کارخونه دارید: شما یه نقشه (تسک partial شما) و یه لیست از مواد خام (ورودی‌های expand) بهش میدید، و کارخونه به سرعت و با کارایی بالا، کلی محصول یکسان (نمونه‌های تسک نگاشت شده) رو تولید می‌کنه.

بیایید اجزای اصلی این قابلیت رو بررسی کنیم:

  • .partial(): این متد رو به تعریف یه تسک اضافه می‌کنیم تا بگیم این یه تسک قابل نگاشت هست. با این کار به Airflow سیگنال میدیم که: “این تسک فقط یه الگوئه؛ پارامترهای کامل اجرای اون به صورت پویا تو زمان اجرا فراهم میشه.” هر آرگومانی که به .partial() میدید، به عنوان آرگومان ثابت برای تمام نمونه‌های نگاشت شده در نظر گرفته میشه.
  • .expand(): این متد روی یه تسک که قبلاً با .partial() مشخص شده، صدا زده میشه و لیست ورودی‌هایی رو که قراره نگاشت پویا رو هدایت کنه، به Airflow میده. برای هر آیتم تو این لیستی که به .expand() ارسال می‌کنید، Airflow یه نمونه تسک (task instance) جدید و مجزا (همون “تسک نگاشت شده”) ایجاد می‌کنه. مقادیری که از لیست میان، به عنوان آرگومان به هر نمونه تسک نگاشت شده پاس داده میشن.

ترکیب هوشمندانه partial() و expand() یه تعریف تسک ساده رو به یه مکانیزم قدرتمند برای پردازش موازی تبدیل می‌کنه، که کاملاً برای نیازهای دسته‌بندی ما ایده‌آله.


یک مثال کاربردی: بازنویسی برای مقیاس‌پذیری

بیایید مساله‌ای که برای من پیش اومد رو با هم بررسی کنیم.
فرض کنید شما قرار هست حجم عظیمی از داده‌ها رو پردازش کنین. ممکنه این داده‌ها صرفا برای اولین بار زیاد باشن یا اینکه با گذشت زمان و زیاد شدن داده‌های روزانه و حتی ساعتی این چالش براتون پیش بیاد.
یه DAG که خوب بهینه‌سازی نشده باشه، ممکنه یه تسک واحد داشته باشه که همه چت‌ها رو یکجا واکشی کنه و اینجا یه گلوگاه ایجاد بشه. حالا، نسخه‌ای که با هم بازنویسی می‌کنیم، یه پایپ‌لاین قوی و مقیاس‌پذیر با استفاده از Dynamic Task Mapping رو نشون میده.

۱. تعیین گستره کار: get_timeframe_and_count

اولین قدم خیلی مهم اینه که گستره داده‌هامون رو بفهمیم، اونم بدون اینکه کل داده‌ها رو تو حافظه بارگذاری کنیم. این تسک، تعداد کل رکوردها رو برای یه بازه زمانی مشخص پیدا می‌کنه. در این مثال فرض کردم دارم دیتای رو از یک کالکشن MongoDB به اسم data_collection میخونم.

from airflow.decorators import task

@task
def get_timeframe_and_count(**context):
    start_time = "2025-01-01T00:00:00Z"
    end_time = "2025-01-02T00:00:00Z"

    # This task now ONLY gets the count of documents, not the documents themselves.
    # This is a lightweight database operation.
    chat_count = data_collection.count_documents({
        "created_at": {
            "$gte": start_time,
            "$lt": end_time,
        }
    })
    print(f"Total chats to process: {chat_count}")
    return {"start_time": start_time, "end_time": end_time, "chat_count": chat_count}

این تسک طوری طراحی شده که سبک و سریع باشه. اون اطلاعات ضروری (تعداد کل، محدوده‌های زمانی) رو که برای استراتژی دسته‌بندی‌مون لازم داریم، بدون مصرف بیش از حد منابع، بهمون میده.

در این تسک من تاریخ شروع و پایان رو ثابت گرفتم توی کد واقعی ممکنه زمان‌های نسبی (مثل دیروز، یک ساعت پیش و..) یا زمان آخرین اجرای موفق این تسک رو گرفته باشین. من خودم از روش دوم استفاده کردم ولی اینجا کدش رو نمیذارم که از بحث اصلی خارج نشیم.

۲. آماده‌سازی دسته‌ها: chunk_processing_batches

حالا باید یه لیست از دیکشنری‌های batch درست کنیم. هر دیکشنری شامل پارامترهای لازم (مثل skip و limit برای یک کوئری دیتابیس) برای یه ورکر هست تا یه زیرمجموعه خاص از داده‌ها رو پردازش کنه. این لیست، ورودی اصلی برای Dynamic Task Mapping ما خواهد بود.

برای محاسبه تعداد batchها هم سعی کردم با یک کار ساده یه ceiling division انجام بدم :))

@task
def chunk_processing_batches(timeframe_and_count: dict):
    batch_size = 1000
    total_count = timeframe_and_count["chat_count"]
    start_time = timeframe_and_count["start_time"]
    end_time = timeframe_and_count["end_time"]
    num_batches = (total_count + batch_size - 1) // batch_size
    
    batches = []
    for i in range(num_batches):
        skip = i * batch_size
        batches.append({
            "skip": skip,
            "limit": batch_size,
            "start_time": start_time,
            "end_time": end_time
        })
    print(f"Generated {len(batches)} batches.")
    return batches

۳. پردازش موازی: process_batch_of_chats

حالا میرسیم به اصل ماجرا، یعنی منطق پردازش اصلی. ما تسکی رو تعریف می‌کنیم که یه دسته واحد رو مدیریت می‌کنه، و بعد با استفاده از Dynamic Task Mapping، اون رو به صورت موازی برای همه دسته‌هایی که ساختیم اجرا می‌کنیم.

@task(pool="extract_pool")
def process_batch_of_chats(batch_info: dict):
    skip = batch_info["skip"]
    limit = batch_info["limit"]
    start_time = batch_info["start_time"]
    end_time = batch_info["end_time"]

    print(f"Processing batch: skip={skip}, limit={limit}")

    # This task now performs a LIMITED query on the database, fetching only its batch.
    chat_sessions = data_collection.find(
        { "created_at": { "$gte": start_time, "$lt": end_time } }
    ).skip(skip).limit(limit)
    
    entities_batch = []
    for chat_session in chat_sessions:
        # ... complex logic to extract entities, analyze sentiment, etc. ...
        # Example: just appending a placeholder for demonstration
        entities_batch.append(f"Processed_Chat_{chat_session.get('_id', 'N/A')}")
        
    print(f"Finished batch: {len(entities_batch)} items processed.")
    return entities_batch

در نهایت هم DAG رو تعریف می‌کنیم.

from airflow.models.dag import DAG
from datetime import datetime

with DAG(
    dag_id="scalable_batch_processing_example",
    start_date=datetime(2025, 01, 01),
    schedule=None,
    catchup=False,
    tags=["batch", "dynamic_mapping", "tutorial"],
) as dag:
    timeframe = get_timeframe_and_count()
    batches = chunk_processing_batches(timeframe)
    results = process_batch_of_chats.partial().expand(batch_info=batches)

با صدا زدن .partial().expand()، ما عملاً به Airflow دستور دادیم که چندین نمونه از process_batch_of_chats رو راه‌اندازی کنه، که هر کدومشون یه دیکشنری batch_info منحصر به فرد دریافت می‌کنن. این دقیقاً همون چیزیه که قدرت پردازش موازی ورکرهای Airflow رو مورد استفاده قرار میده.


جمع‌بندی: قدرتمند کردن پایپ‌لاین‌های شما با پردازش دسته‌ای

پردازش دسته‌ای، که با قابلیت Dynamic Task Mapping در Airflow تقویت میشه، یه الگوی ضروری برای ساخت پایپ‌لاین‌های داده‌ای مقیاس‌پذیر، مقاوم و کارآمد هست. این روش به طور مستقیم با چالش‌های رایجی مثل محدودیت‌های حافظه و تسک‌های طولانی‌مدت مقابله می‌کنه و به شما اجازه میده داده‌های واقعاً عظیم رو بدون به خطر انداختن عملکرد یا پایداری، پردازش کنید. توصیه میکنم برای درک بهتر موضوع پردازش دسته‌ای و البته ویژگی‌های دیگه Apache Airflow حتما داکیومنت‌های رسمی مثل این صفحه رو بخونین.

نوشته‌های مشابه

دیدگاهتان را بنویسید

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *