airflow apply custom logging in airflow_local_settings.py to python file being run by bashoperator

  airflow, logging, python

currently we are having an issue trying to apply our custom logging format to a python file being called by bashoperator. Here is an example of a properly formatted log:

{"asctime": "2021-11-30 00:00:02,562", "name": "bts", "levelname": "INFO", "message": "created the logger"}

These are the config changes we made to airflow_local_settings.py for logging:

DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow': {'format': LOG_FORMAT},
        'airflow_coloured': {
            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
            'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter',
        },
        'json': {
          '()' : 'pythonjsonlogger.jsonlogger.JsonFormatter',
          'format': '%(asctime)s %(name)s %(levelname)s %(message)s',
        },
    },
    'filters': {
        'mask_secrets': {
            '()': 'airflow.utils.log.secrets_masker.SecretsMasker',
        },
    },
    'handlers': {
        'console': {
            'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
            'formatter': 'json',
            'stream': 'sys.stdout',
            'filters': ['mask_secrets'],
        },
        'task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'json',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
            'filters': ['mask_secrets'],
        },
        'processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'json',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
            'filters': ['mask_secrets'],
        },
    },
    'loggers': {
        'airflow.processor': {
            'handlers': ['processor'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task': {
            'handlers': ['task'],
            'level': LOG_LEVEL,
            'propagate': False,
            'filters': ['mask_secrets'],
        },
        'flask_appbuilder': {
            'handlers': ['console'],
            'level': FAB_LOG_LEVEL,
            'propagate': True,
        },
    },
    'root': {
        'handlers': ['console'],
        'level': LOG_LEVEL,
        'filters': ['mask_secrets'],
    },
}

DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
    'handlers': {
        'processor_manager': {
            'class': 'logging.handlers.RotatingFileHandler',
            'formatter': 'json',
            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
            'mode': 'a',
            'maxBytes': 104857600,  # 100MB
            'backupCount': 5,
        }
    },
    'loggers': {
        'airflow.processor_manager': {
            'handlers': ['processor_manager'],
            'level': LOG_LEVEL,
            'propagate': False,
        }
    },
}

We were not able to get the logging_config_class variable in airflow.cfg working so we are instead overriding the airflow_local_settings.py file with a copy of the default file with these changes in it. We see that logs that we write in the dag file use the custom format as expected, however we are also writing logs in a separate python file that we run in the dag file using bashoperator. this is the code for that:

file_path = '{}common/log_package/log_module/test_logging_file.py'.format(os.environ['AIRFLOW__CORE__DAGS_FOLDER'])
cmd = f'python {file_path}'

with DAG(
    dag_id='test_bash',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
    params={"example_key": "example_value"},
) as dag:

    run_this = BashOperator(
        task_id='run_after_loop',
        bash_command=cmd,
    )

    run_this

this is the code we have in the test_logging_file.py:

from custom_logger import custom_logger

def test_func():
    z = custom_logger('bts')
    z.info('created the logger')

if __name__ == "__main__":
    test_func()

this is some of the code for the custom_logger:

import logging
import os
from pythonjsonlogger import jsonlogger

class custom_logger():

    def __init__(self, name):
        try:
            #set logger format
            self.formatter = jsonlogger.JsonFormatter('%(name)s %(asctime)s %(levelname)s %(message)s')
            self.logHandler = logging.StreamHandler()
            self.logHandler.setFormatter(self.formatter)
            self.logger = logging.getLogger(name)
            if not self.logger.handlers:
                self.logger.addHandler(self.logHandler)
            try:
                level = logging.getLevelName(os.environ["LOG_LEVEL"].upper())
                self.logger.setLevel(level)
            except:
                self.logger.setLevel(logging.DEBUG)
        except Exception as e:
            raise Exception(e.args[0])

   def info(self, message):
        self.logger.info(message)

In test_logging_file.py, we are trying to get the logs to use the format set in airflow_local_settings.py, however it is not able to find the config. Also when we try to set the log format in this python file using a custom_logger package we created, we see that the logs get formatted, turned into a string, and then put in the message field of the formatted log the dag file uses. This is what that output looks like:

{"asctime": "2021-11-30 00:00:02,563", "name": "airflow.hooks.subprocess.SubprocessHook", "levelname": "INFO", "message": "{"asctime": "2021-11-30 00:00:02,562", "name": "bts", "levelname": "INFO", "message": "created the logger"}"}

Does anyone have any ideas on how to get these subprocess logs in the same format as the logs in the dag file? Thanks in advance.
Show less

Source: Python Questions

LEAVE A COMMENT