Rx.NET Take an element and subscribe again after some time
I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:
Input: (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...
How can achieve this simple thing with Rx.NET?
reactive-programming system.reactive rx.net
add a comment |
I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:
Input: (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...
How can achieve this simple thing with Rx.NET?
reactive-programming system.reactive rx.net
add a comment |
I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:
Input: (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...
How can achieve this simple thing with Rx.NET?
reactive-programming system.reactive rx.net
I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:
Input: (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...
How can achieve this simple thing with Rx.NET?
reactive-programming system.reactive rx.net
reactive-programming system.reactive rx.net
asked Nov 12 '18 at 16:49
Evgeny
306213
306213
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
Try this:
Input
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
Here's a test:
var query =
Observable
.Interval(TimeSpan.FromSeconds(0.2))
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
This produced:
0
5
10
14
19
24
29
34
39
The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.
Tried, doesn't work. I still get all the elements.
– Evgeny
Nov 13 '18 at 5:32
@Eugene - I tested it and it works fine. What was your source?
– Enigmativity
Nov 13 '18 at 7:28
1
@Eugene I've also tried this code and I get events only every 1 second.
– Panagiotis Kanavos
Nov 13 '18 at 8:17
I checked in a console app, your example works.
– Evgeny
Nov 13 '18 at 10:15
My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
– Evgeny
Nov 13 '18 at 10:16
|
show 3 more comments
@Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.
His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
I think the best way around this is a Scan
-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
Note: Test code uses Nuget Microsoft.Reactive.Testing
and the following helper class:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}
I will definitely take a look. Thank you for a great answer.
– Evgeny
Nov 13 '18 at 16:36
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266659%2frx-net-take-an-element-and-subscribe-again-after-some-time%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
Try this:
Input
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
Here's a test:
var query =
Observable
.Interval(TimeSpan.FromSeconds(0.2))
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
This produced:
0
5
10
14
19
24
29
34
39
The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.
Tried, doesn't work. I still get all the elements.
– Evgeny
Nov 13 '18 at 5:32
@Eugene - I tested it and it works fine. What was your source?
– Enigmativity
Nov 13 '18 at 7:28
1
@Eugene I've also tried this code and I get events only every 1 second.
– Panagiotis Kanavos
Nov 13 '18 at 8:17
I checked in a console app, your example works.
– Evgeny
Nov 13 '18 at 10:15
My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
– Evgeny
Nov 13 '18 at 10:16
|
show 3 more comments
Try this:
Input
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
Here's a test:
var query =
Observable
.Interval(TimeSpan.FromSeconds(0.2))
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
This produced:
0
5
10
14
19
24
29
34
39
The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.
Tried, doesn't work. I still get all the elements.
– Evgeny
Nov 13 '18 at 5:32
@Eugene - I tested it and it works fine. What was your source?
– Enigmativity
Nov 13 '18 at 7:28
1
@Eugene I've also tried this code and I get events only every 1 second.
– Panagiotis Kanavos
Nov 13 '18 at 8:17
I checked in a console app, your example works.
– Evgeny
Nov 13 '18 at 10:15
My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
– Evgeny
Nov 13 '18 at 10:16
|
show 3 more comments
Try this:
Input
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
Here's a test:
var query =
Observable
.Interval(TimeSpan.FromSeconds(0.2))
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
This produced:
0
5
10
14
19
24
29
34
39
The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.
Try this:
Input
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
Here's a test:
var query =
Observable
.Interval(TimeSpan.FromSeconds(0.2))
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
.SelectMany(xs => xs.Take(1));
This produced:
0
5
10
14
19
24
29
34
39
The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.
edited Nov 13 '18 at 7:28
answered Nov 12 '18 at 21:15
Enigmativity
75.2k864129
75.2k864129
Tried, doesn't work. I still get all the elements.
– Evgeny
Nov 13 '18 at 5:32
@Eugene - I tested it and it works fine. What was your source?
– Enigmativity
Nov 13 '18 at 7:28
1
@Eugene I've also tried this code and I get events only every 1 second.
– Panagiotis Kanavos
Nov 13 '18 at 8:17
I checked in a console app, your example works.
– Evgeny
Nov 13 '18 at 10:15
My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
– Evgeny
Nov 13 '18 at 10:16
|
show 3 more comments
Tried, doesn't work. I still get all the elements.
– Evgeny
Nov 13 '18 at 5:32
@Eugene - I tested it and it works fine. What was your source?
– Enigmativity
Nov 13 '18 at 7:28
1
@Eugene I've also tried this code and I get events only every 1 second.
– Panagiotis Kanavos
Nov 13 '18 at 8:17
I checked in a console app, your example works.
– Evgeny
Nov 13 '18 at 10:15
My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
– Evgeny
Nov 13 '18 at 10:16
Tried, doesn't work. I still get all the elements.
– Evgeny
Nov 13 '18 at 5:32
Tried, doesn't work. I still get all the elements.
– Evgeny
Nov 13 '18 at 5:32
@Eugene - I tested it and it works fine. What was your source?
– Enigmativity
Nov 13 '18 at 7:28
@Eugene - I tested it and it works fine. What was your source?
– Enigmativity
Nov 13 '18 at 7:28
1
1
@Eugene I've also tried this code and I get events only every 1 second.
– Panagiotis Kanavos
Nov 13 '18 at 8:17
@Eugene I've also tried this code and I get events only every 1 second.
– Panagiotis Kanavos
Nov 13 '18 at 8:17
I checked in a console app, your example works.
– Evgeny
Nov 13 '18 at 10:15
I checked in a console app, your example works.
– Evgeny
Nov 13 '18 at 10:15
My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
– Evgeny
Nov 13 '18 at 10:16
My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
– Evgeny
Nov 13 '18 at 10:16
|
show 3 more comments
@Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.
His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
I think the best way around this is a Scan
-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
Note: Test code uses Nuget Microsoft.Reactive.Testing
and the following helper class:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}
I will definitely take a look. Thank you for a great answer.
– Evgeny
Nov 13 '18 at 16:36
add a comment |
@Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.
His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
I think the best way around this is a Scan
-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
Note: Test code uses Nuget Microsoft.Reactive.Testing
and the following helper class:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}
I will definitely take a look. Thank you for a great answer.
– Evgeny
Nov 13 '18 at 16:36
add a comment |
@Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.
His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
I think the best way around this is a Scan
-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
Note: Test code uses Nuget Microsoft.Reactive.Testing
and the following helper class:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}
@Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.
His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:
t : ---------1---------2---------3
source: ------1---2------3---4----5--|
window: ---------|---------|---------|
spec : ------1----------3-----------|
enigma: ------1---2----------4-------|
The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1100.MsTicks(), 2),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnNext(2200.MsTicks(), 4),
RxTest.OnNext(2600.MsTicks(), 5),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var expectedResults = scheduler.CreateHotObservable<int>(
RxTest.OnNext(700.MsTicks(), 1),
RxTest.OnNext(1800.MsTicks(), 3),
RxTest.OnCompleted<int>(3000.MsTicks())
);
var target = source
.Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
.SelectMany(xs => xs.Take(1));
var observer = scheduler.CreateObserver<int>();
target.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
I think the best way around this is a Scan
-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
{
return TrueThrottle<T>(source, span, Scheduler.Default);
}
public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
}
Note: Test code uses Nuget Microsoft.Reactive.Testing
and the following helper class:
public static class RxTest
{
public static long MsTicks(this int i)
{
return TimeSpan.FromMilliseconds(i).Ticks;
}
public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
}
public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
}
public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
{
return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
}
}
edited Nov 13 '18 at 16:40
answered Nov 13 '18 at 15:48
Shlomo
9,25522133
9,25522133
I will definitely take a look. Thank you for a great answer.
– Evgeny
Nov 13 '18 at 16:36
add a comment |
I will definitely take a look. Thank you for a great answer.
– Evgeny
Nov 13 '18 at 16:36
I will definitely take a look. Thank you for a great answer.
– Evgeny
Nov 13 '18 at 16:36
I will definitely take a look. Thank you for a great answer.
– Evgeny
Nov 13 '18 at 16:36
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266659%2frx-net-take-an-element-and-subscribe-again-after-some-time%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown