Category : airflow-scheduler

while scheduler try to read new *.py file with dag, I’m getting error in log file: Process DagFileProcessor0-Process: Traceback (most recent call last): File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 187, in _run_file_processor callback_requests=callback_requests, File "/usr/local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper return func(*args, session=session, **kwargs) ..

Read more

I am trying to find a way for connection pool management for external connections created in Airflow. Airflow version : 2.1.0 Python Version : 3.9.5 Airflow DB : SQLite External connections created : MySQL and Snowflake I know there are properties in airflow.cfg file sql_alchemy_pool_enabled = True sql_alchemy_pool_size = 5 But these properties are for ..

Read more

I’m using Airflow 1.9.0 Version. DAG is running every minute. But when Airflow goes down for period of time or so we see lot of backfill job running which stopping recent DAG runs. I’m using SubDagOperator. dag_args = { ‘owner’: ‘Anish’, ‘depends_on_past’: False, ‘start_date’: datetime.datetime(2017, 3, 20),#time in past ‘retries’: 0, ‘retry_delay’: datetime.timedelta(seconds=5), ’email’:Variable.get("failure_email_to"), ’email_on_failure’: ..

Read more

I have a dag that insert some records in MySQL table and I want to run my dag every 2h. For that I’ve this code: from datetime import timedelta, datetime import pymysql import airflow from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators import MySqlOperator default_args = { ‘owner’: ‘airflow’, ‘start_date’: datetime(2021, 4, 1), ..

Read more

I’ve set up a dag with the following parameters local_tz = pendulum.timezone(‘US/Eastern’) default_args = { ‘retries’: 3, ‘retry_delay’: timedelta(minutes=5) } dag = DAG( dag_id=’some_dag’, start_date=datetime(2021, 1, 8, tzinfo=local_tz), schedule_interval=’0 16 8 * *’, default_args=default_args, catchup=True ) I am expecting the most recent task run to be on May 8th, however, I only see February 8th, ..

Read more