Java编程方法论:响应式RxJava与代码设计实战
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

1.5 Java 9中的响应式编程

到了Java 9,JDK开始支持响应式编程。从Java 9的JDK中可以找到java.util. concurrent.Flow类,其中所包含的接口和定义的静态方法就是用来支持Flow控制编程的,并且主要基于里面的Publisher、Subscriber、Subscription等接口来支持响应式编程。

本节会分4部分介绍。第1部分是响应式编程接口的介绍。第2部分是一个Java 9响应式编程入门的简单Demo。第3部分是对JDK的SubmissionPublisher<T>类的源码解读。最后一部分是使用Java 9的响应式编程整合Spring的实战案例,以便让大家深入理解和运用Java 9提供的API,并能够快速运用到自己开发的项目中。

1.5.1 响应式编程接口

如表1-1所示就是对java.util.concurrent.Flow中各个接口组件的功能描述。

表1-1

我们可以通过图1-3来了解整个工作过程。

图1-3

由图1-3可知Publisher用于发布元素,并将元素推送给Processor。Processor再将元素推送给Subscriber,Subscriber通过使用Subscriber::onNext方法来接收元素。

Processor通过调用Subscription::request方法来从Publisher请求元素。而这个动作是在Processor中进行的(此时Processor是作为Subscriber角色存在的),所以箭头指向左;Subscriber::onNext接收并消费元素的动作是在Subscription中进行的,所以箭头指向右。

如果大家还是不太明白的话,则请接着往下看,带着问题思考。若概念理解得还不够透彻,则不利于理解接下来的例子,那么就再深入地看看API。

我们可以看到,Flow.Publisher<T>接口是一个函数式接口(其上有注解@Functional Interface),它只有一个抽象方法public void subscribe(Subscriber<? super T> subscriber);。

通过查看Javadoc可知,如果可能的话,这个方法需要添加一个给定的Flow.Subscriber,如果尝试订阅失败,那么会调用Flow.Subscriber的onError方法来发出一个IllegalStateException类型异常。否则,Flow.Subscriber<T>会调用onSubscribe方法,同时传入一个Flow.Subscription,Subscriber通过调用其所属的Flow.Subscription的request方法来获取元素,也可以调用它的cancel方法来解除订阅。

Flow.Subscriber<T>接口有4个方法,下面对它们进行简单的描述。

● void onSubscribe(Subscription subscription) :在给定的Subscription想要使用Subscriber其他方法的前提下,必须先调用这个方法。

● void onError(Throwable throwable):当Publisher或者Subscription遇到了不可恢复的错误时,调用此方法,然后Subscription就不能再调用Subscriber的其他方法了。

● void onNext(T item):获取Subscription的下一个元素。

● void onComplete:在调用这个方法后,Subscription就不能再调用Subscriber的其他方法了。

Flow.Subscription接口有两个方法,下面对它们进行简单的描述。

● void cancel:调用这个方法造成的直接后果是Subscription会停止接收信息。

● void request(long n):Subscription调用这个方法添加 n个元素。如果 n小于0,Subscriber将收到一个onError信号。如果n等于0,那么调用complete方法,否则调用onNext(T item)方法。

Flow.Processor<T,R>是Subscriber<T>、Publisher<R>的集合体,限于篇幅就不多说了,后面会有专门的章节来介绍。

1.5.2 Java 9响应式编程入门Demo

我们采用JDK包java.util.concurrent下的SubmissionPublisher<T>类的设计理念,模仿创建Flow.Publisher<T>接口的实现。

首先创建自己的Flow.Subscriber<T>接口的实现类:

    public class DockerXDemoSubscriber<T> implements Flow.Subscriber<T>{
        private String name;
        private Flow.Subscription subscription;
        final long bufferSize;
        long count;
        public String getName() {
          return name;
        }
        public Flow.Subscription getSubscription() {
          return subscription;
        }
        public DockerXDemoSubscriber(long bufferSize,String name) {
          this.bufferSize = bufferSize;
          this.name = name;
      }
      public void onSubscribe(Flow.Subscription subscription) {
          //count = bufferSize - bufferSize / 2;
          //在消费一半的时候重新请求
          (this.subscription = subscription).request(bufferSize);
          System.out.println("开始onSubscribe订阅");
          try {
              Thread.sleep(100);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
      public void onNext(T item) {
          //if (--count <= 0) subscription.request(count = bufferSize -
          //bufferSize / 2);
      System.out.println(" ###### " +
          Thread.currentThread().getName()+
          " name: " + name + " item: " + item + " ######");
      System.out.println(name + " received: " + item);
      try {
          Thread.sleep(10);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
    }
      public void onError(Throwable throwable) {
          throwable.printStackTrace();
      }
      public void onComplete() {
          System.out.println("Completed");
      }
    }

接下来对Flow.Publisher<T>接口以及其内的Flow.Subscription接口进行实现:

    public  class  DockerXDemoPublisher<T>  implements  Flow.Publisher<T>,
AutoCloseable {
        private final ExecutorService executor; // daemon-based
        private CopyOnWriteArrayList<DockerXDemoSubscription> list = new
CopyOnWriteArrayList();
      public void submit(T item) {
          System.out.println("********* 开始发布元素item: " + item + "
***********");
          list.forEach(e -> {
              e.future=executor.submit(() -> {
                  e.subscriber.onNext(item);
              });
          });
      }
      public DockerXDemoPublisher(ExecutorService executor) {
          this.executor = executor;
      }
      public void close() {
          list.forEach(e -> {
              e.future=
              executor.submit(() -> { e.subscriber.onComplete();});
          });
      }
      @Override
      public void subscribe(Flow.Subscriber<? super T> subscriber) {
          subscriber.onSubscribe(new ockerXDemoSubscription(subscriber,
executor));
          list.add(new DockerXDemoSubscription(subscriber,executor));
      }
      static class DockerXDemoSubscription<T> implements Flow.Subscription
    {
          private final Flow.Subscriber<? super T> subscriber;
          private final ExecutorService executor;
          private Future<?> future;
          private T item;
          private boolean completed;
          public DockerXDemoSubscription(Flow.Subscriber<? super T>
subscriber,ExecutorService executor) {
              this.subscriber = subscriber;
              this.executor = executor;
          }
          @Override
          public void request(long n) {
              if (n != 0 && !completed) {
                  if (n < 0) {
                    IllegalArgumentException ex = new
IllegalArgumentException();
                    executor.execute(() -> subscriber.onError(ex));
                  } else {
                    future = executor.submit(() -> {
                        subscriber.onNext(item);
                    });
                  }
              } else {
                  subscriber.onComplete();
              }
          }
          @Override
          public void cancel() {
              completed = true;
              if (future != null && !future.isCancelled()) {
                  this.future.cancel(true);
              }
          }
        }
    }

如上述代码所示,我们根据Javadoc中所提到和希望的,只有在将subscriber添加到publisher的时候,它的onSubscribe方法才会被自动调用。

一个需要注意的细节是,SubmissionPublisher<T>类有一个submit(T item)方法。通过查阅Javadoc可知,该方法就是通过异步调用每个订阅它的subscriber的onNext方法将发布的给定元素传送过去的,而当针对subscriber的资源不可用时,阻塞不会中断。这样SubmissionPublisher<T>会提交元素给当前的订阅者(subscriber),直到它关闭为止。本例对其进行了简单的实现,后面会具体讲解。

接着,我们使用demoSubscribe方法创建几个subscriber进行演示:

    private static void demoSubscribe(DockerXDemoPublisher<Integer> publisher,
String subscriberName){
        DockerXDemoSubscriber<Integer> subscriber = new
DockerXDemoSubscriber<>(4L,subscriberName);
       publisher.subscribe(subscriber);
    }

接着通过以下代码片段来使用:

    ExecutorService execService =  ForkJoinPool.commonPool();
    //Executors.newFixedThreadPool(3);
    try (DockerXDemoPublisher<Integer> publisher = new
DockerXDemoPublisher<>(execService)) {
        demoSubscribe(publisher,"One");
        demoSubscribe(publisher,"Two");
        demoSubscribe(publisher,"Three");
        IntStream.range(1,5).forEach(publisher::submit);
    } finally {
        ...
    }

上述代码创建了3个subscriber,通过为每一个subscriber分别指定subscription来连接同一个publisher。倒数第4行表示通过生成一个数字流并使用publisher提交出去,然后每一个subscriber将会通过onNext方法得到并消费元素。

在finally代码块中,没什么我们需要注意的内容,直接来看代码:

    finally {
        try {
          execService.shutdown();
          int shutdownDelaySec = 1;
          System.out.println("………………等待" + shutdownDelaySec + " 秒后结束服
………");
          execService.awaitTermination(shutdownDelaySec,TimeUnit.SECONDS);
        } catch (Exception ex) {
          System.out.println("捕获到execService.awaitTermination()方法的异常:"
+ ex.getClass().getName());
        } finally {
          System.out.println("调用execService.shutdownNow()结束服务...");
          List<Runnable> l = execService.shutdownNow();
          System.out.println("还剩"+l.size() + " 个任务等待执行服务已关闭");
        }
    }

运行代码后会看到如下输出:

    开始onSubscribe订阅
    ###### ForkJoinPool.commonPool-worker-9  name: One  item: null ######
    One received: null
    开始onSubscribe订阅
    ###### ForkJoinPool.commonPool-worker-9  name: Two  item: null ######
    Two received: null
    开始onSubscribe订阅
    ###### ForkJoinPool.commonPool-worker-9  name: Three  item: null ######
    Three received: null
    ***************** 开始发布元素item: 1 *****************
    ***************** 开始发布元素item: 2 *****************
    ***************** 开始发布元素item: 3 *****************
    ***************** 开始发布元素item: 4 *****************
    ###### ForkJoinPool.commonPool-worker-9  name: One  item: 1 ######
    One received: 1
    ###### ForkJoinPool.commonPool-worker-2  name: Two  item: 1 ######
    Two received: 1
    ###### ForkJoinPool.commonPool-worker-4  name: One  item: 2 ######
    One received: 2
    ###### ForkJoinPool.commonPool-worker-11  name: Three  item: 1 ######
    Three received: 1
    ###### ForkJoinPool.commonPool-worker-13  name: Two  item: 2 ######
    Two received: 2
    ###### ForkJoinPool.commonPool-worker-15  name: Three  item: 2 ######
    Three received: 2
    ###### ForkJoinPool.commonPool-worker-6  name: One  item: 3 ######
    One received: 3
    ………………等待1 秒后结束服务………………
     ###### main  name: Two  item: 3 ######
    Two received: 3
     ###### ForkJoinPool.commonPool-worker-9  name: Three  item: 3 ######
    Three received: 3
     ###### ForkJoinPool.commonPool-worker-13  name: One  item: 4 ######
    One received: 4
     ###### ForkJoinPool.commonPool-worker-4  name: Two  item: 4 ######
    Two received: 4
     ###### ForkJoinPool.commonPool-worker-15  name: Three  item: 4 ######
    Three received: 4
    Completed
    Completed
    Completed
    调用execService.shutdownNow()结束服务...
    还剩0 个任务等待执行服务已关闭

如上述代码所示,由于其是异步处理的,整个控制流程会很快到达finally代码块,然后在停止服务前等1秒,这段时间足够生成元素并发送给subscriber。由此,我们可以看到每一个生成的元素都会被发送给各个subscriber。每次在每一个subscriber调用onSubscribe方法时会请求4个元素,此时由于publisher并未发布元素,因此会返回一个null。一旦有元素发布,就会调用subscription内挂载的subscriber的onNext方法。因为DockerXDemoPublisher实现了AutoCloseable,所以我们使用try-with-resources语句来自动关闭DockerXDemoPublisher的资源,在池关闭的时候自动调用close方法对订阅者进行解绑。我们在finally代码块中使用的方法,读者在需要的时候也可以借鉴。

1.5.3 SubmissionPublisher类的源码解读

在前面的例子中,已经提到了SubmissionPublisher类,它是Publisher接口的实现。其内部提供了一个Executor,可以并发地将元素传递给Subscriber(后面会具体介绍)。

我们经常会通过Executors.newFixedThreadPool(int nThreads)和ForkJoinPool.commonPool来获得一个线程池。其中Executors.newFixedThreadPool(int nThreads)用于创建一个指定最大线程数量的线程池,池中的每一个线程除非明确指定要关闭,否则会一直存在。

ForkJoinPool.commonPool是SubmissionPublisher内置的默认Executor,ForkJoinPool. commonPool内部调用了new ForkJoinPool((byte)0);,传入的参数0没什么用,其会通过System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");获取并发线程数。如果并未设置java.util.concurrent.ForkJoinPool.common.parallelism属性,将使用Runtime.getRuntime().availableProcessors() -1,即本机CPU核数-1。如果CPU核支持超线程技术,则核数为CPU的线程数量。现在,大家应该可以理解Demo中的这段代码了。

针对每一个Subscriber,SubmissionPublisher类都使用一个独立的缓冲,其最大值可在创建时进行指定,缓冲大小在使用时根据需要扩展,直到所设定的最大值。如果不设定最大值,则会用defaultbufferSize方法获取该值。SubmissionPublisher类对AutoCloseable接口进行了实现,调用其close方法其实就是调用当前Subscriber的onComplete方法。

我们来看一下其内部BufferedSubscription的定义:

    @SuppressWarnings("serial")
    @jdk.internal.vm.annotation.Contended
    private static final class BufferedSubscription<T> implements
Flow.Subscription,ForkJoinPool.ManagedBlocker {
        // Order-sensitive field declarations
        long timeout;                        // > 0 if timed wait
        volatile long demand;              // # unfilled requests
        int maxCapacity;                    // reduced on OOME
        int putStat;                         // offer result for ManagedBlocker
        volatile int ctl;                   // atomic run state flags
        volatile int head;                 // next position to take
        int tail;                        // next position to put
        Object[] array;                // buffer: null if disabled
    //这里包含了我们要传入的订阅者的信息
   Flow.Subscriber<? super T> subscriber;  // null if disabled
   Executor executor;             // null if disabled
   BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable>
   onNextHandler;
   volatile Throwable pendingError;     // holds until onError issued
   volatile Thread waiter;                // blocked producer thread
   T putItem;                      // for offer within ManagedBlocker
   //这里通过next来构造Publisher的执行链也就是一堆订阅者在此做一个编排
   BufferedSubscription<T> next;         // used only by publisher
   //这里将发送失败需要重试的信息放到一起
   BufferedSubscription<T> nextRetry;   // used only by publisher
   // ctl values
   static final int ACTIVE    = 0x01; // consumer task active
   static final int CONSUME   = 0x02; // keep-alive for consumer task
   static final int DISABLED  = 0x04; // final state
   static final int ERROR     = 0x08; // signal onError then disable
   static final int SUBSCRIBE = 0x10; // signal onSubscribe
   static final int COMPLETE  = 0x20; // signal onComplete when done
   static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
   /**
     * maxBufferCapacity大于这个值时使用默认的初始大小,
     * maxBufferCapacity的大小必须为2n次方
     **/
    static final int DEFAULT_INITIAL_CAP = 32;

大家只需要查看上面代码中的中文注释内容即可,接下来就可以很轻松地看懂下面的close方法了:

    public void close() {
        if (!closed) {
          BufferedSubscription<T> b;
          synchronized (this) {
              //no need to re-check closed here
              b = clients;
              clients = null;
              closed = true;
          }
          while (b != null) {
              BufferedSubscription<T> next = b.next;
              b.next = null;
              b.onComplete();
              b = next;
          }
        }
    }

其实这是移除队列中一个节点的操作。这里可得到这个节点的下一个元素,然后将这个节点置空,将下一个节点指定到这个节点的位置,同时将要移除的节点(也就是Subscriber)结束。

我们来观察里面的subscribe(Flow.Subscriber<? super T> subscriber)方法。在调用这个方法后,会生成一个BufferedSubscription实例,其中包装了subscriber。然后会调用subscription.onSubscribe方法,在这个方法内会调用startOrDisable方法。

在这里,我们可以看到e.execute(new ConsumerTask<T>(this))。其中的ConsumerTask继承自抽象类ForkJoinTask<Void>,并实现了Runnable接口和CompletableFuture.Asynchronous-CompletionTask接口。其构造函数传入的参数是一个BufferedSubscription实例,这样ConsumerTask的run方法其实是调用BufferedSubscription实例的consume方法。而在consume方法里,可以看到我们传入的subscriber实例在此出现,同时里面还调用了checkControl(s,c)方法。这个方法很关键,在此通过s.onSubscribe(this)将BufferedSubscription实例作为参数传入subscriber的onSubscribe中。

在consume方法中,当ctl为SUBSCRIBE状态时,执行checkControl(s,c);当ctl为CONSUME状态时,会在QA中取得所要消费的元素,通过subscriber的onNext方法使用。这个过程是无限循环的,至于QA是如何存值的,在此就不做讨论了,大家可自行查阅源码进行分析:

    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        if (subscriber == null) throw new NullPointerException();
          BufferedSubscription<T> subscription =
              new BufferedSubscription<T>(subscriber,executor,
onNextHandler,maxBufferCapacity);
          synchronized (this) {
          for (BufferedSubscription<T> b = clients,pred = null;;) {
              if (b == null) {
                  Throwable ex;
                  subscription.onSubscribe();
                  if ((ex = closedException) != null)
                      subscription.onError(ex);
                  else if (closed)
                      subscription.onComplete();
                  else if (pred == null)
                      clients = subscription;
                  else
                      pred.next = subscription;
                  break;
              }
              BufferedSubscription<T> next = b.next;
              if (b.isDisabled()) { // remove
                  b.next = null;    // detach
                  if (pred == null)
                      clients = next;
                  else
                      pred.next = next;
              }
              else if (subscriber.equals(b.subscriber)) {
                  b.onError(new IllegalStateException("Duplicate subscribe"));
                  break;
              }
              else
                  pred = b;
              b = next;
            }
        }
    }
    /**
    * Responds to control events in consume().
    */
    private boolean checkControl(Flow.Subscriber<? super T> s,int c) {
        boolean stat = true;
        if ((c & SUBSCRIBE) != 0) {
            if (CTL.compareAndSet(this,c,c & ~SUBSCRIBE)) {
              try {
                  if (s != null)
                      s.onSubscribe(this);
              } catch (Throwable ex) {
                  onError(ex);
              }
            }
        }
        else if ((c & ERROR) != 0) {
            Throwable ex = pendingError;
            ctl = DISABLED;           //no need for CAS
            if (ex != null) {         //null if errorless cancel
              try {
                  if (s != null)
                      s.onError(ex);
              } catch (Throwable ignore) {
              }
          }
        }
        else {
          detach();
          stat = false;
        }
        return stat;
    }

关于SubmissionPublisher,我们还需要了解它的以下3个方法,以便得心应手地使用它为我们服务。

● offer:该方法用于将元素发布给subscriber,subscriber可以异步无阻塞地调用它的onNext方法。同时,这个方法可以在超时的时候放弃一些元素,我们可以指定超时时间。在这里,我们还可以指定放弃处理的规则(其实就是一个BiPredicate条件表达式)。

● submit:该方法可以帮助我们以一个简单的方式来将元素发布给subscriber。从synchronized(this)代码块中的while语句可得知,该方法会阻塞调用,直到资源分配给了当前所有的subscriber。若资源进行了分配但subscriber没拿到,则会重新给,直至所有subscriber都拿到资源。该方法与offer方法的区别是后者有超时机制。

● consume:该方法可以定义请求到的元素要消费的动作(在SubmissionPublisher类定义中有Subscriber接口的内部类实现),接下来我们通过下面的这个例子来清晰明了地进行解释。

    public class ConsumeSubmissionPublisher {
        public static void main(String[] args) throws InterruptedException,
ExecutionException {
          publish();
        }
        public static void publish() throws InterruptedException,Execution
Exception {
          CompletableFuture future = null;
          try (SubmissionPublisher publisher = new SubmissionPublisher
<Long>()) {
              System.out.println("Subscriber  Buffer  Size:  "  +  publisher.
getMaxBufferCapacity());
              future=publisher.consume(System.out::println);
              LongStream.range(1,10).forEach(publisher::submit);
          } finally {
              future.get();
          }
      }
    }

下面的代码片段是SubmissionPublisher::consume方法:

    public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
        if (consumer == null)
          throw new NullPointerException();
        CompletableFuture<Void> status = new CompletableFuture<>();
        subscribe(new ConsumerSubscriber<T>(status,consumer));
        return status;
    }

当调用publisher.consume时,其实就是内部生成一个订阅者对象,并通过subscribe(new ConsumerSubscriber<T>(status,consumer));进行订阅。ConsumerSubscriber是一个通过装饰模式得到的增强类,通过consume方法,我们可以得到一个CompletableFuture实例。这样,就可以通过CompletableFuture实例提供的get方法来做到让应用程序一直运行,直到所有的元素都处理完毕。请看前面例子的运行结果:

    Subscriber Buffer Size: 256
    1
    2
    3
    4
    5
    6
    7
    8
    9

1.5.4 响应式编程整合Spring实战案例

有了上面的Demo做支撑,下面我们就来实现一个经常接触的小场景“订单与库存”。在这里,我们通过Map<Product,StockItem>管理每一种产品的库存数量。因为涉及并发场景,所以使用ConcurrentHashMap保存数据,使用StockItem比单纯地用一个长整型数来表达库存数量可以做更多的事情。设计这个响应式编程的小Demo,是为了让大家看到我们无须处理繁多的并发过程导致的锁,而只需要更多地关心我们的业务。我们通过这种编程模式可以达到一定的解耦效果。

下面开始编码。关于库存的编码是比较简单的,操作一个Map即可:

    @Component
    public class Stock {
        private final Map<Product,StockItem> stockItemMap = new
ConcurrentHashMap<>();
      private StockItem getItem(Product product){
          //如果没有此商品添加一个key,返回null值即可
          stockItemMap.putIfAbsent(product,new StockItem());
          return stockItemMap.get(product);
      }
      public void store(Product product,long amount){
          getItem(product).store(amount);
      }
      public void remove(Product product,long amount)  throws ProductIs
OutOfStock {
          if (getItem(product).remove(amount) != amount)
              throw new ProductIsOutOfStock(product);
      }
    }

我们使用StockItem类来对库存商品数量进行操作,因为数量的变动同时发生在多个线程中,也就是涉及并发操作,所以这就有一点复杂。我们在这里使用了一个原子类来保证线程安全(下单减库存逻辑的代码处有中文注释):

    public class StockItem {
        private final AtomicLong amountItemStock =
          new AtomicLong(0);
        public void store(long n) {
          amountItemStock.accumulateAndGet(n,(pre,mount) -> pre + mount);
        }
        //下单时所需商品数量没超过库存数量的话就用库存数量减去所需商品数量返回此次
        //从库存移除商品的具体数量;超过的话不对库存做任何操作返回此次所移除库存商品
        //的数量即为0
        public long remove(long n) {
          class RemoveData {
              long remove;
          }
          RemoveData removeData = new RemoveData();
          amountItemStock.accumulateAndGet(n,
              (pre,mount) -> pre >= n ?
              pre - (removeData.remove = mount) : pre - (removeData.remove = 0L));
          return removeData.remove;
        }
    }

还要多说两句,我们通过使用原子类AtomicLong维护商品数量,AtomicLong原子类拥有accumulateAndGet方法,这个方法接收一个长整型参数和一个以接收两个长整型参数进行操作的动作,也就是一个Lambda表达式。我们通过accumulateAndGet方法计算出新的库存数量,在通过订单对库存进行移除操作的时候,如果库存数量充足,则正常操作;如果库存数量无法满足订单数量,则不做任何操作。返回的数量如上面的代码注释所示,计算过程是在一个Lambda表达式内进行的,因为作用域的问题,基本类型的值没办法逃逸出去(这样做也保证了计算的无状态性),所以定义了一个内部类来达到想要的效果。

接下来介绍和本章内容相关的StockMaintain类。它实现了Subscriber接口,以便有能力针对订单来维护库存数量:

    public class StockMaintain implements Flow.Subscriber<Order> {
        private static final Logger log = LoggerFactory.
getLogger(StockMaintain.class);
        private Stock stock;
        private Flow.Subscription subscription = null;
        private ExecutorService execService =  ForkJoinPool.commonPool();
        public StockMaintain(@Autowired Stock stock) {
          this.stock = stock;
        }
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
          log.info("******调用onSubscribe******");
          subscription.request(3);
          this.subscription = subscription;
        }
        ...
    }

在订阅生产者之后,StockMaintain产生的对象会立即调用onSubscribe方法(后面可以通过日志清楚地观察到),并传入一个subscription对象,将这个对象存储到我们定义的字段上。在一个元素传递到onNext方法中并处理完毕后,就可以调用这个subscription来请求新的元素。这里只是简单的展示,正常配置应该按照第一个Demo的方式做。

最重要的部分便是onNext方法了,为什么要单独拿出来讲?这主要是为了与第一个Demo有所区分。execService线程池操作既可以用于subscription,也可以用在subscriber的onNext方法中。在这个方法中,我们接收一个订单,并从库存数量中减去订单包含的各种商品的数量。如果订单中有所需商品数量超过此商品库存数量的情况,那么就会产生错误日志。为了保证Demo简单,这里不会涉及更多的逻辑,只是想告诉大家应该怎么维护自己的代码。为了达到异步效果,这里通过ExecutorService进行操作,使用了形如()->{}的Lambda表达式。这是为了达到延迟执行的效果,将其当作一个动作进行传递,让它在一个子线程上执行。

通过查阅submit的源码可知,这个动作会被封装成一个RunnableFuture<V> extends Runnable,Future<V>并返回。这样方便我们获取这个动作在子线程上执行的信息,同时方便操作其行为。而execute(ftask)最后其实就是通过new Thread(r).start来执行的。用一个现实中的场景来讲,就是快递员将快递物品送到你手里,你不会立马使用快递包裹里的东西而让快递员一直等你签收。关于任务的处理过程已经清晰明了地展现在我们面前了,交给系统自己处理吧,而我们要做的就是通过onNext方法获取下一个元素:

    @Override
    public void onNext(Order order) {
    execService.submit(() -> {
        log.info("Thread {}",Thread.currentThread().getName());
        order.getItems().forEach(item -> {
          try {
              stock.remove(item.getProduct(),item.getAmount());
              log.info("{} 件商品从库存消耗",item.getAmount());
          } catch (ProductIsOutOfStock productIsOutOfStock) {
              log.error("商品库存不足");
          }
        });
        subscription.request(1);
    });
    }

AbstractExecutorService中的submit的源码实现如下:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task,null);
        execute(ftask);
        return ftask;
    }

SubmissionPublisher源码中的部分实现如下:

    //default Executor setup; nearly the same as CompletableFuture
    /**
    * Default executor -- ForkJoinPool.commonPool() unless it cannot
    * support parallelism.
    */
    private static final Executor ASYNC_POOL =
        (ForkJoinPool.getCommonPoolParallelism() > 1) ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
    private static final class ThreadPerTaskExecutor implements Executor {
        ThreadPerTaskExecutor(){}      //prevent access constructor creation
        public void execute(Runnable r) { new Thread(r).start(); }
    }

最后需要我们关注的就是测试运行部分的代码了:

    @Test
    public void teststockRemoval() throws InterruptedException {
        Stock stock = new Stock();
        SubmissionPublisher<Order> p = new SubmissionPublisher<>();
        ...
    }

为了避免麻烦,这里我们同样使用了JDK提供的SubmissionPublisher类来做publisher。我们创建了一个Stock类的实例,并在publisher上订阅。此时不会传递任何数据过去,因为还没有发布数据,但是它创建了一个subscriber和publisher之间的桥梁,一旦有元素提交,subscriber就可以接收到元素。

接下来,我们开始向库存中添加商品并创建订单,然后在后续的代码中进行提交。这里是重复提交的动作,展示Demo如下:

    Product product = new Product();
    stock.store(product,40);
    OrderItem item = new OrderItem();
    item.setProduct(product);
    item.setAmount(10);
    Order order = new Order();
    List<OrderItem> items = new LinkedList<>();
    items.add(item);
    order.setItems(items);

我们将订单提交给publisher 10次,也就是下了10个相同的订单,这样也能测试代码的所有功能,包括超过库存数量的拒绝修改的反馈:

    for (int i = 0; i < 10; i++)
        p.submit(order);
    log.info("所有订单已经提交完毕");

在订单都发送完毕之后,我们在这里设定主线程等待,以便子线程完成任务。等待时间的设置是为了让读者更好地观察子线程执行和请求元素的执行情况:

    for (int j = 0; j < 10; j++) {
        log.info("Sleeping a bit...");
        Thread.sleep(10);
    }
    p.close();
    log.info("Publisher已关闭");

测试结果如下:

    17-12-24 01:22:43,161  INFO StockMaintain:33- ******调用onSubscr ibe******
    17-12-24 01:22:43,169  INFO TestStockMaintain:39- 所有订单已经提交完毕
    17-12-24 01:22:43,179  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,187  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-9
    17-12-24 01:22:43,187  INFO StockMaintain:41- Thread ForkJoin Pool.
    commonPool-worker-11
    17-12-24 01:22:43,187  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,190  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,202  INFO StockMaintain:45- 10 件商品从库存消耗
    17-12-24 01:22:43,202  INFO StockMaintain:45- 10 件商品从库存消耗
    17-12-24 01:22:43,206  INFO StockMaintain:41- Thread ForkJoinPool.comm
    onPool-worker-2
    17-12-24 01:22:43,207  INFO StockMaintain:45- 10 件商品从库存消耗
    17-12-24 01:22:43,202  INFO StockMaintain:45- 10 件商品从库存消耗
    17-12-24 01:22:43,209  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,207  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,207  INFO StockMaintain:41- Thread ForkJoinPool.comm
    onPool-worker-4
    17-12-24 01:22:43,212 ERROR StockMaintain:47- 商品库存不足
    17-12-24 01:22:43,222  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,224 ERROR StockMaintain:47- 商品库存不足
    17-12-24 01:22:43,225  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-2
    17-12-24 01:22:43,226 ERROR StockMaintain:47- 商品库存不足
    17-12-24 01:22:43,228  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-13
    17-12-24 01:22:43,210  INFO StockMaintain:41- Thread ForkJoinPool.com
    monPool-worker-9
    17-12-24 01:22:43,229 ERROR StockMaintain:47- 商品库存不足
    17-12-24 01:22:43,227  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,214 ERROR StockMaintain:47- 商品库存不足
    17-12-24 01:22:43,231 ERROR StockMaintain:47- 商品库存不足
    17-12-24 01:22:43,244  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,256  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,268  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,279  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,290  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,301  INFO TestStockMaintain:41- Sleeping a bit...
    17-12-24 01:22:43,312  INFO TestStockMaintain:45- Publisher已关闭
    17-12-24 01:22:43,312  INFO StockMaintain:61-  调用onComplete

我们用Subscriber做对外交流的接口,存储操作业务逻辑并交给一个专门的类去处理。这里的Stock是真正的compute计算类业务(比较复杂),会单独拿出来做封装(如果读者做过前端的Vue开发工作,应该会有更深入的理解)。最后我们通过Publisher将生产和消费连接起来,而且真正地做到了订单多线程并发处理。理解了这些内容,就能在后面的章节中驾轻就熟地掌握一些新的东西。另外,代码中一些注解的使用,包括为什么这样用,为什么要在Stock上加上一个@Component注解,为什么在构造函数中用@Autowired注解,以及其他更多细节,就留给读者自己思考学习吧。这也是本书要与Spring进行整合的目的。