Kafka producer failover mechanism and validation of data being pushed to topic
I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)
my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.
below is my producer send code.
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("acks", "all");
configProperties.put("retries", 0);
configProperties.put("batch.size", 15000);
configProperties.put("linger.ms", 1);
configProperties.put("buffer.memory", 30000000);
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
System.out.println("Starting Kafka producer job " + new Date());
producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});
Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.
Below is what i want to address
how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).
how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.
java apache-kafka
add a comment |
I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)
my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.
below is my producer send code.
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("acks", "all");
configProperties.put("retries", 0);
configProperties.put("batch.size", 15000);
configProperties.put("linger.ms", 1);
configProperties.put("buffer.memory", 30000000);
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
System.out.println("Starting Kafka producer job " + new Date());
producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});
Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.
Below is what i want to address
how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).
how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.
java apache-kafka
add a comment |
I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)
my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.
below is my producer send code.
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("acks", "all");
configProperties.put("retries", 0);
configProperties.put("batch.size", 15000);
configProperties.put("linger.ms", 1);
configProperties.put("buffer.memory", 30000000);
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
System.out.println("Starting Kafka producer job " + new Date());
producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});
Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.
Below is what i want to address
how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).
how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.
java apache-kafka
I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)
my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.
below is my producer send code.
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("acks", "all");
configProperties.put("retries", 0);
configProperties.put("batch.size", 15000);
configProperties.put("linger.ms", 1);
configProperties.put("buffer.memory", 30000000);
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
System.out.println("Starting Kafka producer job " + new Date());
producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});
Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.
Below is what i want to address
how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).
how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.
java apache-kafka
java apache-kafka
edited Nov 23 '18 at 16:14
cricket_007
83.8k1147117
83.8k1147117
asked Nov 23 '18 at 11:01
user1708054user1708054
46118
46118
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You can use configProperties.put("enable.idempotence", true);
- it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0
acks=all
and max.in.flight.requests.per.connection
>=0. For details check https://kafka.apache.org/documentation/.
For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/
so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)
– user1708054
Nov 23 '18 at 14:07
no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates.retries
works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.
– freakman
Nov 23 '18 at 14:29
thanks for the info.. i will explore more and update here...
– user1708054
Nov 26 '18 at 19:49
add a comment |
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445455%2fkafka-producer-failover-mechanism-and-validation-of-data-being-pushed-to-topic%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can use configProperties.put("enable.idempotence", true);
- it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0
acks=all
and max.in.flight.requests.per.connection
>=0. For details check https://kafka.apache.org/documentation/.
For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/
so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)
– user1708054
Nov 23 '18 at 14:07
no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates.retries
works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.
– freakman
Nov 23 '18 at 14:29
thanks for the info.. i will explore more and update here...
– user1708054
Nov 26 '18 at 19:49
add a comment |
You can use configProperties.put("enable.idempotence", true);
- it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0
acks=all
and max.in.flight.requests.per.connection
>=0. For details check https://kafka.apache.org/documentation/.
For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/
so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)
– user1708054
Nov 23 '18 at 14:07
no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates.retries
works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.
– freakman
Nov 23 '18 at 14:29
thanks for the info.. i will explore more and update here...
– user1708054
Nov 26 '18 at 19:49
add a comment |
You can use configProperties.put("enable.idempotence", true);
- it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0
acks=all
and max.in.flight.requests.per.connection
>=0. For details check https://kafka.apache.org/documentation/.
For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/
You can use configProperties.put("enable.idempotence", true);
- it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0
acks=all
and max.in.flight.requests.per.connection
>=0. For details check https://kafka.apache.org/documentation/.
For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/
answered Nov 23 '18 at 12:28
freakmanfreakman
3,33611536
3,33611536
so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)
– user1708054
Nov 23 '18 at 14:07
no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates.retries
works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.
– freakman
Nov 23 '18 at 14:29
thanks for the info.. i will explore more and update here...
– user1708054
Nov 26 '18 at 19:49
add a comment |
so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)
– user1708054
Nov 23 '18 at 14:07
no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates.retries
works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.
– freakman
Nov 23 '18 at 14:29
thanks for the info.. i will explore more and update here...
– user1708054
Nov 26 '18 at 19:49
so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)
– user1708054
Nov 23 '18 at 14:07
so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)
– user1708054
Nov 23 '18 at 14:07
no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates.
retries
works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.– freakman
Nov 23 '18 at 14:29
no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates.
retries
works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.– freakman
Nov 23 '18 at 14:29
thanks for the info.. i will explore more and update here...
– user1708054
Nov 26 '18 at 19:49
thanks for the info.. i will explore more and update here...
– user1708054
Nov 26 '18 at 19:49
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445455%2fkafka-producer-failover-mechanism-and-validation-of-data-being-pushed-to-topic%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown