java.lang.IllegalArgumentException: calling sideInput() with unknown view












0















I tried to move the data from one table to another table. Used SideInput for filtering the records while transform the data. SideInput also type of KV collection and its loaded the data from another table.



When run my pipeline got "java.lang.IllegalArgumentException: calling sideInput() with unknown view" error.



Here is the entire code that I tried:



{
PipelineOptionsFactory.register(OptionPipeline.class);

OptionPipeline options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OptionPipeline.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> sideInputData = p.apply("ReadSideInput",BigQueryIO.readTableRows().from(options.getOrgRegionMapping()));
PCollection<KV<String,String>> sideInputMap = sideInputData.apply(ParDo.of(new getSideInputDataFn()));
final PCollectionView<Map<String,String>> sideInputView = sideInputMap.apply(View.<String,String>asMap());



PCollection<TableRow> orgMaster = p.apply("ReadOrganization",BigQueryIO.readTableRows().from(options.getOrgCodeMaster()));
PCollection<TableRow> orgCode = orgMaster.apply(ParDo.of(new gnGetOrgMaster()));


@SuppressWarnings("serial")
PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
{
@ProcessElement
public void processElement(ProcessContext c) {

TableRow outputRow = new TableRow();

TableRow orgCodeRow = c.element();
String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

String region = c.sideInput(sideInputView).get(orgCodefromMaster);

outputRow.set("orgCode", orgCodefromMaster);
outputRow.set("orgName", orgCodeRow.get("orgName"));
outputRow.set("orgName", region);
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
Date dateobj = new Date();
outputRow.set("updatedDate",df.format(dateobj));

c.output(outputRow);
}
}));


finalResultCollection.apply(BigQueryIO.writeTableRows()
.withSchema(schema)
.to(options.getOrgCodeTable())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

p.run().waitUntilFinish();
}
@SuppressWarnings("serial")
static class getSideInputDataFn extends DoFn<TableRow,KV<String, String>>
{
@ProcessElement
public void processElement(ProcessContext c)
{
TableRow row = c.element();
c.output(KV.of((String) row.get("orgcode"), (String) row.get("region")));
}
}









share|improve this question



























    0















    I tried to move the data from one table to another table. Used SideInput for filtering the records while transform the data. SideInput also type of KV collection and its loaded the data from another table.



    When run my pipeline got "java.lang.IllegalArgumentException: calling sideInput() with unknown view" error.



    Here is the entire code that I tried:



    {
    PipelineOptionsFactory.register(OptionPipeline.class);

    OptionPipeline options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OptionPipeline.class);

    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> sideInputData = p.apply("ReadSideInput",BigQueryIO.readTableRows().from(options.getOrgRegionMapping()));
    PCollection<KV<String,String>> sideInputMap = sideInputData.apply(ParDo.of(new getSideInputDataFn()));
    final PCollectionView<Map<String,String>> sideInputView = sideInputMap.apply(View.<String,String>asMap());



    PCollection<TableRow> orgMaster = p.apply("ReadOrganization",BigQueryIO.readTableRows().from(options.getOrgCodeMaster()));
    PCollection<TableRow> orgCode = orgMaster.apply(ParDo.of(new gnGetOrgMaster()));


    @SuppressWarnings("serial")
    PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
    {
    @ProcessElement
    public void processElement(ProcessContext c) {

    TableRow outputRow = new TableRow();

    TableRow orgCodeRow = c.element();
    String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

    String region = c.sideInput(sideInputView).get(orgCodefromMaster);

    outputRow.set("orgCode", orgCodefromMaster);
    outputRow.set("orgName", orgCodeRow.get("orgName"));
    outputRow.set("orgName", region);
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
    Date dateobj = new Date();
    outputRow.set("updatedDate",df.format(dateobj));

    c.output(outputRow);
    }
    }));


    finalResultCollection.apply(BigQueryIO.writeTableRows()
    .withSchema(schema)
    .to(options.getOrgCodeTable())
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    p.run().waitUntilFinish();
    }
    @SuppressWarnings("serial")
    static class getSideInputDataFn extends DoFn<TableRow,KV<String, String>>
    {
    @ProcessElement
    public void processElement(ProcessContext c)
    {
    TableRow row = c.element();
    c.output(KV.of((String) row.get("orgcode"), (String) row.get("region")));
    }
    }









    share|improve this question

























      0












      0








      0








      I tried to move the data from one table to another table. Used SideInput for filtering the records while transform the data. SideInput also type of KV collection and its loaded the data from another table.



      When run my pipeline got "java.lang.IllegalArgumentException: calling sideInput() with unknown view" error.



      Here is the entire code that I tried:



      {
      PipelineOptionsFactory.register(OptionPipeline.class);

      OptionPipeline options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OptionPipeline.class);

      Pipeline p = Pipeline.create(options);

      PCollection<TableRow> sideInputData = p.apply("ReadSideInput",BigQueryIO.readTableRows().from(options.getOrgRegionMapping()));
      PCollection<KV<String,String>> sideInputMap = sideInputData.apply(ParDo.of(new getSideInputDataFn()));
      final PCollectionView<Map<String,String>> sideInputView = sideInputMap.apply(View.<String,String>asMap());



      PCollection<TableRow> orgMaster = p.apply("ReadOrganization",BigQueryIO.readTableRows().from(options.getOrgCodeMaster()));
      PCollection<TableRow> orgCode = orgMaster.apply(ParDo.of(new gnGetOrgMaster()));


      @SuppressWarnings("serial")
      PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
      {
      @ProcessElement
      public void processElement(ProcessContext c) {

      TableRow outputRow = new TableRow();

      TableRow orgCodeRow = c.element();
      String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

      String region = c.sideInput(sideInputView).get(orgCodefromMaster);

      outputRow.set("orgCode", orgCodefromMaster);
      outputRow.set("orgName", orgCodeRow.get("orgName"));
      outputRow.set("orgName", region);
      DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
      Date dateobj = new Date();
      outputRow.set("updatedDate",df.format(dateobj));

      c.output(outputRow);
      }
      }));


      finalResultCollection.apply(BigQueryIO.writeTableRows()
      .withSchema(schema)
      .to(options.getOrgCodeTable())
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

      p.run().waitUntilFinish();
      }
      @SuppressWarnings("serial")
      static class getSideInputDataFn extends DoFn<TableRow,KV<String, String>>
      {
      @ProcessElement
      public void processElement(ProcessContext c)
      {
      TableRow row = c.element();
      c.output(KV.of((String) row.get("orgcode"), (String) row.get("region")));
      }
      }









      share|improve this question














      I tried to move the data from one table to another table. Used SideInput for filtering the records while transform the data. SideInput also type of KV collection and its loaded the data from another table.



      When run my pipeline got "java.lang.IllegalArgumentException: calling sideInput() with unknown view" error.



      Here is the entire code that I tried:



      {
      PipelineOptionsFactory.register(OptionPipeline.class);

      OptionPipeline options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OptionPipeline.class);

      Pipeline p = Pipeline.create(options);

      PCollection<TableRow> sideInputData = p.apply("ReadSideInput",BigQueryIO.readTableRows().from(options.getOrgRegionMapping()));
      PCollection<KV<String,String>> sideInputMap = sideInputData.apply(ParDo.of(new getSideInputDataFn()));
      final PCollectionView<Map<String,String>> sideInputView = sideInputMap.apply(View.<String,String>asMap());



      PCollection<TableRow> orgMaster = p.apply("ReadOrganization",BigQueryIO.readTableRows().from(options.getOrgCodeMaster()));
      PCollection<TableRow> orgCode = orgMaster.apply(ParDo.of(new gnGetOrgMaster()));


      @SuppressWarnings("serial")
      PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
      {
      @ProcessElement
      public void processElement(ProcessContext c) {

      TableRow outputRow = new TableRow();

      TableRow orgCodeRow = c.element();
      String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

      String region = c.sideInput(sideInputView).get(orgCodefromMaster);

      outputRow.set("orgCode", orgCodefromMaster);
      outputRow.set("orgName", orgCodeRow.get("orgName"));
      outputRow.set("orgName", region);
      DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
      Date dateobj = new Date();
      outputRow.set("updatedDate",df.format(dateobj));

      c.output(outputRow);
      }
      }));


      finalResultCollection.apply(BigQueryIO.writeTableRows()
      .withSchema(schema)
      .to(options.getOrgCodeTable())
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

      p.run().waitUntilFinish();
      }
      @SuppressWarnings("serial")
      static class getSideInputDataFn extends DoFn<TableRow,KV<String, String>>
      {
      @ProcessElement
      public void processElement(ProcessContext c)
      {
      TableRow row = c.element();
      c.output(KV.of((String) row.get("orgcode"), (String) row.get("region")));
      }
      }






      google-cloud-dataflow apache-beam






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 14 '18 at 14:39









      lourdu rajanlourdu rajan

      186




      186
























          1 Answer
          1






          active

          oldest

          votes


















          0














          It looks like the runner is complaining because you never told it about the side input when defining the graph. In this case you call .withSideInputs after the ParDo.of call passing in the reference to the PCollectionView<T> you defined earlier.



          @SuppressWarnings("serial")
          PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
          {
          @ProcessElement
          public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
          }
          }).withSideInputs(sideInputView));


          I didn't test this code but that's what stands out when I look at it.






          share|improve this answer
























          • After added .withSideInputs(sideInputView)); then I got the result.

            – lourdu rajan
            Dec 12 '18 at 10:37













          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53302728%2fjava-lang-illegalargumentexception-calling-sideinput-with-unknown-view%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          It looks like the runner is complaining because you never told it about the side input when defining the graph. In this case you call .withSideInputs after the ParDo.of call passing in the reference to the PCollectionView<T> you defined earlier.



          @SuppressWarnings("serial")
          PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
          {
          @ProcessElement
          public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
          }
          }).withSideInputs(sideInputView));


          I didn't test this code but that's what stands out when I look at it.






          share|improve this answer
























          • After added .withSideInputs(sideInputView)); then I got the result.

            – lourdu rajan
            Dec 12 '18 at 10:37


















          0














          It looks like the runner is complaining because you never told it about the side input when defining the graph. In this case you call .withSideInputs after the ParDo.of call passing in the reference to the PCollectionView<T> you defined earlier.



          @SuppressWarnings("serial")
          PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
          {
          @ProcessElement
          public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
          }
          }).withSideInputs(sideInputView));


          I didn't test this code but that's what stands out when I look at it.






          share|improve this answer
























          • After added .withSideInputs(sideInputView)); then I got the result.

            – lourdu rajan
            Dec 12 '18 at 10:37
















          0












          0








          0







          It looks like the runner is complaining because you never told it about the side input when defining the graph. In this case you call .withSideInputs after the ParDo.of call passing in the reference to the PCollectionView<T> you defined earlier.



          @SuppressWarnings("serial")
          PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
          {
          @ProcessElement
          public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
          }
          }).withSideInputs(sideInputView));


          I didn't test this code but that's what stands out when I look at it.






          share|improve this answer













          It looks like the runner is complaining because you never told it about the side input when defining the graph. In this case you call .withSideInputs after the ParDo.of call passing in the reference to the PCollectionView<T> you defined earlier.



          @SuppressWarnings("serial")
          PCollection<TableRow> finalResultCollection = orgCode.apply("Process", ParDo.of(new DoFn<TableRow, TableRow>()
          {
          @ProcessElement
          public void processElement(ProcessContext c) {

          TableRow outputRow = new TableRow();

          TableRow orgCodeRow = c.element();
          String orgCodefromMaster = (String) orgCodeRow.get("orgCode");

          String region = c.sideInput(sideInputView).get(orgCodefromMaster);

          outputRow.set("orgCode", orgCodefromMaster);
          outputRow.set("orgName", orgCodeRow.get("orgName"));
          outputRow.set("orgName", region);
          DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
          Date dateobj = new Date();
          outputRow.set("updatedDate",df.format(dateobj));

          c.output(outputRow);
          }
          }).withSideInputs(sideInputView));


          I didn't test this code but that's what stands out when I look at it.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Dec 6 '18 at 6:58









          Tony MTony M

          362




          362













          • After added .withSideInputs(sideInputView)); then I got the result.

            – lourdu rajan
            Dec 12 '18 at 10:37





















          • After added .withSideInputs(sideInputView)); then I got the result.

            – lourdu rajan
            Dec 12 '18 at 10:37



















          After added .withSideInputs(sideInputView)); then I got the result.

          – lourdu rajan
          Dec 12 '18 at 10:37







          After added .withSideInputs(sideInputView)); then I got the result.

          – lourdu rajan
          Dec 12 '18 at 10:37




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53302728%2fjava-lang-illegalargumentexception-calling-sideinput-with-unknown-view%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Hercules Kyvelos

          Tangent Lines Diagram Along Smooth Curve

          Yusuf al-Mu'taman ibn Hud