Observable的后来者:响应式接口

发布时间:2021-04-10 20:26
最后更新:2021-04-10 20:26
所属分类:
JVM Java

在包java.util中的接口Observer和类Observable提供了非常常用的观察者模式实现,而且这一实现在许多项目中都有着广泛的应用。但是随着Java 9的发布,这一著名实现却被@Deprecated标记了,难道观察者模式已经过时了吗?然而并不是。

Observable类被废弃,其主要原因是这个类并没有实现线程安全,而观察者模式又常常用在多线程项目里,这样的一对矛盾就给项目埋下了不少隐患。

当一个常用的接口被废弃,那么一定会有一个更优化的继任者出现。自Java 9开始,Observable的继任者就是在包java.util.concurrent中定义的Flow系列接口和类,Flow中的四个接口定义了一套响应流式处理的过程。一提到响应流式处理,我们就会自然的想到Rx系列类库,而Flow中定义的系列接口,也的确类似于Rx类库,但是形态比Rx类库要简单,使用却要复杂一些。

Flow中的接口主要有以下这四个:

  • Flow.Publisher<T>:生产者。
  • Flow.Subscriber<T>:消费者。
  • Flow.Processor<T, R>:即作为生产者,又作为消费者的中间处理组件。
  • Flow.Subscription:生产者对于消费者的订阅,是两者之间沟通的桥梁。

消费者

消费者的概念是比较简单的,Flow.Subscriber接口只需要实现以下四个方法。

  • void onSubscribe(Subscription),当消费者被生产者订阅时,即调用这个方法通知消费者订阅已经开始,消费者可以在这里通过Subscription来对订阅做一些操作。
  • void onComplete(),当生产者所产生的内容结束时,可以调用这个方法以示内容生产完结,订阅正常结束。
  • void onError(Throwable),当生产者出现异常时,异常可以通过这个方法抛出给消费者。当这个方法被调用的时候,订阅将会异常结束。
  • void onNext(T),这是消费者处理生产者发送来的数据的主要方法,这个方法每次只处理一个来自于生产者的数据内容。

以下是一个专门接受整型数据并将其打印出来的简单示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class NumberPrinter implements Flow.Subscriber<Long> {
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(50);
    }

    @Override
    public void onComplete() {
        System.out.println("Complete");
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStacktrace();
    }

    @Override
    public void onNext(Long item) {
        System.out.println(item);
    }
}

这个示例已经十分的简单了,里面唯一需要进行额外解释的部分是subscription.request(n)。与Rx库不同,Flow中的接口天生带有回压控制,也就是说作为消费者,是可以通过Subecription来控制生产者发送数据的频率的。消费者每次调用.request(n)方法,其意义都是向生产者发出一个请求,请求生产者发送至多n个数据,或者仅仅是传递一个控制信号等。Subscription中的.request(n)方法是整个Flow的响应式接口中比较核心的部分,在下文中将会详细的说明。

生产者

Flow中的生产者接口Flow.Publisher只需要实现一个方法:void subscribe(Subscriber<T>)。这个方法允许生产者订阅消费者,这与我们平时的理解是不一样的,但是与Rx库中的习惯是相同的。虽然生产者中的接口直接提供了消费者的实例,但是,我们却不能在在生产者中直接调用消费者中的方法,这样调用不会产生任何效果。

所以从这个角度来看,任何实现了生产者接口的类实际上都只是一个订阅调度器,其主要作用是为它订阅的消费者分配订阅(Subscription)。而依据JDK官方文档的说明,每个实现了生产者接口的类,一般都会包含一个实现了Flow.Subscription的静态内部类或者能够为消费者构建不同Subscription接口实例的工厂。

至此,我们又回到了Subscription接口,所以说Subscription接口是整个Flow系列接口的核心。前面也提到了Subscription接口是沟通生产者和消费者的桥梁,所以对于生产者来说,它并不直接持有所有消费者实例,而是持有消费者对应的Subscription实例;对于消费者来说,它也并不直接与生产者打交道,而是通过它所持有的Subscription实例来实施间接的控制。

Subscription接口需要实现两个方法,void request(Long)void cancel()。其中void cancel()十分容易理解,当调用这个方法的时候,代表消费者或者生产者将要取消订阅了,在这个方法里可以放置一些资源清理或者需要其他额外通知的任务。

方法void request(Long)在前面已经提到过了,在消费者中调用可以向生产者发出请求。这个方法实际上本质设计意图是让消费者向生产者声明自己的缓冲区大小,让生产者一次性推送的内容不至于溢出缓冲区。在实际的操作中,void request(Long)里除了包含对于缓冲区的控制以外,还担负着向消费者发送数据的功能,或者控制向消费者发送数据的过程。

这里用一个可以产生斐波那契数列的生产者来做示例,这个生产者使用一个静态内部类来定义Subscription,每次只发送请求数量的数字。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FibonacciProducer implements Flow.Publisher<Long> {
    @Override
    public void subscribe(Flow.Subscribe<? super Long> subscriber) {
        subscriber.onSubscribe(new FibonacciSubscription(subscriber));
    }

    public static class FibonacciSubscription implements Flow.Subscription {
        private final Flow.Subscriber<? super Long> subscriber;
        private long a = 0;
        private long b = 1;

        public FibonacciSubscription(Flow.Subscriber<? super Long> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            if (n > 0) {
                long counter = 0;
                while (counter < n) {
                    this.subscriber.onNext(b);
                    long temp = a;
                    a = b;
                    b += temp;
                    counter++;
                }
            } else {
                subscriber.onError(new IllegalArgumentException());
            }
        }

        @Override
        public void cancel() {
            this.subscriber.onComplete();
        }
    }
}

要试验这个示例,可以在main()函数中完成订阅以后,多次调用消费者的request()方法,就可以观察到生产者和消费者之间的联动关系。这个示例中没有涉及任何多线程的内容,也没有展示Flow系列接口的线程安全能力,读者可自行在示例中加入多线程设计来体验一下。

至于Flow.Processor<T, R>是更加简单的,可以看一下Java的文档,这个接口直接继承了Flow.PublisherFlow.Subscriber,所以这个中间处理接口就是两者的简单结合体,在使用时也只需要结合两者的实现即可。

生产者与消费者的交互示意图

@startuml
autonumber
control Main as m
control Publisher as pub
control Subscriber as sub

m -> pub : 使用subscribe方法订阅
pub -> sub : 调用onSubscribe方法

create control Subscription as ss
pub -> ss : 创建Subscription
ss -> sub : 交予Subscription
activate ss
sub -> ss : 调用request
alt 请求合法
ss -> sub : 控制调用onNext
ss -> sub : 调用onComplete结束
else 请求不合法
ss -> sub : 调用onError传递错误
else 主动结束订阅
sub -> ss : 调用cancel
end
deactivate ss

@enduml

索引标签
JVM
Java
Observable
Concurrent
Flow
Publisher
Subscriber