Is there a better way to load a huge tar file in Spark while avoiding OutOfMemoryError?
up vote
1
down vote
favorite
I have a single tar file mytar.tar
that is 40 GB in size. Inside this tar
file are 500 tar.gz
files, and inside each one of these tar.gz
files are a bunch of JSON
files. I have written up the code to process this single tar
file and attempt to get the list of JSON
string contents. My code looks like the following.
val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)
val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache
jsonRdd.count //action just to execute the code
When I execute this code, I get an OutOfMemoryError (OOME).
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space
My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge
(30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME
happens during the creation of the isRDD
(input stream RDD).
Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.
One thing I have thought about doing was to untar mytar.tar
and instead start with the .tar.gz
files inside. I am thinking that each .tar.gz
inside the tar
file will have to be less than 30 GB to avoid the OOME
(on the i3.xlarge).
Any tips or advice is appreciated.
apache-spark tar
add a comment |
up vote
1
down vote
favorite
I have a single tar file mytar.tar
that is 40 GB in size. Inside this tar
file are 500 tar.gz
files, and inside each one of these tar.gz
files are a bunch of JSON
files. I have written up the code to process this single tar
file and attempt to get the list of JSON
string contents. My code looks like the following.
val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)
val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache
jsonRdd.count //action just to execute the code
When I execute this code, I get an OutOfMemoryError (OOME).
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space
My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge
(30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME
happens during the creation of the isRDD
(input stream RDD).
Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.
One thing I have thought about doing was to untar mytar.tar
and instead start with the .tar.gz
files inside. I am thinking that each .tar.gz
inside the tar
file will have to be less than 30 GB to avoid the OOME
(on the i3.xlarge).
Any tips or advice is appreciated.
apache-spark tar
I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25
add a comment |
up vote
1
down vote
favorite
up vote
1
down vote
favorite
I have a single tar file mytar.tar
that is 40 GB in size. Inside this tar
file are 500 tar.gz
files, and inside each one of these tar.gz
files are a bunch of JSON
files. I have written up the code to process this single tar
file and attempt to get the list of JSON
string contents. My code looks like the following.
val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)
val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache
jsonRdd.count //action just to execute the code
When I execute this code, I get an OutOfMemoryError (OOME).
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space
My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge
(30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME
happens during the creation of the isRDD
(input stream RDD).
Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.
One thing I have thought about doing was to untar mytar.tar
and instead start with the .tar.gz
files inside. I am thinking that each .tar.gz
inside the tar
file will have to be less than 30 GB to avoid the OOME
(on the i3.xlarge).
Any tips or advice is appreciated.
apache-spark tar
I have a single tar file mytar.tar
that is 40 GB in size. Inside this tar
file are 500 tar.gz
files, and inside each one of these tar.gz
files are a bunch of JSON
files. I have written up the code to process this single tar
file and attempt to get the list of JSON
string contents. My code looks like the following.
val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)
val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache
jsonRdd.count //action just to execute the code
When I execute this code, I get an OutOfMemoryError (OOME).
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space
My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge
(30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME
happens during the creation of the isRDD
(input stream RDD).
Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.
One thing I have thought about doing was to untar mytar.tar
and instead start with the .tar.gz
files inside. I am thinking that each .tar.gz
inside the tar
file will have to be less than 30 GB to avoid the OOME
(on the i3.xlarge).
Any tips or advice is appreciated.
apache-spark tar
apache-spark tar
edited Nov 9 at 23:53
asked Nov 9 at 22:56
Jane Wayne
2,69453164
2,69453164
I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25
add a comment |
I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25
I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25
I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25
add a comment |
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53234307%2fis-there-a-better-way-to-load-a-huge-tar-file-in-spark-while-avoiding-outofmemor%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53234307%2fis-there-a-better-way-to-load-a-huge-tar-file-in-spark-while-avoiding-outofmemor%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25