2015. május 18., hétfő

Operator concurrency primitives: producers (part 6 - final)

Introduction

You might have already guessed that Producers are the real 'heroes' of the operators. One way or another, once the request-response ratio is no longer 1:1, you need to introduce an intermediate producer that needs to coordinate messages going in either direction.

In this final part about various producers, I'm going to detail the final, generic-purpose Producer implementation which coordinates not just the requesting and changing of the upstream producer but also makes sure event delivery doesn't run concurrently with such changes. Such a producer are required inside switchOnNext() because if there is an active emission from a previous source, one can't switch to the next and trigger execution. One has to wait for the emission to complete and then make the switch, so no concurrency and no over-requesting happens.

The producer-observer-arbiter

The solution, as usual, is to use one of the serialized access primitives and route every relevant method call through the emitter-loop or drain. For a start, here is the basic class structure for the Producer:


public final class ProducerObserverArbiter<T> 
implements Producer, Observer<T> {      // (1)
    final Subscriber child;
    
    boolean emitting;
    
    List<Object> queue;                                    // (2)
    Producer currentProducer;
    long requested;
    
    public ProducerObserverArbiter(
            Subscriber<? super T> child) {
        this.child = child;
    }
    
    @Override
    public void onNext(T t) {
        // implement
    }
    @Override
    public void onError(Throwable e) {
        // implement
    }
    @Override
    public void onCompleted() {
        // implement
    }
    public void request(long n) {
        // implement
    }
    public void set(Producer p) {
        // implement
    }
    void emitLoop() {
        // implement
    }
}

The basic structure is straightforward: we extend Producer and Observer (for convenience on the method), capture the child and will use an emitter-loop where the tasks (such as value emission, request, production) will be queued (2).

Instead of using missed fields directly (more on that later), I'll wrap the various actions into private holder classes so a single common queue can hold not just the events, but the request and producer changes as well:

    private static final class ErrorSentinel {                     // (1)
        final Throwable error;
        public ErrorSentinel(Throwable error) {
            this.error = error;
        }
    }

    private static final Object COMPLETED_SENTINEL = new Object(); // (2)
    
    private static final class RequestSentinel {                   // (3)
        final long n;
        public RequestSentinel(long n) {
            this.n = n;
        }
    }
    
    private static final class ProducerSentinel {                  // (4)
        final Producer p;
        public ProducerSentinel(Producer p) {
            this.p = p;
        }
    }

We define 4 types of sentinels; these private classes and object instance make sure their underlying value doesn't conflict with the emission value of T (i.e., you can observe a stream of Throwables, Longs or Producers without interference). Depending on whether or not the queue supports null values, you can introduce a NULL_SENTINEL as needed.

  1. We store errors in an ErrorSentinel instance.
  2. We use a constant for indicating a stateless completed event happened.
  3. We store the positive requests and negative production values in a RequestSentinel instance.
  4. We store the setting or clearing of a producer in a ProducerSentinel instance.

Let's start with the Observer method implementations:


    // ...
    @Override
    public void onNext(T t) {
        synchronized (this) {
            if (emitting) {
                List<Object> q = queue;
                if (q == null) {
                    q = new ArrayList<>();
                    queue = q;
                }
                q.add(t);
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            child.onNext(t);

            long r = requested;
            if (r != Long.MAX_VALUE) {            // (1)
                requested = r - 1;
            }
            
            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }
    // ...

The implementation of onNext() is straightforward and resembles earlier serialization examples but with the exception that we decrement the current requested amount (unless running in unlimited mode) (1).

    // ...
    @Override
    public void onError(Throwable e) {
        synchronized (this) {
            if (emitting) {
                List<Object> q = new ArrayList<>();
                q.add(new ErrorSentinel(e));        // (1)
                queue = q;
                return;
            }
            emitting = true;
        }
        child.onError(e);                           // (2)
    }
    // ...

In onError(), we don't enqueue the Throwable after earlier actions but clear (1) the queue so the very first action executed by the emitLoop() will be the delivery of this error, skipping any further events. If the emission race is won, there is no need to loop for other events or even 'unlock' the emission flag (2).

The implementation of onCompleted() looks quite similar:
    // ...
    @Override
    public void onCompleted() {
        synchronized (this) {
            if (emitting) {
                List<Object> q = new ArrayList<>();
                q.add(COMPLETED_SENTINEL);
                queue = q;
                return;
            }
            emitting = true;
        }
        child.onCompleted();
    }
    // ...

Next comes the request() method's familiar implementation:
    // ...
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();
        }
        if (n == 0) {
            return;
        }
        synchronized (this) {
            if (emitting) {
                List<Object> q = queue;
                if (q == null) {
                    q = new ArrayList<>();
                    queue = q;
                }
                q.add(new RequestSentinel(n));          // (1)
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            long r = requested;
            long u = r + n;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            requested = u;                             // (2)

            Producer p = currentProducer;
            if (p != null) {                           // (3)
                p.request(n);
            }
            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }    
    }
    // ...

We enqueue (1) the requested amount, guaranteed to be positive, in the same way as actions are enqueued in the other methods. The requested amount is incremented (and capped) (2), then we take the current producer and if non-zero, we request the original amount of n (3).

Similarly, the set() producer uses the now-familiar pattern:

    public void set(Producer p) {
        synchronized (this) {
            if (emitting) {
                List<Object> q = queue;
                if (q == null) {
                    q = new ArrayList<>();
                    queue = q;
                }
                q.add(new ProducerSentinel(p));
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            currentProducer = p;
            long r = requested;
            if (p != null && r != 0) {                  // (1)
                p.request(r);
            }
            emitLoop();
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }


If the producer is switched to a non-null producer and the current requested amount is non-zero, we request the entire amount from the new producer (1).

Finally, let's see the implementation of the emitLoop():

    // ...
    void emitLoop() {
        for (;;) {
            List<Object> q;
            synchronized (this) {                             // (1)
                q = queue;
                if (q == null) {
                    emitting = false;
                    return;
                }
                queue = null;
            }
            long e = 0;
            
            for (Object o : q) {
                if (o == null) {                              // (2)
                    child.onNext(null);
                    e++;
                } else
                if (o == COMPLETED_SENTINEL) {                // (3)
                    child.onCompleted();
                    return;
                } else
                if (o.getClass() == ErrorSentinel.class) {    // (4)
                    child.onError(((ErrorSentinel)o).error);
                    return;
                } else
                if (o.getClass() == ProducerSentinel.class) { // (5)
                    Producer p = (Producer)o;
                    currentProducer = p;
                    long r = requested;
                    if (p != null && r != 0) {
                        p.request(r);
                    }
                } else
                if (o.getClass() == RequestSentinel.class) {  // (6)
                    long n = ((RequestSentinel)o).n;
                    long u = requested + n;
                    if (u < 0) {
                        u = Long.MAX_VALUE;
                    }
                    requested = u;
                    Producer p = currentProducer;
                    if (p != null) {
                        p.request(n);
                    }
                } else {                                      // (7)
                    child.onNext((T)o);
                    e++;
                }
            }
            long r = requested;
            if (r != Long.MAX_VALUE) {                        // (8)
                long v = requested - e;
                if (v < 0) {
                    throw new IllegalStateException();
                }
                requested = v;
            }
        }
    }
}

This looks, by far, the most action-packed emitter-loop:

  1. We 'dequeue' the batch of actions.
  2. For each action in the queue, if said item is null, we emit null to the child Subscriber and count the number of emissions.
  3. If the item is equal to the COMPLETED_SENTINEL, no further action should be performed and the loop quits.
  4. Otherwise, we have a non-null value so type-checks can be performed to determine the exact thing to do. If we encounter an ErrorSentinel, the error is unwrapped, emitted, and the loop quits.
  5. If a ProducerSentinel is encountered, the inner producer is unwrapped and stored in currentProducer, and if it is non-null, the entire current request amount is requested through it.
  6. If we encounter a RequestSentinel, the amount is unwrapped, added to the current requested amount (capped) and the unwrapped amount is requested from the current producer if available.
  7. Otherwise, since we shouldn't have any more sentinel types, what remains is a plain non-null element, which we emit to the child subscriber and increment the emission count.
  8. Finally, if the requested amount is not unbounded, we decrement the requested amount by the accumulated emission count and update the instance field.


Perhaps it is not meaningful or even desirable to let a producer produce if it is going to be replaced immediately. For example, in switchOnNext, once two new sources arrive in quick succession, one doesn't really want to start the first one but skip it and start with the second one. You can use the missedProducer approach from the previous part instead of enqueueing a switch-action and you may also consider whether such switch should clear the queue of pending values or not. In addition, one can use the instance fields from ProducerArbiter to save on further allocations by the request and produced actions.

The only thing remaining is the usage example, for example, in a operator that switches among a fixed set of Observables based on a timer:


public static final class SwitchTimer<T> 
implements OnSubscribe<T> {
    final List<Observable<? extends T>> sources;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    public SwitchTimer(
            Iterable<? extends Observable<? extends T>> sources, 
            long time, TimeUnit unit, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.sources = new ArrayList<>();
        this.time = time;
        this.unit = unit;
        sources.forEach(this.sources::add);
    }
    @Override
    public void call(Subscriber<? super T> child) {
        ProducerObserverArbiter<T> poa = 
            new ProducerObserverArbiter<>(child);             // (1)
        
        Scheduler.Worker w = scheduler.createWorker();        // (2)
        child.add(w);
        
        child.setProducer(poa);                                  
        
        SerialSubscription ssub = new SerialSubscription();   // (3)
        child.add(ssub);
        
        int[] index = new int[1];
        
        w.schedulePeriodically(() -> {
            int idx = index[0]++;
            if (idx >= sources.size()) {                      // (4)
                poa.onCompleted();
                return;
            }
            Subscriber<T> s = new Subscriber<T>() {           // (5)
                @Override
                public void onNext(T t) {
                    poa.onNext(t);
                }
                @Override
                public void onError(Throwable e) {
                    poa.onError(e);
                }
                @Override
                public void onCompleted() {
                    if (idx + 1 == sources.size()) {          // (6)
                        poa.onCompleted();
                    }
                }
                @Override
                public void setProducer(Producer producer) {
                    poa.set(producer);
                }
            };

            ssub.set(s);                                      // (7)
            sources.get(idx).unsafeSubscribe(s);
            
        }, time, time, unit);
    }
}

List<Observable<Long>> timers = Arrays.asList(
    Observable.timer(100, 100, TimeUnit.MILLISECONDS),
    Observable.timer(100, 100, TimeUnit.MILLISECONDS)
        .map(v -> v + 20),
    Observable.timer(100, 100, TimeUnit.MILLISECONDS)
        .map(v -> v + 40)
);

Observable<Long> source = Observable.create(
    new SwitchTimer<>(timers, 500, 
    TimeUnit.MILLISECONDS, Schedulers.computation()));
        
source.toBlocking().forEach(System.out::println);

It is constructed as follows:

  1. We create our ProducerObserverArbiter instance.
  2. We create a scheduler-worker instance and add it to the child subscriber to allow cancelling the whole schedule upon child unsubscription.
  3. We will hold onto the Subscriber for the active Observable sequence and chain it with the child subscriber in order to propagate cancellation.
  4. In case the final Observable sequence didn't complete in time, the periodic task will complete the whole sequence.
  5. Otherwise we create a Subscriber for the next Observable (without sharing anything with child).
  6. In the onCompleted() we check if the captured index is the last and if so, we complete by sending an onCompleted() through the arbiter.
  7. The SerialSubscription.set will make sure the previous subscription will be unsubscribed when a new subscription happens to the next source.

Conclusion

In the producers series, I've shown implementation examples ranging from a single-value-producer up to a complete producer-observer-arbiter. With each producer variant, more and more complexity and explanation was given which should help operator writers in developing their custom solutions.

The next primitive I'm going to talk about is the various Subscription containers and show how to implement custom ones in case the standard set isn't adequate for some reason.



3 megjegyzés:

  1. Dear David, I have two questions:
    1. `long n = (Long)o;` should it be `long n = ((RequestSentinel)o).n;` ?
    2. In the `onNext` implementation, why do we introduce a extra local variable `r` instead of use `requested` directly?

    VálaszTörlés
  2. 1) Fixed, thanks
    2) To avoid re-reading those fields which could cause a cache-miss (due to direct change or false sharing) of several dozen cycles

    VálaszTörlés