What is the Apache Beam way to handle 'routing'











up vote
0
down vote

favorite












Using Apache Beam I am doing computations - and if they succeed I'd like to write the output to one sink, and if there is a failure I'd like to write that to another sink.



Is there any way to handle metadata or content based routing in Apache Beam?



I've used Apache Camel extensively, and so in my mind based on the outcome of a previous transform, I should route a message to a different sink using a router (perhaps determined by a metadata flag I set on the message header). Is there an analogous capability with Apache Beam, or would I instead just have a sequential transform that inspects the PCollection and handles writing to sinks within the transform?



Ideally I'd like this logic (written verbosely for attempted clarity)



result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
result | ([success_failure_router]
| 'sucess_sink' >> beam.io.WriteToText('/path/to/file')
| 'failure_sink' >> beam.io.WriteStringsToPubSub('mytopic'))


However.. I suspect the 'Beam' way of handling this is



result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
result | 'write_results_appropriately' >> write_results_appropriately(result))
...
def write_results_appropriately(result):
if result == ..:
# success, write to file
else:
# failure, write to topic


Thanks,
Kevin










share|improve this question


























    up vote
    0
    down vote

    favorite












    Using Apache Beam I am doing computations - and if they succeed I'd like to write the output to one sink, and if there is a failure I'd like to write that to another sink.



    Is there any way to handle metadata or content based routing in Apache Beam?



    I've used Apache Camel extensively, and so in my mind based on the outcome of a previous transform, I should route a message to a different sink using a router (perhaps determined by a metadata flag I set on the message header). Is there an analogous capability with Apache Beam, or would I instead just have a sequential transform that inspects the PCollection and handles writing to sinks within the transform?



    Ideally I'd like this logic (written verbosely for attempted clarity)



    result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
    result | ([success_failure_router]
    | 'sucess_sink' >> beam.io.WriteToText('/path/to/file')
    | 'failure_sink' >> beam.io.WriteStringsToPubSub('mytopic'))


    However.. I suspect the 'Beam' way of handling this is



    result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
    result | 'write_results_appropriately' >> write_results_appropriately(result))
    ...
    def write_results_appropriately(result):
    if result == ..:
    # success, write to file
    else:
    # failure, write to topic


    Thanks,
    Kevin










    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      Using Apache Beam I am doing computations - and if they succeed I'd like to write the output to one sink, and if there is a failure I'd like to write that to another sink.



      Is there any way to handle metadata or content based routing in Apache Beam?



      I've used Apache Camel extensively, and so in my mind based on the outcome of a previous transform, I should route a message to a different sink using a router (perhaps determined by a metadata flag I set on the message header). Is there an analogous capability with Apache Beam, or would I instead just have a sequential transform that inspects the PCollection and handles writing to sinks within the transform?



      Ideally I'd like this logic (written verbosely for attempted clarity)



      result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
      result | ([success_failure_router]
      | 'sucess_sink' >> beam.io.WriteToText('/path/to/file')
      | 'failure_sink' >> beam.io.WriteStringsToPubSub('mytopic'))


      However.. I suspect the 'Beam' way of handling this is



      result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
      result | 'write_results_appropriately' >> write_results_appropriately(result))
      ...
      def write_results_appropriately(result):
      if result == ..:
      # success, write to file
      else:
      # failure, write to topic


      Thanks,
      Kevin










      share|improve this question













      Using Apache Beam I am doing computations - and if they succeed I'd like to write the output to one sink, and if there is a failure I'd like to write that to another sink.



      Is there any way to handle metadata or content based routing in Apache Beam?



      I've used Apache Camel extensively, and so in my mind based on the outcome of a previous transform, I should route a message to a different sink using a router (perhaps determined by a metadata flag I set on the message header). Is there an analogous capability with Apache Beam, or would I instead just have a sequential transform that inspects the PCollection and handles writing to sinks within the transform?



      Ideally I'd like this logic (written verbosely for attempted clarity)



      result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
      result | ([success_failure_router]
      | 'sucess_sink' >> beam.io.WriteToText('/path/to/file')
      | 'failure_sink' >> beam.io.WriteStringsToPubSub('mytopic'))


      However.. I suspect the 'Beam' way of handling this is



      result = my_pcollections | 'compute_stuff' >> beam.Map(lambda (pcollection): my_compute_func(pcollection))
      result | 'write_results_appropriately' >> write_results_appropriately(result))
      ...
      def write_results_appropriately(result):
      if result == ..:
      # success, write to file
      else:
      # failure, write to topic


      Thanks,
      Kevin







      google-cloud-dataflow apache-beam






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 8 at 18:39









      Kevin

      6952710




      6952710
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote













          High-level:



          I am not sure of specifics of the Python API in this case, but from high level it looks like this:




          • par-dos support multiple outputs;

          • outputs are identified by the tag you give them (e.g. "correct-elements", "invalid-elements");

          • in your main par-do you write to multiple outputs choosing the output using your criteria;

          • each output is represented by a separate PCollection;

          • then you get the separate PCollections representing the tagged outputs from your par-do;

          • then apply different sinks to each of the tagged PCollections;


          In detail see the section
          https://beam.apache.org/documentation/programming-guide/#additional-outputs






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53214163%2fwhat-is-the-apache-beam-way-to-handle-routing%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            0
            down vote













            High-level:



            I am not sure of specifics of the Python API in this case, but from high level it looks like this:




            • par-dos support multiple outputs;

            • outputs are identified by the tag you give them (e.g. "correct-elements", "invalid-elements");

            • in your main par-do you write to multiple outputs choosing the output using your criteria;

            • each output is represented by a separate PCollection;

            • then you get the separate PCollections representing the tagged outputs from your par-do;

            • then apply different sinks to each of the tagged PCollections;


            In detail see the section
            https://beam.apache.org/documentation/programming-guide/#additional-outputs






            share|improve this answer

























              up vote
              0
              down vote













              High-level:



              I am not sure of specifics of the Python API in this case, but from high level it looks like this:




              • par-dos support multiple outputs;

              • outputs are identified by the tag you give them (e.g. "correct-elements", "invalid-elements");

              • in your main par-do you write to multiple outputs choosing the output using your criteria;

              • each output is represented by a separate PCollection;

              • then you get the separate PCollections representing the tagged outputs from your par-do;

              • then apply different sinks to each of the tagged PCollections;


              In detail see the section
              https://beam.apache.org/documentation/programming-guide/#additional-outputs






              share|improve this answer























                up vote
                0
                down vote










                up vote
                0
                down vote









                High-level:



                I am not sure of specifics of the Python API in this case, but from high level it looks like this:




                • par-dos support multiple outputs;

                • outputs are identified by the tag you give them (e.g. "correct-elements", "invalid-elements");

                • in your main par-do you write to multiple outputs choosing the output using your criteria;

                • each output is represented by a separate PCollection;

                • then you get the separate PCollections representing the tagged outputs from your par-do;

                • then apply different sinks to each of the tagged PCollections;


                In detail see the section
                https://beam.apache.org/documentation/programming-guide/#additional-outputs






                share|improve this answer












                High-level:



                I am not sure of specifics of the Python API in this case, but from high level it looks like this:




                • par-dos support multiple outputs;

                • outputs are identified by the tag you give them (e.g. "correct-elements", "invalid-elements");

                • in your main par-do you write to multiple outputs choosing the output using your criteria;

                • each output is represented by a separate PCollection;

                • then you get the separate PCollections representing the tagged outputs from your par-do;

                • then apply different sinks to each of the tagged PCollections;


                In detail see the section
                https://beam.apache.org/documentation/programming-guide/#additional-outputs







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 8 at 19:45









                Anton

                789115




                789115






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53214163%2fwhat-is-the-apache-beam-way-to-handle-routing%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    這個網誌中的熱門文章

                    Xamarin.form Move up view when keyboard appear

                    Post-Redirect-Get with Spring WebFlux and Thymeleaf

                    Anylogic : not able to use stopDelay()