Conditional execution of mapPartition in Spark?











up vote
0
down vote

favorite












Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.



What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.



One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.



I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.



I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition() call.



The workflow looks something like this (in Spark 1.6.3)



input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)


The conditional statement would work like this:



while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}

def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}


Here, repartitionSetter would divide the dataset into partitions of size n and distributedAPICalls is the function passed to each partition to access the API.



Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?



We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X










share|improve this question
























  • Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
    – user6910411
    Nov 8 at 22:24















up vote
0
down vote

favorite












Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.



What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.



One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.



I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.



I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition() call.



The workflow looks something like this (in Spark 1.6.3)



input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)


The conditional statement would work like this:



while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}

def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}


Here, repartitionSetter would divide the dataset into partitions of size n and distributedAPICalls is the function passed to each partition to access the API.



Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?



We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X










share|improve this question
























  • Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
    – user6910411
    Nov 8 at 22:24













up vote
0
down vote

favorite









up vote
0
down vote

favorite











Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.



What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.



One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.



I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.



I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition() call.



The workflow looks something like this (in Spark 1.6.3)



input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)


The conditional statement would work like this:



while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}

def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}


Here, repartitionSetter would divide the dataset into partitions of size n and distributedAPICalls is the function passed to each partition to access the API.



Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?



We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X










share|improve this question















Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.



What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.



One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.



I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.



I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition() call.



The workflow looks something like this (in Spark 1.6.3)



input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)


The conditional statement would work like this:



while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}

def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}


Here, repartitionSetter would divide the dataset into partitions of size n and distributedAPICalls is the function passed to each partition to access the API.



Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?



We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X







scala apache-spark client






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 8 at 19:30

























asked Nov 8 at 18:38









kingledion

766719




766719












  • Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
    – user6910411
    Nov 8 at 22:24


















  • Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
    – user6910411
    Nov 8 at 22:24
















Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24




Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24

















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53214137%2fconditional-execution-of-mappartition-in-spark%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53214137%2fconditional-execution-of-mappartition-in-spark%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







這個網誌中的熱門文章

Xamarin.form Move up view when keyboard appear

Post-Redirect-Get with Spring WebFlux and Thymeleaf

Anylogic : not able to use stopDelay()