PythonOperator with python_callable set gets executed constantly
up vote
1
down vote
favorite
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + 'n')
I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.
What I want is for the function to be executed on every minute as intended, not every execution loop.
airflow airflow-scheduler
add a comment |
up vote
1
down vote
favorite
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + 'n')
I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.
What I want is for the function to be executed on every minute as intended, not every execution loop.
airflow airflow-scheduler
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + 'n')
I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.
What I want is for the function to be executed on every minute as intended, not every execution loop.
airflow airflow-scheduler
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + 'n')
I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.
What I want is for the function to be executed on every minute as intended, not every execution loop.
airflow airflow-scheduler
airflow airflow-scheduler
asked Nov 5 at 17:44
jiminssy
5162719
5162719
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
up vote
2
down vote
accepted
The scheduler will run each DAG file every scheduler loop, including all import statements.
Is there anything running code in the file from where you are importing the function?
Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
– jiminssy
Nov 6 at 10:01
add a comment |
up vote
0
down vote
Try to check the scheduler_heartbeat_sec
config parameter in your config file. For your case it should be smaller than 60 seconds.
If you want the scheduler not to cahtchup previous runs set catchup_by_default
to False (I am not sure if this relevant to your question though).
Please indicate which Apache Airflow version are you using
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
2
down vote
accepted
The scheduler will run each DAG file every scheduler loop, including all import statements.
Is there anything running code in the file from where you are importing the function?
Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
– jiminssy
Nov 6 at 10:01
add a comment |
up vote
2
down vote
accepted
The scheduler will run each DAG file every scheduler loop, including all import statements.
Is there anything running code in the file from where you are importing the function?
Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
– jiminssy
Nov 6 at 10:01
add a comment |
up vote
2
down vote
accepted
up vote
2
down vote
accepted
The scheduler will run each DAG file every scheduler loop, including all import statements.
Is there anything running code in the file from where you are importing the function?
The scheduler will run each DAG file every scheduler loop, including all import statements.
Is there anything running code in the file from where you are importing the function?
answered Nov 5 at 18:24
Viraj Parekh
6337
6337
Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
– jiminssy
Nov 6 at 10:01
add a comment |
Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
– jiminssy
Nov 6 at 10:01
Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
– jiminssy
Nov 6 at 10:01
Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
– jiminssy
Nov 6 at 10:01
add a comment |
up vote
0
down vote
Try to check the scheduler_heartbeat_sec
config parameter in your config file. For your case it should be smaller than 60 seconds.
If you want the scheduler not to cahtchup previous runs set catchup_by_default
to False (I am not sure if this relevant to your question though).
Please indicate which Apache Airflow version are you using
add a comment |
up vote
0
down vote
Try to check the scheduler_heartbeat_sec
config parameter in your config file. For your case it should be smaller than 60 seconds.
If you want the scheduler not to cahtchup previous runs set catchup_by_default
to False (I am not sure if this relevant to your question though).
Please indicate which Apache Airflow version are you using
add a comment |
up vote
0
down vote
up vote
0
down vote
Try to check the scheduler_heartbeat_sec
config parameter in your config file. For your case it should be smaller than 60 seconds.
If you want the scheduler not to cahtchup previous runs set catchup_by_default
to False (I am not sure if this relevant to your question though).
Please indicate which Apache Airflow version are you using
Try to check the scheduler_heartbeat_sec
config parameter in your config file. For your case it should be smaller than 60 seconds.
If you want the scheduler not to cahtchup previous runs set catchup_by_default
to False (I am not sure if this relevant to your question though).
Please indicate which Apache Airflow version are you using
answered Nov 6 at 9:09
sdikby
385318
385318
add a comment |
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53159534%2fpythonoperator-with-python-callable-set-gets-executed-constantly%23new-answer', 'question_page');
}
);
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password