Studiewijzer module 07
Concurrency
-
ThreadimplementsRunnable.
CyclicBarrier
Een CyclicBarrier laat threads wachten op andere threads. Dit is een handige manier om bijvoorbeeld threads tegelijkertijd te laten beginnen.
Een CyclicBarrier maak je aan via:
CyclicBarrier barrier = new CyclicBarrier(2); // (1)
... doe werk ...
barrier.await(); // (2)
-
Het constructor argument
2is het aantal threads die de Barrier moeten passeren om verder te gaan. -
laat de thread wachten totdat het aantal wachtende threads voor deze barrier de ingestelde limiet bereikt heeft. Alle wachtende threads worden daarna vrijgegeven.
Om live- en deadlocks te voorkomen kun je een timeout meegeven aan de
barrier: barrier.await(10, TimeUnit.SECONDS);
De CyclicBarrier#await(timeout, unit) kan de volgende exceptions
gooien (overgenomen vanuit de JavaDoc):
InterruptedException-
if the current thread was interrupted while waiting
TimeoutException-
if the specified timeout elapses. In this case the barrier will be broken.
BrokenBarrierException-
if another thread was interrupted or timed out while the current thread was waiting, or the barrier was reset, or the barrier was broken when
awaitwas called, or the barrier action (if present) failed due to an exception
ExecutorService
De class Executors is een factory voor executor services. Hiermee
kunnen eenvoudig Runnables en Callables uitgevoerd worden. Een
executor service is eigenlijk een thread pool of een object die het
uitvoeren van meerdere taken tegelijkertijd beheert, zoals hoeveel er
tegelijkertijd plaats kunnen vinden.
Voorbeelden hoe je een ExecutorService aanmaakt:
ExecutorService executorService1 = Executors.newSingleThreadExecutor(); // 1
ExecutorService executorService2 = Executors.newFixedThreadPool(10); // 2
ExecutorService executorService3 = Executors.newScheduledThreadPool(10); // 3
De FixedScheduledThreadPool lijkt op de ScheduledThreadPool. De laatste heeft de mogelijkheid om taken pas uit te voeren na een gespecificeerde tijd.
De methode execute() accepteert alleen een Runnable.
De methode submit() accepteert zowel een Runnable als een
Callable.
Callable heeft een generic, welke de methode call() throws Exception, returned. Runnable is een void
De Execute() is een Void. De submit() levert een Future op. Via future.get() kun je de resultwaarde opvragen. Voor Runnable is dit null, zodra de taak klaar is, voor Callable is dit het geretourneerde object.
Als je klaar bent met de ExecutorService moet je de ExecutorService.shutdown() aanroepen. Deze zorgt ervoor dat er geen nieuwe executes/submits meer geaccepteerd worden. Alles wat al bezig is, gaat gewoon door.
|
Note
|
De combinatie tussen een FixedThreadPool en een CyclicBarrier waarbij de ThreadPool kleiner is dan de Barrier, levert een Deadlock op omdat de Barrier te threads vasthoudt! |
ForkJoinTask
Een ForkJoinTask maak je aan via een JoinForkPool welke je aanmaakt als: ForkJoinPool pool = new ForkJoinPool(numberOfProcessors); of Executors.newWorkStealingPool(numberOfProcessors);.
ForkJoinPool is een implementatie van ExecutorService. Er is een extra submit(ForkJoinTask) en execute(ForkJoinTask).
In OCP komen de subclasses RecursiveTask<T> en RecursiveAction aan bod. RecursiveAction is in principe een RecursiveTask, maar dan zonder return type.
De methode die verplicht geimplementeerd moet worden is public T compute(). De inhoud is vergelijkbaar met een Runnable of een Callable behalve een recursieve toevoeging, wat meestal hier op neerkomt:
protected List< Integer > compute()
{
List< VoorbeeldTask > subTasks = new ArrayList<>();
List< Integer > result = new ArrayList<>();
if(zwareTaak.size()==1)
{
result.add(doeZelf(zwareTaak.remove(0)));
}
else
{
VoorbeeldTask subTask1 = new VoorbeeldTask(eersteHelft(zwareTaak));
VoorbeeldTask subTask2 = new VoorbeeldTask(tweedeHelft(zwareTaak));
subTask1.fork(); // (1)
subTask2.fork(); // (1)
subTasks.add(subTask1);
subTasks.add(subTask2);
}
for(VoorbeeldTask task : subTasks)
{
result.addAll(task.join()); // (2)
}
return result;
}
-
De
forkmethode laat dus een subtaak draaien met daarin een deelprobleem. De later aangeroepenjoin()geeft het resultaat van de subtaken terug en voegt dat bij het eigen resultaat. -
De
submitmethode op de Pool geeft in dit geval ook zelf een taak terug (wat in principe gewoon eenfork()is). Om het eindresultaat te bekijken, kun je hierop eenjoin()aanroepen.
ForkJoinTask< List< Integer > > task = pool.submit(new VoorbeeldTask(beginProbleem));
List< Integer > result = task.join();
Concurrent Collections
-
ConcurrentHashMapis een map met synchronized mutatiemethoden. DeConcurrentHashMapgeeft nog steeds eenConcurrentModificationExceptionbij aanpassen tijdens het itereren. -
CopyOnWriteArrayListgeeft iedere lezer een eigen lijst van de huidige status. EenConcurrentModificationExceptionkan hierbij dus niet optreden. Het schrijven naar de lijst heeft wel een lock. Hetzelfde geldt voor deCopyOnWriteArraySet. -
ConcurrentSkipListSetgarandeert sortering en is de threadsafe equivalent van deTreeSet. Er bestaat ook eenConcurrentSkipListMap.
« Vorige module Volgende module »