2017. szeptember 2., szombat

Java 9 Flow API: mapping and filtering in one stage

Introduction

In most reactive libraries, mapping and filtering can be done on a flow via separate operators map() and filter() respectively. One rare occasions, the functions to these operators would need to communicate with each other without sharing information in a flow-external manner and without using defer(). Such combined and standard mapFilter() operator doesn't exist and one has to write one of its own.

Given that the Java 9 Flow API is brand new, one has to definitely write a custom operator for it as Java 9 itself doesn't provide any rich set of predefined operations on Flow.Publishers unlike its dual, the Stream API.

Shameless advertising

By the way, if you are looking for a Java 9 Flow-based, native and modern reactive library with rich set of operators, similar to RxJava 2 (even including some operators from its extension project), I happen to have one for you: Reactive4JavaFlow. It is free and open-source with the promising outlook that one day, it may form the basis for the next major RxJava version...


MapFilter API design

When the Reactive4Java library was first concieved in 2011, the first significant stumbling block was not the lack of lambdas in Java 6/7 but the lack of extension methods. C# had it and made Rx.NET conveniently extendable (assuming you managed to understand how to write operators for it as it wasn't open source at the time). Java still doesn't have any sign of ever getting extension methods, therefore, we either need a rich abstract base class, such as Flowable or Flux, or an utility class whose methods almost look like extension method definitions with the exception that the developer has to stack them on top of one another:


import java.util.concurrent.*;
import static FlowUtils.*;

Flow.Publisher<String> f =
    timeout(
        mapFilter(
            new FlowRange(1, 10, Runnable::run),
            (v, e) -> {
                if (v % 2 == 0) {
                    e.next(v.toString())
                }
                if (v == 7) {
                    e.complete();
                }
            }
        ),
        5, TimeUnit.MILLISECONDS
    );

When thinking about a combined map and filter operator, the problem arises that we'd need to allow the function to emit a mapped item, indicate the input item is dropped, fail and while we are at it, stop the sequence entirely. A functional interface can only return one thing at once or throw, trying to encode the return value or stop indicator introduces potential manual casting and null-use otherwise absent in modern reactive flows.

At first thought, exposing the Flow.Subscriber to an user provided function may be attractive, however, such direct access has implications and susceptible to incorrect use:


  • The operator has to honor backpressure, hopefully without buffering at all. Calling any of the methods multiple times deliberately or by accident would violate the Reactive-Streams protocol.
  • The upstream's Flow.Subscription has to be cancelled if the user function called onError() or onComplete().
  • Call to onSubscribe() should be prevented.
  • At best, the three previous options would need an extra wrapper around the downstream Flow.Subscriber which adds one layer of indirection and one extra allocation.

Instead, we'll define an MapFilterEmitter interface that limits the API surface to the 4 possible response to an upstream value in the operator:

interface MapFilterEmitter<R> {

    void next(R result);

    void error(Throwable throwable);

    void complete();
}

The choice of dropping an upstream value via this interface design is indicated by simply not calling any of the methods. The reason the method names don't use the onX patterns is that by having different names, the same class can implement both the Flow.Subscriber and MapFilterEmitter interfaces without name and functionality clashing.

Now the operator's method signature in the FlowUtils class can be defined as follows:


public static <T, R> Flow.Publisher<R> mapFilter(Flow.Publisher<T> source,
        java.util.function.BiConsumer<? super T, MapFilterEmitter<R>> handler) {

    return new MapFilterPublisher<>(source, handler);
}


We employ the Java 8+ standard functional interface BiConsumer instead of a Function because we indicate the mapping outcome by requiring the implementor of this handler to call methods on the provided MapFilterEmitter instance. Naturally, the upstream's value we'd like the handler to respond to must be also provided.

Sometimes, it is more user friendly to allow the user provided lambda to throw checked exceptions. Unfortunately, the Java 8+ standard functional interfaces don't let us do that and one has to define its own functional interface with the single abstract method on it declaring throws Exception or even throws Throwable. Adapting the operator API to this case is left as excercise to the reader.

The operator implementation

Working with a reactive stream of data practically means creating a Flow.Subscriber to receive data from an upstream and then calling the methods of a downstream Flow.Subscriber at the right time. When the Flow.Publisher.subscribe() happens, the downstream's Flow.Subscriber becomes available, it gets wrapped into the operator's own Flow.Subscriber instance and this instance gets subscribed to the Flow.Publisher upstream.

This is usually surrounded by a class implementing Flow.Publisher itself and can be considered a boilerplate to set up: have a constructor taking the parameters and callbacks to be used by the operator and perform the subscription in its subscribe() method:


public final class MapFilterPublisher<T, R> implements Flow.Publisher<R> {

    final Flow.Publisher<T> upstream;

    final BiConsumer<? super T, MapFilterEmitter<R>> handler;

    public MapFilterPublisher(
            Flow.Publisher<T> upstream,
            BiConsumer<? super T, MapFilterEmitter<R>> handler
    ) {
        this.upstream = upstream;
        this.handler = handler;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super R> downstream) {
        
        upstream.subscribe(new MapFilterSubscriber<>(downstream, handler));

    }
}


The main benefit of this structure is that the same, usually complicated, stream can be run multiple times and independently of each other, or repeatedly if (these) parts of the flow must be retried due to failure.

The MapFilterSubscriber practically shimmed between the upstream and downstream for the purpose of altering the stream's emission pattern. For this, it has to implement the Flow.Subscriber interface and to save on complications, we will implement the MapFilterEmitter interface on it the same time.

It is recommended to implement Flow.Subscription as well and delegate the downstream's request() and cancel() up the chain, even though this means a level of indirection. The reason for this to become operator-fusion friendly even though the current operator won't support operator-fusion.


static final class MapFilterSubscriber<T, R> implements
        Flow.Subscriber<T>, MapFilterSubscriber<R>, Flow.Subscription {

    final Flow.Subscription<? super R> downstream;

    final BiConsumer<? super T, MapFilterEmitter<R>> handler;

    Flow.Subscription upstream;

    R result;

    Throwable error;

    boolean done;

    MapFilterSubscriber(
             Flow.Subscription<? super R> downstream,
             BiConsumer<? super T, MapFilterEmitter<R>> handler
    ) {
        this.downstream = downstream;
        this.handler = handler;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        this.upstream = s;
        downstream.onSubscribe(this);
    }

    @Override
    public void request(long n) {
        upstream.request(n);
    }

    @Override
    public void cancel() {
        upstream.cancel();
    }

    // +++++++++++++++++++++++++++++++++++++++++++++++++++++++

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    @Override
    public void next(R result) {
        // TODO implement
    }

    @Override
    public void error(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void complete() {
        // TODO implement
    }

}

The fields result, error an done store the operator's state in response to the call to the given handler. This indirection is required to detect invalid multiple calls to the next, error and complete methods of the MapFilterEmitter. The onSubscribe(), request() and cancel() is straightforward: forward the calls to the downstream and upstream respectively.

First, the methods onNext(), onError() and onComplete(), which respond to upstream signals will be implemented:


@Override
public void onNext(T item) {
    if (done) {                                    // (1)
        return;
    }

    try {
        handler.accept(item, this);                // (2)
    } catch (Throwable ex) {

        upstream.cancel();                         // (3)

        Throwable error = this.error;
        if (error != null) {                       // (4)
            error.addSuppressed(ex);
        } else {
            this.error = ex;
        }
        done = true;   
    }

    R v = result;
    if (v != null) {                               // (5)
        result = null;

        downstream.onNext(v);
    }

    if (done) {

        Throwable error = this.error;              // (6)
        this.error = null;

        if (error == null) {
            downstream.onComplete();
        } else {
            downstream.onError(error);
        }
        return;
    }

    if (v == null) {
        upstream.request(1);                       // (7)
    }
}


  1. In case the upstream can't react to cancellation immediately when the operator reaches its terminal state due to crashing/completing, checking for the done flag and returning immediately will prevent the unnecessary call to the handler later on.
  2. The handler is called with the current item from upstream and with the this instance, which implements the MapFilterEmitter to receive the decision inside the handler regarding the item.
  3. User provided functions are prone to crashing which we capture via catching Throwable. In this case, the upstream Flow.Subscription gets cancelled.
  4.  The error gets saved or appended to an existing error in case the handler called error() first then crashed after for some reason.
  5. Since the handler could have signalled a result value, we will first emit that value to downstream.
  6. If the handler terminated or crashed, the done flag will be true, indicating the terminal event should be emitted to downstream as well.
  7. Finally, if there was no result value from the handler, indicating the item should be dropped, we have to request a "replacement" value from upstream because the downstream won't know it should request more - there is no signal for "no value produced" but only the lack of signals.

Implementing onError() and onComplete() only requires preventing the upstream to call onError() or onComplete() on the downstream in case the handler in onNext() has indicated termination or crashed:


@Override
public void onError(Throwable throwable) {
    if (!done) {
        downstream.onError(throwable);
    }
}

@Override
public void onComplete() {
    if (!done) {
        downstream.onComplete();
    }
}


Finally, the role of the remaining next(), error() and complete() methods is to ensure they are called at most once per handler.accept() invocation, and in case of a violation, cancel the upstream and prepare the error to be emitted by (6) in onNext() listed above.


@Override
public void next(R item) {
    if (done) {
        return;
    }

    if (item == null) {
        error(new NullPointerException("item == null");
    }

    if (this.result != null) {
        error(new IllegalStateException("Multiple next() calls not allowed");
    } else {
        this.result = item;
    }
}


In next(), first we check if the sequence has been already terminated by a prior error() or complete() call from within the handler. Then we check for null as null values are not allowed in Reactive-Streams. One can observe that when writing operators, using null internally is beneficial and unless it will lead to an onNext(null) call, we can use it for our purposes such as indicating the next() has not been called before from the same invocation of the handler.accept() method.

@Override
public void error(Throwable throwable) {
    if (done) {
        return;
    }
    if (throwable == null) {
        throwable = new NullPointerException("throwable == null");
    }

    upstream.cancel();

    this.error = throwable;
    done = true;
}

Similary, error() checks for an already terminated state, turns a null Throwable into a NullPointerException and stores it in the error field to be emitted from (6) in onNext(). Since such call is a terminal signal, the upstream has to be cancelled as no further items should and would be handled by the operator from that point on.

@Override
public void complete() {
    upstream.cancel();
    done = true;
}

Finally, complete() will just cancel the upstream for the same reason mentioned before and set the done flag. Multiple calls to complete() has no (observable) effect and is considered idempotent, therefore, no special checks for multiple calls are necessary. One call, of course, do the same if (done) return; as with the other methods in the listing.

Conclusion

Writing in-sequence operators that don't amplify item amounts usually don't require complicated request management and/or the typical queue-drain approach mentioned a while ago in this blog. Combining operator features, however, may require its own operator-specific API definition to interact with, help and at the same time limit the user's interaction with the flow to ensure it remains Reactive-Streams compliant.

You might say: "I'm using RxJava 2, how do I implement such operator for it?". Easily; since conceptionally both the Java 9 Flow and RxJava 2 are derived from and built upon the Reactive-Streams principles, you only have to

  1. replace the imports of java.util.concurrent.Flow.* with org.reactivestreams.*,
  2. remove the prefix Flow. if you from the codes above and 
  3. replace implements Publisher<R> with extends Flowable<R>:


import org.reactivestreams.*;
import io.reactivex.*;

public final class MapFilterPublisher<T, R> extends Flowable<R> {
     // ...
}

(Before you rush and propose a PR for RxJava 2, it should be noted that this type of operator is already available in the RxJava 2 Extensions project via the FlowableTransformers.mapFilter() operator.)

Interestingly, there was no concurrency-related component to this operator. This is no accident since the handling of an upstream item happens in-sequence due to onNext being called in a sequential manner and as long as the onNext() method doesn't return control to its called, we are safe from reentrant calls. From the handler's perspective, this means it must not go asynchronous by taking the emitter instance provided to it as the second argument of the BiConsumer.accept() it implements. It has to produce a response synchronously and may block while doing it.

However, in case the mapping and/or filtering response is the result of an asynchronous computation "forked out" from the current element, concurrency has to be handled quite differently. We will explore this case in the next blog post.

Nincsenek megjegyzések:

Megjegyzés küldése