Помощна програма за паралелна Java с JCTools

1. Общ преглед

В този урок ще представим библиотеката JCTools (Java Concurrency Tools).

Просто казано, това осигурява редица полезни структури от данни, подходящи за работа в многонишкова среда.

2. Неблокиращи алгоритми

Традиционно многонишковият код, който работи върху изменяемо споделено състояние, използва ключалки, за да осигури последователност на данните и публикации (промени, направени от една нишка, които са видими за друга).

Този подход има редица недостатъци:

  • нишките могат да бъдат блокирани в опит да получат заключване, без да напредват, докато операцията на друга нишка не приключи - това ефективно предотвратява паралелизма
  • колкото по-тежък е спорът за заключване, толкова повече време JVM прекарва в работа с нишки за планиране, управление на спорове и опашки от чакащи нишки и по-малко реална работа, която прави
  • блокировки са възможни, ако са включени повече от една ключалка и те бъдат придобити / освободени в грешен ред
  • възможна е опасност от инверсия на приоритет - нишка с висок приоритет се заключва в опит да се получи заключване, държана от нишка с нисък приоритет
  • по-голямата част от времето се използват грубозърнести брави, което вреди много на паралелизма - фино заключването изисква по-внимателен дизайн, увеличава заключването над главите и е по-податливо на грешки

Алтернатива е да се използва неблокиращ алгоритъм, т.е. алгоритъм, при който повреда или спиране на която и да е нишка не може да причини повреда или спиране на друга нишка .

Неблокиращият алгоритъм е без заключване, ако поне една от участващите нишки е гарантирана, че ще постигне напредък за произволен период от време, т.е. блокировки не могат да възникнат по време на обработката.

Освен това тези алгоритми са без изчакване, ако има и гарантиран напредък по нишка.

Ето пример за неблокиращ стек от отличната книга на Java Concurrency in Practice; той определя основното състояние:

public class ConcurrentStack { AtomicReference
    
      top = new AtomicReference
     
      (); private static class Node { public E item; public Node next; // standard constructor } }
     
    

И също така няколко API метода:

public void push(E item){ Node newHead = new Node(item); Node oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead)); } public E pop() { Node oldHead; Node newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }

Можем да видим, че алгоритъмът използва за сравнение, и суап (CAS) инструкции-прецизен и е за заключване без (дори и множество нишки наричат top.compareAndSet () едновременно, един от тях със сигурност ще бъде успешна), но не Чакай безплатно, тъй като няма гаранция, че CAS в крайна сметка ще успее за конкретна нишка.

3. Зависимост

Първо, нека добавим зависимостта JCTools към нашия pom.xml :

 org.jctools jctools-core 2.1.2 

Моля, обърнете внимание, че най-новата налична версия е налична в Maven Central.

4. Опашки за JCTools

Библиотеката предлага редица опашки, които да се използват в среда с много нишки, т.е. една или повече нишки записват в опашка и една или повече нишки се четат от нея по начин, безопасен за заключване.

Общият интерфейс за всички реализации на Queue е org.jctools.queues.MessagePassingQueue .

4.1. Видове опашки

Всички опашки могат да бъдат категоризирани според техните политики за производители / потребители:

  • единичен производител, единичен потребител - такива класове се именуват с помощта на префикса Spsc , например SpscArrayQueue
  • единичен производител, множество потребители - използвайте префикса Spmc , например SpmcArrayQueue
  • множество производители, един потребител - използвайте Mpsc префикс, например MpscArrayQueue
  • множество производители, множество потребители - използвайте префикса Mpmc , например MpmcArrayQueue

Важно е да се отбележи, че вътрешно няма проверки на правилата, т.е. опашката може да се повреди безшумно в случай на неправилно използване .

Например тестът по-долу попълва опашка с един производител от две нишки и преминава, въпреки че потребителят не е гарантирано да вижда данни от различни производители:

SpscArrayQueue queue = new SpscArrayQueue(2); Thread producer1 = new Thread(() -> queue.offer(1)); producer1.start(); producer1.join(); Thread producer2 = new Thread(() -> queue.offer(2)); producer2.start(); producer2.join(); Set fromQueue = new HashSet(); Thread consumer = new Thread(() -> queue.drain(fromQueue::add)); consumer.start(); consumer.join(); assertThat(fromQueue).containsOnly(1, 2);

4.2. Реализации на опашката

Обобщавайки класификациите по-горе, ето списъкът на опашките JCTools:

  • SpscArrayQueue - единичен производител, единичен потребител, използва масив вътрешно, обвързан капацитет
  • SpscLinkedQueue - единичен производител, единичен потребител, използва вътрешно свързан списък, несвързан капацитет
  • SpscChunkedArrayQueue - единичен производител, единичен потребител, започва с първоначален капацитет и нараства до максимален капацитет
  • SpscGrowableArrayQueue - единичен производител, единичен потребител, започва с първоначален капацитет и нараства до максимален капацитет. Това е същият договор като SpscChunkedArrayQueue , единствената разлика е вътрешното управление на парчета. Препоръчително е да използвате SpscChunkedArrayQueue, защото има опростено изпълнение
  • SpscUnboundedArrayQueue - единичен производител, единичен потребител, използва масив вътрешно, несвързан капацитет
  • SpmcArrayQueue - единичен производител, множество потребители, използва масив вътрешно, обвързан капацитет
  • MpscArrayQueue - множество производители, един потребител, използва масив вътрешно, обвързан капацитет
  • MpscLinkedQueue - множество производители, един потребител, използва вътрешно свързан списък, несвързан капацитет
  • MpmcArrayQueue - множество производители, множество потребители, използва масив вътрешно, обвързан капацитет

4.3. Атомни опашки

Всички опашки, споменати в предишния раздел, използват sun.misc.Unsafe . С появата на Java 9 и JEP-260 обаче този API става недостъпен по подразбиране.

И така, има алтернативни опашки, които използват java.util.concurrent.atomic.AtomicLongFieldUpdater (публичен API, по-малко ефективни) вместо sun.misc.Unsafe .

They are generated from the queues above and their names have the word Atomic inserted in between, e.g. SpscChunkedAtomicArrayQueue or MpmcAtomicArrayQueue.

It's recommended to use ‘regular' queues if possible and resort to AtomicQueues only in environments where sun.misc.Unsafe is prohibited/ineffective like HotSpot Java9+ and JRockit.

4.4. Capacity

All JCTools queues might also have a maximum capacity or be unbound. When a queue is full and it's bound by capacity, it stops accepting new elements.

In the following example, we:

  • fill the queue
  • ensure that it stops accepting new elements after that
  • drain from it and ensure that it's possible to add more elements afterward

Please note that a couple of code statements are dropped for readability. The complete implementation can be found over on GitHub:

SpscChunkedArrayQueue queue = new SpscChunkedArrayQueue(8, 16); CountDownLatch startConsuming = new CountDownLatch(1); CountDownLatch awakeProducer = new CountDownLatch(1); Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue(); }); producer.start(); startConsuming.await(); Set fromQueue = new HashSet(); queue.drain(fromQueue::add); awakeProducer.countDown(); producer.join(); queue.drain(fromQueue::add); assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));

5. Other JCTools Data Structures

JCTools offers a couple of non-Queue data structures as well.

All of them are listed below:

  • NonBlockingHashMap a lock-free ConcurrentHashMap alternative with better-scaling properties and generally lower mutation costs. It's implemented via sun.misc.Unsafe, so, it's not recommended to use this class in a HotSpot Java9+ or JRockit environment
  • NonBlockingHashMapLong like NonBlockingHashMap but uses primitive long keys
  • NonBlockingHashSet a simple wrapper around NonBlockingHashMaplike JDK's java.util.Collections.newSetFromMap()
  • NonBlockingIdentityHashMap like NonBlockingHashMap but compares keys by identity.
  • NonBlockingSetInta multi-threaded bit-vector set implemented as an array of primitive longs. Works ineffectively in case of silent autoboxing

6. Performance Testing

Let's use JMH for comparing the JDK's ArrayBlockingQueue vs. JCTools queue's performance. JMH is an open-source micro-benchmark framework from Sun/Oracle JVM gurus which protects us from indeterminism of compiler/jvm optimization algorithms). Please feel free to get more details on it in this article.

Note that the code snippet below misses a couple of statements in order to improve readability. Please find the complete source code on GitHub:

public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } } }

Results (excerpt for the 95th percentile, nanoseconds per-operation):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

We can see thatMpmcArrayQueue performs just slightly better than MpmcAtomicArrayQueue and ArrayBlockingQueue is slower by a factor of two.

7. Drawbacks of Using JCTools

Using JCTools has an important drawback – it's not possible to enforce that the library classes are used correctly. For example, consider a situation when we start using MpscArrayQueue in our large and mature project (note that there must be a single consumer).

Unfortunately, as the project is big, there is a possibility that someone makes a programming or configuration error and the queue is now read from more than one thread. The system seems to work as before but now there is a chance that consumers miss some messages. That is a real problem which might have a big impact and is very hard to debug.

Ideally, it should be possible to run a system with a particular system property which forces JCTools to ensure thread access policy. E.g. local/test/staging environments (but not production) might have it turned on. Sadly, JCTools does not provide such a property.

Друго съображение е, че въпреки че се уверихме, че JCTools е значително по-бърз от аналога на JDK, това не означава, че нашето приложение печели същата скорост, колкото започваме да използваме реализациите на персонализирана опашка. Повечето приложения не обменят много обекти между нишките и са обвързани предимно с I / O.

8. Заключение

Сега имаме основно разбиране за класовете на помощните програми, предлагани от JCTools, и видяхме колко добре се представят те в сравнение с аналозите на JDK при голямо натоварване.

В заключение, струва си да използваме библиотеката само ако обменяме много обекти между нишки и дори тогава е необходимо да бъдем много внимателни, за да запазим политиката за достъп до нишки.

Както винаги, пълният изходен код за горните проби може да бъде намерен в GitHub.