Category : airflow-operator

I have a custom operator where in the argument list I am using xcom_pull to get values from XCOM. But it is not rendering to actual value instead it remains as the string. download= CustomSparkSubmitOperator( task_id=’download’, env=env, application=application, args =[‘–input_file’, "{{ ti.xcom_pull(‘set_parameters’, key=’input_file’) }}", ‘–output_file’, "{{ ti.xcom_pull(‘set_parameters’, key=’output_file’) }}" ], provide_context=True, dag=dag) The operator returns ..

Read more

I’m trying to make a dynamic workflow. I got this broken DAG error duplicate task id Broken DAG: [/opt/airflow/dags/academi_dag.py] Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 430, in __init__ task_group.add(self) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/task_group.py", line 140, in add raise DuplicateTaskIdFound(f"Task id ‘{key}’ has already been added to the DAG") airflow.exceptions.DuplicateTaskIdFound: Task id ‘Review.extract__1’ has already been ..

Read more

p_utils = imp.load_source(u’p_utils’, join(‘/efs/com/cm/utilities/cm_integration_python_utils.py’)) lash_site_init_task = PythonOperator(task_id=’xyz’,python_callable=p_utils.initiate_batch, op_kwargs={‘source’:’lash’,’feed’:’site’}, trigger_rule=TriggerRule.ALL_SUCCESS,dag=lash_dag) using k8podopearator lash_site_init_task = KubernetesPodOperator(namespace=namespace, image=image , cmds=["/bin/bash","-c", p_utils.compare_lst_modified_time], op_kwargs={‘source’:’lash’,’feed’:’prvdr’}, volume_mounts=[volume_mount], volumes=[volume], name="table-lash_prvdr_data_load_check", task_id=’lash_prvdr_data_load_check’, get_logs=True, provide_context=True, config_file=kube_config_path, in_cluster=False, cluster_context=’aws’, trigger_rule=TriggerRule.ALL_SUCCESS, on_failure_callback=p_utils.update_audit_failed_status(‘lash’, ‘prvdr’, ‘data_load_check’), dag=lash_dag) Source: Python..

Read more

I have installed Airflow on the server which is running Ubuntu and python 3.8. I’m trying to import a simple dag in Airflow UI to list the files in the bucket. from airflow import DAG from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from datetime import datetime ..

Read more

I have a requirement where i need to capture XCOM response over Airflow SQLsensor operator and apply some python command to change data format. Snippet:- delta_sql_sensor_task = SqlSensor( task_id= "Verify-Completion-of-Delta-Job", poke_interval= 10, timeout=100, conn_id="lct_statedb", #gobal_id_list="{{ str(ti.xcom_pull(key=’id_list’, task_ids=[‘hook_task’])).replace(‘[‘,”) }}", sql="SELECT COALESCE ( (SELECT count(*) FROM lct_transformation_history WHERE deltaload_transformation_status is not null AND fullload_transformation_status is not null ..

Read more

How can I run a PythonOperator inside another one PythonOperator? The idea is: to call "main" function as a PythonOperator and then run a few other PythonOperators inside and scheduler them The code is: def printFunction(value): time.sleep(5) print(value) def main(): for i in range(10): task_2 = PythonOperator( task_id=’loop_task_2’+str(i), python_callable = printFunction, op_kwargs = {‘value’:i}, dag=dag, ..

Read more