遵义网站建设公司,化妆品企业网站案例大全,唐山室内设计公司排名,绘图软件1 基础概念
1.1 生产者消费者概念
生产者消费者是设计模式的一种。让生产者和消费者基于一个容器来解决强耦合问题。 生产者 消费者彼此之间不会直接通讯的#xff0c;而是通过一个容器#xff08;队列#xff09;进行通讯。 所以生产者生产完数据后扔到容器中#xff0c…1 基础概念
1.1 生产者消费者概念
生产者消费者是设计模式的一种。让生产者和消费者基于一个容器来解决强耦合问题。 生产者 消费者彼此之间不会直接通讯的而是通过一个容器队列进行通讯。 所以生产者生产完数据后扔到容器中不通用等待消费者来处理。 消费者不需要去找生产者要数据直接从容器中获取即可。 而这种容器最常用的结构就是队列。
1.2 JUC阻塞队列的存取方法
常用的存取方法都是来自于JUC包下的BlockingQueue 生产者存储方法 add(E) // 添加数据到队列如果队列满了无法存储抛出异常 offer(E) // 添加数据到队列如果队列满了返回false offer(E,timeout,unit) // 添加数据到队列如果队列满了阻塞timeout时间如果阻塞一段时间依然没添加进入返回false put(E) // 添加数据到队列如果队列满了挂起线程等到队列中有位置再扔数据进去死等 消费者取数据方法 remove() // 从队列中移除数据如果队列为空抛出异常 poll() // 从队列中移除数据如果队列为空返回null么的数据 poll(timeout,unit) // 从队列中移除数据如果队列为空挂起线程timeout时间等生产者扔数据再获取 take() // 从队列中移除数据如果队列为空线程挂起一直等到生产者扔数据再获取 2 ArrayBlockingQueue
2.1 ArrayBlockingQueue的基本使用
ArrayBlockingQueue在初始化的时候必须指定当前队列的长度。 因为**ArrayBlockingQueue**是基于数组实现的队列结构数组长度不可变必须提前设置数组长度信息。 public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // 必须设置队列的长度 ArrayBlockingQueue queue new ArrayBlockingQueue(4); // 生产者扔数据 queue.add(1); queue.offer(2); queue.offer(3,2,TimeUnit.SECONDS); queue.put(2); // 消费者取数据 System.out.println(queue.remove()); System.out.println(queue.poll()); System.out.println(queue.poll(2,TimeUnit.SECONDS)); System.out.println(queue.take()); } 2.2 生产者方法实现原理
生产者添加数据到队列的方法比较多需要一个一个查看
2.2.1 ArrayBlockingQueue的常见属性
ArrayBlockingQueue中的成员变量 lock 就是一个ReentrantLock count 就是当前数组中元素的个数 iterms 就是数组本身 # 基于putIndex和takeIndex将数组结构实现为了队列结构 putIndex 存储数据时的下标 takeIndex 去数据时的下标 notEmpty 消费者挂起线程和唤醒线程用到的Condition看成sync的wait和notify notFull 生产者挂起线程和唤醒线程用到的Condition看成sync的wait和notify 2.2.2 add方法实现
add方法本身就是调用了offer方法如果offer方法返回false直接抛出异常
public boolean add(E e) {
if (offer(e))
return true;
else
// 抛出的异常
throw new IllegalStateException(Queue full);
} 2.2.3 offer方法实现
public boolean offer(E e) {
// 要求存储的数据不允许为null为null就抛出空指针
checkNotNull(e);
// 当前阻塞队列的lock锁final ReentrantLock lock this.lock;
// 为了保证线程安全加锁
lock.lock();
try {
// 如果队列中的元素已经存满了
if (count items.length)
// 返回false
return false;
else {
// 队列没满执行enqueue将元素添加到队列中
enqueue(e);
// 返回true
return true;
}
} finally {
// 操作完释放锁
lock.unlock();
}
}
//
private void enqueue(E x) {
// 拿到数组的引用
final Object[] items this.items;
// 将元素放到指定位置
items[putIndex] x;
// 对inputIndex进行操作并且判断是否已经等于数组长度需要归位if (putIndex items.length)
// 将索引设置为0
putIndex 0;
// 元素添加成功进行操作。
count;
// 将一个Condition中阻塞的线程唤醒。
notEmpty.signal();
}
2.2.4 offer(time,unit)方法
生产者在添加数据时如果队列已经满了阻塞一会。 ● 阻塞到消费者消费了消息然后唤醒当前阻塞线程 ● 阻塞到了time时间再次判断是否可以添加不能直接告辞。
// 如果线程在挂起的时候如果对当前阻塞线程的中断标记位进行设置此时会抛出异常直接结束
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 非空检验
checkNotNull(e);
// 将时间单位转换为纳秒
long nanos unit.toNanos(timeout);
// 加锁
final ReentrantLock lock this.lock;
// 允许线程中断并排除异常的加锁方式
lock.lockInterruptibly();
try {// 为什么是while虚假唤醒
// 如果元素个数和数组长度一致队列慢了
while (count items.length) {
// 判断等待的时间是否还充裕
if (nanos 0)
// 不充裕直接添加失败
return false;
// 挂起等待会同时释放锁资源对标sync的wait方法
// awaitNanos会挂起线程并且返回剩余的阻塞时间
// 恢复执行时需要重新获取锁资源
nanos notFull.awaitNanos(nanos);
}
// 说明队列有空间了enqueue将数据扔到阻塞队列中
enqueue(e);
return true;
} finally {
// 释放锁资源
lock.unlock();
}
}
2.2.5 put方法
如果队列是满的 就一直挂起直到被唤醒或者被中断
public void put(E e) throws InterruptedException {checkNotNull(e);
final ReentrantLock lock this.lock;
lock.lockInterruptibly();
try {
while (count items.length)
// await方法一直阻塞直到被唤醒或者中断标记位
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
2.3 消费者方法实现原理
2.3.1 remove方法
// remove方法就是调用了poll
public E remove() {
E x poll();
// 如果有数据直接返回
if (x ! null)
return x;
// 没数据抛出异常
else
throw new NoSuchElementException();
}
2.4.2 poll方法
// 拉取数据
public E poll() {
// 加锁操作
final ReentrantLock lock this.lock;
lock.lock();
try {
// 如果没有数据直接返回null如果有数据执行dequeue取出数据并返回
return (count 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//
// 取出数据
private E dequeue() {
// 将成员变量引用到局部变量
final Object[] items this.items;
// 直接获取指定索引位置的数据
E x (E) items[takeIndex];
// 将数组上指定索引位置设置为null
items[takeIndex] null;// 设置下次取数据时的索引位置
if (takeIndex items.length)
takeIndex 0;
// 对count进行--操作
count--;
// 迭代器内容先跳过
if (itrs ! null)
itrs.elementDequeued();
// signal方法会唤醒当前Condition中排队的一个Node。
// signalAll方法会将Condition中所有的Node全都唤醒
notFull.signal();
// 返回数据。
return x;
}
2.4.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 转换时间单位
long nanos unit.toNanos(timeout);
// 竞争锁
final ReentrantLock lock this.lock;
lock.lockInterruptibly();
try {
// 如果没有数据
while (count 0) {if (nanos 0)
// 没数据也无法阻塞了返回null
return null;
// 没数据挂起消费者线程
nanos notEmpty.awaitNanos(nanos);
}
// 取数据
return dequeue();
} finally {
lock.unlock();
}
}
2.4.4 take方法
public E take() throws InterruptedException {
final ReentrantLock lock this.lock;
lock.lockInterruptibly();
try {
// 虚假唤醒
while (count 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
2.4.5 虚假唤醒
阻塞队列中如果需要线程挂起操作判断有无数据的位置采用的是while循环 为什么不能换成if 肯定是不能换成if逻辑判断 线程A线程B线程E线程C。 其中ABE生产者C属于消费者 假如线程的队列是满的
// E拿到锁资源还没有走while判断
while (count items.length)
// A醒了
// B挂起
notFull.await();
enqueue(e)
C此时消费一条数据执行notFull.signal()唤醒一个线程A线程被唤醒 E走判断发现有空余位置可以添加数据到队列E添加数据走enqueue 如果判断是ifA在E释放锁资源后拿到锁资源直接走enqueue方法。 此时A线程就是在putIndex的位置覆盖掉之前的数据造成数据安全问题