Rx.NET Take an element and subscribe again after some time












0














I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:



Input:  (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
Output: (1) --------------------------------- (4) --------- (5) ----------- ...


How can achieve this simple thing with Rx.NET?










share|improve this question



























    0














    I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:



    Input:  (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
    Output: (1) --------------------------------- (4) --------- (5) ----------- ...


    How can achieve this simple thing with Rx.NET?










    share|improve this question

























      0












      0








      0







      I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:



      Input:  (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
      Output: (1) --------------------------------- (4) --------- (5) ----------- ...


      How can achieve this simple thing with Rx.NET?










      share|improve this question













      I need a kind of throttling, which works a bit differently. I need to get an element from a sequence, unsubscribe and subscribe again in 1 sec. In other words, I want to ignore all elements during 1 sec after a first element is taken:



      Input:  (1) -100ms- (2) -200ms- (3) -1_500ms- (4) -1_000ms- (5) -500ms- (6) ...
      Output: (1) --------------------------------- (4) --------- (5) ----------- ...


      How can achieve this simple thing with Rx.NET?







      reactive-programming system.reactive rx.net






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 12 '18 at 16:49









      Evgeny

      306213




      306213
























          2 Answers
          2






          active

          oldest

          votes


















          2














          Try this:



          Input
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          Here's a test:



          var query =
          Observable
          .Interval(TimeSpan.FromSeconds(0.2))
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          This produced:




          0
          5
          10
          14
          19
          24
          29
          34
          39


          The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.






          share|improve this answer























          • Tried, doesn't work. I still get all the elements.
            – Evgeny
            Nov 13 '18 at 5:32












          • @Eugene - I tested it and it works fine. What was your source?
            – Enigmativity
            Nov 13 '18 at 7:28






          • 1




            @Eugene I've also tried this code and I get events only every 1 second.
            – Panagiotis Kanavos
            Nov 13 '18 at 8:17










          • I checked in a console app, your example works.
            – Evgeny
            Nov 13 '18 at 10:15










          • My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
            – Evgeny
            Nov 13 '18 at 10:16



















          1














          @Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.



          His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:



          t     : ---------1---------2---------3
          source: ------1---2------3---4----5--|
          window: ---------|---------|---------|
          spec : ------1----------3-----------|
          enigma: ------1---2----------4-------|


          The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:



          var scheduler = new TestScheduler();
          var source = scheduler.CreateColdObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1100.MsTicks(), 2),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnNext(2200.MsTicks(), 4),
          RxTest.OnNext(2600.MsTicks(), 5),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var expectedResults = scheduler.CreateHotObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var target = source
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
          .SelectMany(xs => xs.Take(1));

          var observer = scheduler.CreateObserver<int>();
          target.Subscribe(observer);
          scheduler.Start();
          ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);


          I think the best way around this is a Scan-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:



          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
          {
          return TrueThrottle<T>(source, span, Scheduler.Default);
          }

          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
          {
          return source
          .Timestamp(scheduler)
          .Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
          ? item
          : state
          )
          .DistinctUntilChanged()
          .Select(t => t.Value);
          }


          Note: Test code uses Nuget Microsoft.Reactive.Testing and the following helper class:



          public static class RxTest
          {
          public static long MsTicks(this int i)
          {
          return TimeSpan.FromMilliseconds(i).Ticks;
          }

          public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
          }

          public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
          }

          public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
          }
          }





          share|improve this answer























          • I will definitely take a look. Thank you for a great answer.
            – Evgeny
            Nov 13 '18 at 16:36











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266659%2frx-net-take-an-element-and-subscribe-again-after-some-time%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          2 Answers
          2






          active

          oldest

          votes








          2 Answers
          2






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          2














          Try this:



          Input
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          Here's a test:



          var query =
          Observable
          .Interval(TimeSpan.FromSeconds(0.2))
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          This produced:




          0
          5
          10
          14
          19
          24
          29
          34
          39


          The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.






          share|improve this answer























          • Tried, doesn't work. I still get all the elements.
            – Evgeny
            Nov 13 '18 at 5:32












          • @Eugene - I tested it and it works fine. What was your source?
            – Enigmativity
            Nov 13 '18 at 7:28






          • 1




            @Eugene I've also tried this code and I get events only every 1 second.
            – Panagiotis Kanavos
            Nov 13 '18 at 8:17










          • I checked in a console app, your example works.
            – Evgeny
            Nov 13 '18 at 10:15










          • My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
            – Evgeny
            Nov 13 '18 at 10:16
















          2














          Try this:



          Input
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          Here's a test:



          var query =
          Observable
          .Interval(TimeSpan.FromSeconds(0.2))
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          This produced:




          0
          5
          10
          14
          19
          24
          29
          34
          39


          The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.






          share|improve this answer























          • Tried, doesn't work. I still get all the elements.
            – Evgeny
            Nov 13 '18 at 5:32












          • @Eugene - I tested it and it works fine. What was your source?
            – Enigmativity
            Nov 13 '18 at 7:28






          • 1




            @Eugene I've also tried this code and I get events only every 1 second.
            – Panagiotis Kanavos
            Nov 13 '18 at 8:17










          • I checked in a console app, your example works.
            – Evgeny
            Nov 13 '18 at 10:15










          • My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
            – Evgeny
            Nov 13 '18 at 10:16














          2












          2








          2






          Try this:



          Input
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          Here's a test:



          var query =
          Observable
          .Interval(TimeSpan.FromSeconds(0.2))
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          This produced:




          0
          5
          10
          14
          19
          24
          29
          34
          39


          The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.






          share|improve this answer














          Try this:



          Input
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          Here's a test:



          var query =
          Observable
          .Interval(TimeSpan.FromSeconds(0.2))
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0)))
          .SelectMany(xs => xs.Take(1));


          This produced:




          0
          5
          10
          14
          19
          24
          29
          34
          39


          The jump from 10 to 14 is just a result of using multiple threads and not an error in the query.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 13 '18 at 7:28

























          answered Nov 12 '18 at 21:15









          Enigmativity

          75.2k864129




          75.2k864129












          • Tried, doesn't work. I still get all the elements.
            – Evgeny
            Nov 13 '18 at 5:32












          • @Eugene - I tested it and it works fine. What was your source?
            – Enigmativity
            Nov 13 '18 at 7:28






          • 1




            @Eugene I've also tried this code and I get events only every 1 second.
            – Panagiotis Kanavos
            Nov 13 '18 at 8:17










          • I checked in a console app, your example works.
            – Evgeny
            Nov 13 '18 at 10:15










          • My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
            – Evgeny
            Nov 13 '18 at 10:16


















          • Tried, doesn't work. I still get all the elements.
            – Evgeny
            Nov 13 '18 at 5:32












          • @Eugene - I tested it and it works fine. What was your source?
            – Enigmativity
            Nov 13 '18 at 7:28






          • 1




            @Eugene I've also tried this code and I get events only every 1 second.
            – Panagiotis Kanavos
            Nov 13 '18 at 8:17










          • I checked in a console app, your example works.
            – Evgeny
            Nov 13 '18 at 10:15










          • My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
            – Evgeny
            Nov 13 '18 at 10:16
















          Tried, doesn't work. I still get all the elements.
          – Evgeny
          Nov 13 '18 at 5:32






          Tried, doesn't work. I still get all the elements.
          – Evgeny
          Nov 13 '18 at 5:32














          @Eugene - I tested it and it works fine. What was your source?
          – Enigmativity
          Nov 13 '18 at 7:28




          @Eugene - I tested it and it works fine. What was your source?
          – Enigmativity
          Nov 13 '18 at 7:28




          1




          1




          @Eugene I've also tried this code and I get events only every 1 second.
          – Panagiotis Kanavos
          Nov 13 '18 at 8:17




          @Eugene I've also tried this code and I get events only every 1 second.
          – Panagiotis Kanavos
          Nov 13 '18 at 8:17












          I checked in a console app, your example works.
          – Evgeny
          Nov 13 '18 at 10:15




          I checked in a console app, your example works.
          – Evgeny
          Nov 13 '18 at 10:15












          My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
          – Evgeny
          Nov 13 '18 at 10:16




          My case is: the first element can come after 0.8 sec from start, not right after start (like in your example). And I need to get the first element immeditely (+ restart the 1 sec locking period), not at the end of the first second.
          – Evgeny
          Nov 13 '18 at 10:16













          1














          @Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.



          His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:



          t     : ---------1---------2---------3
          source: ------1---2------3---4----5--|
          window: ---------|---------|---------|
          spec : ------1----------3-----------|
          enigma: ------1---2----------4-------|


          The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:



          var scheduler = new TestScheduler();
          var source = scheduler.CreateColdObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1100.MsTicks(), 2),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnNext(2200.MsTicks(), 4),
          RxTest.OnNext(2600.MsTicks(), 5),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var expectedResults = scheduler.CreateHotObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var target = source
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
          .SelectMany(xs => xs.Take(1));

          var observer = scheduler.CreateObserver<int>();
          target.Subscribe(observer);
          scheduler.Start();
          ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);


          I think the best way around this is a Scan-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:



          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
          {
          return TrueThrottle<T>(source, span, Scheduler.Default);
          }

          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
          {
          return source
          .Timestamp(scheduler)
          .Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
          ? item
          : state
          )
          .DistinctUntilChanged()
          .Select(t => t.Value);
          }


          Note: Test code uses Nuget Microsoft.Reactive.Testing and the following helper class:



          public static class RxTest
          {
          public static long MsTicks(this int i)
          {
          return TimeSpan.FromMilliseconds(i).Ticks;
          }

          public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
          }

          public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
          }

          public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
          }
          }





          share|improve this answer























          • I will definitely take a look. Thank you for a great answer.
            – Evgeny
            Nov 13 '18 at 16:36
















          1














          @Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.



          His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:



          t     : ---------1---------2---------3
          source: ------1---2------3---4----5--|
          window: ---------|---------|---------|
          spec : ------1----------3-----------|
          enigma: ------1---2----------4-------|


          The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:



          var scheduler = new TestScheduler();
          var source = scheduler.CreateColdObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1100.MsTicks(), 2),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnNext(2200.MsTicks(), 4),
          RxTest.OnNext(2600.MsTicks(), 5),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var expectedResults = scheduler.CreateHotObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var target = source
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
          .SelectMany(xs => xs.Take(1));

          var observer = scheduler.CreateObserver<int>();
          target.Subscribe(observer);
          scheduler.Start();
          ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);


          I think the best way around this is a Scan-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:



          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
          {
          return TrueThrottle<T>(source, span, Scheduler.Default);
          }

          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
          {
          return source
          .Timestamp(scheduler)
          .Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
          ? item
          : state
          )
          .DistinctUntilChanged()
          .Select(t => t.Value);
          }


          Note: Test code uses Nuget Microsoft.Reactive.Testing and the following helper class:



          public static class RxTest
          {
          public static long MsTicks(this int i)
          {
          return TimeSpan.FromMilliseconds(i).Ticks;
          }

          public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
          }

          public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
          }

          public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
          }
          }





          share|improve this answer























          • I will definitely take a look. Thank you for a great answer.
            – Evgeny
            Nov 13 '18 at 16:36














          1












          1








          1






          @Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.



          His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:



          t     : ---------1---------2---------3
          source: ------1---2------3---4----5--|
          window: ---------|---------|---------|
          spec : ------1----------3-----------|
          enigma: ------1---2----------4-------|


          The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:



          var scheduler = new TestScheduler();
          var source = scheduler.CreateColdObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1100.MsTicks(), 2),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnNext(2200.MsTicks(), 4),
          RxTest.OnNext(2600.MsTicks(), 5),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var expectedResults = scheduler.CreateHotObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var target = source
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
          .SelectMany(xs => xs.Take(1));

          var observer = scheduler.CreateObserver<int>();
          target.Subscribe(observer);
          scheduler.Start();
          ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);


          I think the best way around this is a Scan-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:



          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
          {
          return TrueThrottle<T>(source, span, Scheduler.Default);
          }

          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
          {
          return source
          .Timestamp(scheduler)
          .Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
          ? item
          : state
          )
          .DistinctUntilChanged()
          .Select(t => t.Value);
          }


          Note: Test code uses Nuget Microsoft.Reactive.Testing and the following helper class:



          public static class RxTest
          {
          public static long MsTicks(this int i)
          {
          return TimeSpan.FromMilliseconds(i).Ticks;
          }

          public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
          }

          public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
          }

          public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
          }
          }





          share|improve this answer














          @Enigmativity's answer doesn't fit the specs exactly. It may work for what you want though.



          His answer defines 1-second windows, and takes the first from each of those windows. This doesn't guarantee you one second of silence between items though. Consider this case:



          t     : ---------1---------2---------3
          source: ------1---2------3---4----5--|
          window: ---------|---------|---------|
          spec : ------1----------3-----------|
          enigma: ------1---2----------4-------|


          The answer implies you want one second of nothing after item 1. The next item after that is 3, with silence then through the end. Here's that test coded up:



          var scheduler = new TestScheduler();
          var source = scheduler.CreateColdObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1100.MsTicks(), 2),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnNext(2200.MsTicks(), 4),
          RxTest.OnNext(2600.MsTicks(), 5),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var expectedResults = scheduler.CreateHotObservable<int>(
          RxTest.OnNext(700.MsTicks(), 1),
          RxTest.OnNext(1800.MsTicks(), 3),
          RxTest.OnCompleted<int>(3000.MsTicks())
          );

          var target = source
          .Window(() => Observable.Timer(TimeSpan.FromSeconds(1.0), scheduler))
          .SelectMany(xs => xs.Take(1));

          var observer = scheduler.CreateObserver<int>();
          target.Subscribe(observer);
          scheduler.Start();
          ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);


          I think the best way around this is a Scan-based solution with timestamps. You basically hold the last legit message in memory with a timestamp, and if the new message is one second older, emit. Otherwise, don't:



          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span)
          {
          return TrueThrottle<T>(source, span, Scheduler.Default);
          }

          public static IObservable<T> TrueThrottle<T>(this IObservable<T> source, TimeSpan span, IScheduler scheduler)
          {
          return source
          .Timestamp(scheduler)
          .Scan(default(Timestamped<T>), (state, item) => state == default(Timestamped<T>) || item.Timestamp - state.Timestamp > span
          ? item
          : state
          )
          .DistinctUntilChanged()
          .Select(t => t.Value);
          }


          Note: Test code uses Nuget Microsoft.Reactive.Testing and the following helper class:



          public static class RxTest
          {
          public static long MsTicks(this int i)
          {
          return TimeSpan.FromMilliseconds(i).Ticks;
          }

          public static Recorded<Notification<T>> OnNext<T>(long msTicks, T t)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnNext(t));
          }

          public static Recorded<Notification<T>> OnCompleted<T>(long msTicks)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnCompleted<T>());
          }

          public static Recorded<Notification<T>> OnError<T>(long msTicks, Exception e)
          {
          return new Recorded<Notification<T>>(msTicks, Notification.CreateOnError<T>(e));
          }
          }






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 13 '18 at 16:40

























          answered Nov 13 '18 at 15:48









          Shlomo

          9,25522133




          9,25522133












          • I will definitely take a look. Thank you for a great answer.
            – Evgeny
            Nov 13 '18 at 16:36


















          • I will definitely take a look. Thank you for a great answer.
            – Evgeny
            Nov 13 '18 at 16:36
















          I will definitely take a look. Thank you for a great answer.
          – Evgeny
          Nov 13 '18 at 16:36




          I will definitely take a look. Thank you for a great answer.
          – Evgeny
          Nov 13 '18 at 16:36


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266659%2frx-net-take-an-element-and-subscribe-again-after-some-time%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Hercules Kyvelos

          Tangent Lines Diagram Along Smooth Curve

          Yusuf al-Mu'taman ibn Hud