How to set the right Data Type in parquet with Spark from a CSV with pyspark












1















I have a csv file that looks something like:



39813458,13451345,14513,SomeText,344564,Some other text,328984,"[{""field_int_one"":""16784832510"",""second_int_field"":""84017"",""third_int_field"":""245"",""some_timestamp_one"":""2018-04-17T23:54:34.000Z"",""some_other_timestamp"":""2018-03-03T15:34:04.000Z"",""one_more_int_field"":0,},{""field_int_one"":""18447548326"",""second_int_field"":""04965"",""third_int_field"":""679"",""some_timestamp_one"":""2018-02-06T03:39:12.000Z"",""some_other_timestamp"":""2018-03-01T09:19:12.000Z"",""one_more_int_field"":0}]"


I'm converting it to parquet with



from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

if __name__ == "__main__":
sqlContext = SQLContext(sc)

schema = StructType([
StructField("first_int", IntegerType(), True),
StructField("second_int", IntegerType(), True),
StructField("third_int", IntegerType(), True),
StructField("first_string_field", StringType(), True),
StructField("fourth_int", IntegerType(), True),
StructField("second_string_field", StringType(), True),
StructField("last_int_field", StringType(), True),
StructField("json_field", StringType(), True)])

rdd = spark.read.schema(schema).csv("source_file.csv")
rdd.write.parquet('parquet_output')


It works and converts it but if you do a .printSchema once you are querying it, it obviously prints its definition as String. How can I declare that last field as Json, correctly?










share|improve this question



























    1















    I have a csv file that looks something like:



    39813458,13451345,14513,SomeText,344564,Some other text,328984,"[{""field_int_one"":""16784832510"",""second_int_field"":""84017"",""third_int_field"":""245"",""some_timestamp_one"":""2018-04-17T23:54:34.000Z"",""some_other_timestamp"":""2018-03-03T15:34:04.000Z"",""one_more_int_field"":0,},{""field_int_one"":""18447548326"",""second_int_field"":""04965"",""third_int_field"":""679"",""some_timestamp_one"":""2018-02-06T03:39:12.000Z"",""some_other_timestamp"":""2018-03-01T09:19:12.000Z"",""one_more_int_field"":0}]"


    I'm converting it to parquet with



    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    from pyspark.sql.session import SparkSession
    sc = SparkContext('local')
    spark = SparkSession(sc)

    if __name__ == "__main__":
    sqlContext = SQLContext(sc)

    schema = StructType([
    StructField("first_int", IntegerType(), True),
    StructField("second_int", IntegerType(), True),
    StructField("third_int", IntegerType(), True),
    StructField("first_string_field", StringType(), True),
    StructField("fourth_int", IntegerType(), True),
    StructField("second_string_field", StringType(), True),
    StructField("last_int_field", StringType(), True),
    StructField("json_field", StringType(), True)])

    rdd = spark.read.schema(schema).csv("source_file.csv")
    rdd.write.parquet('parquet_output')


    It works and converts it but if you do a .printSchema once you are querying it, it obviously prints its definition as String. How can I declare that last field as Json, correctly?










    share|improve this question

























      1












      1








      1








      I have a csv file that looks something like:



      39813458,13451345,14513,SomeText,344564,Some other text,328984,"[{""field_int_one"":""16784832510"",""second_int_field"":""84017"",""third_int_field"":""245"",""some_timestamp_one"":""2018-04-17T23:54:34.000Z"",""some_other_timestamp"":""2018-03-03T15:34:04.000Z"",""one_more_int_field"":0,},{""field_int_one"":""18447548326"",""second_int_field"":""04965"",""third_int_field"":""679"",""some_timestamp_one"":""2018-02-06T03:39:12.000Z"",""some_other_timestamp"":""2018-03-01T09:19:12.000Z"",""one_more_int_field"":0}]"


      I'm converting it to parquet with



      from pyspark import SparkContext
      from pyspark.sql import SQLContext
      from pyspark.sql.types import *
      from pyspark.sql.session import SparkSession
      sc = SparkContext('local')
      spark = SparkSession(sc)

      if __name__ == "__main__":
      sqlContext = SQLContext(sc)

      schema = StructType([
      StructField("first_int", IntegerType(), True),
      StructField("second_int", IntegerType(), True),
      StructField("third_int", IntegerType(), True),
      StructField("first_string_field", StringType(), True),
      StructField("fourth_int", IntegerType(), True),
      StructField("second_string_field", StringType(), True),
      StructField("last_int_field", StringType(), True),
      StructField("json_field", StringType(), True)])

      rdd = spark.read.schema(schema).csv("source_file.csv")
      rdd.write.parquet('parquet_output')


      It works and converts it but if you do a .printSchema once you are querying it, it obviously prints its definition as String. How can I declare that last field as Json, correctly?










      share|improve this question














      I have a csv file that looks something like:



      39813458,13451345,14513,SomeText,344564,Some other text,328984,"[{""field_int_one"":""16784832510"",""second_int_field"":""84017"",""third_int_field"":""245"",""some_timestamp_one"":""2018-04-17T23:54:34.000Z"",""some_other_timestamp"":""2018-03-03T15:34:04.000Z"",""one_more_int_field"":0,},{""field_int_one"":""18447548326"",""second_int_field"":""04965"",""third_int_field"":""679"",""some_timestamp_one"":""2018-02-06T03:39:12.000Z"",""some_other_timestamp"":""2018-03-01T09:19:12.000Z"",""one_more_int_field"":0}]"


      I'm converting it to parquet with



      from pyspark import SparkContext
      from pyspark.sql import SQLContext
      from pyspark.sql.types import *
      from pyspark.sql.session import SparkSession
      sc = SparkContext('local')
      spark = SparkSession(sc)

      if __name__ == "__main__":
      sqlContext = SQLContext(sc)

      schema = StructType([
      StructField("first_int", IntegerType(), True),
      StructField("second_int", IntegerType(), True),
      StructField("third_int", IntegerType(), True),
      StructField("first_string_field", StringType(), True),
      StructField("fourth_int", IntegerType(), True),
      StructField("second_string_field", StringType(), True),
      StructField("last_int_field", StringType(), True),
      StructField("json_field", StringType(), True)])

      rdd = spark.read.schema(schema).csv("source_file.csv")
      rdd.write.parquet('parquet_output')


      It works and converts it but if you do a .printSchema once you are querying it, it obviously prints its definition as String. How can I declare that last field as Json, correctly?







      python csv apache-spark pyspark parquet






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 14 '18 at 17:49









      GusGus

      176213




      176213
























          1 Answer
          1






          active

          oldest

          votes


















          0














          I think a nested ArrayType would work for this type of schema



          schema = StructType([
          StructField("first_int", IntegerType(), True),
          StructField("second_int", IntegerType(), True),
          StructField("third_int", IntegerType(), True),
          StructField("first_string_field", StringType(), True),
          StructField("fourth_int", IntegerType(), True),
          StructField("second_string_field", StringType(), True),
          StructField("last_int_field", StringType(), True),
          StructField("json_field", ArrayType(
          StructType()
          .add("field_int_one", IntegerType())
          .add("field_string_one", StringType())
          .addMoreFieldsHere),
          True)])





          share|improve this answer
























          • I'm getting this error from that pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<struct<field_int_one:int, second_int_field:int, third_int_field:int, some_timestamp_one:timestamp, some_other_timestamp:timestamp, one_more_int_field:int>> data type.;'

            – Gus
            Nov 14 '18 at 19:32













          • that ^ is because of this File "/Library/Python/2.7/site-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace)

            – Gus
            Nov 14 '18 at 21:12













          • Apply the schema to the write part not read part

            – sramalingam24
            Nov 15 '18 at 0:30











          • how would I do that?

            – Gus
            Nov 15 '18 at 16:04











          • I tried and if I do that it doesn't seem to keep the field's names and properties once saved

            – Gus
            Nov 15 '18 at 17:20











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53306074%2fhow-to-set-the-right-data-type-in-parquet-with-spark-from-a-csv-with-pyspark%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          I think a nested ArrayType would work for this type of schema



          schema = StructType([
          StructField("first_int", IntegerType(), True),
          StructField("second_int", IntegerType(), True),
          StructField("third_int", IntegerType(), True),
          StructField("first_string_field", StringType(), True),
          StructField("fourth_int", IntegerType(), True),
          StructField("second_string_field", StringType(), True),
          StructField("last_int_field", StringType(), True),
          StructField("json_field", ArrayType(
          StructType()
          .add("field_int_one", IntegerType())
          .add("field_string_one", StringType())
          .addMoreFieldsHere),
          True)])





          share|improve this answer
























          • I'm getting this error from that pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<struct<field_int_one:int, second_int_field:int, third_int_field:int, some_timestamp_one:timestamp, some_other_timestamp:timestamp, one_more_int_field:int>> data type.;'

            – Gus
            Nov 14 '18 at 19:32













          • that ^ is because of this File "/Library/Python/2.7/site-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace)

            – Gus
            Nov 14 '18 at 21:12













          • Apply the schema to the write part not read part

            – sramalingam24
            Nov 15 '18 at 0:30











          • how would I do that?

            – Gus
            Nov 15 '18 at 16:04











          • I tried and if I do that it doesn't seem to keep the field's names and properties once saved

            – Gus
            Nov 15 '18 at 17:20
















          0














          I think a nested ArrayType would work for this type of schema



          schema = StructType([
          StructField("first_int", IntegerType(), True),
          StructField("second_int", IntegerType(), True),
          StructField("third_int", IntegerType(), True),
          StructField("first_string_field", StringType(), True),
          StructField("fourth_int", IntegerType(), True),
          StructField("second_string_field", StringType(), True),
          StructField("last_int_field", StringType(), True),
          StructField("json_field", ArrayType(
          StructType()
          .add("field_int_one", IntegerType())
          .add("field_string_one", StringType())
          .addMoreFieldsHere),
          True)])





          share|improve this answer
























          • I'm getting this error from that pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<struct<field_int_one:int, second_int_field:int, third_int_field:int, some_timestamp_one:timestamp, some_other_timestamp:timestamp, one_more_int_field:int>> data type.;'

            – Gus
            Nov 14 '18 at 19:32













          • that ^ is because of this File "/Library/Python/2.7/site-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace)

            – Gus
            Nov 14 '18 at 21:12













          • Apply the schema to the write part not read part

            – sramalingam24
            Nov 15 '18 at 0:30











          • how would I do that?

            – Gus
            Nov 15 '18 at 16:04











          • I tried and if I do that it doesn't seem to keep the field's names and properties once saved

            – Gus
            Nov 15 '18 at 17:20














          0












          0








          0







          I think a nested ArrayType would work for this type of schema



          schema = StructType([
          StructField("first_int", IntegerType(), True),
          StructField("second_int", IntegerType(), True),
          StructField("third_int", IntegerType(), True),
          StructField("first_string_field", StringType(), True),
          StructField("fourth_int", IntegerType(), True),
          StructField("second_string_field", StringType(), True),
          StructField("last_int_field", StringType(), True),
          StructField("json_field", ArrayType(
          StructType()
          .add("field_int_one", IntegerType())
          .add("field_string_one", StringType())
          .addMoreFieldsHere),
          True)])





          share|improve this answer













          I think a nested ArrayType would work for this type of schema



          schema = StructType([
          StructField("first_int", IntegerType(), True),
          StructField("second_int", IntegerType(), True),
          StructField("third_int", IntegerType(), True),
          StructField("first_string_field", StringType(), True),
          StructField("fourth_int", IntegerType(), True),
          StructField("second_string_field", StringType(), True),
          StructField("last_int_field", StringType(), True),
          StructField("json_field", ArrayType(
          StructType()
          .add("field_int_one", IntegerType())
          .add("field_string_one", StringType())
          .addMoreFieldsHere),
          True)])






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 14 '18 at 18:28









          TimTim

          216210




          216210













          • I'm getting this error from that pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<struct<field_int_one:int, second_int_field:int, third_int_field:int, some_timestamp_one:timestamp, some_other_timestamp:timestamp, one_more_int_field:int>> data type.;'

            – Gus
            Nov 14 '18 at 19:32













          • that ^ is because of this File "/Library/Python/2.7/site-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace)

            – Gus
            Nov 14 '18 at 21:12













          • Apply the schema to the write part not read part

            – sramalingam24
            Nov 15 '18 at 0:30











          • how would I do that?

            – Gus
            Nov 15 '18 at 16:04











          • I tried and if I do that it doesn't seem to keep the field's names and properties once saved

            – Gus
            Nov 15 '18 at 17:20



















          • I'm getting this error from that pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<struct<field_int_one:int, second_int_field:int, third_int_field:int, some_timestamp_one:timestamp, some_other_timestamp:timestamp, one_more_int_field:int>> data type.;'

            – Gus
            Nov 14 '18 at 19:32













          • that ^ is because of this File "/Library/Python/2.7/site-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace)

            – Gus
            Nov 14 '18 at 21:12













          • Apply the schema to the write part not read part

            – sramalingam24
            Nov 15 '18 at 0:30











          • how would I do that?

            – Gus
            Nov 15 '18 at 16:04











          • I tried and if I do that it doesn't seem to keep the field's names and properties once saved

            – Gus
            Nov 15 '18 at 17:20

















          I'm getting this error from that pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<struct<field_int_one:int, second_int_field:int, third_int_field:int, some_timestamp_one:timestamp, some_other_timestamp:timestamp, one_more_int_field:int>> data type.;'

          – Gus
          Nov 14 '18 at 19:32







          I'm getting this error from that pyspark.sql.utils.AnalysisException: u'CSV data source does not support array<struct<field_int_one:int, second_int_field:int, third_int_field:int, some_timestamp_one:timestamp, some_other_timestamp:timestamp, one_more_int_field:int>> data type.;'

          – Gus
          Nov 14 '18 at 19:32















          that ^ is because of this File "/Library/Python/2.7/site-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace)

          – Gus
          Nov 14 '18 at 21:12







          that ^ is because of this File "/Library/Python/2.7/site-packages/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace)

          – Gus
          Nov 14 '18 at 21:12















          Apply the schema to the write part not read part

          – sramalingam24
          Nov 15 '18 at 0:30





          Apply the schema to the write part not read part

          – sramalingam24
          Nov 15 '18 at 0:30













          how would I do that?

          – Gus
          Nov 15 '18 at 16:04





          how would I do that?

          – Gus
          Nov 15 '18 at 16:04













          I tried and if I do that it doesn't seem to keep the field's names and properties once saved

          – Gus
          Nov 15 '18 at 17:20





          I tried and if I do that it doesn't seem to keep the field's names and properties once saved

          – Gus
          Nov 15 '18 at 17:20


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53306074%2fhow-to-set-the-right-data-type-in-parquet-with-spark-from-a-csv-with-pyspark%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Xamarin.form Move up view when keyboard appear

          Post-Redirect-Get with Spring WebFlux and Thymeleaf

          Anylogic : not able to use stopDelay()