تدفق الهواء: كيفية الاستخدام لجدولة آلاف المهام في لقطة واحدة

في هذا المنشور ، سأناقش كيف يمكننا جدولة آلاف المهام داخل قطعة واحدة. لن أركز على ما هو Airflow وكيف يمكنك تثبيته ، ولكن بدلاً من ذلك سأناقش كيف يمكننا جدولة عدد كبير من المهام داخل قطعة واحدة.

في الأساس ، تم تصميم Airflow لامتلاك عدة DAGs وداخل DAG يمكن أن يكون هناك مئات أو ألف مهمة. لذا ، ماذا يحدث عندما نريد جدولة عدد كبير من المهام دعنا نقول حوالي 60000 أو أكثر من ذلك؟ هذا ما شرحته في هذه المدونة.

أنا أعمل مع Airflow لأتمتة سير العمل. ولكن في شركتي لدينا كمية كبيرة من البيانات وحاولت باستخدام إصدارات مختلفة من Airflow وبسبب البيانات الضخمة حقًا ، لدي ما يقرب من 70000 مهمة داخل DAG واحدة. لقد جربت إصدارات مختلفة من Airflow ويمكن للإصدار الأحدث جدولة 5000 مهمة ، ولكن إذا أردنا جدولة أكثر من ذلك ، يظل المجدول في حالة التشغيل دون جدولة المهام. لقد وجدت كل مشكلة وتحققت من كيفية حلها ما هو السبب الفعلي وأخيرًا على الرغم من كتابة هذه المدونة.

هذه إحدى حالات الاستخدام في Airflow عندما يكون لديك آلاف المهام داخل DAG واحدة. بادئ ذي بدء ، يتعين علينا استخدام إصدار Airflow الإصدار 1.10.3 بعد ذلك لا نركز على عدد كبير من المهام ، لذلك يتعين علينا استخدام الإصدار 1.10.3 من Airflow. لتثبيت هذا الإصدار اتبع الخطوات التالية:

  • أولاً يجب علينا إنشاء بيئة جديدة وتفعيل تلك البيئة باستخدام الأمر التالي:
كوندا إنشاء -n airflow_3
كوندا تنشيط تدفق الهواء
  • لتثبيت Airflow بإصدار محدد من 1.10.3 ، استخدم الأمر التالي:
تثبيت Conda -c Conda-forge airflow == 1.10.3
  • يجب أن تتأكد من بعض المتطلبات المحددة التي لا يعملها هذا الإصدار مع قارورة≥1.0.9 ، لذلك إذا كان لديك قارورة أكبر من هذا الإصدار ، استخدم الأمر التالي:
قارورة تثبيت النقطة = = 1.0.4
pip install funcsigs == 1.0.0 (هذا متطلبات أخرى يجب تثبيتها)
  • ويوصى باستخدام برنامج Celery Executor عندما نعمل مع هذا العدد الكبير من المهام ، حيث يتعين علينا موازنة تلك المهام ويمكن تحقيقها باستخدام برنامج Celery Executor. لتثبيت الكرفس ، استخدم الأمر التالي:
الكرفس نقطة تثبيت
  • عليك استخدام العمال وتعيين وسيط لاستخدام منفذ الكرفس ، وأنا أستخدم RabbitMQ كوسيط. إلى إعداد URL للوسيط يمكن استخدام البنية التالية:
broker_url = amqp: // “username”: “password” @ ”host_name”: “port” /

فمثلا

broker_url = amqp: // ضيف: ضيف @ localhost: 5672 /
  • لرؤية واجهة المستخدم الخاصة بـ Celery Executor ، يمكننا استخدام Flower لتثبيت هذا الأمر باستخدام الأمر التالي:
تثبيت كوندا - زهرة كوندا - صياغة
  • بعد القيام بذلك ، يتعين علينا تغيير بعض التكوينات لتشغيل آلاف المهام بالتوازي وجدولة آلاف المهام في لقطة واحدة.
[النواة]
المنفذ = CeleryExecutor التوازي = 200000 non_pooled_task_slot_count = 100000 dag_concurrency = 100000 max_active_runs_per_dag = 2
[جدولة]
max_thread = 10 (يمكن استخدام سلاسل الرسائل حسب برنامجك عن طريق زيادتها أو تقليلها)

هذه هي الإعدادات الرئيسية عندما تريد جدولة آلاف المهام في لقطة واحدة. عليك تعديله وفقًا لعدد DAGs الأقصى الذي تريد تشغيله بالتوازي وعدد المهام التي لديك داخل DAG واحدة.

المعلمة الرئيسية هي "Non_pooled_task_slot_count" التي تمت إزالتها من الإصدار 1.10.4 من Airflow ، لذلك أستخدم 1.10.3 ، حيث تلعب هذه المعلمة دورًا مهمًا جدًا في جدولة المهام.

الاختلاف الرئيسي بعد إزالة "Non_pooled_task_slot_count" هو أنه يستخدم default_pool الذي تم تعيينه على 128 افتراضيًا (يمكن زيادته وفقًا للمتطلبات). العمل الرئيسي لـ "Non_pooled_task_slot_count" هو جدولة المهام وهو غير متصل بـ default_pool أو أي عدد آخر من الاتصال من قاعدة البيانات حتى نتمكن من زيادة هذا العدد بقدر ما تريد ولكن إذا قمت بزيادة عدد الفتحات في "default_pool" ثم يتم توصيله أيضًا باتصالات قاعدة البيانات لديك ولا يمكن أن يكون لديك 100000 اتصال قاعدة بيانات في وقت واحد يعمل بالتوازي. بشكل أساسي ، تمت إزالة "Non_pooled_task_slot_count" لصالح "default_pool".

يحتوي هذا المنشور على إجابة للسؤال عن سبب اختناق المجدول ، أو تعليقه ، أو عدم جدولة عدد كبير من المهام أو تشغيله طوال اليوم دون القيام بأي شيء. تحتوي كل هذه الإجابات على إجابة واحدة لاستخدام الإصدار 1.10.3 من Airflow.

عند استخدام Airflow 1.10.3 ، يتعين علينا تحديد التجمع الذي يجب أن تستخدمه مجموعة DAG حيث لا تستخدم "default_pool" افتراضيًا ، لذا أثناء إنشاء المهام ، يتعين علينا تمرير pool mater pool = 'defautl_pool'. يمكنك إنشاء "default_pool" باستخدام واجهة المستخدم (Admin -> pools) أو يمكن القيام بذلك عن طريق سطر الأوامر:

تجمع تدفق الهواء -s default_pool 128 "التجمع الافتراضي".

فيما يلي مثال لعينة DAG:

نظام تشغيل الاستيراد من وقت الاستيراد ، تاريخ الاستيراد ، تدفق الهواء المستورد من تدفق الهواء استيراد DAG من airflow.operators.dummy_operator استيراد DummyOperator
default_args = {'owner': 'Airflow'، 'يعتمد_on_past': خطأ، 'start_date': airflow.utils.dates.days_ago (2)، 'retries': 1، 'retry_delay': timedelta (minutes = 1)،}
dag = DAG ('dummy_try1'، default_args = default_args، Schedule_interval = بلا)
بالنسبة إلى i في النطاق (50000): المهام = DummyOperator (task_id = '{}'. format (i)، dag = dag، pool = 'default_pool)

يمكنك التحقق من الفرق بين جميع الإصدارات على الرابط أدناه:

  • https://github.com/apache/airflow/blob/master/UPDATING.md#airflow-1104