Category : airflow

I am trying to build an Airflow pipeline that create’s a table (if not created) and fill’s it out with data from bigquery-public-data.samples.github_timeline Here is my code: import airflow from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator from datetime import datetime from airflow.models import Variable tmpl_search_path = Variable.get("my_sql_path") default_args = { ‘owner’: ‘Testing’, ‘depends_on_past’: False, ..

Read more

I’m using the Airflow PostgresOperator with argument parameters in order to replace the table name of my sql query with the one contained in my dictionary. For example: create_table = PostgresOperator(sql=’DROP TABLE if exists %(my_table)s;’, parameters={‘my_table’:’my_name’},…) Problem is when operator is executed, the rendered sql is DROP TABLE if exists ‘my_name’ and not DROP TABLE ..

Read more

The baseoperator class in airflow has the pre_execute method, I saw a code like this: subdag_operator_x.pre_execute = lambda a: api_x_call(a) I can’t found the name of this "=" sign over a class method, I mens, I want to understand what’s this operation effect over the class method, (I tried finding, "override a method" or "overload ..

Read more

currently we are having an issue trying to apply our custom logging format to a python file being called by bashoperator. Here is an example of a properly formatted log: {"asctime": "2021-11-30 00:00:02,562", "name": "bts", "levelname": "INFO", "message": "created the logger"} These are the config changes we made to airflow_local_settings.py for logging: DEFAULT_LOGGING_CONFIG: Dict[str, Any] ..

Read more