Conditional execution of mapPartition in Spark?
up vote
0
down vote
favorite
Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.
What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.
One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.
I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.
I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition()
call.
The workflow looks something like this (in Spark 1.6.3)
input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)
The conditional statement would work like this:
while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}
def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}
Here, repartitionSetter
would divide the dataset into partitions of size n
and distributedAPICalls
is the function passed to each partition to access the API.
Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?
We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X
scala apache-spark client
add a comment |
up vote
0
down vote
favorite
Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.
What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.
One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.
I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.
I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition()
call.
The workflow looks something like this (in Spark 1.6.3)
input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)
The conditional statement would work like this:
while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}
def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}
Here, repartitionSetter
would divide the dataset into partitions of size n
and distributedAPICalls
is the function passed to each partition to access the API.
Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?
We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X
scala apache-spark client
Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.
What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.
One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.
I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.
I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition()
call.
The workflow looks something like this (in Spark 1.6.3)
input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)
The conditional statement would work like this:
while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}
def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}
Here, repartitionSetter
would divide the dataset into partitions of size n
and distributedAPICalls
is the function passed to each partition to access the API.
Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?
We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X
scala apache-spark client
Given a dataframe which has the parameters for Http calls to a web accessible API, I want to distribute those API calls to all nodes of a cluster, have make the calls, and return a new dataframe with the result of the calls. This is relatively straightforward, and I have a solution that works.
What is not straightforward is limiting the rate at which the API calls are made. If, for a example, you can only make 100 API calls per second, I want some way to ensure that we don't exceed the limit. Also, if run on a larger cluster, I want some way to ensure that this distributed API caller doesn't turn into a low-grade DDoS attack.
One workable solution is to have each thread sleep when it gets a rejection message from the server (HTTP 429 - too many requests). However, at this point you have already performed the DDoS attack; I'd like to slow down before we get to that point.
I attempted to implement this through an accumulator and broadcast variable. Each call implemented the accumulator and the broadcast variable was the start time. Each worker could then divide the accumulator by time elapsed to see if the request rate was too high. Unfortunately, you can't read an accumulator from a worker. This doesn't work, and I don't see any way to make it work.
I could use the same solution, except control the rate by reading from the driver. I could divide the dataset into a bunch of small partitions, perhaps 10 or 100 each. Then the driver could check the rate before mapping out each partition. However, I don't know any way to introduce a conditional sleep statement executed on the driver side into a .mapPartition()
call.
The workflow looks something like this (in Spark 1.6.3)
input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)
The conditional statement would work like this:
while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}
def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}
Here, repartitionSetter
would divide the dataset into partitions of size n
and distributedAPICalls
is the function passed to each partition to access the API.
Is there any way to incorporate a conditional statement into the distributed API call workflow before MapPartion?
We are in the process of an upgrade to Spark 2.X, but this feature is due in before that upgrade, so an ideal solution will work for both Spark 1.6.3 and Spark 2.X
scala apache-spark client
scala apache-spark client
edited Nov 8 at 19:30
asked Nov 8 at 18:38
kingledion
766719
766719
Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24
add a comment |
Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24
Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24
Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24
add a comment |
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53214137%2fconditional-execution-of-mappartition-in-spark%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Spark is probably not the best tool for that (even with the restrictions 100 requests is usually something that you can do in a single thread) the only straightforward solution is to limit rate per partition (optionally adding circuit breaker code) adjusted for the min(number of assigned executrices, number of partitions).
– user6910411
Nov 8 at 22:24