PythonOperator with python_callable set gets executed constantly











up vote
1
down vote

favorite












import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)

t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)


The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.



def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + 'n')


I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.



What I want is for the function to be executed on every minute as intended, not every execution loop.










share|improve this question


























    up vote
    1
    down vote

    favorite












    import airflow
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta
    from workflow.task import some_task

    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['jimin.park1@aig.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'start_date': airflow.utils.dates.days_ago(0)
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    }

    dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)

    t1 = PythonOperator(
    task_id='Task1',
    provide_context=True,
    python_callable=some_task,
    dag=dag
    )


    The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.



    def some_task(ds, **kwargs):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("test.txt", "a") as myfile:
    myfile.write(current_time + 'n')


    I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.



    What I want is for the function to be executed on every minute as intended, not every execution loop.










    share|improve this question
























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      import airflow
      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from datetime import datetime, timedelta
      from workflow.task import some_task

      default_args = {
      'owner': 'airflow',
      'depends_on_past': False,
      'email': ['jimin.park1@aig.com'],
      'email_on_failure': False,
      'email_on_retry': False,
      'retries': 0,
      'retry_delay': timedelta(minutes=1),
      'start_date': airflow.utils.dates.days_ago(0)
      # 'queue': 'bash_queue',
      # 'pool': 'backfill',
      # 'priority_weight': 10,
      # 'end_date': datetime(2016, 1, 1),
      }

      dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)

      t1 = PythonOperator(
      task_id='Task1',
      provide_context=True,
      python_callable=some_task,
      dag=dag
      )


      The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.



      def some_task(ds, **kwargs):
      current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
      with open("test.txt", "a") as myfile:
      myfile.write(current_time + 'n')


      I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.



      What I want is for the function to be executed on every minute as intended, not every execution loop.










      share|improve this question













      import airflow
      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from datetime import datetime, timedelta
      from workflow.task import some_task

      default_args = {
      'owner': 'airflow',
      'depends_on_past': False,
      'email': ['jimin.park1@aig.com'],
      'email_on_failure': False,
      'email_on_retry': False,
      'retries': 0,
      'retry_delay': timedelta(minutes=1),
      'start_date': airflow.utils.dates.days_ago(0)
      # 'queue': 'bash_queue',
      # 'pool': 'backfill',
      # 'priority_weight': 10,
      # 'end_date': datetime(2016, 1, 1),
      }

      dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)

      t1 = PythonOperator(
      task_id='Task1',
      provide_context=True,
      python_callable=some_task,
      dag=dag
      )


      The actual some_task itself simply appends timestamp to some file. As you can see in the dag config file, the task itself is configured to run every 1 min.



      def some_task(ds, **kwargs):
      current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
      with open("test.txt", "a") as myfile:
      myfile.write(current_time + 'n')


      I simply tail -f the output file and started up the webserver without the scheduler running. This function was being called and things were being appended to the file when webserver starts up. When I start up the scheduler, on each execution loop, the file gets appended.



      What I want is for the function to be executed on every minute as intended, not every execution loop.







      airflow airflow-scheduler






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 5 at 17:44









      jiminssy

      5162719




      5162719
























          2 Answers
          2






          active

          oldest

          votes

















          up vote
          2
          down vote



          accepted










          The scheduler will run each DAG file every scheduler loop, including all import statements.



          Is there anything running code in the file from where you are importing the function?






          share|improve this answer





















          • Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
            – jiminssy
            Nov 6 at 10:01


















          up vote
          0
          down vote













          Try to check the scheduler_heartbeat_sec config parameter in your config file. For your case it should be smaller than 60 seconds.



          If you want the scheduler not to cahtchup previous runs set catchup_by_defaultto False (I am not sure if this relevant to your question though).



          Please indicate which Apache Airflow version are you using






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














             

            draft saved


            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53159534%2fpythonoperator-with-python-callable-set-gets-executed-constantly%23new-answer', 'question_page');
            }
            );

            Post as a guest
































            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            2
            down vote



            accepted










            The scheduler will run each DAG file every scheduler loop, including all import statements.



            Is there anything running code in the file from where you are importing the function?






            share|improve this answer





















            • Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
              – jiminssy
              Nov 6 at 10:01















            up vote
            2
            down vote



            accepted










            The scheduler will run each DAG file every scheduler loop, including all import statements.



            Is there anything running code in the file from where you are importing the function?






            share|improve this answer





















            • Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
              – jiminssy
              Nov 6 at 10:01













            up vote
            2
            down vote



            accepted







            up vote
            2
            down vote



            accepted






            The scheduler will run each DAG file every scheduler loop, including all import statements.



            Is there anything running code in the file from where you are importing the function?






            share|improve this answer












            The scheduler will run each DAG file every scheduler loop, including all import statements.



            Is there anything running code in the file from where you are importing the function?







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Nov 5 at 18:24









            Viraj Parekh

            6337




            6337












            • Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
              – jiminssy
              Nov 6 at 10:01


















            • Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
              – jiminssy
              Nov 6 at 10:01
















            Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
            – jiminssy
            Nov 6 at 10:01




            Amazing. That was the problem. There was the method call some where in the imported py file... I must have left it there for testing purposes. Thank you for pointing this out. Didn't know import was being called on every scheduler loop.
            – jiminssy
            Nov 6 at 10:01












            up vote
            0
            down vote













            Try to check the scheduler_heartbeat_sec config parameter in your config file. For your case it should be smaller than 60 seconds.



            If you want the scheduler not to cahtchup previous runs set catchup_by_defaultto False (I am not sure if this relevant to your question though).



            Please indicate which Apache Airflow version are you using






            share|improve this answer

























              up vote
              0
              down vote













              Try to check the scheduler_heartbeat_sec config parameter in your config file. For your case it should be smaller than 60 seconds.



              If you want the scheduler not to cahtchup previous runs set catchup_by_defaultto False (I am not sure if this relevant to your question though).



              Please indicate which Apache Airflow version are you using






              share|improve this answer























                up vote
                0
                down vote










                up vote
                0
                down vote









                Try to check the scheduler_heartbeat_sec config parameter in your config file. For your case it should be smaller than 60 seconds.



                If you want the scheduler not to cahtchup previous runs set catchup_by_defaultto False (I am not sure if this relevant to your question though).



                Please indicate which Apache Airflow version are you using






                share|improve this answer












                Try to check the scheduler_heartbeat_sec config parameter in your config file. For your case it should be smaller than 60 seconds.



                If you want the scheduler not to cahtchup previous runs set catchup_by_defaultto False (I am not sure if this relevant to your question though).



                Please indicate which Apache Airflow version are you using







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 6 at 9:09









                sdikby

                385318




                385318






























                     

                    draft saved


                    draft discarded



















































                     


                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53159534%2fpythonoperator-with-python-callable-set-gets-executed-constantly%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest




















































































                    這個網誌中的熱門文章

                    Hercules Kyvelos

                    Tangent Lines Diagram Along Smooth Curve

                    Yusuf al-Mu'taman ibn Hud