Java 多线程 概念
线程就是独立执行的路径;
线程 是程序中执行的线程。 Java虚拟机允许应用程序同时执行多个执行线程。
线程分为守护线程和非守护线程,当一个进程中的非守护线程全部停止运行后守护线程也会停止运行。
线程可以通过Thread 或实现Runnable
创建线程 创建方式 继承Thread class Thread 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 class PrimeThread extends Thread { long minPrime; PrimeThread(long minPrime) { this .minPrime = minPrime; } public void run () { . . . } }
1 2 PrimeThread p = new PrimeThread(143 ); p.start();
实现 Runnable接口 Runnable实现代码
1 2 3 4 5 6 7 8 9 10 11 class PrimeRun implements Runnable { long minPrime; PrimeRun(long minPrime) { this .minPrime = minPrime; } public void run () { . . . } }
1 2 PrimeRun p = new PrimeRun(143 ); new Thread(p).start();
Runnable 的使用方式与Thread不同的地方在于,同一个对象可以启动多个线程。在多线程的情况下使用Runnable有优势。
实现 Callable接口
使用Executors.newFixedThreadPool(1)
来创建一个服务ExecutorService
提交执行 Future<Bolean> = ser.submit( es)
获取结果 Boolean r1 = result1.get()
关闭服务 ser.shutdownNow();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package org.akachi;import com.sun.org.apache.xpath.internal.operations.Bool;import java.util.concurrent.*;import static java.util.concurrent.Executors.*;public class CallableTest implements Callable <String > { int i = 100 ; public static void main (String[] args) { CallableTest callableTest = new CallableTest(); ExecutorService executorService = newFixedThreadPool(4 ); Future<String> stringFuture= executorService.submit(callableTest); try { System.out.println(stringFuture.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public String call () throws Exception { while (i>0 ){ System.out.println(Thread.currentThread().getName()+"-->" +i--); Thread.sleep(10 ); } return "threadName:" +Thread.currentThread().getName(); } }
Callable 与Runnable都是为了执行线程而设计的,不同的地方在于Callable允许返回值并且可以将异常传递回去而Runnable只能执行。
可以返回值
可以抛出异常
使用call替换run方法。
使用FutureTask來調用Runnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package akache.org.study.thread;import java.util.concurrent.*;import static java.util.concurrent.Executors.*;public class CallableTest { public static void main (String[] args) { FutureTask<String> ft = new FutureTask<String>(() -> { Thread.sleep(1100 ); if (Math.random() > 0.5d ) { Exception e = new Exception("fuck" ); e.printStackTrace(); throw e; } else { System.out.println("ok" ); return "ok" ; } }); new Thread(ft).start(); while (!ft.isDone()) { System.out.println("等待完成" ); try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } try { System.out.println(ft.get()); } catch (Exception e) { e.printStackTrace(); } } }
Thread 使用代理模式来实现的 代理模式:
代理模式是指我们写的方法该写什么就写什么,比如Thread,我们不需要关系下列代码是要在线程中使用还是在原线程中使用,该写什么就写什么。而在系统中创建一个新线程的工作完全交给Thread类来完成。(与计算机交互的工作很多,但是我们在写线程的时候完全没做过这些工作。)
线程状态
创建状态
就绪状态
阻塞状态
运行状态
有可能进入就绪状态,因为CPU去执行别的线程了。
死亡状态
线程方法
线程同步机制 synchronized 通过队列和锁来完成同步执行。
synchronized 无论如何都会有一个锁对象
如果要锁请锁
死锁 一个线程需要两个以上的锁资源,但是无法获得全部并且持有至少一个其他线程所需要的所资源。互相均无法完成并持续持有锁资源的情况。
解决:
尽量不要在获取一个所资源后再去获得其他锁资源,获得的锁资源最好是并行的。
四个必要條件:
互斥条件:每个资源只能被一个进程使用。
请求与保持:一个进程因请求资源而阻塞时不会释放其他资源。
不剥夺條件:为使用完之前不可剥夺线程的资源。
循环等待條件:若干个资源形成一种首尾相接的循环等待关系。
Lock显示同步锁 接口 java.util.concurrent.locks.Lock
实现类:
方法:
lock.lock()
lock.unlcok();
lock与synchronized类似
lock对象相当于synchronize(这里)的对象。
所以同样要注意lock的生命周期。
线程通讯
并发协作模型 管程法 消费者生产者模型
通过对于缓冲区的控制来控制生产者线程和消费者线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package org.akachi;public class TestPC { public static void main (String[] args) { SyncContainer container = new SyncContainer(); new Producer(container).start(); new Consumer(container).start(); } } class Producer extends Thread { private SyncContainer container; Producer(SyncContainer container){ this .container=container; } @Override public void run () { for (int i=0 ;i<100 ;i++){ this .container.push(new Chicken().setProductId(i)); System.out.println("Producer-->" + i); } } } class Consumer extends Thread { private SyncContainer container; Consumer(SyncContainer container){ this .container=container; } @Override public void run () { for (int i=0 ;i<100 ;i++) { System.out.println("consumer-->" + container.get().getProductId()); } } } class Chicken { private Integer productId; public Integer getProductId () { return productId; } public Chicken setProductId (Integer productId) { this .productId = productId; return this ; } } class SyncContainer { Chicken[] chickens = new Chicken[10 ]; private int count; public synchronized void push (Chicken chicken) { System.out.println("SyncContainer.put.count-->" +count); if (count == chickens.length) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } chickens[count] = chicken; count++; this .notifyAll(); } public synchronized Chicken get () { System.out.println("SyncContainer.get.count-->" +count); if (count==0 ){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } count--; Chicken chicken = chickens[count]; this .notifyAll(); return chicken; } }
信号模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package org.akachi;public class TestPc2 { public static void main (String[] args) { TV tv = new TV(); new Player(tv).start(); new Watcher(tv).start(); } } class Player extends Thread { private TV tv; Player(TV tv) { this .tv = tv; } @Override public void run () { for (int i = 0 ; i < 100 ; i++) { String name = "節目" +i; System.out.println("錄電視:" +name); this .tv.play(name); } } } class Watcher extends Thread { private TV tv; Watcher(TV tv) { this .tv = tv; } @Override public void run () { while (true ) { System.out.println("看電視:" + tv.watch()); } } } class TV { boolean flag = false ; private String name; synchronized String watch () { if (!flag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String name = this .name; this .name = "沒有節目" ; flag = !flag; this .notifyAll(); return name; } synchronized void play (String name) { if (flag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this .name = name; flag = !flag; this .notifyAll(); } }
相对而言比较简单。
线程池 降低线程创建和销毁的开销。
提高响应速度
降低资源消耗
便于管理
corePoolSize: 核心线程池大小
maximumPoolSize 最大线程数
keepAliveTime: 多久会终止无任务的线程
在Callable时就用到了线程池
创建和使用 ExecutorService
使用Executors.newFixedThreadPool
来创建一个线程池。
创建:
fixed 固定数量
cached 浮动数量 60秒未使用销毁
single 确保线程顺序单个的执行
scheduled 定时周期性调度任务
newWorkStealingPool 窃取工池
可以把工作拆分
实现的是ForkJoinPool
使用:
提交Callable接口的实现类使用submit();
提交Runnable接口的实现类使用execute();
并发编程 java帮助文档包:
java.util.concurrent 并发编程
java.util.concurrent.atomic 原子性
java.util.concurrent.locks 锁
java.util.function 函数式编程
在使用Thread没有返回值。
Runnable比Callable性能低。
进程和线程
查看CPU核数:
1 System.out.println(Runtime.getRuntime().availableProcessors());
知识 线程有几个状态:
wait/seep的区别
技巧:
Lock 接口 使用方法:
1 2 3 Lock l = ...; l.lock(); try { } finally { l.unlock(); }
实现类:
tryLoc方法 锁是什么? 生产者和消费者模型 在操作资源时需要用“判断等待、业务、通知”这样的模型来执行。
生产者和消费者模型中的if判断需要使用while循环。
虛假唤醒 通过while循环来判断
1 2 3 4 5 6 7 while (productCount==0 ){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
wait 1 2 public final void wait() throws InterruptedException
导致当前线程等待,直到另一个线程调用该对象的notify()
方法或notifyAll()
方法。换句话说,这个方法的行为就好像简单地执行呼叫wait(0)
。
当前的线程必须拥有该对象的显示器。 该线程释放此监视器的所有权,并等待另一个线程通知等待该对象监视器的线程通过调用notify
方法或notifyAll
方法notifyAll
。 然后线程等待,直到它可以重新获得监视器的所有权并恢复执行。
像在一个参数版本中,中断和虚假唤醒是可能的,并且该方法应该始终在循环中使用:
1 2 3 4 5 synchronized (obj) { while (<condition does not hold>) obj.wait(); ... // Perform action appropriate to condition }
该方法只能由作为该对象的监视器的所有者的线程调用。有关线程可以成为监视器所有者的方式的说明,请参阅notify
方法。
使用Lock 替代Synchronized
可以声明多个condition指定启用某个方法来达到调度线程的顺序。
关于锁的8个问题:
并发集合 CopyOnWirteArrayList 使用CopyOnWrite cow的ArrayList
COW 计算机程序设计领域的一种优化策略。用于提高效率
在写入时避免覆盖造成问题。
add源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
常用的辅助类 CountDownLatch 减法计数器,创建CountDOwnLatch时指定一个数字。
使用CountDownLatch.await()后在其他工作线程调用CountDownLatch.countDown方法进行减法,减完时线程就会启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package akache.org.study.thread;import java.util.concurrent.CountDownLatch;public class CountDownLatchTest { public static void main (String[] args) { CountDownLatch driveLock = new CountDownLatch(10 ); new Thread(() -> { try { driveLock.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("司机发车。" ); }, "dirive" ).start(); new Thread(() -> { int i = 1 ; dirveOver: while (true ) { if (driveLock.getCount() == 0 ) { System.out.println(String.format("码头工人装满了货车第%d个箱子不用装了。等待货车就位;" , i)); i = 0 ; break dirveOver; } System.out.println(String.format("码头工人装载了第%d个箱子。" , i++)); driveLock.countDown(); } }).start(); } }
CyclicBarrier 创建CyclicBarrier可以指定number和一个Runnable
执行nubmer次计awiat会自动解锁继续执行。并且会执行创建CyclicBarrier时创建的线程
Semaphore 创建Semaphore时指定许可数量,线程先获得许可数量然后执行。
执行完毕后释放许可数量其他线程再进入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package akache.org.study.thread;import java.util.List;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.Semaphore;public class SemaphoreTest { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); final List list = new CopyOnWriteArrayList(); for (int i = 0 ;i<1000 ;i++){ new Thread(()->{ try { semaphore.acquire(); list.add(Thread.currentThread().getName()); System.out.println(String.format("%s入场" ,Thread.currentThread().getName())); System.out.println(String.format("当前在执行的线程集合%s" ,list)); Thread.sleep(new Double(Math.random()*50000 ).intValue()); System.out.println(String.format("%s出场" ,Thread.currentThread().getName())); list.remove(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); },String.format("第%d位" ,i)).start(); } } }
ReadWriteLock 读写锁
写个缓存案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package akache.org.study.thread;import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockTest { public static void main (String[] args) { MyCache myCache = new MyCache(); myCache.put("test" , "test" ); for (int i = 0 ; i < 40 ; i++) { final Integer count = i; new Thread(() -> { if (count == 20 ) { myCache.put("test" , "test2" ); } else { System.out.println(String.format("线程%s的读取结果是%s" ,count,myCache.get("test" ))); } },"线程" +i).start(); } } } class MyCache { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true ); Lock readLock = readWriteLock.readLock(); Lock writeLock = readWriteLock.writeLock(); Map<String,Object> cacheMap = new HashMap(); public Object get (String key) { readLock.lock(); try { System.out.println(String.format("%s开始读取" ,Thread.currentThread().getName())); Thread.sleep(new Double(Math.random()*20000 ).intValue()); return cacheMap.get(key); } catch (InterruptedException e) { e.printStackTrace(); }finally { readLock.unlock(); } return null ; } public MyCache put (String key,Object value) { writeLock.lock(); try { System.out.println(String.format("%s开始写入" ,Thread.currentThread().getName())); Thread.sleep(10000 ); System.out.println(String.format("%s写入完毕" ,Thread.currentThread().getName())); cacheMap.put(key,value); return this ; } catch (InterruptedException e) { e.printStackTrace(); }finally { writeLock.unlock(); } return null ; } }
问题
锁降级 获取写锁=>获取读锁=>释放写锁=>释放读锁
通过一上方式可以将写锁降级为读锁。
也就是说已经持有写锁的线程能够继续持有读锁,当然其他人不能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); Lock read = readWriteLock.readLock(); Lock write = readWriteLock.writeLock(); write.lock(); read.lock(); try { System.out.println("成功获取读锁在持有写锁的情况下" ); }catch (Exception e){ e.printStackTrace(); }finally { write.unlock(); System.out.println("释放写锁" ); read.unlock(); System.out.println("释放读锁" ); }
放过来先获取读锁再获取写锁就不行会死锁。
阻塞队列 BlockingQueue
通过一个共享的队列由一端进行输入一段进行输出。
放满或者取空之后都会阻塞线程。
分类
核心方法
BlockingQueueDemo 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package akache.org.study.thread;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingQueueDemo { public static void main (String[] args) { BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(20 ); new Thread(() -> { try { int i =0 ; while (true ){ Thread.sleep(new Double(Math.random()*100 ).intValue()); System.out.println(String.format("插入第%d个元素" ,i)); blockingQueue.put(i++); } } catch (InterruptedException e) { e.printStackTrace(); } }, "producer" ).start(); new Thread(() -> { try { while (true ){ Thread.sleep(new Double(Math.random()*100 ).intValue()); System.out.println(String.format("取得第%d个元素" ,blockingQueue.take())); System.out.println(String.format("队列中一共有%d个元素" ,blockingQueue.stream().count())); } } catch (InterruptedException e) { e.printStackTrace(); } }, "consumer" ).start(); } }
ThreadPool 基本概念 线程池不仅能降低调度的开销还能防止过度调度。
并且能节约销毁和创建线程带来的开销。
问题:
线程池架构
线程池使用方式
Executors.newFixedThreadPool(2)
固定的线程
Executors.newSingleThreadExecutor()
创建一个线程
Executors.newCachedThreadPool()
可变的线程
线程池的底层原理 线程池的七个参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0 }<br> * {@code keepAliveTime < 0 }<br> * {@code maximumPoolSize <= 0 }<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null
int corePoolSize
常驻线程数
int maximumPoolSize
最大线程数
long keepAliveTime
保持活跃时间
TimeUnit unit
时间单位
BlockingQueue workQueue
阻塞队列 ,workQueue the queue to use for holding tasks before they are executed.
在executed前的对象会被保存在这。 经过测试的确是执行前的。
ThreadFactory threadFactory
线程创建工厂
RejectedExecutionHandler handler
拒绝策略
线程池的工作原理
进入线程先进核心池
多了进阻塞队列BlockingQueue
BlockingQueue也满了后执行拒绝策略。
自定义线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package akache.org.study.thread;import java.util.concurrent.*;public class ThreadPoolDemo { static Integer x = 0 ; public static synchronized Integer getX () { return x++; } public static void main (String[] args) throws InterruptedException { ExecutorService executorService = new ThreadPoolExecutor(4 , 16 , 3 , TimeUnit.MINUTES, new ArrayBlockingQueue<>(128 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0 ; i < 300 ; i++) { executorService.submit(() -> { try { Thread.sleep(new Double(Math.random() * 5000 ).intValue()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行了完成了" + ThreadPoolDemo.getX() + "个线程" ); }); } } }
Executors.defaultThreadFactory(), //线程创建工厂
new ThreadPoolExecutor.CallerRunsPolicy()); //拒绝策略
分支合并框架
Fork/Join 可以拆分和合并任务。
Demo 用RecursiveTask来处理斐波那契数列。
使用多线程来解决递归问题。
值得注意的地方是这里使用compute去执行了一颗-2的树。
而另一个join的线程会去执行-1并且会拆分出一棵-2的树,然后去执行-3。
所以使用compute使当前线程执行-2的树会创造一颗相对平衡的由线程组成的树。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package akache.org.study.thread.forkjoin;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;public class ForkDemo { public static void main (String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); Integer index = 20 ; ForkJoinTask<Integer> returned = forkJoinPool.submit(new Fibonacci(index)); try { System.out.println(String.format("第%d个斐波那契数是%d" ,index,returned.get())); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } forkJoinPool.shutdown(); } } class Fibonacci extends RecursiveTask <Integer > { Integer index=0 ; Fibonacci(Integer index) { this .index=index; } @Override protected Integer compute () { if (index<=1 ){ return index; }else { System.out.println(String.format("创建了线程%s来处理第%d个数" , Thread.currentThread().getName(), index)); return new Fibonacci(index - 2 ).compute() + new Fibonacci(index - 1 ).fork().join(); } } }
异步回调
我们要获得异步执行的方法值,如果它正在执行久等他。
runAsync() 无返回值异步调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package akache.org.study.thread.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureDemo { public static void main (String[] args) { System.out.println("main start" ); CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{ try { Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步调用:" +Thread.currentThread().getName()); }); try { completableFuture.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("main end" ); } }
supplyAsync() 有返回值的异步调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package akache.org.study.thread.completable;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class SupplyCompletableFutureDemo { public static void main (String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ System.out.println("执行异步:" +Thread.currentThread().getName()); if (Math.random()>0.5d ){ throw new RuntimeException("test" ); } return 0 ; }); int z = 0 ; try { z = future.whenComplete((t,u)->{ System.out.println("t:" +t); System.out.println("u:" +u); }).get();; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("返回了" +z); } }
在调用get之前先调用whenComplete,
实际上是个BiConsumer 我们需要实现它的andThen方法。
这个方法会获得t u两个参数,
在线程执行过程中如果获得异常或返回值我们都可以在这里处理。
最终再调用get获得返回值。
线程池的源码分析 ThreadPoolExecutor 的关键参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; }
execute 执行execute做了什么
源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
实际上这些代买就是这个流程中指的部分
addWorker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null &&!workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Work 线程通讯