2015. május 14., csütörtök

Pitfalls of operator implementations (part 2)

Introduction

I'm taking a break from writing about Producers and return to the pitfalls of operator implementations, but I'd also like to mention some pitfalls in the use of certain (sequence of) RxJava operators.

#6: Emitting without request

Let's assume you want to write an operator which ignores all upstream values and emits a single replacement value when the upstream completes:


Operator<Integer, Integer> ignoreAllAndJust = child -> {
    Subscriber<Integer> parent = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer value) {
            // ignored
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {
            child.onNext(1);
            child.onCompleted();
        }
    };
    child.add(parent);
    return parent;
};

The operator relies on the fact that the default Subscriber behavior is to request everything and tries to balance on the fact that downstream is likely to request at least a single element before the upstream calls onCompleted(). However, even though your tests may pass, the operator still breaks the requirements of backpressure: the onCompleted() unconditionally emits a value and doesn't check if downstream did actually request anything at all. This problem becomes reality in the moment you have some kind of firehose (a hot Observable or a backpressure-unaware one) and you need to interact with a reactive-streams compliant downstream: your downstream Subscriber will receive some form of an onError violating §1.1 of reactive-streams' rules.

The fix is relatively simple, now that we know about a bunch of Producer types:


        // ... same as before
        @Override
        public void onCompleted() {
            child.setProducer(new SingleProducer(child, 1));
        }
        // ... same as before

We learned about the SingleProducer in producers part 2 and it is a prime candidate here.

However, I'd like to take the opportunity and show the alternative fix which will become relevant in RxJava 2.0 and reactive-streams compliant operators:


Operator<Integer, Integer> ignoreAllAndJust = child -> {
    SingleDelayedProducer<Integer> sdp = 
        new SingleDelayedProducer<>(child);
    Subscriber<Integer> parent = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer value) {
            // ignored
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {
            sdp.set(1);
        }
    };
    child.add(parent);
    child.setProducer(sdp);
    return parent;
};


This approach, although a bit verbose for RxJava 1.x, works in functionally equivalent way. The reason it is the correct way for reactive-streams is that their Subscribers can't live without a 'Producer' because their 'Producer' is also a 'Subscription' (quite literally) and provides the Subscriber with the only means to unsubscribe from upstream. Any delay in receiving a producer also delays a potential unsubscription.

#7: Shared state in the operator

You might think ignoreAllAndJust is a bit silly operator, but we can make it more useful by turning it into an operator that counts the number of received values and emits the total count on completion. In addition, let's assume we are stuck with Java 6 and can't use lambdas:


public final class CounterOp<T 
implements Operator<Integer, T> {
    int count;                                              // (1)
    @Override
    public Subscriber call(Subscriber child) {
        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onNext(T t) {
                count++;
            }
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                child.setProducer(
                    new SingleProducer<Integer>(child, count));
            }
        };
        child.add(parent);
        return parent;
    }
}

Observable<Integer> count = Observable.just(1)
    .lift(new CounterOp<Integer>());                        // (2)
        
count.subscribe(System.out::println);
count.subscribe(System.out::println);
count.subscribe(System.out::println);

We were naturally careful with backpressure and implemented onCompleted() correctly, however, when one runs the example, it prints 1, 2 and 3! Certainly, the number of elements in just(1) should be 1 no matter how many times one counts it.

The bug is on line (1) which shares the counter across all subscribers that passes through the call() method. The first subscriber will increment it to 1, the second will increment it further to 2 and so on because there is in fact always a single instance of this count value due to (2).

The solution is to move the counter into the parent subscriber:


public final class CounterOp<T 
implements Operator<Integer, T> {
    @Override
    public Subscriber call(Subscriber child) {
        Subscriber<T> parent = new Subscriber<T>() {
           int count;
    // ... the rest is the same


There are legitimate cases where one can share state between various subscribers, but such cases are few and far between, therefore, the general rule of thumb is:

All instance variables on an Operator should be final.

If you put the final keyword before them, you'll quickly discover where your code tries to modify them.

#8: Shared state in an Observable chain

Let's assume you are dissatisfied with the performance or the type of the List the toList() operator returns and you want to roll your own aggregator instead of it. For a change, you want to do this by using existing operators and you find the operator reduce():


Observable<Vector<Integer>> list = Observable
    .range(1, 3)
    .reduce(new Vector<Integer>(), (vector, value) -> {
        vector.add(value);
        return vector;
    });

list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);

When you run the 'test' calls, the first prints what you'd expect, but the second prints a vector where the range 1-3 appears twice and the third subscribe prints 9 elements!

The problem is not with the reduce() operator itself but with the expectation surrounding it. When the chain is established, the new Vector passed in is a 'global' instance and will be shared between all evaluation of the chain.

Naturally, there is a way of fixing this without implementing an operator for the whole purpose (which should be quite simple if you see the potential in the previous CounterOp):


Observable<Vector<Integer>> list2 = Observable
    .range(1, 3)
    .reduce((Vector<Integer>)null, (vector, value) -> {
        if (vector == null) {
            vector = new Vector<>();
        }
        vector.add(value);
        return vector;
    });

list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);


You need to start with null and create a vector inside the accumulator function, which now isn't shared between subscribers.

The rule of thumb here is that whenever you see an aggregator-like operator taking some plain value, be cautious as this 'initial value' will most likely be shared across all subscribers and if you plan to consume the resulting stream with multiple subscribers, they will clash and may give you unexpected results or even crash.

Conclusion

In this lighter toned blog post, I've talked about three more common pitfalls regarding operators and shown examples of testing for the underlying bug and ways to fix them.

One can shot oneself in the foot in many (sometimes hilarious - at least to me) ways with RxJava and its operators, so my quest isn't over yet. However, the pitfalls start to get more subtle and in order to fix the underlying bug(s), we need to learn more kinds of operator primitives.

Nincsenek megjegyzések:

Megjegyzés küldése