欧洲美妇做爰网站,梅州南站,360个人网站怎么推广,效果图参考网站有哪些一、自定义线程池
1#xff09;背景#xff1a;
在 QPS 量比较高的情况下#xff0c;我们不可能说所有的访问都创建一个线程执行#xff0c;这会导致内存占用过高#xff0c;甚至有可能出现 out of memory另外也要考虑 cpu 核数#xff0c;如果请求超过了cpu核数#…一、自定义线程池
1背景
在 QPS 量比较高的情况下我们不可能说所有的访问都创建一个线程执行这会导致内存占用过高甚至有可能出现 out of memory另外也要考虑 cpu 核数如果请求超过了cpu核数那么有一部分线程就会收到限制然后等cpu时间片结束会进行一个上下文切换频繁的上下文也会影响性能。
总和以上我们可以引出线程池的概念也就是结合前面的享元模式创建一批线程复用线程既可以较少内存占用又可以较少线程上下文切换。 2任务数量放不满 Blocking Queue
import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;Slf4j(topic c.TestPool)
public class Test1AndPoolHandle1 {public static void main(String[] args) {ThreadPool threadPool new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);for (int i 0; i 5; i ) {int j i;threadPool.execute(() - {log.debug({}, j);});}}
}
Slf4j(topic c.TestPool)
class ThreadPool {// 线程池大小private int capacity;// 队列private BlockQueueRunnable blockQueue;// 线程队列private HashSetWorker workers new HashSet();private long timeout;private TimeUnit timeUnit;public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize) {this.timeout timeout;this.timeUnit unit;this.capacity capacity;blockQueue new BlockQueue(queueSize);}// 执行任务public void execute(Runnable runnable) {synchronized (workers) {if (workers.size() capacity) {Worker worker new Worker(runnable);log.debug(开始创建线程...{}, worker);workers.add(worker);worker.start();} else {log.debug(线程已满...加入队列);blockQueue.put(runnable);}}}class Worker extends Thread{private Runnable runnable;public Worker(Runnable runnable) {this.runnable runnable;}Overridepublic void run() {//执行任务// 1 当任务不为空是执行任务// 2 当任务执行完毕接着从队列中获取任务执行//这里有赋值不用上锁是因为poll跟take实现都加了锁// while ((runnable ! null) || (runnable blockQueue.take()) ! null) {while ((runnable ! null) || (runnable blockQueue.poll(timeout, timeUnit)) ! null) {try {log.debug(正在执行。。。{}, runnable);runnable.run();} catch (Exception e) {throw new RuntimeException(e);} finally {runnable null;}}synchronized (workers) {log.debug({} 被移除了, this);workers.remove(this);}}}
}Slf4j(topic c.TestPool)
class BlockQueueT {// 队列大小private int capacity;//存放队列private DequeT deque new ArrayDeque();// 一把锁private ReentrantLock lock new ReentrantLock();// 等待条件private Condition fullCondition lock.newCondition();private Condition emptyCondition lock.newCondition();public BlockQueue(int capacity) {this.capacity capacity;}public int size() {return deque.size();}// 获取任务 设置超时时间public T poll(long time, TimeUnit unit) {try {lock.lock();long nanos unit.toNanos(time);while (deque.isEmpty()) {try {if (nanos 0) {log.debug(超时结束);return null;}//进行等待nanos emptyCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 获取任务public T take() {try {lock.lock();while (deque.isEmpty()) {try {emptyCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}// log.debug(删除队列头...);return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 阻塞添加存放任务public void put(T runnable) {try {lock.lock();while (deque.size() capacity) {log.debug(等待加入任务队列 {} ..., runnable);try {fullCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug(存放到队列尾);deque.addLast(runnable);emptyCondition.signal();} finally {lock.unlock();}}
}2任务数量放满 Blocking Queue (改进
1. 带超时时间的 阻塞添加
// 带超时时间的存放任务
public boolean offer(T runnable, long time, TimeUnit unit) {try {lock.lock();long nanos unit.toNanos(time);while (deque.size() capacity) {log.debug(等待加入任务队列 {} ..., runnable);try {if (nanos 0) {log.debug(等待加入任务队列超时 {} ..., runnable);return false;}nanos fullCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug(线程已满...加入队列{},runnable);deque.addLast(runnable);emptyCondition.signal();return true;} finally {lock.unlock();}
}
2. 设计模式 之 策略模式
如果存放的队列已经满了之前的版本就进入阻塞死等了那么我们可以改进的方式
带超时时间的等待让调用者放弃执行让调用者自己执行让调用者抛出异常... 等
有很多方式如果写死的话就会有很多else if那么定死了没有扩展性所以使用策略模式将权利下放利用函数式接口让调用者传进来拒绝策略。
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Slf4j(topic c.TestPool)
public class Test1AndPoolHandle1 {public static void main(String[] args) {ThreadPool threadPool new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) - {// 1、 死等//queue.put(task);// 2、 带超时时间等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3、 让调用者放弃
// log.debug(放弃任务{}, task);// 4、 让调用者抛出异常
// throw new RuntimeException(跑出异常 task);// 5、 让调用者自己执行task.run();});for (int i 0; i 3; i ) {int j i;threadPool.execute(() - {try {Thread.sleep(1000L);log.debug({}, j);} catch (InterruptedException e) {throw new RuntimeException(e);}});}}
}FunctionalInterface
interface RejectFunctionT {void reject(BlockQueueT tBlockQueue, T task);}
Slf4j(topic c.TestPool)
class ThreadPool {// 线程池大小private int capacity;// 队列private BlockQueueRunnable blockQueue;// 线程队列private HashSetWorker workers new HashSet();private long timeout;private TimeUnit timeUnit;private RejectFunctionRunnable rejectFunction;public ThreadPool(int capacity, long timeout, TimeUnit unit, int queueSize, RejectFunctionRunnable rejectFunction) {this.timeout timeout;this.timeUnit unit;this.capacity capacity;blockQueue new BlockQueue(queueSize);this.rejectFunction rejectFunction;}// 执行任务public void execute(Runnable runnable) {synchronized (workers) {if (workers.size() capacity) {Worker worker new Worker(runnable);log.debug(开始创建线程...{}, worker);workers.add(worker);worker.start();} else {
// blockQueue.put(runnable);blockQueue.tryPut(rejectFunction, runnable);}}}class Worker extends Thread{private Runnable runnable;public Worker(Runnable runnable) {this.runnable runnable;}Overridepublic void run() {//执行任务// 1 当任务不为空是执行任务// 2 当任务执行完毕接着从队列中获取任务执行//这里有赋值不用上锁是因为poll跟take实现都加了锁
// while ((runnable ! null) || (runnable blockQueue.take()) ! null) {while ((runnable ! null) || (runnable blockQueue.poll(timeout, timeUnit)) ! null) {try {log.debug(正在执行。。。{}, runnable);runnable.run();} catch (Exception e) {throw new RuntimeException(e);} finally {runnable null;}}synchronized (workers) {log.debug({} 被移除了, this);workers.remove(this);}}}
}Slf4j(topic c.BlockQueue)
class BlockQueueT {// 队列大小private int capacity;//存放队列private DequeT deque new ArrayDeque();// 一把锁private ReentrantLock lock new ReentrantLock();// 等待条件private Condition fullCondition lock.newCondition();private Condition emptyCondition lock.newCondition();public BlockQueue(int capacity) {this.capacity capacity;}public int size() {return deque.size();}// 获取任务 设置超时时间public T poll(long time, TimeUnit unit) {try {lock.lock();long nanos unit.toNanos(time);while (deque.isEmpty()) {try {if (nanos 0) {log.debug(超时结束);return null;}//进行等待nanos emptyCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 获取任务public T take() {try {lock.lock();while (deque.isEmpty()) {try {emptyCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}
// log.debug(删除队列头...);return deque.removeFirst();} finally {fullCondition.signal();lock.unlock();}}// 存放任务public void put(T runnable) {try {lock.lock();while (deque.size() capacity) {log.debug(等待加入任务队列 {} ..., runnable);try {fullCondition.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug(线程已满...加入队列{},runnable);deque.addLast(runnable);emptyCondition.signal();} finally {lock.unlock();}}// 带超时时间的存放任务public boolean offer(T runnable, long time, TimeUnit unit) {try {lock.lock();long nanos unit.toNanos(time);while (deque.size() capacity) {log.debug(等待加入任务队列 {} ..., runnable);try {if (nanos 0) {log.debug(等待加入任务队列超时 {} ..., runnable);return false;}nanos fullCondition.awaitNanos(nanos);} catch (InterruptedException e) {throw new RuntimeException(e);}}log.debug(线程已满...加入队列{},runnable);deque.addLast(runnable);emptyCondition.signal();return true;} finally {lock.unlock();}}public void tryPut(RejectFunctionT rejectFunction , T runnable) {lock.lock();try {if (deque.size() capacity) {rejectFunction.reject(this, runnable);} else {log.debug(线程已满...加入队列{},runnable);deque.addLast(runnable);emptyCondition.signal();}} finally {lock.unlock();}}
}二、ThreadPoolExecutor 1线程池状态
ThreadPoolExecutor 使用 int 的高3位来表示线程池状态低29位表示线程数量。 从数字上比较 RUNNING SHUTDOWN STOP TIDYING TERMINATED
为什么111 比 00小因为是一个整数的高3位所以第一位为1表示负数。 问题
为什么使用一个整数表示线程池状态和线程数量不用两个变量来表示呢
答
因为要保证在对 线程池状态 和 线程数量 进行赋值操作时的原子性
那么他存在一个变量中就只需要一次cas操作就可以保证原子性既可以保存状态信息也可以保存数量信息。
如果存在两个变量中就需要两次cas操作才可以保证原子性才可以对两个变量进行赋值操作。 2构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) 假设 核心线程 2 救急线程1 阻塞队列2
一开始任务阻塞队列空闲线程也空闲任务1 创建 核心线程1 执行任务2 创建 核心线程2 执行核心线程满了任务3 进入 阻塞队列核心线程满了任务4 进入 阻塞队列核心线程满了 阻塞队列满了任务5 创建 救急线程1 执行核心线程满了 阻塞队列满了救急线程满了任务6执行拒绝策略
救急线程空闲一定时间会自动销毁等待下次用到在创建
核心线程一直运行。 触发救急线程创建的前提 是配合有界队列来使用 如果线程到达 最大线程数量 仍然有新的任务这时会执行拒绝策略。
拒绝策略 jdk 提供了 4 种实现其他著名框架也提供了实现。 3newFixedThreadPool 固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable());
}
//可以给第二参数传线程工厂提供创建新线程的名字。
特点
核心线程 最大线程数 没有救急线程因此无需超时时间阻塞队列是无界的没有指定大小可以放任务数量任务
评价
适用于任务量已知相对耗时的任务 4newCachedThreadPool (带缓冲线程池
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueueRunnable(),threadFactory);
}
特点
核心线程为0最大线程为Integer.MAX_VALUE救急线程的空闲生存时间 60s 也就是全部都是救急线程60s后可以回收救急线程可以无限创建
队列采用 SynchronousQueue 特点是没有容量没有线程来获取的时候放不进去任务类似于 一手交钱、一手交货刚好当前线程池全部都是救急线程每个任务都会创建一个新的线程来获取任务合适
评价
整个线程池会不断创建新的线程任务量有多少线程就会创建多少然后空闲60s后释放线程
适用于任务量比较密集但是任务执行时间比较短的情况 5newSingleThreadExecutor (单线程线程池
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable()));
}
使用场景
希望多个任务串行执行。线程数固定为1当任务数量多于1时会存放到阻塞队列中无界。任务执行完毕线程也不会被摧毁释放。
那么跟 固定大小线程池设置成1 或者 自己创建一个线程执行 有什么区别呢
自己创建一个线程执行如果中间发生了异常那么线程销毁了阻塞队列中的任务就不执行了没有什么补救措施但是如果单线程线程池还会创建一个新的线程保证正常工作。newFixedThreadPool(1) 固定大小线程池设置成1返回的对象还是可以修改的 返回的对象还是ThreadPoolExecutor对象可以强转后调用setCorePoolSize 等方法修改。
单线程线程池 newSingleThreadExecutor() 始终线程数量为1不能修改 返回的对象 通过new FinalizableDelegatedExecutorService进行了包装应用了一种装饰器模式只对外暴露 ExecutorService 接口 因此不能调用 ThreadPoolExecutor 中特有方法修改。 6提交任务 1. ALL 2. submit 演示
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
Slf4j(topic c.pool)
public class Test2SubmitShow {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool Executors.newFixedThreadPool(2);//带有返回结果的submit执行可以使用lambda表达式FutureString future pool.submit(new CallableString() {Overridepublic String call() throws Exception {log.debug(执行);Thread.sleep(1000);return ok;}});//获取值会等待submit执行完相当于保护性暂停模式log.debug({}, future.get());}
}
3. invokeAll 演示
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
Slf4j(topic c.pool)
public class Test2Show {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool Executors.newFixedThreadPool(3);// 调用 invokeAll 执行一个链表中的所有任务然后将所有返回值收集成一个list 类型是Future// 可以设置超时时间 如果规定时间内没有执行完所有任务直接终止ListFutureObject futures pool.invokeAll(Arrays.asList(() - {log.debug(begin);Thread.sleep(1000);return 1;},() - {log.debug(begin);Thread.sleep(500);return 2;},() - {log.debug(begin);Thread.sleep(2000);return 3;}));// 便利所有Future等待所有执行完毕之后输出// 如果线程数量是2 任务3进入队列等 所以等待时间 500 2000// 如果线程数量是3 任务3直接与信工 等待时间 2000futures.forEach(f - {try {log.debug({}, f.get());} catch (ExecutionException | InterruptedException e) {throw new RuntimeException(e);}});}
}4. invokeAny 演示
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
Slf4j(topic c.pool)
public class Test2SubmitShow {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建固定大小线程池ExecutorService pool Executors.newFixedThreadPool(3);// 调用 invokeAny 执行一个链表中的所有任务然后将执行最快的一个任务的返回值返回然后其他任务不执行了// 如果只有一个线程 那么只有第一个任务被执行所以只会返回第一个任务的结果String result pool.invokeAny(Arrays.asList(() - {log.debug(begin);Thread.sleep(1000);log.debug(end);return 1;},() - {log.debug(begin);Thread.sleep(500);log.debug(end);return 2;},() - {log.debug(begin);Thread.sleep(2000);log.debug(end);return 3;}));//最后拿到 的是2 然后1 跟 3的end没输出log.debug({}, result);}
}7关闭线程池
shutdown showdownNow 其他方法 三、设计模式 之 异步模式 之 工作线程
1 定义
让有限的工作线程Worker Thread可以轮流异步处理无限多的任务也可以归类为分工模式。他的典型实现是 线程池体现了经典设计模式中的享元模式。
例如餐厅的服务员线程轮流处理每位客人的点餐任务如果为每位客户分配一位服务员那么成本就太高了
对比另外一种多线程设计模式Thread-Pre-Message 这个是一个任务创建一个新线程处理) 注意不同任务类型应该使用不同的线程池这样能够避免饥饿并能提升效率。
例如如果一个餐厅的工人既要招呼客人任务类型A又要后厨做菜任务类型B) 显然效率很低因为他全都要干
这时候如果分成招呼客人一批人线程池A做菜一批人线程池B更为合理更细致的分工效率跟更高。 2饥饿
这里的饥饿跟 synchronized 那里的饥饿有点区别这里不是因为锁导致的而是因为线程不足导致的现象类似于死锁但是不是锁导致的。
一般是固定大小线程池有饥饿现象。
有两个工人在同一个线程池中表示两个线程。两个工人是全能的既能做饭又能处理点餐有以下两种阶段情景 1、有一个客人点餐工人A处理点餐等待上菜工人B做菜工人A上菜一气呵成分工合作很顺利2、有两个客人同时点餐工人A和工人B同时处理点餐同时等待上菜没有额外工人线程去做菜这时候就死锁了。 情况1 import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;Slf4j(topic c.pool)
public class Test3Executor {static ListString list Arrays.asList(红烧牛肉1, 红烧牛肉2, 牛肉3, 红烧牛肉4);public static void main(String[] args) {ExecutorService pool Executors.newFixedThreadPool(2);// 有人点餐执行任务pool.execute(() - {log.debug(点餐);FutureString future pool.submit(new CallableString() {Overridepublic String call() throws Exception {log.debug(做菜);Thread.sleep(1000);return list.get(0);}});try {log.debug(做完{}, future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:17:54.256 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:17:54.261 [pool-1-thread-2] DEBUG c.pool - 做菜
// 21:17:55.262 [pool-1-thread-1] DEBUG c.pool - 做完红烧牛肉1
情况2
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;Slf4j(topic c.pool)
public class Test3Executor {static ListString list Arrays.asList(红烧牛肉1, 红烧牛肉2, 牛肉3, 红烧牛肉4);public static void main(String[] args) {ExecutorService pool Executors.newFixedThreadPool(2);// 有人点餐执行任务pool.execute(() - {log.debug(点餐);FutureString future pool.submit(new CallableString() {Overridepublic String call() throws Exception {log.debug(做菜);Thread.sleep(1000);return list.get(0);}});try {log.debug(做完{}, future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});// 有人点餐执行任务pool.execute(() - {log.debug(点餐);FutureString future pool.submit(new CallableString() {Overridepublic String call() throws Exception {log.debug(做菜);Thread.sleep(1000);return list.get(0);}});try {log.debug(做完{}, future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:19:36.930 [pool-1-thread-1] DEBUG c.pool - 点餐
// 21:19:36.930 [pool-1-thread-2] DEBUG c.pool - 点餐3解决饥饿
解决1不全面
增加核心线程来解决但是如果同时比较多还是没有办法解决。
解决2全面
不同功能的线程放到不同线程池
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;Slf4j(topic c.pool)
public class Test3Executor {static ListString list Arrays.asList(红烧牛肉1, 红烧牛肉2, 牛肉3, 红烧牛肉4);public static void main(String[] args) {ExecutorService cookPool Executors.newFixedThreadPool(1);ExecutorService waitPool Executors.newFixedThreadPool(1);// 有人点餐执行任务waitPool.execute(() - {log.debug(点餐1);FutureString future cookPool.submit(new CallableString() {Overridepublic String call() throws Exception {log.debug(做菜1);Thread.sleep(1000);return list.get(0);}});try {log.debug(做完1{}, future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});// 有人点餐执行任务waitPool.execute(() - {log.debug(点餐2);FutureString future cookPool.submit(new CallableString() {Overridepublic String call() throws Exception {log.debug(做菜2);Thread.sleep(1000);return list.get(2);}});try {log.debug(做完2{}, future.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}});}
}
// 21:35:52.924 [pool-2-thread-1] DEBUG c.pool - 点餐1
// 21:35:52.924 [pool-2-thread-2] DEBUG c.pool - 点餐2
// 21:35:52.927 [pool-1-thread-1] DEBUG c.pool - 做菜1
// 21:35:53.928 [pool-1-thread-1] DEBUG c.pool - 做菜2
// 21:35:53.928 [pool-2-thread-1] DEBUG c.pool - 做完1红烧牛肉1
// 21:35:54.930 [pool-2-thread-2] DEBUG c.pool - 做完2牛肉34创建多少线程池合适
过小会导致不能充分利用cpu系统资源容易导致饥饿过大会导致频繁发生线程上下文切换占用更多内存
1. CPU 密集型运算
通常采用 cpu核数 1 能够实现最优的CPU利用率 1是保证当线程由于页缺失故障操作系统或其他原因导致暂停时
额外的这个线程能够顶上去保证CPU时钟周期不被浪费。常用于数据分析时。 2. I/O 密集型运算
CPU 不总是在繁忙状态例如当你执行业务计算时这时候会使用CPU资源但是当你执行I/O操作时、远程RPC调用、数据库操作时等这时候CPU资源就空闲了
你可以利用多线程提高他的利用率。
经验公式如下
线程数 核数 * 期望 CPU 利用率 * 总时间CPU计算时间 等待时间 / CPU 计算时间
例如
4 核CPU计算时间是50%其他等待时间是50%期望cpu被100%利用, 套用公式
4 * 100% * 100% / 50% 8 四、任务调度线程池
有些时候我们需要延迟执行任务(延迟几秒后执行)或者需要循环执行任务(每隔几秒执行一次)这时候就需要用到任务调度的线程池了。
JDK5的时候加入任务调度线程池
JDK5之前 可以使用java.util.Timer 来实现定时功能
1. Timer
优点简单易用
缺点所有任务都由一个独立线程执行所有任务都是串行执行的如果中间有一个任务发生异常后面的任务也不会运行了。
Slf4j(topic c.pool)
public class Test4 {public static void main(String[] args) {Timer timer new Timer();TimerTask task1 new TimerTask() {Overridepublic void run() {log.debug(task 1);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}};TimerTask task2 new TimerTask() {Overridepublic void run() {log.debug(task 2);try {Thread.sleep(0);} catch (InterruptedException e) {throw new RuntimeException(e);}}};log.debug(start);// 一秒后执行 task1 task2timer.schedule(task1, 1000);timer.schedule(task2, 1000);}
}
// 16:17:24.730 [main] DEBUG c.pool - start
// 16:17:25.734 [Timer-0] DEBUG c.pool - task 1
// 16:17:27.735 [Timer-0] DEBUG c.pool - task 2 2. ScheduledExecutorService
解决了Timer的缺点抛出异常不会终止其他任务还可以设置多个线程参与调度。
如果ScheduledExecutorService1 设置成1那么也是串行执行但是不会终止其他任务
a. 延迟任务
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;Slf4j(topic c.pool)
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool Executors.newScheduledThreadPool(2);pool.schedule(() - {log.debug(task1);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, TimeUnit.SECONDS);pool.schedule(() - {log.debug(task2);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, TimeUnit.SECONDS);}
}
// 16:23:45.724 [pool-1-thread-1] DEBUG c.pool - task1
// 16:23:45.724 [pool-1-thread-2] DEBUG c.pool - task2
b. 定时任务
ⅰ. scheduleAtFixedRate
Slf4j(topic c.pool)
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool Executors.newScheduledThreadPool(1);log.debug(start);// 延迟1秒执行然后每隔1秒执行一次// 如果任务的延迟比定时还长那么所有任务会在上一个任务执行完之后 紧接着运行pool.scheduleAtFixedRate(() - {log.debug(task1);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, 1, TimeUnit.SECONDS);}
}
// 16:33:06.263 [main] DEBUG c.pool - start
// 16:33:07.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:09.303 [pool-1-thread-1] DEBUG c.pool - task1
// 16:33:11.303 [pool-1-thread-1] DEBUG c.pool - task1
ⅱ. scheduleWithFixedDelay
Slf4j(topic c.pool)
public class Test4 {public static void main(String[] args) {ScheduledExecutorService pool Executors.newScheduledThreadPool(1);log.debug(start);// 不管任务执行多长时间都会等他工作完成之后再延迟一秒执行一次pool.scheduleWithFixedDelay(() - {log.debug(task1);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}, 1, 1, TimeUnit.SECONDS);}
}
// 16:35:55.014 [main] DEBUG c.pool - start
// 16:35:56.048 [pool-1-thread-1] DEBUG c.pool - task1
// 16:35:59.051 [pool-1-thread-1] DEBUG c.pool - task1
// 16:36:02.051 [pool-1-thread-1] DEBUG c.pool - task1
正确处理线程池异常
try catch 手动捕捉普通线程池可以使用配合feture的返回值get()等待如果有异常返回值会是异常描述 3. 线程池应用
如何让每周日 18:00:00 定时执行任务
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class TestTaskRun {// 如何让每周日 18:00:00 定时执行任务public static void main(String[] args) {// 获取当前时间LocalDateTime now LocalDateTime.now();System.out.println(now now);// 获取本周周日时间LocalDateTime time now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);// 如果当前时间 本周周四那么获取的time是不对的需要是下周的if (now.isAfter(time)) {time time.plusWeeks(1);}System.out.println(time time);long period 1000 * 60 * 60 * 24 * 7; // 一周时间long initialDelay Duration.between(now, time).toMillis(); //获取两个时间之间的毫秒差值ScheduledExecutorService pool Executors.newScheduledThreadPool(1);pool.scheduleAtFixedRate(() - {System.out.println(running...);}, initialDelay, period, TimeUnit.MILLISECONDS);}
} 五、Tomcat 线程池 浏览器发出一个请求
首先会经过LimitLatch作用是限流控制最大连接数防止太多连接把服务器压垮类似于J.U.C 中的 Semaphore 后面再讲
当没有超过最大连接数到达第二个组件 Acceptor本质上是一个线程不断循环看有没有新的连接主要就是负责接受socket连接其他不管
然后Poller 也是一个线程死循环看连接上是否有这种可读的事件发生但是也是负责监听是否有如果有交给一个Executor来执行来负责看是否有IO的读写操作
一旦Poller 发现可读会封装一个任务对象socketProcessor实现了runnable提交个Executor线程池处理
Executor 线程池中的工作线程核心救急线程模式最终负责【处理请求】 Tomcat 线程池扩展了 ThreadPoolExecutor 行为稍有不同
如果总线程数达到 最大线程数 并且队列也满了 这时不会立刻抛出 RejectedExecutionException 异常而是再次尝试将任务放入队列如果还失败才抛出 RejectedExecutionException 异常
源码 tomcat- 7.0.42 tomcat 线程池配置 tomcat 线程池相对于 java原生线程池tomcat进行了一些改进主要体现在核心在于Tomcat自己定义的任务队列。
TaskQueue —— offer(Runnable o)方法
public class TaskQueue extends LinkedBlockingQueueRunnable {private transient volatile ThreadPoolExecutor parent null;public TaskQueue() {super();}public void setParent(ThreadPoolExecutor tp) {parent tp;}Overridepublic boolean offer(Runnable o) {//we cant do any checksif (parentnull) {return super.offer(o);}//we are maxed out on threads, simply queue the object// 如果线程数量已经达到最大数量则进入队列等待执行if (parent.getPoolSizeNoLock() parent.getMaximumPoolSize()) {return super.offer(o);}//we have idle threads, just add it to the queue// 执行到这里 最大线程数 当前线程数 核心线程数。// 如果提交的任务数 当前线程数则说明存在空闲线程则提交到任务队列等待执行if (parent.getSubmittedCount() parent.getPoolSizeNoLock()) {return super.offer(o);}//if we have less threads than maximum force creation of a new thread// 执行到这说明提交的任务数已经大于当前线程数需要创建新的线程if (parent.getPoolSizeNoLock() parent.getMaximumPoolSize()) {return false;}//if we reached here, we need to add it to the queuereturn super.offer(o);}
}
submittedCountTomcat自定义的ThreadPoolExecutor中使用该字段来标记当前已提交的任务数提交加一执行完减一。poolSizeNoLock当前执行的线程数 判断当前是否需要创建空闲线程执行任务而不是全放入队列因为LinkedBlockingQueue默认是无界的但也可以设计最大数量 六、Fork / Join线程池
1概念
JDK 1.7 之后加入的新的线程池实现 使用了分治思想适用于能够进行任务拆分的 cpu 密集型运算。所谓任务拆分是一个大任务拆分算法上相同的小任务直到不能再拆分然后求解最后合并类似于归并排序裴波那契数列等思想分治思想的基础上加入了多线程每个任务的分解和合并交给不同线程执行进一步提升运算效率。默认会创建与 cpu 核数大小相同的线程池。 2使用
1. 拆分不完善导致串行
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;Slf4j(topic c.TestForkJoin)
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool poll new ForkJoinPool(4);// 运行调用很简单System.out.println(poll.invoke(new MyTask(5)));}
}Slf4j(topic c.MyTask)
// 计算机1 - n之间的和
class MyTask extends RecursiveTaskInteger {private int n;public MyTask(int n) {this.n n;}Overridepublic String toString() {return MyTask{ n n };}Overrideprotected Integer compute() {// 递归结束条件if (n 1) {log.debug(join() {}, n);return 1;}//分解成子问题MyTask myTask new MyTask(n - 1);myTask.fork(); //分配线程执行log.debug(fork() {} {}, n, myTask);//获取线程返回结果Integer join myTask.join();log.debug(join() {} {} {}, n, myTask, n join);return n join; //然后拼接子问题返回答案}
}
// 21:07:44.838 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - fork() 5 MyTask{n4}
// 21:07:44.838 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - fork() 4 MyTask{n3}
// 21:07:44.838 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - fork() 2 MyTask{n1}
// 21:07:44.838 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - fork() 3 MyTask{n2}
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 1
// 21:07:44.841 [ForkJoinPool-1-worker-0] DEBUG c.MyTask - join() 2 MyTask{n1} 3
// 21:07:44.842 [ForkJoinPool-1-worker-3] DEBUG c.MyTask - join() 3 MyTask{n2} 6
// 21:07:44.842 [ForkJoinPool-1-worker-2] DEBUG c.MyTask - join() 4 MyTask{n3} 10
// 21:07:44.842 [ForkJoinPool-1-worker-1] DEBUG c.MyTask - join() 5 MyTask{n4} 15
// 15
2. 分治思想并行
package com.itheima.test8;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;Slf4j(topic c.TestForkJoin)
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool poll new ForkJoinPool(4);// 运行调用很简单System.out.println(poll.invoke(new AddTask(1, 5)));}
}
Slf4j(topic c.AddTask)
// 计算机1 - n之间的和
class AddTask extends RecursiveTaskInteger {private int start;private int end;public AddTask(int start, int end) {this.start start;this.end end;}Overridepublic String toString() {return AddTask{ start start , end end };}Overrideprotected Integer compute() {// 递归结束条件if (start end) {log.debug(join() {}, start);return start;}if (end start 1) {log.debug(join() {}, end start);return end start;}//分解成子问题int mid (start end) / 2;AddTask addTask1 new AddTask(start, mid);addTask1.fork(); //分配线程执行log.debug(fork() {}, addTask1);AddTask addTask2 new AddTask(mid 1, end);addTask2.fork(); //分配线程执行log.debug(fork() {}, addTask2);//获取线程返回结果Integer join addTask1.join() addTask2.join();log.debug(join() {} , {} {}, start, end, join);return join; //然后拼接子问题返回答案}
}七、AQS原理
1概述 2自定义锁
自定义一个同步器组件例如可重入锁那些都是这么做。在你自定义的同步器组件内部定义一个AQS类的内部类所有操作交给这个内部类完成我们的自定义同步组件只需要调用这个内部类的方法
这里要注意acquire()是AQS实现的模板方法可以直接用tryAquire()是自定义的加锁方法 模板会调用此自定义实现方法
它的acquire、release方法是aqs的固定机制比如怎么把线程阻塞怎么加入队列等等而tryxx是自己实现的自己决定是否可重入是否可共享等等
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;Slf4j(topic c.MyLock)
public class Test6Lock {public static void main(String[] args) {MyLock lock new MyLock();new Thread(() - {lock.lock();log.debug(lock);lock.lock();log.debug(lock);try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {log.debug(unlock);lock.unlock();}}).start();
// new Thread(() - {
// lock.lock();
// log.debug(lock);
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } finally {
// log.debug(unlock);
// lock.unlock();
// }
// }).start();}
}// 自定义锁 不可重入锁
class MyLock implements Lock {class MyAQS extends AbstractQueuedSynchronizer {// 独占锁 同步器类Overrideprotected boolean tryAcquire(int arg) {// cas 操作是否可以获得锁if (compareAndSetState(0, 1)) {//然后锁的Owner 是当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}Overrideprotected boolean tryRelease(int arg) {// 释放锁之后清空OwnersetExclusiveOwnerThread(null);// 因为是volatile 所以因为写屏障所以前面不会重排指令到state后面同步到主存setState(0);return true;}Override //是否持有独占锁protected boolean isHeldExclusively() {return getState() 1;}public Condition newCondition() {return new ConditionObject();}}MyAQS sync new MyAQS();Override //加锁 (加锁失败进入队列public void lock() {sync.acquire(1);}Override // 加锁但是可以打断public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}Override // 尝试加锁一次public boolean tryLock() {return sync.tryAcquire(1);}Override // 尝试加锁带超时public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}Override //释放锁public void unlock() {sync.release(1); //释放锁底层还会唤醒等待的线程}Override //创建条件变量public Condition newCondition() {return sync.newCondition();}
}八、ReentrantLock 原理
1加锁成功流程 非公平锁实现原理
加锁解锁流程
构造器方法默认是非公平锁实现
public ReentrantLock() {sync new NonfairSync();
}
NonfairSync 继承自 AQS原理类似于刚才的AQS实现自定义锁
2加锁失败流程 第一次尝试如果已经被别人加锁了调用acquire
第二次尝试如果这时候加锁的人已经释放则不需要继续往下否则就创建AddWaiter构造一个链表(即head、tail)添加一个Node链表节点加入队列
首次创建会创建两个第一个Node称为 Dummy哑元或哨兵用来占位并不关联线程。黄色三角形表示该Node的 waitStatus 状态默认 0 为正常状态
进入acquireQueue () 逻辑
第三个尝试循环判断当前节点的前驱节点是不是头结点说明他是第一个节点有资格去再次尝试获取锁
进入 shouldParkAfterFailAcquire逻辑
把前驱节点改成状态改成 -1代表前驱节点有资格唤醒后继结点因为如果进行park 需要有人唤醒第一次会方法会返回false
第四次尝试再次尝试获取获取失败再次进入shouldParkAfterFailAcquire因为前驱节点node已经是状态-1所以这次返回true进行一个park阻塞住
3解锁竞争成功流程
如果有多个线程经历上述过程竞争失败变成这个样子 这时候 Thread - 0 释放锁进入 tryRelease 流程如果成功
设置 exclusiveOwnerThread 为 nullstate 0
然后判断头结点是不是不为空并且 waitStatus 状态不为0那么就执行 unparkSuccessor 唤醒距离头结点最近的节点然后他有资格继续尝试获取锁
获取到锁之后设置state 1 并且 exclusiveOwnerThread 当前线程然后将头结点替换成当前节点 】
4解锁竞争失败流程
因为是非公平锁此时有另外一个线程Thread4不是队列中的而是第一次访问的线程那么就会发生一个竞争
Thread-1 如果竞争失败Thread-4 的线程就会设置成 exclusiveOwnerThread state 1
Thread - 1 竞争失败再次进入 acquireQueue 流程获取锁失败重新进入 park 阻塞 公平锁的意思也就是 新线程来的时候会不会直接加入队列中。
5锁重入原理 6可打断原理
不可打断锁原理
打断不会影响到获取锁停止只有获得锁之后才会返回打断标记 可打断原理
如果打断之后直接抛出异常不会继续获取锁了 7公平锁原理 公平锁原理新线程进来获取锁的时候判断是不是队列中有线程等待来判断是否可以进行竞争锁。
8条件变量 - await
每个条件变量其实对应着一个等待队列 其实现类是 ConditionObject 开始Thread-0持有锁调用await进入 COnditionObject 的 addConditionWaiter 流程
创建 新的 Node 状态为 -2Node.CONDITION) 关联 Thread-0加入等待队列尾部 然后进入 AQS 的fullyRelease 流程 释放同步器上的锁也就是当前线程的所有锁
为什么是fullyRelease 不是 因为可能发生了锁重入所以需要释放所有锁
state 0 exclusiveOwnerThread null
然后unpark AQS 队列中的下一个节点竞争锁假设没有其他竞争线程那么Thread - 1 竞争成功。 state 1 exclusiveOwnerThread thread1
9条件变量 - signal
假设 Thread- 1 调用 signal 要来唤醒 条件变量中等待的 Thread - 0 调用 signal 方法然后唤醒 条件变量中的 第一个节点然后条件变量中的当前点 置为null
转移成功 然后加入到等待队列队尾等待
转移失败 因为有些时候有时限的等待超时或者被打断那么就没有必要加入到队列队尾 加入队列获取前一个节点然后设置state 为 -1因为最后一个是0前一个节点设置成-1表示有资格唤醒unpark新加入的线程
如果前一个节点的状态 0 表示被取消了那么需要唤醒当前线程去找能够当做前驱节点的点。