Apache Nifi - When utilizing SplitText on large files, how can I make the put files write out immediately
up vote
0
down vote
favorite
I am reading in text files with 50k rows of data where each row represents a complete record.
Our Nifi flow is utilizing the SplitText to handle the file in batches of 1000 rows. (This was setup before my time for memory issues I'm told)
Is it possible to have the PutFile execute immediately? I want the files to just right out the PutFile record once it is done and not just sit in queue waiting for all 50k+ rows of data have been processed. Seems rather dumb to do that if it is being split up.
I was reading up on documentation but I cannot find if this is by design and not configurable.
Appreciate any documentation guidance that can help answer/configure my flow.
configuration apache-nifi
add a comment |
up vote
0
down vote
favorite
I am reading in text files with 50k rows of data where each row represents a complete record.
Our Nifi flow is utilizing the SplitText to handle the file in batches of 1000 rows. (This was setup before my time for memory issues I'm told)
Is it possible to have the PutFile execute immediately? I want the files to just right out the PutFile record once it is done and not just sit in queue waiting for all 50k+ rows of data have been processed. Seems rather dumb to do that if it is being split up.
I was reading up on documentation but I cannot find if this is by design and not configurable.
Appreciate any documentation guidance that can help answer/configure my flow.
configuration apache-nifi
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I am reading in text files with 50k rows of data where each row represents a complete record.
Our Nifi flow is utilizing the SplitText to handle the file in batches of 1000 rows. (This was setup before my time for memory issues I'm told)
Is it possible to have the PutFile execute immediately? I want the files to just right out the PutFile record once it is done and not just sit in queue waiting for all 50k+ rows of data have been processed. Seems rather dumb to do that if it is being split up.
I was reading up on documentation but I cannot find if this is by design and not configurable.
Appreciate any documentation guidance that can help answer/configure my flow.
configuration apache-nifi
I am reading in text files with 50k rows of data where each row represents a complete record.
Our Nifi flow is utilizing the SplitText to handle the file in batches of 1000 rows. (This was setup before my time for memory issues I'm told)
Is it possible to have the PutFile execute immediately? I want the files to just right out the PutFile record once it is done and not just sit in queue waiting for all 50k+ rows of data have been processed. Seems rather dumb to do that if it is being split up.
I was reading up on documentation but I cannot find if this is by design and not configurable.
Appreciate any documentation guidance that can help answer/configure my flow.
configuration apache-nifi
configuration apache-nifi
asked Nov 5 at 17:54
Elijah
80421028
80421028
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
2
down vote
TL;DR A workaround is to use multiple SplitTexts, the first one splitting into 10k rows for example, then the second to split into 1000 rows. Then the first 10k rows will be split into 10 flow files and sent downstream while the second 10k rows are being processed by the second SplitText.
EDIT: Adding another workaround, a Groovy script to be used in InvokeScriptedProcessor:
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
def ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { null }
String getIdentifier() { null }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session1 = sessionFactory.createSession()
def session2 = sessionFactory.createSession()
try {
def inFlowFile = session1.get()
if(!inFlowFile) return
def inputStream = session1.read(inFlowFile)
inputStream.eachLine { line ->
def outFlowFile = session2.create()
outFlowFile = session2.write(outFlowFile, {outputStream ->
outputStream.write(line.bytes)
} as OutputStreamCallback)
session2.transfer(outFlowFile, REL_SUCCESS)
session2.commit()
}
inputStream.close()
session1.transfer(inFlowFile, REL_ORIGINAL)
session1.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object)
session2.rollback(true)
session1.rollback(true)
throw t
}}}
processor = new GroovyProcessor()
For completeness:
The Split processors were designed to support the Split/Merge pattern, and in order to merge them back together later, they each need the same "parent ID" as well as the count.
If you send flow files out before you've split everything up, you won't know the total count and won't be able to merge them back later. Also if something goes wrong with split processing, you may want to "rollback" the operation instead of having some flow files already downstream, and the rest of them sent to failure
In order to send out some flow files before all processing, you have to "commit the process session". This prevents you from doing the things above, and it creates a break in the provenance for the incoming flow file, as you have to commit/transfer that file in the session that originally takes it in. All following commits will need new flow files created, which breaks the provenance/lineage chain.
Although there is an open Jira for this (NIFI-2878), there has been some dissent on the mailing lists and pull requests about adding this feature to processors that accept input (i.e. non-source processors). NiFi's framework is fairly transactional, and this kind of feature flies in the face of that.
How do I commit the process session? I have no need to merge the items back together. @mattyb
– Elijah
Nov 5 at 19:30
That's a NiFi development concept, things that happen "under the hood".
– mattyb
Nov 5 at 20:29
Can you tell me if I am wrong in how I am understanding this. If I were to use multiple levels of SplitText then that would do the following: lighten the load on the service and write out to the PutFile faster because the flow would not require everything from the original file to be processed but would depend on the last size of the SplitText. So if the last SplitText was set to 100 then after every 100 finishes then it would send out to the PutFile.
– Elijah
Nov 6 at 15:02
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
2
down vote
TL;DR A workaround is to use multiple SplitTexts, the first one splitting into 10k rows for example, then the second to split into 1000 rows. Then the first 10k rows will be split into 10 flow files and sent downstream while the second 10k rows are being processed by the second SplitText.
EDIT: Adding another workaround, a Groovy script to be used in InvokeScriptedProcessor:
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
def ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { null }
String getIdentifier() { null }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session1 = sessionFactory.createSession()
def session2 = sessionFactory.createSession()
try {
def inFlowFile = session1.get()
if(!inFlowFile) return
def inputStream = session1.read(inFlowFile)
inputStream.eachLine { line ->
def outFlowFile = session2.create()
outFlowFile = session2.write(outFlowFile, {outputStream ->
outputStream.write(line.bytes)
} as OutputStreamCallback)
session2.transfer(outFlowFile, REL_SUCCESS)
session2.commit()
}
inputStream.close()
session1.transfer(inFlowFile, REL_ORIGINAL)
session1.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object)
session2.rollback(true)
session1.rollback(true)
throw t
}}}
processor = new GroovyProcessor()
For completeness:
The Split processors were designed to support the Split/Merge pattern, and in order to merge them back together later, they each need the same "parent ID" as well as the count.
If you send flow files out before you've split everything up, you won't know the total count and won't be able to merge them back later. Also if something goes wrong with split processing, you may want to "rollback" the operation instead of having some flow files already downstream, and the rest of them sent to failure
In order to send out some flow files before all processing, you have to "commit the process session". This prevents you from doing the things above, and it creates a break in the provenance for the incoming flow file, as you have to commit/transfer that file in the session that originally takes it in. All following commits will need new flow files created, which breaks the provenance/lineage chain.
Although there is an open Jira for this (NIFI-2878), there has been some dissent on the mailing lists and pull requests about adding this feature to processors that accept input (i.e. non-source processors). NiFi's framework is fairly transactional, and this kind of feature flies in the face of that.
How do I commit the process session? I have no need to merge the items back together. @mattyb
– Elijah
Nov 5 at 19:30
That's a NiFi development concept, things that happen "under the hood".
– mattyb
Nov 5 at 20:29
Can you tell me if I am wrong in how I am understanding this. If I were to use multiple levels of SplitText then that would do the following: lighten the load on the service and write out to the PutFile faster because the flow would not require everything from the original file to be processed but would depend on the last size of the SplitText. So if the last SplitText was set to 100 then after every 100 finishes then it would send out to the PutFile.
– Elijah
Nov 6 at 15:02
add a comment |
up vote
2
down vote
TL;DR A workaround is to use multiple SplitTexts, the first one splitting into 10k rows for example, then the second to split into 1000 rows. Then the first 10k rows will be split into 10 flow files and sent downstream while the second 10k rows are being processed by the second SplitText.
EDIT: Adding another workaround, a Groovy script to be used in InvokeScriptedProcessor:
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
def ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { null }
String getIdentifier() { null }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session1 = sessionFactory.createSession()
def session2 = sessionFactory.createSession()
try {
def inFlowFile = session1.get()
if(!inFlowFile) return
def inputStream = session1.read(inFlowFile)
inputStream.eachLine { line ->
def outFlowFile = session2.create()
outFlowFile = session2.write(outFlowFile, {outputStream ->
outputStream.write(line.bytes)
} as OutputStreamCallback)
session2.transfer(outFlowFile, REL_SUCCESS)
session2.commit()
}
inputStream.close()
session1.transfer(inFlowFile, REL_ORIGINAL)
session1.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object)
session2.rollback(true)
session1.rollback(true)
throw t
}}}
processor = new GroovyProcessor()
For completeness:
The Split processors were designed to support the Split/Merge pattern, and in order to merge them back together later, they each need the same "parent ID" as well as the count.
If you send flow files out before you've split everything up, you won't know the total count and won't be able to merge them back later. Also if something goes wrong with split processing, you may want to "rollback" the operation instead of having some flow files already downstream, and the rest of them sent to failure
In order to send out some flow files before all processing, you have to "commit the process session". This prevents you from doing the things above, and it creates a break in the provenance for the incoming flow file, as you have to commit/transfer that file in the session that originally takes it in. All following commits will need new flow files created, which breaks the provenance/lineage chain.
Although there is an open Jira for this (NIFI-2878), there has been some dissent on the mailing lists and pull requests about adding this feature to processors that accept input (i.e. non-source processors). NiFi's framework is fairly transactional, and this kind of feature flies in the face of that.
How do I commit the process session? I have no need to merge the items back together. @mattyb
– Elijah
Nov 5 at 19:30
That's a NiFi development concept, things that happen "under the hood".
– mattyb
Nov 5 at 20:29
Can you tell me if I am wrong in how I am understanding this. If I were to use multiple levels of SplitText then that would do the following: lighten the load on the service and write out to the PutFile faster because the flow would not require everything from the original file to be processed but would depend on the last size of the SplitText. So if the last SplitText was set to 100 then after every 100 finishes then it would send out to the PutFile.
– Elijah
Nov 6 at 15:02
add a comment |
up vote
2
down vote
up vote
2
down vote
TL;DR A workaround is to use multiple SplitTexts, the first one splitting into 10k rows for example, then the second to split into 1000 rows. Then the first 10k rows will be split into 10 flow files and sent downstream while the second 10k rows are being processed by the second SplitText.
EDIT: Adding another workaround, a Groovy script to be used in InvokeScriptedProcessor:
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
def ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { null }
String getIdentifier() { null }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session1 = sessionFactory.createSession()
def session2 = sessionFactory.createSession()
try {
def inFlowFile = session1.get()
if(!inFlowFile) return
def inputStream = session1.read(inFlowFile)
inputStream.eachLine { line ->
def outFlowFile = session2.create()
outFlowFile = session2.write(outFlowFile, {outputStream ->
outputStream.write(line.bytes)
} as OutputStreamCallback)
session2.transfer(outFlowFile, REL_SUCCESS)
session2.commit()
}
inputStream.close()
session1.transfer(inFlowFile, REL_ORIGINAL)
session1.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object)
session2.rollback(true)
session1.rollback(true)
throw t
}}}
processor = new GroovyProcessor()
For completeness:
The Split processors were designed to support the Split/Merge pattern, and in order to merge them back together later, they each need the same "parent ID" as well as the count.
If you send flow files out before you've split everything up, you won't know the total count and won't be able to merge them back later. Also if something goes wrong with split processing, you may want to "rollback" the operation instead of having some flow files already downstream, and the rest of them sent to failure
In order to send out some flow files before all processing, you have to "commit the process session". This prevents you from doing the things above, and it creates a break in the provenance for the incoming flow file, as you have to commit/transfer that file in the session that originally takes it in. All following commits will need new flow files created, which breaks the provenance/lineage chain.
Although there is an open Jira for this (NIFI-2878), there has been some dissent on the mailing lists and pull requests about adding this feature to processors that accept input (i.e. non-source processors). NiFi's framework is fairly transactional, and this kind of feature flies in the face of that.
TL;DR A workaround is to use multiple SplitTexts, the first one splitting into 10k rows for example, then the second to split into 1000 rows. Then the first 10k rows will be split into 10 flow files and sent downstream while the second 10k rows are being processed by the second SplitText.
EDIT: Adding another workaround, a Groovy script to be used in InvokeScriptedProcessor:
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles that were not successfully processed are routed here').build()
def REL_ORIGINAL = new Relationship.Builder().name("original").description('After processing, the original incoming FlowFiles are routed here').build()
def ComponentLog log
void initialize(ProcessorInitializationContext context) { log = context.logger }
Set<Relationship> getRelationships() { return [REL_FAILURE, REL_SUCCESS, REL_ORIGINAL] as Set }
Collection<ValidationResult> validate(ValidationContext context) { null }
PropertyDescriptor getPropertyDescriptor(String name) { null }
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List<PropertyDescriptor> getPropertyDescriptors() { null }
String getIdentifier() { null }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session1 = sessionFactory.createSession()
def session2 = sessionFactory.createSession()
try {
def inFlowFile = session1.get()
if(!inFlowFile) return
def inputStream = session1.read(inFlowFile)
inputStream.eachLine { line ->
def outFlowFile = session2.create()
outFlowFile = session2.write(outFlowFile, {outputStream ->
outputStream.write(line.bytes)
} as OutputStreamCallback)
session2.transfer(outFlowFile, REL_SUCCESS)
session2.commit()
}
inputStream.close()
session1.transfer(inFlowFile, REL_ORIGINAL)
session1.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}; rolling back session', [this, t] as Object)
session2.rollback(true)
session1.rollback(true)
throw t
}}}
processor = new GroovyProcessor()
For completeness:
The Split processors were designed to support the Split/Merge pattern, and in order to merge them back together later, they each need the same "parent ID" as well as the count.
If you send flow files out before you've split everything up, you won't know the total count and won't be able to merge them back later. Also if something goes wrong with split processing, you may want to "rollback" the operation instead of having some flow files already downstream, and the rest of them sent to failure
In order to send out some flow files before all processing, you have to "commit the process session". This prevents you from doing the things above, and it creates a break in the provenance for the incoming flow file, as you have to commit/transfer that file in the session that originally takes it in. All following commits will need new flow files created, which breaks the provenance/lineage chain.
Although there is an open Jira for this (NIFI-2878), there has been some dissent on the mailing lists and pull requests about adding this feature to processors that accept input (i.e. non-source processors). NiFi's framework is fairly transactional, and this kind of feature flies in the face of that.
edited Nov 5 at 20:50
answered Nov 5 at 18:36
mattyb
6,8661018
6,8661018
How do I commit the process session? I have no need to merge the items back together. @mattyb
– Elijah
Nov 5 at 19:30
That's a NiFi development concept, things that happen "under the hood".
– mattyb
Nov 5 at 20:29
Can you tell me if I am wrong in how I am understanding this. If I were to use multiple levels of SplitText then that would do the following: lighten the load on the service and write out to the PutFile faster because the flow would not require everything from the original file to be processed but would depend on the last size of the SplitText. So if the last SplitText was set to 100 then after every 100 finishes then it would send out to the PutFile.
– Elijah
Nov 6 at 15:02
add a comment |
How do I commit the process session? I have no need to merge the items back together. @mattyb
– Elijah
Nov 5 at 19:30
That's a NiFi development concept, things that happen "under the hood".
– mattyb
Nov 5 at 20:29
Can you tell me if I am wrong in how I am understanding this. If I were to use multiple levels of SplitText then that would do the following: lighten the load on the service and write out to the PutFile faster because the flow would not require everything from the original file to be processed but would depend on the last size of the SplitText. So if the last SplitText was set to 100 then after every 100 finishes then it would send out to the PutFile.
– Elijah
Nov 6 at 15:02
How do I commit the process session? I have no need to merge the items back together. @mattyb
– Elijah
Nov 5 at 19:30
How do I commit the process session? I have no need to merge the items back together. @mattyb
– Elijah
Nov 5 at 19:30
That's a NiFi development concept, things that happen "under the hood".
– mattyb
Nov 5 at 20:29
That's a NiFi development concept, things that happen "under the hood".
– mattyb
Nov 5 at 20:29
Can you tell me if I am wrong in how I am understanding this. If I were to use multiple levels of SplitText then that would do the following: lighten the load on the service and write out to the PutFile faster because the flow would not require everything from the original file to be processed but would depend on the last size of the SplitText. So if the last SplitText was set to 100 then after every 100 finishes then it would send out to the PutFile.
– Elijah
Nov 6 at 15:02
Can you tell me if I am wrong in how I am understanding this. If I were to use multiple levels of SplitText then that would do the following: lighten the load on the service and write out to the PutFile faster because the flow would not require everything from the original file to be processed but would depend on the last size of the SplitText. So if the last SplitText was set to 100 then after every 100 finishes then it would send out to the PutFile.
– Elijah
Nov 6 at 15:02
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53159682%2fapache-nifi-when-utilizing-splittext-on-large-files-how-can-i-make-the-put-fi%23new-answer', 'question_page');
}
);
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password