Studiewijzer module 07
Concurrency


  • Thread implements Runnable.

CyclicBarrier

Een CyclicBarrier laat threads wachten op andere threads. Dit is een handige manier om bijvoorbeeld threads tegelijkertijd te laten beginnen.

Een CyclicBarrier maak je aan via:

CyclicBarrier barrier = new CyclicBarrier(2); // (1)
... doe werk ...
barrier.await(); // (2)
  1. Het constructor argument 2 is het aantal threads die de Barrier moeten passeren om verder te gaan.

  2. laat de thread wachten totdat het aantal wachtende threads voor deze barrier de ingestelde limiet bereikt heeft. Alle wachtende threads worden daarna vrijgegeven.

Om live- en deadlocks te voorkomen kun je een timeout meegeven aan de barrier: barrier.await(10, TimeUnit.SECONDS);

De CyclicBarrier#await(timeout, unit) kan de volgende exceptions gooien (overgenomen vanuit de JavaDoc):

InterruptedException

if the current thread was interrupted while waiting

TimeoutException

if the specified timeout elapses. In this case the barrier will be broken.

BrokenBarrierException

if another thread was interrupted or timed out while the current thread was waiting, or the barrier was reset, or the barrier was broken when await was called, or the barrier action (if present) failed due to an exception

ExecutorService

De class Executors is een factory voor executor services. Hiermee kunnen eenvoudig Runnables en Callables uitgevoerd worden. Een executor service is eigenlijk een thread pool of een object die het uitvoeren van meerdere taken tegelijkertijd beheert, zoals hoeveel er tegelijkertijd plaats kunnen vinden.

Voorbeelden hoe je een ExecutorService aanmaakt:

ExecutorService executorService1 = Executors.newSingleThreadExecutor();   // 1
ExecutorService executorService2 = Executors.newFixedThreadPool(10);      // 2
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);  // 3

De FixedScheduledThreadPool lijkt op de ScheduledThreadPool. De laatste heeft de mogelijkheid om taken pas uit te voeren na een gespecificeerde tijd.

De methode execute() accepteert alleen een Runnable.

De methode submit() accepteert zowel een Runnable als een Callable.

Callable heeft een generic, welke de methode call() throws Exception, returned. Runnable is een void

De Execute() is een Void. De submit() levert een Future op. Via future.get() kun je de resultwaarde opvragen. Voor Runnable is dit null, zodra de taak klaar is, voor Callable is dit het geretourneerde object.

Als je klaar bent met de ExecutorService moet je de ExecutorService.shutdown() aanroepen. Deze zorgt ervoor dat er geen nieuwe executes/submits meer geaccepteerd worden. Alles wat al bezig is, gaat gewoon door.

Note
De combinatie tussen een FixedThreadPool en een CyclicBarrier waarbij de ThreadPool kleiner is dan de Barrier, levert een Deadlock op omdat de Barrier te threads vasthoudt!

ForkJoinTask

Een ForkJoinTask maak je aan via een JoinForkPool welke je aanmaakt als: ForkJoinPool pool = new ForkJoinPool(numberOfProcessors); of Executors.newWorkStealingPool(numberOfProcessors);.

ForkJoinPool is een implementatie van ExecutorService. Er is een extra submit(ForkJoinTask) en execute(ForkJoinTask).

In OCP komen de subclasses RecursiveTask<T> en RecursiveAction aan bod. RecursiveAction is in principe een RecursiveTask, maar dan zonder return type.

De methode die verplicht geimplementeerd moet worden is public T compute(). De inhoud is vergelijkbaar met een Runnable of een Callable behalve een recursieve toevoeging, wat meestal hier op neerkomt:

protected List< Integer > compute()
{
  List< VoorbeeldTask > subTasks = new ArrayList<>();
  List< Integer > result = new ArrayList<>();
  if(zwareTaak.size()==1)
  {
    result.add(doeZelf(zwareTaak.remove(0)));
  }
  else
  {
    VoorbeeldTask subTask1 = new VoorbeeldTask(eersteHelft(zwareTaak));
    VoorbeeldTask subTask2 = new VoorbeeldTask(tweedeHelft(zwareTaak));
    subTask1.fork(); // (1)
    subTask2.fork(); // (1)
    subTasks.add(subTask1);
    subTasks.add(subTask2);
  }
  for(VoorbeeldTask task : subTasks)
  {
    result.addAll(task.join()); // (2)
  }
  return result;
}
  1. De fork methode laat dus een subtaak draaien met daarin een deelprobleem. De later aangeroepen join() geeft het resultaat van de subtaken terug en voegt dat bij het eigen resultaat.

  2. De submit methode op de Pool geeft in dit geval ook zelf een taak terug (wat in principe gewoon een fork() is). Om het eindresultaat te bekijken, kun je hierop een join() aanroepen.

ForkJoinTask< List< Integer > > task = pool.submit(new VoorbeeldTask(beginProbleem));
List< Integer > result = task.join();

Concurrent Collections

  • ConcurrentHashMap is een map met synchronized mutatiemethoden. De ConcurrentHashMap geeft nog steeds een ConcurrentModificationException bij aanpassen tijdens het itereren.

  • CopyOnWriteArrayList geeft iedere lezer een eigen lijst van de huidige status. Een ConcurrentModificationException kan hierbij dus niet optreden. Het schrijven naar de lijst heeft wel een lock. Hetzelfde geldt voor de CopyOnWriteArraySet.

  • ConcurrentSkipListSet garandeert sortering en is de threadsafe equivalent van de TreeSet. Er bestaat ook een ConcurrentSkipListMap.


« Vorige module Volgende module »