Java Concurrency - use CyclicBarrier to synchronize tasks across threads

dimitrilc 3 Tallied Votes 158 Views Share

Introduction

This tutorial introduces the CyclicBarrier class and teaches you how to use it.

Goals

At the end of this tutorial, you would have learned:

  1. How to use CyclicBarrier to synchronize tasks across multiple threads.

Prerequisite Knowledge

  1. Intermediate Java.
  2. Basic knowledge of threads.
  3. Executors/ExecutorService.
  4. Lambdas.

Tools Required

  1. A Java IDE with at least JDK 10 support (optionally for LVTI), such as IntelliJ Community Edition.

Concept Overview

CyclicBarrier is a synchronization tool that can be used to synchronize tasks in multiple threads. CyclicBarrier introduces a performance penalty because task(s) in any specific thread is blocked until task(s) in other threads complete, so it is important to weigh the pros and cons when using it.
A CyclicBarrier object keeps count of how many times await() has been called on it. Whenever a thread calls await() on a CyclicBarrier object, that thread is disabled until the CyclicBarrier trips.
The CyclicBarrier object is tripped when the number of threads calling await() is equal to the number that the CyclicBarrier object was initialized with(in the constructor). When a CyclicBarrier object is tripped, all threads waiting are allowed to resume execution.

Scenario

To understand this concept, we are going to create a program to build an apartment building with 3 floors and 4 apartments on each floor.
apartments.png

To simplify the concept, we are going to assume that all the apartments on a floor must be built first before any apartment on the upper floor can be built. It is impossible to build an apartment on the 3rd floor before building apartments on the 1st and 2nd floors because the apartment on the 3rd floor would have no structure to support it.

The picture below depicts the desired logic flow.
barrier.png

The Entry Class

First let us create an Entry class to hold our main() method and two other methods syncBuild() and asyncBuild(). You do not need to understand the code for now as I will explain it later.

package com.example.app;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Entry {
   public static void main(String[] args){
       asyncBuild();
       //syncBuild();
   }

   private static void asyncBuild(){
       final int workerCount = 4; //1

       ExecutorService service = Executors.newFixedThreadPool(workerCount); //2

       try{
           var builder = new ApartmentBuilder(); //3

           for (int i = 0; i < workerCount; i++){ //4
               service.submit(builder::asyncBuildApartments); //5
           }
       } finally {
           service.shutdown(); //6
       }
   }

   private static void syncBuild(){ //7
       final int workerCount = 4;   //8

       ExecutorService service = Executors.newFixedThreadPool(workerCount); //9

       try {
           var builder = new ApartmentBuilder(); //10
           var barrier1 = new CyclicBarrier(workerCount,
                   () -> System.out.println("Done building 1st floor apartments!")); //11
           var barrier2 = new CyclicBarrier(workerCount,
                   () -> System.out.println("Done building 2nd floor apartments!")); //12

           for (int i = 0; i < workerCount; i++){ //13
               service.submit(() -> builder.syncBuildApartments(barrier1, barrier2)); //14
           }
       } finally {
           service.shutdown(); //15
       }
   }
}

The ApartmentBuilder Class

Next we will create the ApartmentBuilder class in the same Entry.java file.

class ApartmentBuilder {
   private void buildFirstFloorApt(){ //16
       System.out.println("Built an Apartment on 1st Floor.");
   }

   private void buildSecondFloorApt(){ //17
       System.out.println("Built an Apartment on 2nd Floor.");
   }

   private void buildThirdFloorApt(){ //18
       System.out.println("Built an Apartment on 3rd Floor.");
   }

   public void syncBuildApartments(CyclicBarrier barrier1, CyclicBarrier barrier2){ //19
       try {
           buildFirstFloorApt(); //20
           barrier1.await();     //21
           buildSecondFloorApt();//22
           barrier2.await();     //23
           buildThirdFloorApt(); //24
       } catch (BrokenBarrierException | InterruptedException e) {
           e.printStackTrace();
       }
   }

   public void asyncBuildApartments(){ //25
           buildFirstFloorApt(); //26
           buildSecondFloorApt();//27
           buildThirdFloorApt(); //28
   }
}

That was quite a lot of code, so I have added numeric comments at the end of some lines to make describing the class content easier. When I refer to line 1, that means the line with the comment //1 and not the line number shown in your IDE or text editor.

  1. The methods on lines 16, 17, and 18 are simple methods that simulate building one apartment on a specific floor at a time. For example, if we want to build enough apartments to fill up the 1st floor, we would have to call buildFirstFloorApt() 4 times.

  2. The asyncBuildApartments() method on line 25 does not make use of any synchronization technique. All it does is building one apartment on each floor and exits. It is not aware of how many workers(threads) are available. This is fine if we only have one worker, but if we have multiple workers working concurrently, then it would be a mess. An experienced worker might complete their tasks faster than a new worker, therefore not satisfying our desired behavior specified in the Scenario section.

  3. To see why there is a problem with asyncBuildApartments() method, we need to look at the asyncBuild() method (in Entry class).
    a. Line 2 is where we create a thread pool with a fixed size that is equal to the number of workers we have.
    b. Line 5 is where we submit new Runnable objects into the thread pool. Because we have 4 workers, we need to submit 4 Runnable lambdas with each calling asyncBuildApartments() once.
    c. When we try to execute asyncBuild() in main(), we would receive output where the workers are completing tasks without waiting on one another at all (output will always be different for each run).

     Built an Apartment on 1st Floor.
     Built an Apartment on 1st Floor.
     Built an Apartment on 2nd Floor.
     Built an Apartment on 1st Floor.
     Built an Apartment on 2nd Floor.
     Built an Apartment on 3rd Floor.
     Built an Apartment on 3rd Floor.
     Built an Apartment on 2nd Floor.
     Built an Apartment on 3rd Floor.
     Built an Apartment on 1st Floor.
     Built an Apartment on 2nd Floor.
     Built an Apartment on 3rd Floor. 
  4. To achieve our desired behavior where all workers must wait until all apartments on each floor are completed before moving to building for the next floor, we would need to use the syncBuildApartments() method declared at line 19.
    a. Take a good look and observe the pattern from line 20 to line 23. Each method that builds an apartment on each floor calls await() before moving onto the next floor. Each call to await() increases the CyclicBarrier counter by one.
    b. The method itself is not aware of how many workers are available or the maximum count of the CyclicBarrier objects. All it cares about is that it needs to call await() after finishing building an apartment on each floor (except for the last(3rd) floor).

  5. The method syncBuild() declared on line 7 is where we would call syncBuildApartments(). syncBuild() and asyncBuild() are similar, but with a few differences on lines 11, 12, and 14.
    a. Because syncBuildApartments() require two CyclicBarrier objects to work properly, we have to create them on lines 11 and 12. In this case we are using the 2nd CyclicBarrier constructor, which optionally executes a Runnable lambda when the barrier is tripped. The first integer argument in the CyclicBarrier constructor is the count until the barrier is tripped.
    b. Line 14 is where we pass the CyclicBarrier objects when calling syncBuild().
    c. When calling syncBuild() in main, we get a result where all workers wait for one another to complete each floor before starting to build on the next floor.

     Built an Apartment on 1st Floor.
     Built an Apartment on 1st Floor.
     Built an Apartment on 1st Floor.
     Built an Apartment on 1st Floor.
     Done building 1st floor apartments!
     Built an Apartment on 2nd Floor.
     Built an Apartment on 2nd Floor.
     Built an Apartment on 2nd Floor.
     Built an Apartment on 2nd Floor.
     Done building 2nd floor apartments!
     Built an Apartment on 3rd Floor.
     Built an Apartment on 3rd Floor.
     Built an Apartment on 3rd Floor.
     Built an Apartment on 3rd Floor.

As we can see, all workers wait for each other before proceeding to the next floor. The Runnable lambdas are also executed as well when the barriers are tripped.

Solution Code

package com.example.app;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Entry {
   public static void main(String[] args){
       //asyncBuild();
       syncBuild();
   }

   private static void asyncBuild(){
       final int workerCount = 4; //1

       ExecutorService service = Executors.newFixedThreadPool(workerCount); //2

       try{
           var builder = new ApartmentBuilder(); //3

           for (int i = 0; i < workerCount; i++){ //4
               service.submit(builder::asyncBuildApartments); //5
           }
       } finally {
           service.shutdown(); //6
       }
   }

   private static void syncBuild(){ //7
       final int workerCount = 4;   //8

       ExecutorService service = Executors.newFixedThreadPool(workerCount); //9

       try {
           var builder = new ApartmentBuilder(); //10
           var barrier1 = new CyclicBarrier(workerCount,
                   () -> System.out.println("Done building 1st floor apartments!")); //11
           var barrier2 = new CyclicBarrier(workerCount,
                   () -> System.out.println("Done building 2nd floor apartments!")); //12

           for (int i = 0; i < workerCount; i++){ //13
               service.submit(() -> builder.syncBuildApartments(barrier1, barrier2)); //14
           }
       } finally {
           service.shutdown(); //15
       }
   }
}

class ApartmentBuilder {
   private void buildFirstFloorApt(){ //16
       System.out.println("Built an Apartment on 1st Floor.");
   }

   private void buildSecondFloorApt(){ //17
       System.out.println("Built an Apartment on 2nd Floor.");
   }

   private void buildThirdFloorApt(){ //18
       System.out.println("Built an Apartment on 3rd Floor.");
   }

   public void syncBuildApartments(CyclicBarrier barrier1, CyclicBarrier barrier2){ //19
       try {
           buildFirstFloorApt(); //20
           barrier1.await();     //21
           buildSecondFloorApt();//22
           barrier2.await();     //23
           buildThirdFloorApt(); //24
       } catch (BrokenBarrierException | InterruptedException e) {
           e.printStackTrace();
       }
   }

   public void asyncBuildApartments(){ //25
           buildFirstFloorApt(); //26
           buildSecondFloorApt();//27
           buildThirdFloorApt(); //28
   }

}

Summary

We have learned how to use CyclicBarrier to synchronize tasks across threads, but there are a few things that we should keep in mind:

  • The barrier limit and the thread pool size should match. If the thread pool count is less than the barrier limit, then your threads will wait indefinitely, resulting in a deadlock.
  • Pay attention to the reusable (hence why it is called cyclic) property of the CyclicBarrier. The barrier count is automatically reset once it is broken. Your code might behave unexpectedly if the thread pool size is larger than the barrier limit.

The full project code can be found here: https://github.com/dmitrilc/DaniWebCyclicBarrier

JamesCherrill commented: The hits keep on coming. Excellent stuff. +0