Java 多线程

概念

  • 线程就是独立执行的路径;
  • 线程是程序中执行的线程。 Java虚拟机允许应用程序同时执行多个执行线程。
  • 线程分为守护线程和非守护线程,当一个进程中的非守护线程全部停止运行后守护线程也会停止运行。
  • 线程可以通过Thread 或实现Runnable

创建线程

创建方式 继承Thread class

Thread 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
class PrimeThread extends Thread {
long minPrime;
PrimeThread(long minPrime) {
this.minPrime = minPrime;
}

public void run() {
// compute primes larger than minPrime
. . .
}
}

1
2
PrimeThread p = new PrimeThread(143);
p.start();

实现 Runnable接口

Runnable实现代码

1
2
3
4
5
6
7
8
9
10
11
class PrimeRun implements Runnable {
long minPrime;
PrimeRun(long minPrime) {
this.minPrime = minPrime;
}

public void run() {
// compute primes larger than minPrime
. . .
}
}
1
2
PrimeRun p = new PrimeRun(143);
new Thread(p).start();

Runnable 的使用方式与Thread不同的地方在于,同一个对象可以启动多个线程。在多线程的情况下使用Runnable有优势。

实现 Callable接口

  • 使用Executors.newFixedThreadPool(1)来创建一个服务ExecutorService
  • 提交执行 Future<Bolean> = ser.submit( es)
  • 获取结果 Boolean r1 = result1.get()
  • 关闭服务 ser.shutdownNow();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.akachi;

import com.sun.org.apache.xpath.internal.operations.Bool;

import java.util.concurrent.*;

import static java.util.concurrent.Executors.*;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/18 19:10
*/
public class CallableTest implements Callable<String> {
int i = 100;

public static void main(String[] args) {
CallableTest callableTest = new CallableTest();
ExecutorService executorService = newFixedThreadPool(4);
Future<String> stringFuture= executorService.submit(callableTest);
try {
System.out.println(stringFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public String call() throws Exception {
while (i>0){
System.out.println(Thread.currentThread().getName()+"-->"+i--);
Thread.sleep(10);
}
return "threadName:"+Thread.currentThread().getName();
}
}

Callable 与Runnable都是为了执行线程而设计的,不同的地方在于Callable允许返回值并且可以将异常传递回去而Runnable只能执行。

  1. 可以返回值
  2. 可以抛出异常
  3. 使用call替换run方法。
  • 使用FutureTask來調用Runnable

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    package akache.org.study.thread;

    import java.util.concurrent.*;

    import static java.util.concurrent.Executors.*;

    /**
    * @Author akachi
    * @Email zsts@hotmail.com
    * @Date 2022/5/22 17:11
    */
    public class CallableTest {

    public static void main(String[] args) {
    FutureTask<String> ft = new FutureTask<String>(() -> {
    Thread.sleep(1100);
    if (Math.random() > 0.5d) {
    Exception e = new Exception("fuck");
    e.printStackTrace();
    throw e;
    } else {
    System.out.println("ok");
    return "ok";
    }
    });
    new Thread(ft).start();
    while (!ft.isDone()) {
    System.out.println("等待完成");
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    try {
    System.out.println(ft.get());
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

Thread 使用代理模式来实现的

代理模式:

代理模式是指我们写的方法该写什么就写什么,比如Thread,我们不需要关系下列代码是要在线程中使用还是在原线程中使用,该写什么就写什么。而在系统中创建一个新线程的工作完全交给Thread类来完成。(与计算机交互的工作很多,但是我们在写线程的时候完全没做过这些工作。)

线程状态

image-20220518194558325

  • 创建状态

  • 就绪状态

  • 阻塞状态

  • 运行状态

    有可能进入就绪状态,因为CPU去执行别的线程了。

  • 死亡状态

线程方法

  • 终止线程

    不建议使用手动停止线程,

    最好使用标志位让线程自己停止。

  • seelp

  • yield

    让出执行权限。

  • join

    合并线程。

    将某个线程作为本线程的一部分,先执行完成后再执行本线程。

  • state

    查看线程状态

  • priority

    线程优先级 1-10越大优先级越大。

    默认值是5

    getPriority() setPriority(int x)

  • daemon

    守护线程。设置为true后就是守护线程。

    setDaemon(true)

线程同步机制

synchronized

通过队列和锁来完成同步执行。

  • synchronized 无论如何都会有一个锁对象
    • 静态方法 class
    • 方法 则是this对象

如果要锁请锁

死锁

一个线程需要两个以上的锁资源,但是无法获得全部并且持有至少一个其他线程所需要的所资源。互相均无法完成并持续持有锁资源的情况。

解决:

尽量不要在获取一个所资源后再去获得其他锁资源,获得的锁资源最好是并行的。

四个必要條件:

  • 互斥条件:每个资源只能被一个进程使用。
  • 请求与保持:一个进程因请求资源而阻塞时不会释放其他资源。
  • 不剥夺條件:为使用完之前不可剥夺线程的资源。
  • 循环等待條件:若干个资源形成一种首尾相接的循环等待关系。

Lock显示同步锁

接口 java.util.concurrent.locks.Lock

实现类:

  • ReentrantLock

方法:

  • lock.lock()
  • lock.unlcok();

lock与synchronized类似

lock对象相当于synchronize(这里)的对象。

所以同样要注意lock的生命周期。

线程通讯

  • wait()

    表示线程一直等待,知道其他线程通知,与sleep不同,不会释放锁。

  • wait(long timeout)

    指定等待的毫秒数。

  • notify()

    唤醒一个处于等待状态的线程。

  • notifyAll()

    唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度。

并发协作模型

管程法

消费者生产者模型

image-20220519115824291

通过对于缓冲区的控制来控制生产者线程和消费者线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package org.akachi;

/**
* 生产者-消费者线程模型-->>利用缓存
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/19 12:00
*/
public class TestPC {
public static void main(String[] args) {
SyncContainer container = new SyncContainer();
new Producer(container).start();
new Consumer(container).start();
}
}

/**
* 生产者
*/
class Producer extends Thread{
private SyncContainer container;
Producer(SyncContainer container){
this.container=container;
}
@Override
public void run(){
for (int i=0;i<100;i++){
this.container.push(new Chicken().setProductId(i));
System.out.println("Producer-->" + i);

}
}
}
//消费者
class Consumer extends Thread{
private SyncContainer container;
Consumer(SyncContainer container){
this.container=container;
}
@Override
public void run(){
for (int i=0;i<100;i++) {
System.out.println("consumer-->" + container.get().getProductId());
}
}
}
//产品
class Chicken{
private Integer productId;

public Integer getProductId() {
return productId;
}

public Chicken setProductId(Integer productId) {
this.productId = productId;
return this;
}
}
//缓冲区
class SyncContainer {
Chicken[] chickens = new Chicken[10];
private int count;

//insert
public synchronized void push(Chicken chicken) {
System.out.println("SyncContainer.put.count-->"+count);
if (count == chickens.length) {
//阻塞生产者
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
chickens[count] = chicken;
count++;
this.notifyAll();
}
public synchronized Chicken get(){
System.out.println("SyncContainer.get.count-->"+count);
if(count==0){
//通知消費者等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count--;
Chicken chicken = chickens[count];
this.notifyAll();
return chicken;
}
}

信号模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package org.akachi;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/19 12:36
*/
public class TestPc2 {
public static void main(String[] args) {
TV tv = new TV();
new Player(tv).start();
new Watcher(tv).start();
}
}

/**
* 生产者
*/
class Player extends Thread {
private TV tv;

Player(TV tv) {
this.tv = tv;
}

@Override
public void run() {
for (int i = 0; i < 100; i++) {
String name = "節目"+i;
System.out.println("錄電視:"+name);
this.tv.play(name);
}
}
}

class Watcher extends Thread {
private TV tv;

Watcher(TV tv) {
this.tv = tv;
}

@Override
public void run() {
while (true) {
System.out.println("看電視:" + tv.watch());
}
}
}

class TV {
boolean flag = false;
private String name;

synchronized String watch() {
if (!flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String name = this.name;
this.name = "沒有節目";
flag = !flag;
this.notifyAll();
return name;
}

synchronized void play(String name) {
if (flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.name = name;
flag = !flag;
this.notifyAll();
}
}

相对而言比较简单。

线程池

降低线程创建和销毁的开销。

  • 提高响应速度
  • 降低资源消耗
  • 便于管理
    • corePoolSize: 核心线程池大小
    • maximumPoolSize 最大线程数
    • keepAliveTime: 多久会终止无任务的线程

在Callable时就用到了线程池

创建和使用

ExecutorService

使用Executors.newFixedThreadPool来创建一个线程池。

创建:

image-20220519160957427

  • fixed 固定数量

  • cached 浮动数量 60秒未使用销毁

  • single 确保线程顺序单个的执行

  • scheduled 定时周期性调度任务

  • newWorkStealingPool 窃取工池

    可以把工作拆分

    实现的是ForkJoinPool

使用:

  • 提交Callable接口的实现类使用submit();
  • 提交Runnable接口的实现类使用execute();

并发编程

java帮助文档包:

  • java.util.concurrent 并发编程
  • java.util.concurrent.atomic 原子性
  • java.util.concurrent.locks 锁
  • java.util.function 函数式编程

在使用Thread没有返回值。

Runnable比Callable性能低。

进程和线程

  • 线程是一个程序执行的路径。

  • 进程是多个线程的集合,由一个程序所启动的。

    java默认由两个线程 main和GC。GC是守护线程。

  • 进程其实是由操作系统启动的。

    Thread.start0是一个native方法。

查看CPU核数:

1
System.out.println(Runtime.getRuntime().availableProcessors());

知识

线程有几个状态:

  • NEW

    尚未启动的线程状态

  • RUNNABLE 就绪

    可运行的线程状态,可能正在等待虚拟机执行

  • BLOCKED 阻塞

    Object.wait可以进入

    会释放锁

  • WAITING 处于执行状态的线程,正在等待另一个线程执行操作。

    调用Thread.join()那么当前线程就会进入这个状态

  • TIMED_WAITING

    设定了超时的等待中的线程

  • TERMINATED

    结束的

wait/seep的区别

  • sleep定式等待

    TimeUnit.DAYS.sleep(timeout)

    可以在任何地方sleep。

  • wait

    等待并且会释放锁,必须在同步代码块中调用。

技巧:

  • Runnable 是一个@FunctionalInterface函数式接口

    所以可以匿名内部内lambda表达式来执行

Lock 接口

使用方法:

1
2
3
Lock l = ...; l.lock(); 
try { // access the resource protected by this lock
} finally { l.unlock(); }

实现类:

  • ReentrantLock

    可重入锁

    1
    2
    3
    public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    }

    实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package akache.org.study.thread;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;

    import static java.util.concurrent.Executors.*;

    /**
    * @Author akachi
    * @Email zsts@hotmail.com
    * @Date 2022/5/20 18:42
    */
    public class LockTest {
    public static void main(String[] args) {
    LockShop lockShop = new LockShop(100);
    ExecutorService es = newFixedThreadPool(3);
    es.submit(() -> {
    for (int i = 0; i < 100; i++) {
    lockShop.sell();
    }
    });es.submit(() -> {
    for (int i = 0; i < 100; i++) {
    lockShop.sell();
    }
    });
    }
    }
    class LockShop {
    LockShop(Integer number){
    this.number = number;
    }
    Integer number;

    public void sell(){
    Lock lock = new ReentrantLock(true);
    lock.lock();
    try {
    if (number > 0) {
    System.out.println(String.format("卖出了第%s张票,卖给了%s。", number,Thread.currentThread().getName()));
    number--;
    }
    }catch (Exception e){
    e.printStackTrace();
    }finally {
    lock.unlock();
    }
    }
    }

    相对于Synchronized

    Lock可以判断是否获取到了锁

    而Synchronized是可重入、非公平、不可终端的锁。

    • 公平锁

      不能插队

    • 非公平锁

      可以插队

  • ReentrantReadWriteLock.ReadLock

    读锁

  • ReentrantReadWriteLock.WriteLock

    写锁

tryLoc方法

锁是什么?

生产者和消费者模型

在操作资源时需要用“判断等待、业务、通知”这样的模型来执行。

生产者和消费者模型中的if判断需要使用while循环。

虛假唤醒

通过while循环来判断

1
2
3
4
5
6
7
while(productCount==0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

wait

1
2
public final void wait()
throws InterruptedException

导致当前线程等待,直到另一个线程调用该对象的notify()方法或notifyAll()方法。换句话说,这个方法的行为就好像简单地执行呼叫wait(0)

当前的线程必须拥有该对象的显示器。 该线程释放此监视器的所有权,并等待另一个线程通知等待该对象监视器的线程通过调用notify方法或notifyAll方法notifyAll 。 然后线程等待,直到它可以重新获得监视器的所有权并恢复执行。

像在一个参数版本中,中断和虚假唤醒是可能的,并且该方法应该始终在循环中使用:

1
2
3
4
5
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}

该方法只能由作为该对象的监视器的所有者的线程调用。有关线程可以成为监视器所有者的方式的说明,请参阅notify方法。

  • 异常

    IllegalMonitorStateException - 如果当前线程不是对象监视器的所有者。

    InterruptedException - 如果任何线程在当前线程等待通知之前或当前线程中断当前线程。 当抛出此异常时,当前线程的中断状态将被清除。

  • 另请参见:

    notify()notifyAll()

使用Lock 替代Synchronized

image-20220520212224318

image-20220520212430497

可以声明多个condition指定启用某个方法来达到调度线程的顺序。

关于锁的8个问题:

  • 同一个类的两个synchronized方法先调用会线执行。

    因为synchronized方法等于使用this,也就是调用者作为锁对象。

    所以谁先拿到谁执行。

  • 普通方法不受锁的影响

  • 静态方法以class对象为锁。

    class是全局唯一。

  • 某一时刻只有一个线程能持有锁。

并发集合

CopyOnWirteArrayList

使用CopyOnWrite cow的ArrayList

COW 计算机程序设计领域的一种优化策略。用于提高效率

在写入时避免覆盖造成问题。

  • COW比Vector牛逼在哪里?

    Vector使用了Syncronized,导致执行时实际上是单线程的。

add源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean add(E e) {
//创建锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//获得存储数组
Object[] elements = getArray();
//获得长度
int len = elements.length;
//拷贝这个数组并且长度+1
Object[] newElements = Arrays.copyOf(elements, len + 1);
//在最新的位置插入元素
newElements[len] = e;
//放回数组
setArray(newElements);
//返回成功
return true;
} finally {
//解锁
lock.unlock();
}
//官方不觉得有失败的可能
}

常用的辅助类

CountDownLatch

减法计数器,创建CountDOwnLatch时指定一个数字。

使用CountDownLatch.await()后在其他工作线程调用CountDownLatch.countDown方法进行减法,减完时线程就会启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package akache.org.study.thread;

import java.util.concurrent.CountDownLatch;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/22 17:40
*/
public class CountDownLatchTest {

public static void main(String[] args) {
CountDownLatch driveLock = new CountDownLatch(10);
new Thread(() -> {
try {
driveLock.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("司机发车。");
}, "dirive").start();

new Thread(() -> {
int i = 1;
dirveOver:
while (true) {
if (driveLock.getCount() == 0) {
System.out.println(String.format("码头工人装满了货车第%d个箱子不用装了。等待货车就位;", i));
i = 0;
break dirveOver;
}
System.out.println(String.format("码头工人装载了第%d个箱子。", i++));
driveLock.countDown();
}
}).start();
}
}

CyclicBarrier

创建CyclicBarrier可以指定number和一个Runnable

执行nubmer次计awiat会自动解锁继续执行。并且会执行创建CyclicBarrier时创建的线程

Semaphore

创建Semaphore时指定许可数量,线程先获得许可数量然后执行。

执行完毕后释放许可数量其他线程再进入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package akache.org.study.thread;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/22 18:59
*/
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
final List list = new CopyOnWriteArrayList();
for (int i = 0;i<1000;i++){
new Thread(()->{
try {
//获得许可
semaphore.acquire();
list.add(Thread.currentThread().getName());

System.out.println(String.format("%s入场",Thread.currentThread().getName()));
System.out.println(String.format("当前在执行的线程集合%s",list));
Thread.sleep(new Double(Math.random()*50000).intValue());
//释放许可
System.out.println(String.format("%s出场",Thread.currentThread().getName()));
list.remove(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
},String.format("第%d位",i)).start();
}
}
}

ReadWriteLock 读写锁

  • 读锁可以共享但排斥写锁
  • 写锁不能共享并且排队

写个缓存案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package akache.org.study.thread;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/22 19:42
*/
public class ReadWriteLockTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
myCache.put("test", "test");
for (int i = 0; i < 40; i++) {
final Integer count = i;
new Thread(() -> {
if (count == 20) {
myCache.put("test", "test2");
} else {
System.out.println(String.format("线程%s的读取结果是%s",count,myCache.get("test")));
}
},"线程"+i).start();
}
}
}
class MyCache{
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();
Map<String,Object> cacheMap = new HashMap();
public Object get(String key){
readLock.lock();
try {
System.out.println(String.format("%s开始读取",Thread.currentThread().getName()));
Thread.sleep(new Double(Math.random()*20000).intValue());
return cacheMap.get(key);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readLock.unlock();
}
return null;
}
public MyCache put(String key,Object value){
writeLock.lock();
try {
System.out.println(String.format("%s开始写入",Thread.currentThread().getName()));
Thread.sleep(10000);
System.out.println(String.format("%s写入完毕",Thread.currentThread().getName()));
cacheMap.put(key,value);
return this;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
writeLock.unlock();
}
return null;
}
}

问题

  • 可能会造成锁的饥饿问题

    一直有读锁写锁抢占不到资源。ReentrantReadWriteLock(false)的情况下也并未测试到这个情况。

  • 读时无法进行写操作

锁降级

获取写锁=>获取读锁=>释放写锁=>释放读锁

通过一上方式可以将写锁降级为读锁。

也就是说已经持有写锁的线程能够继续持有读锁,当然其他人不能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock read = readWriteLock.readLock();
Lock write = readWriteLock.writeLock();
write.lock();
read.lock();
try{
System.out.println("成功获取读锁在持有写锁的情况下");
}catch (Exception e){
e.printStackTrace();
}finally {
write.unlock();
System.out.println("释放写锁");
read.unlock();
System.out.println("释放读锁");
}

放过来先获取读锁再获取写锁就不行会死锁。

阻塞队列 BlockingQueue

image-20220523002754860

  • 通过一个共享的队列由一端进行输入一段进行输出。
  • 放满或者取空之后都会阻塞线程。

分类

  • ArrayBlockingQueue

    定长有界阻塞队列

  • LinkedBlockingQueue

    由链表实现的有界阻塞队列。默认长度为Integer.MAX_VALUE

  • DelayQueue

    使用优先级队列实现的延迟无界阻塞队列。

  • PriorityBlockingQueue

    支持优先级排序的无界阻塞队列

  • SynchronousQueue

    不存储元素的阻塞队列,也就是单个。

  • LinkdTransferQueue

    链表无界阻塞队列

  • LinkedBlockingDeque

    链表双向阻塞队列

核心方法

image-20220523003846333

image-20220523003904847

BlockingQueueDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package akache.org.study.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/23 0:51
*/
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(20);
new Thread(() -> {
try {
int i =0;
while (true){
Thread.sleep(new Double(Math.random()*100).intValue());
System.out.println(String.format("插入第%d个元素",i));
blockingQueue.put(i++);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "producer").start();
new Thread(() -> {
try {
while (true){
Thread.sleep(new Double(Math.random()*100).intValue());
System.out.println(String.format("取得第%d个元素",blockingQueue.take()));
System.out.println(String.format("队列中一共有%d个元素",blockingQueue.stream().count()));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consumer").start();
}
}

ThreadPool

基本概念

线程池不仅能降低调度的开销还能防止过度调度。

并且能节约销毁和创建线程带来的开销。

问题:
  • 线程池怎么做到切换运行的Callable的?

线程池架构

image-20220523100503345

线程池使用方式

  • Executors.newFixedThreadPool(2)

    固定的线程

  • Executors.newSingleThreadExecutor()

    创建一个线程

  • Executors.newCachedThreadPool()

    可变的线程

线程池的底层原理

线程池的七个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
  • int corePoolSize

    常驻线程数

  • int maximumPoolSize

    最大线程数

  • long keepAliveTime

    保持活跃时间

  • TimeUnit unit

    时间单位

  • BlockingQueue workQueue

    阻塞队列 ,workQueue the queue to use for holding tasks before they are executed.

    在executed前的对象会被保存在这。 经过测试的确是执行前的。

  • ThreadFactory threadFactory

    线程创建工厂

  • RejectedExecutionHandler handler

    拒绝策略

线程池的工作原理

image-20220523110012928

  1. 进入线程先进核心池
  2. 多了进阻塞队列BlockingQueue
  3. BlockingQueue也满了后执行拒绝策略。

image-20220523112247644

  • 默认 AbortPolicy 直接异常。

  • CallerRunsPolicy 回退到调用者。

    也就是说使用调用线程来执行,相当于调用了一个Thread.join()

  • DiscardOldestPolicy 插队并且删除队列中等待最久的。

  • DiscardPoolic 直接抛弃。

自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package akache.org.study.thread;

import java.util.concurrent.*;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/23 10:11
*/
public class ThreadPoolDemo {
static Integer x = 0;

public static synchronized Integer getX() {
return x++;
}

public static void main(String[] args) throws InterruptedException {

ExecutorService executorService = new ThreadPoolExecutor(4,
16,
3,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(128),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 300; i++) {
// System.out.println("执行了"+i+"个线程");
// final Integer count = i;
executorService.submit(() -> {
// System.out.println(String.format(">线程[%d]开始运行,线程名字[%s].",count,Thread.currentThread().getName()));
try {
Thread.sleep(new Double(Math.random() * 5000).intValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行了完成了" + ThreadPoolDemo.getX() + "个线程");
// System.out.println(String.format("<线程[%d]结束运行,线程名字[%s].",count,Thread.currentThread().getName()));
});
}
}
}

  • Executors.defaultThreadFactory(), //线程创建工厂
  • new ThreadPoolExecutor.CallerRunsPolicy()); //拒绝策略

分支合并框架

image-20220523173333419

Fork/Join 可以拆分和合并任务。

image-20220523173617806

Demo

用RecursiveTask来处理斐波那契数列。

使用多线程来解决递归问题。

值得注意的地方是这里使用compute去执行了一颗-2的树。

而另一个join的线程会去执行-1并且会拆分出一棵-2的树,然后去执行-3。

所以使用compute使当前线程执行-2的树会创造一颗相对平衡的由线程组成的树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package akache.org.study.thread.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/23 17:41
*/
public class ForkDemo {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Integer index = 20;
ForkJoinTask<Integer> returned = forkJoinPool.submit(new Fibonacci(index));
try {
System.out.println(String.format("第%d个斐波那契数是%d",index,returned.get()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
forkJoinPool.shutdown();
}
}
class Fibonacci extends RecursiveTask<Integer>{
Integer index=0;
Fibonacci(Integer index) {
this.index=index;
}

@Override
protected Integer compute() {
if(index<=1){
return index;
}else {
System.out.println(String.format("创建了线程%s来处理第%d个数", Thread.currentThread().getName(), index));
return new Fibonacci(index - 2).compute() + new Fibonacci(index - 1).fork().join();
}
}
}

异步回调

image-20220523185435458

我们要获得异步执行的方法值,如果它正在执行久等他。

runAsync()

无返回值异步调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package akache.org.study.thread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/23 19:06
*/
public class CompletableFutureDemo {
public static void main(String[] args) {
System.out.println("main start");
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步调用:"+Thread.currentThread().getName());
});
try {
completableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("main end");
}
}

supplyAsync()

有返回值的异步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package akache.org.study.thread.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* @Author akachi
* @Email zsts@hotmail.com
* @Date 2022/5/23 19:13
*/
public class SupplyCompletableFutureDemo {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
System.out.println("执行异步:"+Thread.currentThread().getName());
if(Math.random()>0.5d){
throw new RuntimeException("test");
}
return 0;
});
int z = 0;
try {
z = future.whenComplete((t,u)->{
System.out.println("t:"+t);
System.out.println("u:"+u);
}).get();;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("返回了"+z);
}
}

在调用get之前先调用whenComplete,

实际上是个BiConsumer 我们需要实现它的andThen方法。

这个方法会获得t u两个参数,

  • t 是返回值
  • u 是异常

在线程执行过程中如果获得异常或返回值我们都可以在这里处理。

最终再调用get获得返回值。

线程池的源码分析

ThreadPoolExecutor 的关键参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 用户做位运算
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //最大容量

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //111 正常接收任务
private static final int SHUTDOWN = 0 << COUNT_BITS; //000 会执行完当前与阻塞队列中的任务
private static final int STOP = 1 << COUNT_BITS; //001 showdownNow() 不接受新任务并且终端正在执行的任务,阻塞队列中的任务也无视。
private static final int TIDYING = 2 << COUNT_BITS; //101 线程池即将停止
private static final int TERMINATED = 3 << COUNT_BITS; //011 线程池执行完了

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } //得到线程池的状态
private static int workerCountOf(int c) { return c & CAPACITY; } //得到当前线程的线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }

execute

执行execute做了什么

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();//获得控制器
// 获得工作线程数 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 可以创建核心线程数(Runnable,是否是核心线程)
if (addWorker(command, true))
//return
return;
//如果if没进去,代表创建核心线程数失败。,重新获取ctl
//应为我们是盘点后进来的所以既然失败了,证明有人抢先了。
c = ctl.get();
}
//判断线程池是不是Running状态,如果是将Runnable添加到阻塞队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次判断是否是Running状态,如果不是Running就删除任务
if (! isRunning(recheck) && remove(command))
//拒绝策略
reject(command);
//如果是工作状态就获取工作线程个数,,如果是0个
else if (workerCountOf(recheck) == 0)
// 添加一个空的工作线程
// 核心线程也会超时
addWorker(null, false);
}
//如果都不是 那么创建非核心线程
else if (!addWorker(command, false))
//如果还是败了执行拒绝策略
reject(command);
}

image-20220524001344560

实际上这些代买就是这个流程中指的部分

addWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
private boolean addWorker(Runnable firstTask, boolean core) {
//标记出口
retry:
//死循环
for (;;) {
//
int c = ctl.get();
int rs = runStateOf(c);

// 不是Running状态
if (rs >= SHUTDOWN &&
//有个例外:(rs 是shutdown 首个任务是空的 但工作队列未被清空)
! (rs == SHUTDOWN &&firstTask == null &&!workQueue.isEmpty()))
//返回构建工厂失败
return false;

for (;;) {
//获得工作线程数量
int wc = workerCountOf(c);
//工作线程不能大于里路上线
if (wc >= CAPACITY ||
//core 是个 bool
//如果他是核心线程则返回核心线程size 否则返回maximumsize
//如果wc >=他们 也不再创建
wc >= (core ? corePoolSize : maximumPoolSize))
//返回失败
return false;
// cas 工作线程+1
if (compareAndIncrementWorkerCount(c))
//跳出去
break retry;
//如果cas失败了 重新获取ctl(因为有并发)
c = ctl.get(); // Re-read ctl
//判断状态是否一致
if (runStateOf(c) != rs)
//如果有变化退出外侧操作否则进入死循环重新操作
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//到此为止都是+1
// worker开始
boolean workerStarted = false;
// work 添加
boolean workerAdded = false;
//就是work
Worker w = null;
try {
//创建一个worker
w = new Worker(firstTask);
//获得这个线程
final Thread t = w.thread;
//如果线程t不为空
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//线程池的锁。
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
// 是运行状态
if (rs < SHUTDOWN ||
//或者是Shutdown 但是FirstTask为空
(rs == SHUTDOWN && firstTask == null)) {
//确认t是否已经被启动
if (t.isAlive()) // precheck that t is startable
//非法线程状态
throw new IllegalThreadStateException();
//再workes中添加这个workerw
workers.add(w);
//获取这个workers的size
int s = workers.size();
//如果s已经大于最大线程池大小了
if (s > largestPoolSize)
//最大线程池数量修改
largestPoolSize = s;
//已经添加的状态
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//如果成功添加
if (workerAdded) {
//启动它
t.start();
//修改启动状态
workerStarted = true;
}
}
} finally {
//如果没启动
if (! workerStarted)
//失败这里面回去把w从wokers中remove掉
addWorkerFailed(w);
}
//返回成功或失败
return workerStarted;
}

Work

线程通讯