Airflow duplicates content of the logs while writting to GCS












1















I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.



It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?



I have changed following variables in the cfg file:



task_log_reader=gcs.task
logging_config_class=log_config.LOGGING_CONFIG
remote_log_conn_id=gcs


log_config.py:



GCS_LOG_FOLDER = 'gs://XXXX/'

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
, 'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}


Log:



*** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed

[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
[2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token









share|improve this question



























    1















    I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.



    It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?



    I have changed following variables in the cfg file:



    task_log_reader=gcs.task
    logging_config_class=log_config.LOGGING_CONFIG
    remote_log_conn_id=gcs


    log_config.py:



    GCS_LOG_FOLDER = 'gs://XXXX/'

    LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
    LOG_FORMAT = conf.get('core', 'log_format')

    BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
    PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

    FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
    PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

    LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
    'airflow.task': {
    'format': LOG_FORMAT,
    },
    'airflow.processor': {
    'format': LOG_FORMAT,
    },
    },
    'handlers': {
    'console': {
    'class': 'logging.StreamHandler',
    'formatter': 'airflow.task',
    'stream': 'ext://sys.stdout'
    },
    'file.task': {
    'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
    'formatter': 'airflow.task',
    'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
    'filename_template': FILENAME_TEMPLATE,
    },
    'file.processor': {
    'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
    'formatter': 'airflow.processor',
    'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
    'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    }
    , 'gcs.task': {
    'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
    'formatter': 'airflow.task',
    'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
    'gcs_log_folder': GCS_LOG_FOLDER,
    'filename_template': FILENAME_TEMPLATE,
    },
    },
    'loggers': {
    '': {
    'handlers': ['console'],
    'level': LOG_LEVEL
    },
    'airflow': {
    'handlers': ['console'],
    'level': LOG_LEVEL,
    'propagate': False,
    },
    'airflow.processor': {
    'handlers': ['file.processor'],
    'level': LOG_LEVEL,
    'propagate': True,
    },
    'airflow.task': {
    'handlers': ['gcs.task'],
    'level': LOG_LEVEL,
    'propagate': False,
    },
    'airflow.task_runner': {
    'handlers': ['gcs.task'],
    'level': LOG_LEVEL,
    'propagate': True,
    },
    }
    }


    Log:



    *** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
    [2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
    [2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
    [2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
    [2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
    --------------------------------------------------------------------------------
    Starting attempt 1 of 4
    --------------------------------------------------------------------------------

    [2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
    [2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
    [2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
    [2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
    [2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
    [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
    [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
    [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
    [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
    [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
    [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
    [2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
    [2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
    Traceback (most recent call last):
    File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
    File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
    raise AirflowException("Bash command failed")
    airflow.exceptions.AirflowException: Bash command failed
    [2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
    [2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed

    [2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
    [2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
    [2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
    [2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
    --------------------------------------------------------------------------------
    Starting attempt 1 of 4
    --------------------------------------------------------------------------------

    [2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
    [2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
    [2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
    [2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
    [2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
    [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
    [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
    [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
    [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
    [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
    [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
    [2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
    [2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
    Traceback (most recent call last):
    File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
    File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
    raise AirflowException("Bash command failed")
    airflow.exceptions.AirflowException: Bash command failed
    [2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
    [2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
    [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
    [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
    [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
    [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
    [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
    [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
    [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
    [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
    [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
    [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
    [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
    [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
    [2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
    [2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
    [2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
    [2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
    [2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
    [2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
    [2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
    [2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
    [2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
    [2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
    [2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token









    share|improve this question

























      1












      1








      1








      I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.



      It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?



      I have changed following variables in the cfg file:



      task_log_reader=gcs.task
      logging_config_class=log_config.LOGGING_CONFIG
      remote_log_conn_id=gcs


      log_config.py:



      GCS_LOG_FOLDER = 'gs://XXXX/'

      LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
      LOG_FORMAT = conf.get('core', 'log_format')

      BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
      PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

      FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
      PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

      LOGGING_CONFIG = {
      'version': 1,
      'disable_existing_loggers': False,
      'formatters': {
      'airflow.task': {
      'format': LOG_FORMAT,
      },
      'airflow.processor': {
      'format': LOG_FORMAT,
      },
      },
      'handlers': {
      'console': {
      'class': 'logging.StreamHandler',
      'formatter': 'airflow.task',
      'stream': 'ext://sys.stdout'
      },
      'file.task': {
      'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
      'formatter': 'airflow.task',
      'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
      'filename_template': FILENAME_TEMPLATE,
      },
      'file.processor': {
      'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
      'formatter': 'airflow.processor',
      'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
      'filename_template': PROCESSOR_FILENAME_TEMPLATE,
      }
      , 'gcs.task': {
      'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
      'formatter': 'airflow.task',
      'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
      'gcs_log_folder': GCS_LOG_FOLDER,
      'filename_template': FILENAME_TEMPLATE,
      },
      },
      'loggers': {
      '': {
      'handlers': ['console'],
      'level': LOG_LEVEL
      },
      'airflow': {
      'handlers': ['console'],
      'level': LOG_LEVEL,
      'propagate': False,
      },
      'airflow.processor': {
      'handlers': ['file.processor'],
      'level': LOG_LEVEL,
      'propagate': True,
      },
      'airflow.task': {
      'handlers': ['gcs.task'],
      'level': LOG_LEVEL,
      'propagate': False,
      },
      'airflow.task_runner': {
      'handlers': ['gcs.task'],
      'level': LOG_LEVEL,
      'propagate': True,
      },
      }
      }


      Log:



      *** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
      [2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
      --------------------------------------------------------------------------------
      Starting attempt 1 of 4
      --------------------------------------------------------------------------------

      [2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
      [2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
      [2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
      [2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
      [2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
      [2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
      [2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
      Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
      result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
      raise AirflowException("Bash command failed")
      airflow.exceptions.AirflowException: Bash command failed
      [2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
      [2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed

      [2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
      --------------------------------------------------------------------------------
      Starting attempt 1 of 4
      --------------------------------------------------------------------------------

      [2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
      [2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
      [2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
      [2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
      [2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
      [2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
      [2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
      Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
      result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
      raise AirflowException("Bash command failed")
      airflow.exceptions.AirflowException: Bash command failed
      [2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
      [2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
      [2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
      [2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
      [2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
      [2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
      [2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
      [2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
      [2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
      [2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
      [2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
      [2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
      [2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token









      share|improve this question














      I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.



      It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?



      I have changed following variables in the cfg file:



      task_log_reader=gcs.task
      logging_config_class=log_config.LOGGING_CONFIG
      remote_log_conn_id=gcs


      log_config.py:



      GCS_LOG_FOLDER = 'gs://XXXX/'

      LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
      LOG_FORMAT = conf.get('core', 'log_format')

      BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
      PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

      FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
      PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

      LOGGING_CONFIG = {
      'version': 1,
      'disable_existing_loggers': False,
      'formatters': {
      'airflow.task': {
      'format': LOG_FORMAT,
      },
      'airflow.processor': {
      'format': LOG_FORMAT,
      },
      },
      'handlers': {
      'console': {
      'class': 'logging.StreamHandler',
      'formatter': 'airflow.task',
      'stream': 'ext://sys.stdout'
      },
      'file.task': {
      'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
      'formatter': 'airflow.task',
      'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
      'filename_template': FILENAME_TEMPLATE,
      },
      'file.processor': {
      'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
      'formatter': 'airflow.processor',
      'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
      'filename_template': PROCESSOR_FILENAME_TEMPLATE,
      }
      , 'gcs.task': {
      'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
      'formatter': 'airflow.task',
      'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
      'gcs_log_folder': GCS_LOG_FOLDER,
      'filename_template': FILENAME_TEMPLATE,
      },
      },
      'loggers': {
      '': {
      'handlers': ['console'],
      'level': LOG_LEVEL
      },
      'airflow': {
      'handlers': ['console'],
      'level': LOG_LEVEL,
      'propagate': False,
      },
      'airflow.processor': {
      'handlers': ['file.processor'],
      'level': LOG_LEVEL,
      'propagate': True,
      },
      'airflow.task': {
      'handlers': ['gcs.task'],
      'level': LOG_LEVEL,
      'propagate': False,
      },
      'airflow.task_runner': {
      'handlers': ['gcs.task'],
      'level': LOG_LEVEL,
      'propagate': True,
      },
      }
      }


      Log:



      *** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
      [2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
      --------------------------------------------------------------------------------
      Starting attempt 1 of 4
      --------------------------------------------------------------------------------

      [2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
      [2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
      [2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
      [2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
      [2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
      [2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
      [2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
      Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
      result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
      raise AirflowException("Bash command failed")
      airflow.exceptions.AirflowException: Bash command failed
      [2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
      [2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed

      [2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
      [2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
      --------------------------------------------------------------------------------
      Starting attempt 1 of 4
      --------------------------------------------------------------------------------

      [2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
      [2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
      [2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
      [2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
      [2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
      [2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
      [2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
      [2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
      [2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
      [2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
      Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
      result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
      raise AirflowException("Bash command failed")
      airflow.exceptions.AirflowException: Bash command failed
      [2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
      [2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
      [2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
      [2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
      [2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
      [2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
      [2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
      [2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
      [2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
      [2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
      [2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
      [2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
      [2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
      [2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
      [2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token






      google-cloud-storage airflow






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 20 '18 at 8:46









      MarcinMarcin

      83




      83
























          1 Answer
          1






          active

          oldest

          votes


















          0














          This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.






          share|improve this answer
























          • Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.

            – Marcin
            Nov 21 '18 at 15:24













          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389193%2fairflow-duplicates-content-of-the-logs-while-writting-to-gcs%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.






          share|improve this answer
























          • Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.

            – Marcin
            Nov 21 '18 at 15:24


















          0














          This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.






          share|improve this answer
























          • Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.

            – Marcin
            Nov 21 '18 at 15:24
















          0












          0








          0







          This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.






          share|improve this answer













          This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 20 '18 at 21:23









          Daniel HuangDaniel Huang

          1,9821015




          1,9821015













          • Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.

            – Marcin
            Nov 21 '18 at 15:24





















          • Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.

            – Marcin
            Nov 21 '18 at 15:24



















          Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.

          – Marcin
          Nov 21 '18 at 15:24







          Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.

          – Marcin
          Nov 21 '18 at 15:24






















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389193%2fairflow-duplicates-content-of-the-logs-while-writting-to-gcs%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Hercules Kyvelos

          Tangent Lines Diagram Along Smooth Curve

          Yusuf al-Mu'taman ibn Hud