RxJava + delay == crash?

Łukasz Kiełczykoski

If you are an experienced Android developer, you might know the problem this post is about, otherwise keep reading.

I’m developing an Android app at company I work for. Previously, I wasn’t an Android developer but when a new project arrived I needed to switch for this one. I started using Retrofit with RxJava and RxJava itself as it was recommended in many blogposts I read and the whole thing seems understandable and clear to me.

At some point, when I wanted to create a search input which at change will notify all observers about it but with a little delay to prevent multiple notifications when user is still typing his search input. One of the observers was suppose to make a request to a server but then I stumbled across weird exception. Made request failed and at this point, application was suppose to show simple alert about the failure but instead, it threw a RuntimeException and info about doing view changes on a wrong thread.

My code was fairly simple:

searchInput.addTextChangedListener(new TextWatcher() {
    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after) { }

    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) { }

    @Override
    public void afterTextChanged(Editable s) {
        if (searchTextChangeSubscriber != null) {
            searchTextChangeSubscriber.dispose();
        }
        searchTextChangeSubscriber = Completable.complete()
                .observeOn(SchedulerProvider.getInstance().ui())
                .subscribeOn(SchedulerProvider.getInstance().io())
                .delay(delay, TimeUnit.MILLISECONDS)
                .subscribe(() -> notifyObservers());
        }
    }
});

At first, I was puzzled because I was setting schedulers in both observeOn and subscribeOn. I put some breakpoints to determine what kind of thread is used to execute some particular pieces of code. It turned out, code from subscribe is executed on RxComputationThreadPool-1 and I was expecting main

observeOn changes a thread for actions after it, however delay uses computation thread by default. It is good because a delay won’t block our UI but I wasn’t aware of it and somehow I assumed that delay is performed in some magical world :) The solution is very simple, you either provide a Scheduler to delay or you move delay before observeOn.

searchInput.addTextChangedListener(new TextWatcher() {
    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after) { }

    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) { }

    @Override
    public void afterTextChanged(Editable s) {
        if (searchTextChangeSubscriber != null) {
            searchTextChangeSubscriber.dispose();
        }
        searchTextChangeSubscriber = Completable.complete()
                .delay(delay, TimeUnit.MILLISECONDS) // <-- moved above observeOn
                .observeOn(SchedulerProvider.getInstance().ui())
                .subscribeOn(SchedulerProvider.getInstance().io())
                .subscribe(() -> notifyObservers());
        }
    }
});

See you in the next one!