Thread

Multi-Task: Parallelism v.s. Concurrency

让计算机在同一时间进行多项任务,一般有两种方式:并行(Parallelism)和并发(Concurrency)。

并行(Parallelism):把一项任务拆分成可以同时执行的多个子任务,交给不同的运算单元去执行。 举例:大数据分析,TB级别的数据量,交给一个单核CPU来算,肯定很慢,那就换电脑,改成8核CPU,8个同时算就快了很多。如果还嫌慢,那就加电脑,10台8核CPU电脑,也就是80个CPU,速度又提高了不少。(当然算完了肯定要合并结果,这是Map Reduce的内容以后另说)

并行就是,客服中心有8个电话,公司雇了8个员工,每人分1个电话。

concurrency-vs-parallelism-2.png

并发(Concurrency):通过CPU调度算法,同一时间内运行多个进程/线程,在各个程序之间来回切换执行。 为什么会有并发?为什么要来回倒腾?一个活干完了再干下一个活不行吗?真不行,比如我们一边写博客,一边听歌,一边下载电影,同时运行了三个程序。 如果没有并发,只能开一个程序,用户体验就差很多。而且关键是,这三个任务其实占用的CPU,内存都不高,只有最后一个下电影,对网速和硬盘读写有较大的要求,计算机完全可忙得过来。

并发就是,客服中心有8个电话,公司雇了1个超级员工,让他1个人接8个电话,不断换着讲。

concurrency-vs-parallelism-1.png

Concurrency - Benefits and Costs

并发是为了程序运行得更快。启用多个线程,并互相切换,可以减少因某个线程阻塞而等待的时间。 但是,由于上下文切换也有开销,所以并不是一定并发比串行要快。

用Lmbench3测量上下文切换时长,用vmstat测量上下文切换次数。

Concurrency Implementation: Process v.s. Thread

说说Concurrency,一般实现有两种手段:多线程(Thread)和多进程(Process)。

Thread v.s. Process

多进程(Process)很好理解,Windows打开任务管理器可以看到多个正在执行的进程。 这里主要谈多线程(Thread),因为它更容易编程实现,效率更高。

Thread: Benefits and Costs

Benefit

Cost

Java Thread State

线程的状态:

注:有的地方把 waiting 和 blocked 都看作是阻塞,需要注意。

concurrency-thread-state

总结:

经常问到的几个问题:

Java Thread Interrupt

线程中断

当一个线程调用interrupt()后,表示告诉当前线程:你把手中的活停一下,可能要干些别的事了。如下:

public static void main(String[] args) throws InterruptedException {  
    ThreadA t = new ThreadA();  
    t.start();  
    Thread.sleep(3000);  
    t.interrupt();  
}

这个时候,当前线程干了一件事:把Thread.currentThread().isInterrupted()的值置为true。然后,就看当前线程怎么处理这个flag了。

  1. 一种可能,编程者检查了这个flag,然后使用一段其它的处理逻辑。
  2. 另一种可能,编程者检查了这个flag,然后抛出一个InterruptedException。(比如Thread.sleep(), Object.wait(), Thread.join()都会这么干,并且将flag复位)
  3. 还有一种,编程者直接忽略掉这个flag,那一切无事发生。

下例,是第1种情况:

public class ThreadA extends Thread{
    public void run(){
        while(true){
            if(Thread.currentThread().isInterrupted()){  
                System.out.println("Someone interrupted me.");
                return;
            }
            else{
                System.out.println("Thread is Going...");  
            }
        }
    }
}

下例,是第2种情况:需要注意的是,InterruptedException是由sleep(100)抛出的。

    public void run() {
        while (true) {
            try {
                sleep(100);
                System.out.println("Thread is Going...");
            } catch (InterruptedException e) {
                //e.printStackTrace();
                System.out.println("Someone interrupted me. I can't sleep.");
                return;
            }
        }
    }

中断使用场景:

  1. 在某线程中,调用了Thread.sleep(10000)等10s,实际发现不需要10s,于是使用中断提前唤醒。
  2. 线程A调用join()方法等待线程B执行结束,但是线程B发现自己短时间无法结束,于是使用中断,告诉线程A别等我了。

参考 -> 理解java线程的中断 https://blog.csdn.net/canot/article/details/51087772https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html

Java Thread Example

创建线程对象的方式

创建线程的方式

Thread Programming Steps:

e.g.

 // Part 1: extends Thread
 class PrimeThread extends Thread {
     long minPrime;
     PrimeThread(long minPrime) {
         this.minPrime = minPrime;
     }
     public void run() {
         // compute primes larger than minPrime
         ...
     }
 }
 ...
 PrimeThread p = new PrimeThread(143);
 p.start();

 // Part 2: implements Runnable
 class PrimeRun implements Runnable {
     long minPrime;
     PrimeRun(long minPrime) {
         this.minPrime = minPrime;
     }
     public void run() {
         // compute primes larger than minPrime
         ...
     }
 }
 ...
 PrimeRun p = new PrimeRun(143);
 new Thread(p).start();
 
 // Part 3: using Executors
 ExecutorService executor = Executors.newSingleThreadExecutor();
 executor.submit(() -> {
     String threadName = Thread.currentThread().getName();
     System.out.println("Hello " + threadName);
 });

Thread Issues

多线程共享内存空间,即Thread之间资源共享,一个Thread可以access到另一个Thread的资源,这里就可能会出问题。

e.g. 两个线程同时对String和int进行大量(1000次)操作,预期的结果是:count=1000,string存放了0-999。 但是由于非线程安全,线程1,2同时操作变量,有一个的结果就被“丢掉”了。

public class ConcurrencyIssueTest {
    String string = "";
    int count = 0;

    void inc() {
        string += (","+count++);
    }

    public static void main(String args[]) throws Exception {
        ConcurrencyIssueTest test = new ConcurrencyIssueTest();
        ExecutorService executor = Executors.newFixedThreadPool(2);
        IntStream.range(0, 1000).forEach(i -> executor.submit(test::inc));

        executor.shutdown();
        executor.awaitTermination(60, TimeUnit.SECONDS);

        System.out.println(test.count);
        System.out.println(test.string);
    }
}

输出如下:

999
...277,279,281,283,285,287...

为了防止出现这些问题,需要在设计开发的时候特别注意,保证Thread Safe。

Thread Safe

Thread safety: The program state (fields/objects/variables) behaves correctly when multiple simultaneous threads are using a resource. 保证一个资源同时被多个进程读写时表现正常。

那么如何实现(资源的)Thread Safe呢?

Deadlock Prevention

Lock Ordering: make sure that all locks are always taken in the same order by any thread

典型反面例子

Thread 1:
    lock A
    wait B

Thread 2:
    lock B
    wait A

Lock Timeout: if a thread does not succeed in taking all necessary locks within the given timeout, it will backup, free all locks taken, wait for a random amount of time and then retry.

Deadlock Detection: every time a thread takes a lock it is noted in a data structure (map, graph etc.) of threads and locks. Additionally, whenever a thread requests a lock this is also noted in this data structure.

When a thread requests a lock but the request is denied, the thread can traverse the lock graph to check for deadlocks.

concurrency_deadlock_graph

Tools:

Other Topic

Fork me on GitHub