Monday, August 13, 2012

Java Concurrency

Introduction

Thread is similar to lightweight OS process except that can access shared memory.
Every thread has each own memory cache. Java runs in its own process and threads in Java runs within this process. Java have native support for multithreaded programming. This is one of many reasons why Java currently hold its status as best overall programming language.

Multithreaded programming is about speed and handling deadlocks (concurrent access) and safety issues (incorrect data when threads read old or inconsistent data).

Java provides several locking mechanism that will force exclusivity (only one thread is accessing certain parts of code).

Let’s look at some strategies how we can do concurrent programming in Java.

But first we will need some fairly long task so we can see differences in performance for each strategy.
For this I created something called FibonaciCalculator that will create array of Fibonaci numbers. Generation of Fibonaci array definitely cannot be characterized as CPU intensive task so I added Thread.sleep between each iteration to slow it down.


public class FibonaciCalculator {

    public static final int LONG_TASK_SIMULATED_DELAY = 150;

    public static int calculateFibonaciNumber(int n) {
        int previous = 0;
        int result = 1;
        for (int i = 0; i <= n; i++) {
            simulateCostlyTask();
            int sum = result + previous;
            previous = result;
            result = sum;
        }
        return result;
    }

    private static void simulateCostlyTask() {
        try {
            Thread.sleep(LONG_TASK_SIMULATED_DELAY);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Single threaded strategy

First we will look at simple single threaded model. This is normal non-recursive implementation of Fibonaci number algorithm in Java.
public class SingleThreadedFibonaciArrayConstructor extends FibonaciArrayConstructor{

    protected int[] constructFibonaciNumbersArray(int size) {
        int[] fibonaciNumbers = new int[size];
        for (int i = 0; i < fibonaciNumbers.length; i++) {
            fibonaciNumbers[i] = FibonaciCalculator.calculateFibonaciNumber(i);
        }
        return fibonaciNumbers;
    }       

}


Simple multi threaded strategy

Second strategy is simple brute force multi threaded implementation. In this example we will use base means for concurrency in Java: “java.lang.Threads”. We will implement Runnable interface and create new thread for each runnable instance (in our case for each Fibonaci number calculation in array). Because each thread has access to currentIndex field we need to synchronize modification (incrementation) and retrieval on it. To achieve this we are using synchronization method. Synchronization is simplest way to lock certain part of code. This keyword ensure that only single thread can access marked section of code.
public class SimpleMultiThreadedFibonaciArrayConstructor extends FibonaciArrayConstructor{

    private int[] fibonaciNumbers;
    private int currentIndex = 0;

    protected int[] constructFibonaciNumbersArray(int size) {    
        fibonaciNumbers = new int[size];        

        List<thread> threads = new ArrayList<thread>();

        for (int i = 0; i < size; i++) {
            Runnable task = new MyRunnable();
            Thread worker = new Thread(task);
            worker.setName(String.valueOf(i));
            worker.start();
            threads.add(worker);
        }

        int running;
        do {
            running = 0;
            for (Thread thread : threads) {
                if (thread.isAlive()) {
                    running ++;
                }
            }
        } while (running > 0);
        

        return fibonaciNumbers;
    }

    private synchronized int getNextIndex() {
        return currentIndex++;
    }

    private class MyRunnable implements Runnable {
                
        public void run(){
            int nextIndex = getNextIndex();
            int calculatedFibonaci = FibonaciCalculator.calculateFibonaciNumber(nextIndex);
            fibonaciNumbers[nextIndex] = calculatedFibonaci;
        }

    }
}
Thread pool strategy

In previous example we are using maximum number of threads (size of array equals number of threads) for calculating this Fibonaci array. This can have disadvantages because it can cause performance overhead for creating new thread and switching between these threads. java.util.concurrent package offers improved support for concurrent programming. When we use this package we create pool of worker threads. The thread pools contains a work queue which holds tasks waiting to get executed. This thread pool can be described as a collection of runnables (work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add runnables to the work queue. The Executor framework provides example implementation of the java.util.concurrent.Executor interface, e.g. Executors.newFixedThreadPool(int n) which will create n worker threads.
public class ThreadPooledMultiThreadedFibonaciArrayConstructor extends FibonaciArrayConstructor {

    private int[] fibonaciNumbers;
    private AtomicInteger currentIndex = new AtomicInteger(0);

    protected int[] constructFibonaciNumbersArray(int size) {
        fibonaciNumbers = new int[size];

        ExecutorService executor = Executors.newFixedThreadPool(5);

        for (int i = 0; i < size; i++) {
            Runnable worker = new MyRunnable();
            executor.execute(worker);
        }

        executor.shutdown();

        while (!executor.isTerminated()) {
            
        }

        return fibonaciNumbers;
    }

    

    private class MyRunnable implements Runnable {

        public void run(){
            int nextIndex = currentIndex.getAndAdd(1);
            int calculatedFibonaci = FibonaciCalculator.calculateFibonaciNumber(nextIndex);
            fibonaciNumbers[nextIndex] = calculatedFibonaci;
        }

    }
}

Please note that we are not using synchronized method anymore but we are using AtomicInteger type which is part of Java nonblocking algorithm support that is based on low-level atomic hardware primitives. getAndAdd(1) will retrieve value and modify it (in his case add one to it) as a single operation, so we do not need to synchronize it because there is no danger of dirty read or deadlock.

Feature and callables strategy

In case you expect your threads to return a computed result you can use java.util.concurrent.Callable. Callables allow to return values after competition. If you submit a callable to an executor the framework returns a java.util.concurrent.Future. This futures can be used to check the status of a callable and to retrieve the result from the callable.

In both previous example we are force using stuff like non blocking algorithm support (AtomicInteger) or method synchronization because we have shared variable (currentIndex). Threads also share Fibonaci array, but there are no synchronization issues. Wouldn’t be better if we remove dependency for this currentIndex variable? Yes of course it will be. We can do this by creating defensive copies of our thread execution algorithm.

public class FeaturesAndCallablesMultiThreadedFibonaciArrayConstructor extends FibonaciArrayConstructor{
        
    protected int[] constructFibonaciNumbersArray(int size) {

        int[] fibonaciNumbers = new int[size];

        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Future<Integer>> list = new ArrayList<Future<Integer>>();

        for (int i = 0; i < size; i++) {
            Callable<Integer> worker = new MyCallable(i);            
            Future<Integer> submit = executor.submit(worker);
            list.add(submit);
        }

        for (int i = 0; i < list.size(); i++) {
            try {
                Future<Integer> future = list.get(i);
                int futureResult = future.get();                
                fibonaciNumbers[i] = futureResult;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
        
        return fibonaciNumbers;
    }

    private class MyCallable implements Callable<Integer> {

        int currentIndex;

        public MyCallable(int currentIndex) {
            this.currentIndex = currentIndex;
        }

        public Integer call() throws Exception {            
            return FibonaciCalculator.calculateFibonaciNumber(currentIndex);
        }
    }
}

Testing and conclusion 

test result summary:


Calculation with SingleThreadedFibonaciArrayConstructor took 48 seconds. (1 thread)
Calculation with SimpleMultiThreadedFibonaciArrayConstructor took 3 seconds. (25 threads)
Calculation with ThreadPooledMultiThreadedFibonaciArrayConstructor took 11 seconds. (5 threads)
Calculation with FeaturesAndCallablesMultiThreadedFibonaciArrayConstructor took 11 seconds. (5 threads)


When we run all these implementations through test, we get following results. You can see that off course the more threads we have the execution is faster. In third and fourth implementation we are using 5 thread as opposite to first where we are using 25 threads.

Java 7 Fork-Join mechanism 

Java 7 introduce a new parallel mechanism for compute intensive tasks, the fork-join framework. The fork-join framework allows you to distribute a certain task on several workers and then wait for the result. But I have not yet install Java 7 (who does?), so it will not be described in this thread.

Complete implementation (with tests) can be found here.

No comments:

Post a Comment