posts

Mar 10, 2019

实践java.util.concurrent.TransferQueue

Estimated Reading Time: 3 minutes (528 words)

综述

java.util.concurrent.TransferQueueJava Collections Framework的成员之一,允许我们根据生产者-消费者模式创建程序,并协调从生产者传递给消费者的消息,其JDK内的实现是java.util.concurrent.LinkedTransferQueue

实现实际上类似于BlockingQueue - 但是为我们提供了实现backpressure形式的新功能。这意味着,当生产者使用transfer()方法向使用者发送消息时,生产者将保持阻塞状态,直到消息被消耗为止(tryTransfer()是非阻塞的)。

可以通过hasWaitingConsumer()查询是否有任何消费者在等待,与peek()操作相反。

一个生产者——无消费者

首先,设计一个生产者来测试transfer()方法 - 预期是生产者将被阻塞,直到消费者使用take()方法从队列接收消息。

class Producer implements Runnable {
    private TransferQueue<String> transferQueue;
  
    private String name;
  
    private Integer numberOfMessagesToProduce;
  
    public AtomicInteger numberOfProducedMessages
      = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToProduce; i++) {
            try {
                boolean added
                  = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
                if(added){
                    numberOfProducedMessages.incrementAndGet();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // standard constructors
}

我们将一个TransferQueue实例与我们想要为生产者提供的名称以及应该传输到队列的元素数一起传递给构造函数。

请注意,我们使用的是tryTransfer()方法,并且具有给定的超时。我们正在等待四秒钟,如果生产者无法在给定的超时内传输消息,则返回false并继续执行下一条消息。生产者有一个numberOfProducedMessages变量来跟踪生成的消息数量。

然后,消费者设计如下:

class Consumer implements Runnable {
  
    private TransferQueue<String> transferQueue;
  
    private String name;
  
    private int numberOfMessagesToConsume;
  
    public AtomicInteger numberOfConsumedMessages
     = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToConsume; i++) {
            try {
                String element = transferQueue.take();
                longProcessing(element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void longProcessing(String element)
      throws InterruptedException {
        numberOfConsumedMessages.incrementAndGet();
        Thread.sleep(500);
    }

    // standard constructors
}

它与生产者类似,但我们使用take()方法从队列中接收元素。我们还通过使用longProcessing()方法模拟一些长时间运行的操作,在该方法中我们递增numberOfConsumedMessages变量,该变量是接收消息的计数器。

现在,让我们只用一个生产者开始我们的程序:

@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout()
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);

    // when
    exService.execute(producer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}

我们希望向队列发送三个元素,但生成器在第一个元素上被阻塞,并且没有消费者从队列中获取该元素。我们正在使用tryTransfer()方法,该方法将阻塞,直到消息被消耗或达到超时。超时后,它将返回false以指示传输失败,它将尝试传输下一个。这是上一个示例的输出:

Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...

一个生产者——一个消费者

让我们测试一个生产者和一个消费者的情况:

@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages()
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);
    Consumer consumer = new Consumer(transferQueue, "1", 3);

    // when
    exService.execute(producer);
    exService.execute(consumer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 3);
    assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}

TransferQueue用作交换点,并且在消费者使用队列中的元素之前,生产者不能继续向其添加另一个元素。我们来看看程序输出:

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2

我们看到,由于TransferQueue的规范,生成和消耗队列中的元素是顺序的。

多个生产者——多个消费者

在最后一个例子中,我们将考虑拥有多个消费者和多个生产者:

@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages()
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(3);
    Producer producer1 = new Producer(transferQueue, "1", 3);
    Producer producer2 = new Producer(transferQueue, "2", 3);
    Consumer consumer1 = new Consumer(transferQueue, "1", 3);
    Consumer consumer2 = new Consumer(transferQueue, "2", 3);

    // when
    exService.execute(producer1);
    exService.execute(producer2);
    exService.execute(consumer1);
    exService.execute(consumer2);

    // then
    exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
    assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}

在这个例子中,我们有两个消费者和两个生产者。当程序启动时,我们看到两个生产者都可以生成一个元素,然后,它们将阻塞,直到其中一个消费者从队列中获取该元素:

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2

总结

我们看到了如何使用该构造实现生产者 - 消费者计划。我们使用transfer()方法创建一种backpressure形式,其中生产者在消费者从队列中检索元素之前不能发布另一个元素。

当我们不希望过度生成的生产者使用消息填充队列,从而导致OutOfMemory错误时,TransferQueue会非常有用。在这种设计中,消费者将决定生产者产生消息的速度