Detected cartesian product for INNER join on literal column in PySpark





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







4















The following code raises "Detected cartesian product for INNER join" exception:



first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])

second_df = second_df.withColumn("second_id", F.lit("1"))

# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()

result_df = first_df.join(second_df,
first_df.first_id == second_df.second_id,
'inner')
data = result_df.collect()

result_df.explain()


and shows me that the logical plan is as shown below:



Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;


It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).



But, if I use "persist" method before JOIN it works and the Physical Plan is:



*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(first_id#0, 10)
: +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(second_id#4, 10)
+- InMemoryTableScan [some_value#2, second_id#4]
+- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
+- *(1) Project [some_value#2, 1 AS second_id#4]
+- Scan ExistingRDD[some_value#2]


So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.










share|improve this question































    4















    The following code raises "Detected cartesian product for INNER join" exception:



    first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
    second_df = spark.createDataFrame([{"some_value": "????"}, ])

    second_df = second_df.withColumn("second_id", F.lit("1"))

    # If the next line is uncommented, then the JOIN is working fine.
    # second_df.persist()

    result_df = first_df.join(second_df,
    first_df.first_id == second_df.second_id,
    'inner')
    data = result_df.collect()

    result_df.explain()


    and shows me that the logical plan is as shown below:



    Filter (first_id#0 = 1)
    +- LogicalRDD [first_id#0], false
    and
    Project [some_value#2, 1 AS second_id#4]
    +- LogicalRDD [some_value#2], false
    Join condition is missing or trivial.
    Use the CROSS JOIN syntax to allow cartesian products between these relations.;


    It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).



    But, if I use "persist" method before JOIN it works and the Physical Plan is:



    *(3) SortMergeJoin [first_id#0], [second_id#4], Inner
    :- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
    : +- Exchange hashpartitioning(first_id#0, 10)
    : +- Scan ExistingRDD[first_id#0]
    +- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(second_id#4, 10)
    +- InMemoryTableScan [some_value#2, second_id#4]
    +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
    +- *(1) Project [some_value#2, 1 AS second_id#4]
    +- Scan ExistingRDD[some_value#2]


    So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.










    share|improve this question



























      4












      4








      4


      1






      The following code raises "Detected cartesian product for INNER join" exception:



      first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
      second_df = spark.createDataFrame([{"some_value": "????"}, ])

      second_df = second_df.withColumn("second_id", F.lit("1"))

      # If the next line is uncommented, then the JOIN is working fine.
      # second_df.persist()

      result_df = first_df.join(second_df,
      first_df.first_id == second_df.second_id,
      'inner')
      data = result_df.collect()

      result_df.explain()


      and shows me that the logical plan is as shown below:



      Filter (first_id#0 = 1)
      +- LogicalRDD [first_id#0], false
      and
      Project [some_value#2, 1 AS second_id#4]
      +- LogicalRDD [some_value#2], false
      Join condition is missing or trivial.
      Use the CROSS JOIN syntax to allow cartesian products between these relations.;


      It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).



      But, if I use "persist" method before JOIN it works and the Physical Plan is:



      *(3) SortMergeJoin [first_id#0], [second_id#4], Inner
      :- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
      : +- Exchange hashpartitioning(first_id#0, 10)
      : +- Scan ExistingRDD[first_id#0]
      +- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
      +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
      +- *(1) Project [some_value#2, 1 AS second_id#4]
      +- Scan ExistingRDD[some_value#2]


      So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.










      share|improve this question
















      The following code raises "Detected cartesian product for INNER join" exception:



      first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
      second_df = spark.createDataFrame([{"some_value": "????"}, ])

      second_df = second_df.withColumn("second_id", F.lit("1"))

      # If the next line is uncommented, then the JOIN is working fine.
      # second_df.persist()

      result_df = first_df.join(second_df,
      first_df.first_id == second_df.second_id,
      'inner')
      data = result_df.collect()

      result_df.explain()


      and shows me that the logical plan is as shown below:



      Filter (first_id#0 = 1)
      +- LogicalRDD [first_id#0], false
      and
      Project [some_value#2, 1 AS second_id#4]
      +- LogicalRDD [some_value#2], false
      Join condition is missing or trivial.
      Use the CROSS JOIN syntax to allow cartesian products between these relations.;


      It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).



      But, if I use "persist" method before JOIN it works and the Physical Plan is:



      *(3) SortMergeJoin [first_id#0], [second_id#4], Inner
      :- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
      : +- Exchange hashpartitioning(first_id#0, 10)
      : +- Scan ExistingRDD[first_id#0]
      +- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
      +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
      +- *(1) Project [some_value#2, 1 AS second_id#4]
      +- Scan ExistingRDD[some_value#2]


      So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.







      apache-spark pyspark apache-spark-sql






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 23 '18 at 15:24









      user6910411

      35.8k1090111




      35.8k1090111










      asked Nov 23 '18 at 15:12









      Andrew TalalayAndrew Talalay

      233




      233
























          1 Answer
          1






          active

          oldest

          votes


















          3














          The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.



          It would be trivial to achieve the same outcome, without persistence, using udf





          from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

          @pandas_udf('integer', PandasUDFType.SCALAR)
          def identity(x):
          return x

          second_df = second_df.withColumn('second_id', identity(lit(1)))

          result_df = first_df.join(second_df,
          first_df.first_id == second_df.second_id,
          'inner')

          result_df.explain()




          == Physical Plan ==
          *(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
          :- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
          : +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
          : +- *(1) Filter isnotnull(first_id#4)
          : +- Scan ExistingRDD[first_id#4]
          +- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(second_id#129, 200)
          +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
          +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
          +- *(3) Project [some_value#6]
          +- *(3) Filter isnotnull(pythonUDF0#153)
          +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
          +- Scan ExistingRDD[some_value#6]


          However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.



          Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.



          A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.






          share|improve this answer
























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53449100%2fdetected-cartesian-product-for-inner-join-on-literal-column-in-pyspark%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            3














            The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.



            It would be trivial to achieve the same outcome, without persistence, using udf





            from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

            @pandas_udf('integer', PandasUDFType.SCALAR)
            def identity(x):
            return x

            second_df = second_df.withColumn('second_id', identity(lit(1)))

            result_df = first_df.join(second_df,
            first_df.first_id == second_df.second_id,
            'inner')

            result_df.explain()




            == Physical Plan ==
            *(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
            :- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
            : +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
            : +- *(1) Filter isnotnull(first_id#4)
            : +- Scan ExistingRDD[first_id#4]
            +- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(second_id#129, 200)
            +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
            +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
            +- *(3) Filter isnotnull(pythonUDF0#153)
            +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
            +- Scan ExistingRDD[some_value#6]


            However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.



            Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.



            A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.






            share|improve this answer




























              3














              The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.



              It would be trivial to achieve the same outcome, without persistence, using udf





              from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

              @pandas_udf('integer', PandasUDFType.SCALAR)
              def identity(x):
              return x

              second_df = second_df.withColumn('second_id', identity(lit(1)))

              result_df = first_df.join(second_df,
              first_df.first_id == second_df.second_id,
              'inner')

              result_df.explain()




              == Physical Plan ==
              *(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
              :- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
              : +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
              : +- *(1) Filter isnotnull(first_id#4)
              : +- Scan ExistingRDD[first_id#4]
              +- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
              +- Exchange hashpartitioning(second_id#129, 200)
              +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
              +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
              +- *(3) Project [some_value#6]
              +- *(3) Filter isnotnull(pythonUDF0#153)
              +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
              +- Scan ExistingRDD[some_value#6]


              However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.



              Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.



              A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.






              share|improve this answer


























                3












                3








                3







                The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.



                It would be trivial to achieve the same outcome, without persistence, using udf





                from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

                @pandas_udf('integer', PandasUDFType.SCALAR)
                def identity(x):
                return x

                second_df = second_df.withColumn('second_id', identity(lit(1)))

                result_df = first_df.join(second_df,
                first_df.first_id == second_df.second_id,
                'inner')

                result_df.explain()




                == Physical Plan ==
                *(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
                :- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
                : +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
                : +- *(1) Filter isnotnull(first_id#4)
                : +- Scan ExistingRDD[first_id#4]
                +- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
                +- Exchange hashpartitioning(second_id#129, 200)
                +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
                +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
                +- *(3) Project [some_value#6]
                +- *(3) Filter isnotnull(pythonUDF0#153)
                +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                +- Scan ExistingRDD[some_value#6]


                However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.



                Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.



                A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.






                share|improve this answer













                The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.



                It would be trivial to achieve the same outcome, without persistence, using udf





                from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

                @pandas_udf('integer', PandasUDFType.SCALAR)
                def identity(x):
                return x

                second_df = second_df.withColumn('second_id', identity(lit(1)))

                result_df = first_df.join(second_df,
                first_df.first_id == second_df.second_id,
                'inner')

                result_df.explain()




                == Physical Plan ==
                *(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
                :- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
                : +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
                : +- *(1) Filter isnotnull(first_id#4)
                : +- Scan ExistingRDD[first_id#4]
                +- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
                +- Exchange hashpartitioning(second_id#129, 200)
                +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
                +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
                +- *(3) Project [some_value#6]
                +- *(3) Filter isnotnull(pythonUDF0#153)
                +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                +- Scan ExistingRDD[some_value#6]


                However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.



                Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.



                A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 23 '18 at 15:44









                user6910411user6910411

                35.8k1090111




                35.8k1090111
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53449100%2fdetected-cartesian-product-for-inner-join-on-literal-column-in-pyspark%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    這個網誌中的熱門文章

                    Xamarin.form Move up view when keyboard appear

                    Post-Redirect-Get with Spring WebFlux and Thymeleaf

                    Anylogic : not able to use stopDelay()