Databricks - Structured Streaming: Console Format Displaying Nothing











up vote
1
down vote

favorite












I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.



My program:




  • Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).

  • Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.

  • Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.


Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.



Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.



My question is:





  • How can I make this program to output to Console sink and display the results when using Databricks?


Thank you very much in advance!



Best regards,
Nacho





My Program: myTest.py



import pyspark
import pyspark.sql.functions

import time

#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):

# 1. We create the output variable
res =

# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)

# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)

# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]

# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]

# 3.4. We append the name to the list
res.append(file_name)

# 4. We sort the list in alphabetic order
res.sort()

# 5. We return res
return res

#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)

# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)

start = time.time()

# 3. We set a counter in the amount of files being transferred
count = 0

# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)

# 4.2. We increase the counter, as we have transferred a new file
count = count + 1

# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())

# 5. We wait a last time_step_interval
time.sleep(time_step_interval)

#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True

# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"

dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)

dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)

# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')

# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")
.load(monitoring_dir)

myDSW = None
# 4. Operation A1: We create the DataStreamWritter...

# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")
.option("path", result_dir)
.option("checkpointLocation", checkpoint_dir)
.trigger(processingTime="10 seconds")
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")
.trigger(processingTime="10 seconds")
.outputMode("append")

# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()

# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)

# 7. We stop the StreamingQuery to finish the application
mySQ.stop()

#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()




My Dataset: f1.txt



First sentence.



Second sentence.





My Dataset: f2.txt



Third sentence.



Fourth sentence.





My Dataset: f3.txt



Fifth sentence.



Sixth sentence.










share|improve this question


























    up vote
    1
    down vote

    favorite












    I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.



    My program:




    • Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).

    • Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.

    • Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.


    Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.



    Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.



    My question is:





    • How can I make this program to output to Console sink and display the results when using Databricks?


    Thank you very much in advance!



    Best regards,
    Nacho





    My Program: myTest.py



    import pyspark
    import pyspark.sql.functions

    import time

    #------------------------------------
    # FUNCTION get_source_dir_file_names
    #------------------------------------
    def get_source_dir_file_names(source_dir):

    # 1. We create the output variable
    res =

    # 2. We get the FileInfo representation of the files of source_dir
    fileInfo_objects = dbutils.fs.ls(source_dir)

    # 3. We traverse the fileInfo objects, to get the name of each file
    for item in fileInfo_objects:
    # 3.1. We get a string representation of the fileInfo
    file_name = str(item)

    # 3.2. We look for the pattern name= to remove all useless info from the start
    lb_index = file_name.index("name='")
    file_name = file_name[(lb_index + 6):]

    # 3.3. We look for the pattern ') to remove all useless info from the end
    ub_index = file_name.index("',")
    file_name = file_name[:ub_index]

    # 3.4. We append the name to the list
    res.append(file_name)

    # 4. We sort the list in alphabetic order
    res.sort()

    # 5. We return res
    return res

    #------------------------------------
    # FUNCTION streaming_simulation
    #------------------------------------
    def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
    # 1. We get the names of the files on source_dir
    files = get_source_dir_file_names(source_dir)

    # 2. We get the starting time of the process
    time.sleep(time_step_interval * 0.1)

    start = time.time()

    # 3. We set a counter in the amount of files being transferred
    count = 0

    # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
    # (i.e, the files are moved one by one for each time period, simulating their generation).
    for file in files:
    # 4.1. We copy the file from source_dir to dataset_dir#
    dbutils.fs.cp(source_dir + file, monitoring_dir + file)

    # 4.2. We increase the counter, as we have transferred a new file
    count = count + 1

    # 4.3. We wait the desired transfer_interval until next time slot.
    time.sleep((start + (count * time_step_interval)) - time.time())

    # 5. We wait a last time_step_interval
    time.sleep(time_step_interval)

    #------------------------------------
    # FUNCTION my_main
    #------------------------------------
    def my_main():
    # 0. We set the mode
    console_sink = True

    # 1. We set the paths to the folders
    source_dir = "/FileStore/tables/my_dataset/"
    monitoring_dir = "/FileStore/tables/my_monitoring/"
    checkpoint_dir = "/FileStore/tables/my_checkpoint/"
    result_dir = "/FileStore/tables/my_result/"

    dbutils.fs.rm(monitoring_dir, True)
    dbutils.fs.rm(result_dir, True)
    dbutils.fs.rm(checkpoint_dir, True)

    dbutils.fs.mkdirs(monitoring_dir)
    dbutils.fs.mkdirs(result_dir)
    dbutils.fs.mkdirs(checkpoint_dir)

    # 2. We configure the Spark Session
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel('WARN')

    # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
    inputUDF = spark.readStream.format("text")
    .load(monitoring_dir)

    myDSW = None
    # 4. Operation A1: We create the DataStreamWritter...

    # 4.1. To either save to result_dir in append mode
    if console_sink == False:
    myDSW = inputUDF.writeStream.format("text")
    .option("path", result_dir)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(processingTime="10 seconds")
    .outputMode("append")
    # 4.2. Or to display by console in append mode
    else:
    myDSW = inputUDF.writeStream.format("console")
    .trigger(processingTime="10 seconds")
    .outputMode("append")

    # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
    mySQ = myDSW.start()

    # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
    streaming_simulation(source_dir, monitoring_dir, 10)

    # 7. We stop the StreamingQuery to finish the application
    mySQ.stop()

    #-------------------------------
    # MAIN ENTRY POINT
    #-------------------------------strong text
    if __name__ == '__main__':
    my_main()




    My Dataset: f1.txt



    First sentence.



    Second sentence.





    My Dataset: f2.txt



    Third sentence.



    Fourth sentence.





    My Dataset: f3.txt



    Fifth sentence.



    Sixth sentence.










    share|improve this question
























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.



      My program:




      • Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).

      • Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.

      • Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.


      Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.



      Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.



      My question is:





      • How can I make this program to output to Console sink and display the results when using Databricks?


      Thank you very much in advance!



      Best regards,
      Nacho





      My Program: myTest.py



      import pyspark
      import pyspark.sql.functions

      import time

      #------------------------------------
      # FUNCTION get_source_dir_file_names
      #------------------------------------
      def get_source_dir_file_names(source_dir):

      # 1. We create the output variable
      res =

      # 2. We get the FileInfo representation of the files of source_dir
      fileInfo_objects = dbutils.fs.ls(source_dir)

      # 3. We traverse the fileInfo objects, to get the name of each file
      for item in fileInfo_objects:
      # 3.1. We get a string representation of the fileInfo
      file_name = str(item)

      # 3.2. We look for the pattern name= to remove all useless info from the start
      lb_index = file_name.index("name='")
      file_name = file_name[(lb_index + 6):]

      # 3.3. We look for the pattern ') to remove all useless info from the end
      ub_index = file_name.index("',")
      file_name = file_name[:ub_index]

      # 3.4. We append the name to the list
      res.append(file_name)

      # 4. We sort the list in alphabetic order
      res.sort()

      # 5. We return res
      return res

      #------------------------------------
      # FUNCTION streaming_simulation
      #------------------------------------
      def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
      # 1. We get the names of the files on source_dir
      files = get_source_dir_file_names(source_dir)

      # 2. We get the starting time of the process
      time.sleep(time_step_interval * 0.1)

      start = time.time()

      # 3. We set a counter in the amount of files being transferred
      count = 0

      # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
      # (i.e, the files are moved one by one for each time period, simulating their generation).
      for file in files:
      # 4.1. We copy the file from source_dir to dataset_dir#
      dbutils.fs.cp(source_dir + file, monitoring_dir + file)

      # 4.2. We increase the counter, as we have transferred a new file
      count = count + 1

      # 4.3. We wait the desired transfer_interval until next time slot.
      time.sleep((start + (count * time_step_interval)) - time.time())

      # 5. We wait a last time_step_interval
      time.sleep(time_step_interval)

      #------------------------------------
      # FUNCTION my_main
      #------------------------------------
      def my_main():
      # 0. We set the mode
      console_sink = True

      # 1. We set the paths to the folders
      source_dir = "/FileStore/tables/my_dataset/"
      monitoring_dir = "/FileStore/tables/my_monitoring/"
      checkpoint_dir = "/FileStore/tables/my_checkpoint/"
      result_dir = "/FileStore/tables/my_result/"

      dbutils.fs.rm(monitoring_dir, True)
      dbutils.fs.rm(result_dir, True)
      dbutils.fs.rm(checkpoint_dir, True)

      dbutils.fs.mkdirs(monitoring_dir)
      dbutils.fs.mkdirs(result_dir)
      dbutils.fs.mkdirs(checkpoint_dir)

      # 2. We configure the Spark Session
      spark = pyspark.sql.SparkSession.builder.getOrCreate()
      spark.sparkContext.setLogLevel('WARN')

      # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
      inputUDF = spark.readStream.format("text")
      .load(monitoring_dir)

      myDSW = None
      # 4. Operation A1: We create the DataStreamWritter...

      # 4.1. To either save to result_dir in append mode
      if console_sink == False:
      myDSW = inputUDF.writeStream.format("text")
      .option("path", result_dir)
      .option("checkpointLocation", checkpoint_dir)
      .trigger(processingTime="10 seconds")
      .outputMode("append")
      # 4.2. Or to display by console in append mode
      else:
      myDSW = inputUDF.writeStream.format("console")
      .trigger(processingTime="10 seconds")
      .outputMode("append")

      # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
      mySQ = myDSW.start()

      # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
      streaming_simulation(source_dir, monitoring_dir, 10)

      # 7. We stop the StreamingQuery to finish the application
      mySQ.stop()

      #-------------------------------
      # MAIN ENTRY POINT
      #-------------------------------strong text
      if __name__ == '__main__':
      my_main()




      My Dataset: f1.txt



      First sentence.



      Second sentence.





      My Dataset: f2.txt



      Third sentence.



      Fourth sentence.





      My Dataset: f3.txt



      Fifth sentence.



      Sixth sentence.










      share|improve this question













      I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.



      My program:




      • Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).

      • Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.

      • Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.


      Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.



      Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.



      My question is:





      • How can I make this program to output to Console sink and display the results when using Databricks?


      Thank you very much in advance!



      Best regards,
      Nacho





      My Program: myTest.py



      import pyspark
      import pyspark.sql.functions

      import time

      #------------------------------------
      # FUNCTION get_source_dir_file_names
      #------------------------------------
      def get_source_dir_file_names(source_dir):

      # 1. We create the output variable
      res =

      # 2. We get the FileInfo representation of the files of source_dir
      fileInfo_objects = dbutils.fs.ls(source_dir)

      # 3. We traverse the fileInfo objects, to get the name of each file
      for item in fileInfo_objects:
      # 3.1. We get a string representation of the fileInfo
      file_name = str(item)

      # 3.2. We look for the pattern name= to remove all useless info from the start
      lb_index = file_name.index("name='")
      file_name = file_name[(lb_index + 6):]

      # 3.3. We look for the pattern ') to remove all useless info from the end
      ub_index = file_name.index("',")
      file_name = file_name[:ub_index]

      # 3.4. We append the name to the list
      res.append(file_name)

      # 4. We sort the list in alphabetic order
      res.sort()

      # 5. We return res
      return res

      #------------------------------------
      # FUNCTION streaming_simulation
      #------------------------------------
      def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
      # 1. We get the names of the files on source_dir
      files = get_source_dir_file_names(source_dir)

      # 2. We get the starting time of the process
      time.sleep(time_step_interval * 0.1)

      start = time.time()

      # 3. We set a counter in the amount of files being transferred
      count = 0

      # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
      # (i.e, the files are moved one by one for each time period, simulating their generation).
      for file in files:
      # 4.1. We copy the file from source_dir to dataset_dir#
      dbutils.fs.cp(source_dir + file, monitoring_dir + file)

      # 4.2. We increase the counter, as we have transferred a new file
      count = count + 1

      # 4.3. We wait the desired transfer_interval until next time slot.
      time.sleep((start + (count * time_step_interval)) - time.time())

      # 5. We wait a last time_step_interval
      time.sleep(time_step_interval)

      #------------------------------------
      # FUNCTION my_main
      #------------------------------------
      def my_main():
      # 0. We set the mode
      console_sink = True

      # 1. We set the paths to the folders
      source_dir = "/FileStore/tables/my_dataset/"
      monitoring_dir = "/FileStore/tables/my_monitoring/"
      checkpoint_dir = "/FileStore/tables/my_checkpoint/"
      result_dir = "/FileStore/tables/my_result/"

      dbutils.fs.rm(monitoring_dir, True)
      dbutils.fs.rm(result_dir, True)
      dbutils.fs.rm(checkpoint_dir, True)

      dbutils.fs.mkdirs(monitoring_dir)
      dbutils.fs.mkdirs(result_dir)
      dbutils.fs.mkdirs(checkpoint_dir)

      # 2. We configure the Spark Session
      spark = pyspark.sql.SparkSession.builder.getOrCreate()
      spark.sparkContext.setLogLevel('WARN')

      # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
      inputUDF = spark.readStream.format("text")
      .load(monitoring_dir)

      myDSW = None
      # 4. Operation A1: We create the DataStreamWritter...

      # 4.1. To either save to result_dir in append mode
      if console_sink == False:
      myDSW = inputUDF.writeStream.format("text")
      .option("path", result_dir)
      .option("checkpointLocation", checkpoint_dir)
      .trigger(processingTime="10 seconds")
      .outputMode("append")
      # 4.2. Or to display by console in append mode
      else:
      myDSW = inputUDF.writeStream.format("console")
      .trigger(processingTime="10 seconds")
      .outputMode("append")

      # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
      mySQ = myDSW.start()

      # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
      streaming_simulation(source_dir, monitoring_dir, 10)

      # 7. We stop the StreamingQuery to finish the application
      mySQ.stop()

      #-------------------------------
      # MAIN ENTRY POINT
      #-------------------------------strong text
      if __name__ == '__main__':
      my_main()




      My Dataset: f1.txt



      First sentence.



      Second sentence.





      My Dataset: f2.txt



      Third sentence.



      Fourth sentence.





      My Dataset: f3.txt



      Fifth sentence.



      Sixth sentence.







      python spark-streaming databricks






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 9 at 19:15









      Nacho Castiñeiras

      113




      113





























          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53231997%2fdatabricks-structured-streaming-console-format-displaying-nothing%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53231997%2fdatabricks-structured-streaming-console-format-displaying-nothing%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Xamarin.form Move up view when keyboard appear

          Post-Redirect-Get with Spring WebFlux and Thymeleaf

          Anylogic : not able to use stopDelay()