综述
java.util.concurrent.TransferQueue是Java Collections Framework的成员之一,允许我们根据生产者-消费者模式创建程序,并协调从生产者传递给消费者的消息,其JDK内的实现是java.util.concurrent.LinkedTransferQueue
。
实现实际上类似于BlockingQueue
- 但是为我们提供了实现backpressure
形式的新功能。这意味着,当生产者使用transfer()
方法向使用者发送消息时,生产者将保持阻塞状态,直到消息被消耗为止(tryTransfer()
是非阻塞的)。
可以通过hasWaitingConsumer()
查询是否有任何消费者在等待,与peek()
操作相反。
一个生产者——无消费者
首先,设计一个生产者来测试transfer()
方法 - 预期是生产者将被阻塞,直到消费者使用take()
方法从队列接收消息。
class Producer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private Integer numberOfMessagesToProduce;
public AtomicInteger numberOfProducedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToProduce; i++) {
try {
boolean added
= transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
if(added){
numberOfProducedMessages.incrementAndGet();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// standard constructors
}
我们将一个TransferQueue实例
与我们想要为生产者提供的名称
以及应该传输到队列的元素数
一起传递给构造函数。
请注意,我们使用的是tryTransfer()
方法,并且具有给定的超时。我们正在等待四秒钟
,如果生产者无法在给定的超时内传输消息,则返回false
并继续执行下一条消息。生产者有一个numberOfProducedMessages
变量来跟踪生成的消息数量。
然后,消费者设计如下:
class Consumer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private int numberOfMessagesToConsume;
public AtomicInteger numberOfConsumedMessages
= new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToConsume; i++) {
try {
String element = transferQueue.take();
longProcessing(element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(String element)
throws InterruptedException {
numberOfConsumedMessages.incrementAndGet();
Thread.sleep(500);
}
// standard constructors
}
它与生产者类似,但我们使用take()
方法从队列中接收元素。我们还通过使用longProcessing()
方法模拟一些长时间运行的操作,在该方法中我们递增numberOfConsumedMessages
变量,该变量是接收消息的计数器。
现在,让我们只用一个生产者开始我们的程序:
@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
// when
exService.execute(producer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}
我们希望向队列发送三个元素,但生成器在第一个元素上被阻塞,并且没有消费者从队列中获取该元素。我们正在使用tryTransfer()
方法,该方法将阻塞,直到消息被消耗或达到超时。超时后,它将返回false
以指示传输失败,它将尝试传输下一个。这是上一个示例的输出:
Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...
一个生产者——一个消费者
让我们测试一个生产者和一个消费者的情况:
@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
Consumer consumer = new Consumer(transferQueue, "1", 3);
// when
exService.execute(producer);
exService.execute(consumer);
// then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer.numberOfProducedMessages.intValue(), 3);
assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}
TransferQueue用作交换点
,并且在消费者使用队列中的元素
之前,生产者不能继续向其添加另一个元素。我们来看看程序输出:
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2
我们看到,由于TransferQueue的规范,生成和消耗队列中的元素是顺序的。
多个生产者——多个消费者
在最后一个例子中,我们将考虑拥有多个消费者和多个生产者:
@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages()
throws InterruptedException {
// given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(3);
Producer producer1 = new Producer(transferQueue, "1", 3);
Producer producer2 = new Producer(transferQueue, "2", 3);
Consumer consumer1 = new Consumer(transferQueue, "1", 3);
Consumer consumer2 = new Consumer(transferQueue, "2", 3);
// when
exService.execute(producer1);
exService.execute(producer2);
exService.execute(consumer1);
exService.execute(consumer2);
// then
exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
exService.shutdown();
assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}
在这个例子中,我们有两个消费者和两个生产者。当程序启动时,我们看到两个生产者都可以生成一个元素,然后,它们将阻塞,直到其中一个消费者从队列中获取该元素:
Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2
总结
我们看到了如何使用该构造实现生产者 - 消费者计划。我们使用transfer()
方法创建一种backpressure形式,其中生产者在消费者从队列中检索元素之前不能发布另一个元素。
当我们不希望过度生成的生产者使用消息填充队列,从而导致OutOfMemory错误时,TransferQueue会非常有用。在这种设计中,消费者将决定生产者产生消息的速度。