Concurrent

Thread

Why need thread?

  • Sometimes, we need to run the task on background, such as: generating report, getting data from database, we don't want to do this in the main thread instead of doing it by creating a new thread

Cycle

  • We need to create a new thread , runnable class which is responsible for executing the task

	// main
	public static void main(String[] args){

		Countdown countdown = new Countdown();
		CustomRunnable customRunnable = new CustomRunnable(countdown);

		Thread thread1 = new Thread(customRunnable);
		Thread thread2 = new Thread(customRunnable);
		thread1.setName("thread1");
		thread2.setName("thread2");
		thread1.start();
		thread2.start();

	}
public class CustomRunnable implements Runnable{
    private Countdown countdown;

    public CustomRunnable(Countdown countdown) {
        this.countdown = countdown;
    }

    @Override
    public void run() {
        countdown.startCount();
    }
}

Method

Join

  • Wait for other thread to finish for several time

	Thread thread3 = new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("Thread 3 start");
			}
		});

	Thread thread4 = new Thread(new Runnable() {
		@Override
		public void run() {
			try {
				thread3.join();
				System.out.println("Thread 4 start");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	});

	thread4.start();
	thread3.start();
	
	// Output:
	// Thread 3 start
	// Thread 4 start

Interrupt

  • To wake the sleeping thread up

Thread thread3 = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					System.out.println("Thread 3 wake up");
				}
				System.out.println("Thread 3 start");
			}
		});

		Thread thread4 = new Thread(new Runnable() {
			@Override
			public void run() {
				thread3.interrupt();
				System.out.println("Thread 4 start");
			}
		});

		thread4.start();
		thread3.start();
		
		// Output:
		// Thread 4 start 
		// Thread 3 wake up
		// Thread 3 start

Racing Condition

  • As threads shares the same heap, if threads access the same objects at the same time, racing condition/ thread interference will be occur. We cannot ensure the order of event

public class Countdown {
    private Integer count = 0;
    public void startCount(){
        while (count < 10) {
            System.out.println("Thread" + Thread.currentThread().getName() + " Count : " + count);
            count++;
        }
    }
}
public static void main(String[] args){

		Countdown countdown = new Countdown();
		CustomRunnable customRunnable = new CustomRunnable(countdown);

		Thread thread1 = new Thread(customRunnable);
		Thread thread2 = new Thread(customRunnable);

		thread1.setName("thread1");
		thread2.setName("thread2");
		thread1.start();
		thread2.start();
	}
public class CustomRunnable implements Runnable{
    private Countdown countdown;

    public CustomRunnable(Countdown countdown) {
        this.countdown = countdown;
    }

    @Override
    public void run() {
        countdown.startCount();
    }
}
Output
  • Countdown Object is accessed by threads at the same time, racing condition is happened

Synchronization

  • To make a scope of the code to ensure that one thread can run, other thread keep waiting until releasing the scope by thread

public class Countdown {

    private Integer count = 0;
    public void startCount(){
        synchronized (count) {
            while (count < 10) {
                System.out.println("Thread" + Thread.currentThread().getName() + " Count : " + count);
                count++;
            }
        }
    }
}
Output

DeadLock

  • The task of 2 threads rely on the release of the lock to finish task at the same time, therefore, the task of 2 threads cannot be finished, the object lock will be kept forever

public class Deadlock {
    Object lock1 = new Object();
    Object lock2 = new Object();

    public void test1(){
        synchronized (lock1){
            System.out.println("Test1 Holding lock 1");
            synchronized (lock2){
                System.out.println("Test1 Holding lock 2");
            }
        }
    }
    public void test2(){
        synchronized (lock2){
            System.out.println("Test2 Holding lock 2");
            synchronized (lock1){
                System.out.println("Test2 Holding lock 1");
            }
        }
    }
}
public class DeadLockRunnable implements Runnable{
    private Deadlock deadlock;

    public DeadLockRunnable(Deadlock deadlock) {
        this.deadlock = deadlock;
    }

    @Override
    public void run() {
        deadlock.test1();
    }
}
		// main
		Deadlock deadlock = new Deadlock();
		DeadLockRunnable deadLockRunnable  = new DeadLockRunnable(deadlock);
		DeadLockRunnable2 deadLockRunnable2 = new DeadLockRunnable2(deadlock);
		Thread thread1 = new Thread(deadLockRunnable);
		Thread thread2 = new Thread(deadLockRunnable2);

		thread1.setName("thread1");
		thread2.setName("thread2");
		thread1.start();
		thread2.start();
		
		//output:
		// test2 Holding lock2
		// test1 Holding lock1

LiveLock

  • The task of 2 threads will keep looping and failed to release the lock

public class LivelockExample {

    private Lock lock1 = new ReentrantLock(true);
    private Lock lock2 = new ReentrantLock(true);

    public static void main(String[] args) {
        LivelockExample livelock = new LivelockExample();
        new Thread(livelock::operation1, "T1").start();
        new Thread(livelock::operation2, "T2").start();
    }

    public void operation1() {
        while (true) {
            tryLock(lock1, 50);
            print("lock1 acquired, trying to acquire lock2.");
            sleep(50);

            if (tryLock(lock2)) {
                print("lock2 acquired.");
            } else {
                print("cannot acquire lock2, releasing lock1.");
                lock1.unlock();
                continue;
            }

            print("executing first operation.");
            break;
        }
        lock2.unlock();
        lock1.unlock();
    }

    public void operation2() {
        while (true) {
            tryLock(lock2, 50);
            print("lock2 acquired, trying to acquire lock1.");
            sleep(50);

            if (tryLock(lock1)) {
                print("lock1 acquired.");
            } else {
                print("cannot acquire lock1, releasing lock2.");
                lock2.unlock();
                continue;
            }

            print("executing second operation.");
            break;
        }
        lock1.unlock();
        lock2.unlock();
    }

    // helper methods

}
Output

Producer & Consumer

  • As we need to make a order that receiving function should wait for the sending function,

  • we can make good use of wait and notify, when notify is not received, the task will stop in while loop

  • Until notify is made, the loop will be break and keep running the process

public class Packet {
    private String packet 
    private boolean transfer = true;
    
    // Producer

    public synchronized send(String packet) {
        while (!transfer) {
            try {
                wait();
                System.out.println("send waiting....");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.packet = packet;
        transfer = false;
        notifyAll();
    }
    
    // Consumer

    public synchronized String receive()  {
        while (transfer) {
            try {
                wait();
                System.out.println("receive waiting....");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        transfer = true;
        notifyAll();
        return packet;
    }
}
public class Producer implements Runnable{
    private Packet packet;

    public Producer(Packet packet) {
        this.packet = packet;
    }

    @Override
    public void run() {
        List<String> stringList = new ArrayList<>();
        stringList.add("1");
        stringList.add("2");
        stringList.add("3");
        stringList.add("End");
        stringList.forEach(s -> {
           packet.send(s);
        });
    }
}
public class Consumer implements Runnable{
    private Packet packet;

    public Consumer(Packet packet) {
        this.packet = packet;
    }

    @Override
    public void run() {

        for (String str = packet.receive(); !str.equals("End"); str = packet.receive()) {
            System.out.println(str);
        }
    }
}
  • Wait and notify can only be used within synchronized scope

  • Wait should be used in while loop

Thread Starvation

  • Assume that there are many threads, by using synchronized scope, the task of random thread will be chosen to run when a thread is finished

  • However, there will be possibility that a thread will be wait for longer and longer and never be chosen to run unluckily

  • In such a case, we can make good use of fair lock to replace the synchronized scope

  • The thread having longest suspending time will be chosen to run automatically

public class Countdown {

    private Integer count = 0;
    private static ReentrantLock lock = new ReentrantLock(true);
    public void startCount() {

        while (count < 10) {
            lock.lock();
            try {
                System.out.println("Thread" + Thread.currentThread().getName() + " Count : " + count);
                count++;
            }
            finally {
                lock.unlock();
            }
        }
    }
}
  • From the result, we can see that the order is based on waiting time

  • Unlike synchronization, we must need to add back unlock so as to release to another thread

Executor

Executor

  • Executor provides a flexibility to run the task in main thread directly or create a new thread to run the task

public class CustomExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        Thread thread = new Thread(command);
        thread.start();
    }
}

Executors

  • It is a factory class which provide a template for creating executor based on the thread pool

	public static void main(String[] args){
		Countdown countdown = new Countdown();
		CustomRunnable customRunnable = new CustomRunnable(countdown);
		CustomExecutor executor1 = new CustomExecutor();
		Executor executor2 = Executors.newSingleThreadExecutor();
		executor1.execute(customRunnable);
		executor2.execute(customRunnable);
	}

ExecutorService

  • It is class extended from executor class, which provide additional method - submit which can return a future as a result

  • However, the executor service still running even the task is finished and wait for accepting new task, so we need to shutdown the thread manually

public static void main(String[] args){
		Countdown countdown = new Countdown();
		CustomRunnable customRunnable = new CustomRunnable(countdown);
		CustomExecutor executor1 = new CustomExecutor();
		Executor executor2 = Executors.newSingleThreadExecutor();
		ExecutorService executor3 = Executors.newSingleThreadExecutor();
		executor1.execute(customRunnable);
		executor2.execute(customRunnable);
		Future<Void> future = (Future<Void>) executor3.submit(customRunnable);
		executor3.shutdown();
	}

ThreadPoolTaskExecutor

  • It is a class extended from executor class, we can easily customize the thread setting on that

ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(1);
threadPoolTaskExecutor.setQueueCapacity(50);
threadPoolTaskExecutor.setMaxPoolSize(10);

Thread Pool

  • Used to control then number of task can be run at the same time

  • Core Pool Size: The number of thread created in the beginning

  • Queue Capacity: The number of thread that can be suspended

  • Max Pool Size: The max number of thread that can be run at the same time

  • When the number of thread is exceed the core pool size , the task of thread will be suspended , put in the query

  • The query is full, the new thread pool will be created so as to run the task of the thread, and the position of query will also be released

  • If the total number of thread created is larger than the queue capacity and the max pool size, the task of the thread will be rejected and throw the exception

  • The setting of core pool size can be 0 and cannot larger than the max pool size

Future

  • It is the return value of a thread

  • It can be used the stop the task even it is running and check whether the task is cancelled or finished

Future<Void> future = (Future<Void>) executor3.submit(customRunnable);
	future.cancel(true);
	if(future.isDone()|| future.isCancelled()){
		System.out.println("The thread is finished");
	}

AtomicVariable & ConcurrentHashMap

  • Use them when the variable have chance to be changed by multiple threads at the same time

  • It is thread-safe even without using synchronization scope

Last updated

Was this helpful?