Monday, May 5, 2014

Java Consumer Producer example (without using Java BlockingQueue)

I found very interesting and nice blocking queue implementation.
Off course you can use Java BlockingQueue (http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html) implementation,
but if you want to learn something new along the way why not implement it by yourself?

I left some original comments and add some of my own.
Enjoy!

public class BlockingQueueTest {
    private final BlockingQ bq = new BlockingQ();
    /**
     The Worker thread is not very robust.  If a RuntimeException
     occurse in the run method, the thread will stop.
     */
    private class Worker extends Thread {
        public Worker(String name) { super(name); start(); }
        public void run() {
            try {
                //This loop will continue to take next runnable task from queue or
                //will block on pop() method.
                while(!isInterrupted()) {
                    ((Runnable)bq.pop()).run();
                }
                //This will spit out the "poison pill". :)
            } catch(InterruptedException ex) {}
            System.out.println(getName() + " finished");
        }
    }
    public BlockingQueueTest() {
        // We create 10 threads as workers
        Thread[] workers = new Thread[10];
        for (int i=0; i < workers.length; i++)
            workers[i] = new Worker("Worker Thread " + i);

        // We then push 100 commands onto the queue
        for (int i=0; i<100; i++) {
            final String msg = "Task " + i + " completed";
            bq.push(new Runnable() {
                public void run() {
                    System.out.println(msg);
                    // Sleep a random amount of time, up to 1 second
                    try { Thread.sleep((long)(Math.random()*1000)); }
                    catch(InterruptedException ex) { }
                }
            });
        }

        // We then push one "poison pill" onto the queue for each
        // worker thread, which will only be processed once the other
        // tasks are completed.
        for (int i=0; i < workers.length; i++) {
            bq.push(new Runnable() {
                public void run() {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // Lastly we join ourself to each of the Worker threads, so
        // that we only continue once all the worker threads are
        // finished.
        for (int i=0; i < workers.length; i++) {
            try {
                workers[i].join();
            } catch(InterruptedException ex) {}
        }
        System.out.println("BlockingQueueTest finished");
    }
    public static void main(String[] args) throws Exception{
        new BlockingQueueTest();
    }
}

class BlockingQ {
    /**
     It makes logical sense to use a linked list for a FIFO queue,
     although an ArrayList is usually more efficient for a short
     queue (on most VMs).
     */
    private final LinkedList queue = new LinkedList();
    /**
     This method pushes an object onto the end of the queue, and
     then notifies one of the waiting threads.
     */
    public void push(Object o) {
        synchronized(queue) {
            queue.add(o);
            //This will notify blocked thread (on queue.wait() bellow) to continue.
            queue.notify();
        }
    }
    /**
     The pop operation blocks until either an object is returned
     or the thread is interrupted, in which case it throws an
     InterruptedException.
     */
    public Object pop() throws InterruptedException {
        //Racing condition starts here.
        synchronized(queue) {
            //Thread will wait if queue is empty.
            while (queue.isEmpty()) {
                queue.wait();
            }
            return queue.removeFirst();
        }
    }
}
This implementation was taken from: http://www.javaspecialists.eu/archive/Issue016.html

No comments:

Post a Comment