RxJava2 have remote data override local data in Observable












1















Currently I have a method in a repository class which fetches data from both a local cache and a remote API.



public Observable<List<Items>> getItemsForUser(String userId {
return Observable.concatArrayEager(
getUserItemsLocal(userId), // returns Observable<List<Items>>
getUserItemsRemote(userId) // returns Observable<List<Items>>
);
}


Currently, the method fetches the local data first (which may be outdated) and returns it, then updates it with the fresh data from the remote API.



I want to change the implementation to use Observable.merge so that if the remote API request completes first, that data gets shown first. However, if I just use Observable.merge I'm concerned that the local database request may return stale data, which will then overwrite the fresh data from the remote.



Basically, I want something like:



public Observable<List<ShoutContent>> getItemsForUser(String userId, ErrorCallback errorCallback) {
return Observable.merge(
getUserItemsRemote(userId),
getUserItemsLocal(userId)
.useOnlyIfFirstResponse()
}


So if the remote API request completes first, then that response is the only one that gets returned. But if the local request completes first, I want to return that, and then return the remote request once it is completed. Does RxJava have anything like this built in?



Edit: I would like to add that getUserItemsRemote does update the local database when the Observable emits, but I don't think that I can ensure that the database will be updated before the local request completes, which leaves the possibility that the local request will respond with stale data.










share|improve this question





























    1















    Currently I have a method in a repository class which fetches data from both a local cache and a remote API.



    public Observable<List<Items>> getItemsForUser(String userId {
    return Observable.concatArrayEager(
    getUserItemsLocal(userId), // returns Observable<List<Items>>
    getUserItemsRemote(userId) // returns Observable<List<Items>>
    );
    }


    Currently, the method fetches the local data first (which may be outdated) and returns it, then updates it with the fresh data from the remote API.



    I want to change the implementation to use Observable.merge so that if the remote API request completes first, that data gets shown first. However, if I just use Observable.merge I'm concerned that the local database request may return stale data, which will then overwrite the fresh data from the remote.



    Basically, I want something like:



    public Observable<List<ShoutContent>> getItemsForUser(String userId, ErrorCallback errorCallback) {
    return Observable.merge(
    getUserItemsRemote(userId),
    getUserItemsLocal(userId)
    .useOnlyIfFirstResponse()
    }


    So if the remote API request completes first, then that response is the only one that gets returned. But if the local request completes first, I want to return that, and then return the remote request once it is completed. Does RxJava have anything like this built in?



    Edit: I would like to add that getUserItemsRemote does update the local database when the Observable emits, but I don't think that I can ensure that the database will be updated before the local request completes, which leaves the possibility that the local request will respond with stale data.










    share|improve this question



























      1












      1








      1








      Currently I have a method in a repository class which fetches data from both a local cache and a remote API.



      public Observable<List<Items>> getItemsForUser(String userId {
      return Observable.concatArrayEager(
      getUserItemsLocal(userId), // returns Observable<List<Items>>
      getUserItemsRemote(userId) // returns Observable<List<Items>>
      );
      }


      Currently, the method fetches the local data first (which may be outdated) and returns it, then updates it with the fresh data from the remote API.



      I want to change the implementation to use Observable.merge so that if the remote API request completes first, that data gets shown first. However, if I just use Observable.merge I'm concerned that the local database request may return stale data, which will then overwrite the fresh data from the remote.



      Basically, I want something like:



      public Observable<List<ShoutContent>> getItemsForUser(String userId, ErrorCallback errorCallback) {
      return Observable.merge(
      getUserItemsRemote(userId),
      getUserItemsLocal(userId)
      .useOnlyIfFirstResponse()
      }


      So if the remote API request completes first, then that response is the only one that gets returned. But if the local request completes first, I want to return that, and then return the remote request once it is completed. Does RxJava have anything like this built in?



      Edit: I would like to add that getUserItemsRemote does update the local database when the Observable emits, but I don't think that I can ensure that the database will be updated before the local request completes, which leaves the possibility that the local request will respond with stale data.










      share|improve this question
















      Currently I have a method in a repository class which fetches data from both a local cache and a remote API.



      public Observable<List<Items>> getItemsForUser(String userId {
      return Observable.concatArrayEager(
      getUserItemsLocal(userId), // returns Observable<List<Items>>
      getUserItemsRemote(userId) // returns Observable<List<Items>>
      );
      }


      Currently, the method fetches the local data first (which may be outdated) and returns it, then updates it with the fresh data from the remote API.



      I want to change the implementation to use Observable.merge so that if the remote API request completes first, that data gets shown first. However, if I just use Observable.merge I'm concerned that the local database request may return stale data, which will then overwrite the fresh data from the remote.



      Basically, I want something like:



      public Observable<List<ShoutContent>> getItemsForUser(String userId, ErrorCallback errorCallback) {
      return Observable.merge(
      getUserItemsRemote(userId),
      getUserItemsLocal(userId)
      .useOnlyIfFirstResponse()
      }


      So if the remote API request completes first, then that response is the only one that gets returned. But if the local request completes first, I want to return that, and then return the remote request once it is completed. Does RxJava have anything like this built in?



      Edit: I would like to add that getUserItemsRemote does update the local database when the Observable emits, but I don't think that I can ensure that the database will be updated before the local request completes, which leaves the possibility that the local request will respond with stale data.







      caching observable rx-java2






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 20 '18 at 2:18







      theasianpianist

















      asked Nov 20 '18 at 1:01









      theasianpianisttheasianpianist

      8819




      8819
























          1 Answer
          1






          active

          oldest

          votes


















          0














          You can make use of the takeUntil operator.



          takeUntil returns an Observable that emits the items emitted by the source Observable until a second ObservableSource emits an item.



          In your case, you need to stop observing the local observable, once the remote Observable is emitted. The code is demonstrated below.



          public Observable<String> getUserItemsLocal() {
          return Observable.just("Local db response")
          .delay(5, TimeUnit.SECONDS); // assume local db takes 5 seconds to emit
          }


          public Observable<String> getUserItemsRemote() {
          return Observable.just("Remote Data")
          .delay(1, TimeUnit.SECONDS); // remote data comes quicker, in 1 second
          }


          Your repository code goes like



          Observable<String> remoteResponse = getUserItemsRemote();
          getUserItemsLocal().takeUntil(remoteResponse)
          .mergeWith(remoteResponse)
          .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
          Log.d(TAG, "result: " + s);
          }
          });





          share|improve this answer


























          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? reactivex.io/documentation/operators/takeuntil.html according to the ReacticeX docs it looks like the result will emit up until the second observable emits, at which point it stops. Does this mean I need to manually subscribe to the remote obersvable as well with something like concatArrayEager?

            – theasianpianist
            Nov 21 '18 at 18:41











          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? Yes

            – Sarath Kn
            Nov 21 '18 at 18:44













          • the result will emit up until the second observable emits That's exactly your requirement is right? You can take the local results (first observable) till your remote data arrives. once remote data starts emitting (second observable) you don't need the first observable emissions anymore

            – Sarath Kn
            Nov 21 '18 at 18:50













          • You can interchange the delay given for getUserItemsLocal and getUserItemsRemote in the above samples and see the output

            – Sarath Kn
            Nov 21 '18 at 18:52











          • Yes, I just wanted to make sure that the second observable would still emit without me having to subscribe separately, which is what the ReactiveX docs seem to indicate. Thank you!

            – theasianpianist
            Nov 21 '18 at 20:37











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53384773%2frxjava2-have-remote-data-override-local-data-in-observable%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          You can make use of the takeUntil operator.



          takeUntil returns an Observable that emits the items emitted by the source Observable until a second ObservableSource emits an item.



          In your case, you need to stop observing the local observable, once the remote Observable is emitted. The code is demonstrated below.



          public Observable<String> getUserItemsLocal() {
          return Observable.just("Local db response")
          .delay(5, TimeUnit.SECONDS); // assume local db takes 5 seconds to emit
          }


          public Observable<String> getUserItemsRemote() {
          return Observable.just("Remote Data")
          .delay(1, TimeUnit.SECONDS); // remote data comes quicker, in 1 second
          }


          Your repository code goes like



          Observable<String> remoteResponse = getUserItemsRemote();
          getUserItemsLocal().takeUntil(remoteResponse)
          .mergeWith(remoteResponse)
          .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
          Log.d(TAG, "result: " + s);
          }
          });





          share|improve this answer


























          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? reactivex.io/documentation/operators/takeuntil.html according to the ReacticeX docs it looks like the result will emit up until the second observable emits, at which point it stops. Does this mean I need to manually subscribe to the remote obersvable as well with something like concatArrayEager?

            – theasianpianist
            Nov 21 '18 at 18:41











          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? Yes

            – Sarath Kn
            Nov 21 '18 at 18:44













          • the result will emit up until the second observable emits That's exactly your requirement is right? You can take the local results (first observable) till your remote data arrives. once remote data starts emitting (second observable) you don't need the first observable emissions anymore

            – Sarath Kn
            Nov 21 '18 at 18:50













          • You can interchange the delay given for getUserItemsLocal and getUserItemsRemote in the above samples and see the output

            – Sarath Kn
            Nov 21 '18 at 18:52











          • Yes, I just wanted to make sure that the second observable would still emit without me having to subscribe separately, which is what the ReactiveX docs seem to indicate. Thank you!

            – theasianpianist
            Nov 21 '18 at 20:37
















          0














          You can make use of the takeUntil operator.



          takeUntil returns an Observable that emits the items emitted by the source Observable until a second ObservableSource emits an item.



          In your case, you need to stop observing the local observable, once the remote Observable is emitted. The code is demonstrated below.



          public Observable<String> getUserItemsLocal() {
          return Observable.just("Local db response")
          .delay(5, TimeUnit.SECONDS); // assume local db takes 5 seconds to emit
          }


          public Observable<String> getUserItemsRemote() {
          return Observable.just("Remote Data")
          .delay(1, TimeUnit.SECONDS); // remote data comes quicker, in 1 second
          }


          Your repository code goes like



          Observable<String> remoteResponse = getUserItemsRemote();
          getUserItemsLocal().takeUntil(remoteResponse)
          .mergeWith(remoteResponse)
          .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
          Log.d(TAG, "result: " + s);
          }
          });





          share|improve this answer


























          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? reactivex.io/documentation/operators/takeuntil.html according to the ReacticeX docs it looks like the result will emit up until the second observable emits, at which point it stops. Does this mean I need to manually subscribe to the remote obersvable as well with something like concatArrayEager?

            – theasianpianist
            Nov 21 '18 at 18:41











          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? Yes

            – Sarath Kn
            Nov 21 '18 at 18:44













          • the result will emit up until the second observable emits That's exactly your requirement is right? You can take the local results (first observable) till your remote data arrives. once remote data starts emitting (second observable) you don't need the first observable emissions anymore

            – Sarath Kn
            Nov 21 '18 at 18:50













          • You can interchange the delay given for getUserItemsLocal and getUserItemsRemote in the above samples and see the output

            – Sarath Kn
            Nov 21 '18 at 18:52











          • Yes, I just wanted to make sure that the second observable would still emit without me having to subscribe separately, which is what the ReactiveX docs seem to indicate. Thank you!

            – theasianpianist
            Nov 21 '18 at 20:37














          0












          0








          0







          You can make use of the takeUntil operator.



          takeUntil returns an Observable that emits the items emitted by the source Observable until a second ObservableSource emits an item.



          In your case, you need to stop observing the local observable, once the remote Observable is emitted. The code is demonstrated below.



          public Observable<String> getUserItemsLocal() {
          return Observable.just("Local db response")
          .delay(5, TimeUnit.SECONDS); // assume local db takes 5 seconds to emit
          }


          public Observable<String> getUserItemsRemote() {
          return Observable.just("Remote Data")
          .delay(1, TimeUnit.SECONDS); // remote data comes quicker, in 1 second
          }


          Your repository code goes like



          Observable<String> remoteResponse = getUserItemsRemote();
          getUserItemsLocal().takeUntil(remoteResponse)
          .mergeWith(remoteResponse)
          .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
          Log.d(TAG, "result: " + s);
          }
          });





          share|improve this answer















          You can make use of the takeUntil operator.



          takeUntil returns an Observable that emits the items emitted by the source Observable until a second ObservableSource emits an item.



          In your case, you need to stop observing the local observable, once the remote Observable is emitted. The code is demonstrated below.



          public Observable<String> getUserItemsLocal() {
          return Observable.just("Local db response")
          .delay(5, TimeUnit.SECONDS); // assume local db takes 5 seconds to emit
          }


          public Observable<String> getUserItemsRemote() {
          return Observable.just("Remote Data")
          .delay(1, TimeUnit.SECONDS); // remote data comes quicker, in 1 second
          }


          Your repository code goes like



          Observable<String> remoteResponse = getUserItemsRemote();
          getUserItemsLocal().takeUntil(remoteResponse)
          .mergeWith(remoteResponse)
          .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
          Log.d(TAG, "result: " + s);
          }
          });






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 20 '18 at 5:42

























          answered Nov 20 '18 at 5:35









          Sarath KnSarath Kn

          1,623919




          1,623919













          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? reactivex.io/documentation/operators/takeuntil.html according to the ReacticeX docs it looks like the result will emit up until the second observable emits, at which point it stops. Does this mean I need to manually subscribe to the remote obersvable as well with something like concatArrayEager?

            – theasianpianist
            Nov 21 '18 at 18:41











          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? Yes

            – Sarath Kn
            Nov 21 '18 at 18:44













          • the result will emit up until the second observable emits That's exactly your requirement is right? You can take the local results (first observable) till your remote data arrives. once remote data starts emitting (second observable) you don't need the first observable emissions anymore

            – Sarath Kn
            Nov 21 '18 at 18:50













          • You can interchange the delay given for getUserItemsLocal and getUserItemsRemote in the above samples and see the output

            – Sarath Kn
            Nov 21 '18 at 18:52











          • Yes, I just wanted to make sure that the second observable would still emit without me having to subscribe separately, which is what the ReactiveX docs seem to indicate. Thank you!

            – theasianpianist
            Nov 21 '18 at 20:37



















          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? reactivex.io/documentation/operators/takeuntil.html according to the ReacticeX docs it looks like the result will emit up until the second observable emits, at which point it stops. Does this mean I need to manually subscribe to the remote obersvable as well with something like concatArrayEager?

            – theasianpianist
            Nov 21 '18 at 18:41











          • So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? Yes

            – Sarath Kn
            Nov 21 '18 at 18:44













          • the result will emit up until the second observable emits That's exactly your requirement is right? You can take the local results (first observable) till your remote data arrives. once remote data starts emitting (second observable) you don't need the first observable emissions anymore

            – Sarath Kn
            Nov 21 '18 at 18:50













          • You can interchange the delay given for getUserItemsLocal and getUserItemsRemote in the above samples and see the output

            – Sarath Kn
            Nov 21 '18 at 18:52











          • Yes, I just wanted to make sure that the second observable would still emit without me having to subscribe separately, which is what the ReactiveX docs seem to indicate. Thank you!

            – theasianpianist
            Nov 21 '18 at 20:37

















          So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? reactivex.io/documentation/operators/takeuntil.html according to the ReacticeX docs it looks like the result will emit up until the second observable emits, at which point it stops. Does this mean I need to manually subscribe to the remote obersvable as well with something like concatArrayEager?

          – theasianpianist
          Nov 21 '18 at 18:41





          So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? reactivex.io/documentation/operators/takeuntil.html according to the ReacticeX docs it looks like the result will emit up until the second observable emits, at which point it stops. Does this mean I need to manually subscribe to the remote obersvable as well with something like concatArrayEager?

          – theasianpianist
          Nov 21 '18 at 18:41













          So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? Yes

          – Sarath Kn
          Nov 21 '18 at 18:44







          So if the local finishes first, it will emit both, but if remote finishes first, it only emits the remote? Yes

          – Sarath Kn
          Nov 21 '18 at 18:44















          the result will emit up until the second observable emits That's exactly your requirement is right? You can take the local results (first observable) till your remote data arrives. once remote data starts emitting (second observable) you don't need the first observable emissions anymore

          – Sarath Kn
          Nov 21 '18 at 18:50







          the result will emit up until the second observable emits That's exactly your requirement is right? You can take the local results (first observable) till your remote data arrives. once remote data starts emitting (second observable) you don't need the first observable emissions anymore

          – Sarath Kn
          Nov 21 '18 at 18:50















          You can interchange the delay given for getUserItemsLocal and getUserItemsRemote in the above samples and see the output

          – Sarath Kn
          Nov 21 '18 at 18:52





          You can interchange the delay given for getUserItemsLocal and getUserItemsRemote in the above samples and see the output

          – Sarath Kn
          Nov 21 '18 at 18:52













          Yes, I just wanted to make sure that the second observable would still emit without me having to subscribe separately, which is what the ReactiveX docs seem to indicate. Thank you!

          – theasianpianist
          Nov 21 '18 at 20:37





          Yes, I just wanted to make sure that the second observable would still emit without me having to subscribe separately, which is what the ReactiveX docs seem to indicate. Thank you!

          – theasianpianist
          Nov 21 '18 at 20:37




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53384773%2frxjava2-have-remote-data-override-local-data-in-observable%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown