Databricks - Structured Streaming: Console Format Displaying Nothing
up vote
1
down vote
favorite
I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.
My program:
- Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
- Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
- Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.
Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.
Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.
My question is:
How can I make this program to output to Console sink and display the results when using Databricks?
Thank you very much in advance!
Best regards,
Nacho
My Program: myTest.py
import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res =
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")
.option("path", result_dir)
.option("checkpointLocation", checkpoint_dir)
.trigger(processingTime="10 seconds")
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")
.trigger(processingTime="10 seconds")
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
My Dataset: f1.txt
First sentence.
Second sentence.
My Dataset: f2.txt
Third sentence.
Fourth sentence.
My Dataset: f3.txt
Fifth sentence.
Sixth sentence.
python spark-streaming databricks
add a comment |
up vote
1
down vote
favorite
I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.
My program:
- Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
- Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
- Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.
Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.
Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.
My question is:
How can I make this program to output to Console sink and display the results when using Databricks?
Thank you very much in advance!
Best regards,
Nacho
My Program: myTest.py
import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res =
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")
.option("path", result_dir)
.option("checkpointLocation", checkpoint_dir)
.trigger(processingTime="10 seconds")
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")
.trigger(processingTime="10 seconds")
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
My Dataset: f1.txt
First sentence.
Second sentence.
My Dataset: f2.txt
Third sentence.
Fourth sentence.
My Dataset: f3.txt
Fifth sentence.
Sixth sentence.
python spark-streaming databricks
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.
My program:
- Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
- Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
- Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.
Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.
Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.
My question is:
How can I make this program to output to Console sink and display the results when using Databricks?
Thank you very much in advance!
Best regards,
Nacho
My Program: myTest.py
import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res =
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")
.option("path", result_dir)
.option("checkpointLocation", checkpoint_dir)
.trigger(processingTime="10 seconds")
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")
.trigger(processingTime="10 seconds")
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
My Dataset: f1.txt
First sentence.
Second sentence.
My Dataset: f2.txt
Third sentence.
Fourth sentence.
My Dataset: f3.txt
Fifth sentence.
Sixth sentence.
python spark-streaming databricks
I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.
My program:
- Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
- Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
- Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.
Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.
Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.
My question is:
How can I make this program to output to Console sink and display the results when using Databricks?
Thank you very much in advance!
Best regards,
Nacho
My Program: myTest.py
import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res =
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")
.option("path", result_dir)
.option("checkpointLocation", checkpoint_dir)
.trigger(processingTime="10 seconds")
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")
.trigger(processingTime="10 seconds")
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
My Dataset: f1.txt
First sentence.
Second sentence.
My Dataset: f2.txt
Third sentence.
Fourth sentence.
My Dataset: f3.txt
Fifth sentence.
Sixth sentence.
python spark-streaming databricks
python spark-streaming databricks
asked Nov 9 at 19:15
Nacho Castiñeiras
113
113
add a comment |
add a comment |
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53231997%2fdatabricks-structured-streaming-console-format-displaying-nothing%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53231997%2fdatabricks-structured-streaming-console-format-displaying-nothing%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown