Airflow duplicates content of the logs while writting to GCS
I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.
It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?
I have changed following variables in the cfg file:
task_log_reader=gcs.task
logging_config_class=log_config.LOGGING_CONFIG
remote_log_conn_id=gcs
log_config.py:
GCS_LOG_FOLDER = 'gs://XXXX/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
, 'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Log:
*** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
[2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token
google-cloud-storage airflow
add a comment |
I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.
It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?
I have changed following variables in the cfg file:
task_log_reader=gcs.task
logging_config_class=log_config.LOGGING_CONFIG
remote_log_conn_id=gcs
log_config.py:
GCS_LOG_FOLDER = 'gs://XXXX/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
, 'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Log:
*** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
[2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token
google-cloud-storage airflow
add a comment |
I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.
It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?
I have changed following variables in the cfg file:
task_log_reader=gcs.task
logging_config_class=log_config.LOGGING_CONFIG
remote_log_conn_id=gcs
log_config.py:
GCS_LOG_FOLDER = 'gs://XXXX/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
, 'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Log:
*** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
[2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token
google-cloud-storage airflow
I configured Airflow 1.9 to store dag logs in Google Cloud Storage following (exactly) this description. It is working, however parts of the content of all DAGs logs seems to be duplicated (see below). It appears as if the log were appended to itself with additional information about the upload. The log file on a local drive doesn't have those duplicates.
It seems that gcs_write is by default using an append mode - so the only hack I found is to change it to False. Is there a configuration for that? What is the reason for this anyway?
I have changed following variables in the cfg file:
task_log_reader=gcs.task
logging_config_class=log_config.LOGGING_CONFIG
remote_log_conn_id=gcs
log_config.py:
GCS_LOG_FOLDER = 'gs://XXXX/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
, 'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Log:
*** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
[2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token
google-cloud-storage airflow
google-cloud-storage airflow
asked Nov 20 '18 at 8:46
MarcinMarcin
83
83
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.
Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.
– Marcin
Nov 21 '18 at 15:24
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389193%2fairflow-duplicates-content-of-the-logs-while-writting-to-gcs%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.
Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.
– Marcin
Nov 21 '18 at 15:24
add a comment |
This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.
Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.
– Marcin
Nov 21 '18 at 15:24
add a comment |
This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.
This is a known issue that affects both GCS and S3 remote logging, see AIRFLOW-1916. It is fixed in Airflow 1.10 so you can either upgrade or pull run a fork with the fix.
answered Nov 20 '18 at 21:23
Daniel HuangDaniel Huang
1,9821015
1,9821015
Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.
– Marcin
Nov 21 '18 at 15:24
add a comment |
Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.
– Marcin
Nov 21 '18 at 15:24
Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.
– Marcin
Nov 21 '18 at 15:24
Thanks Daniel! Some how I overlooked that. It seems it is yet another reason to move to 1.10.
– Marcin
Nov 21 '18 at 15:24
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389193%2fairflow-duplicates-content-of-the-logs-while-writting-to-gcs%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown