做网站要固定ip,兰州新区城乡建设管理局网站,ui中国设计网,百度网站的网址是什么3 LinkedBlockingQueue
3.1 LinkedBlockingQueue的底层实现
查看LinkedBlockingQueue是如何存储数据#xff0c;并且实现链表结构的。
// Node对象就是存储数据的单位
static class NodeE {
// 存储的数据
E item;
// 指向下一个数据的指针
NodeE next;
//…3 LinkedBlockingQueue
3.1 LinkedBlockingQueue的底层实现
查看LinkedBlockingQueue是如何存储数据并且实现链表结构的。
// Node对象就是存储数据的单位
static class NodeE {
// 存储的数据
E item;
// 指向下一个数据的指针
NodeE next;
// 有参构造
Node(E x) { item x; }
}
查看LinkedBlockingQueue的有参构造 // 可以手动指定LinkedBlockingQueue的长度如果没有指定默认为Integer.MAX_VALUE public LinkedBlockingQueue(int capacity) { if (capacity 0) throw new IllegalArgumentException(); this.capacity capacity; // 在初始化时构建一个item为null的节点作为head和last // 这种node可以成为哨兵Node // 如果没有哨兵节点那么在获取数据时需要判断head是否为null才能找next // 如果没有哨兵节点那么在添加数据时需要判断last是否为null才能找next last head new NodeE(null); } 查看LinkedBlockingQueue的其他属性
// 因为是链表没有想数组的length属性基于AtomicInteger来记录长度
private final AtomicInteger count new AtomicInteger();
// 链表的头取
transient NodeE head;
// 链表的尾存
private transient NodeE last;
// 消费者的锁
private final ReentrantLock takeLock new ReentrantLock();
// 消费者的挂起操作以及唤醒用的condition
private final Condition notEmpty takeLock.newCondition();
// 生产者的锁
private final ReentrantLock putLock new ReentrantLock();
// 生产者的挂起操作以及唤醒用的condition
private final Condition notFull putLock.newCondition();
3.2 生产者方法实现原理
3.2.1 add方法
你懂得还是走offer方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException(Queue full);
}
3.2.2 offer方法
public boolean offer(E e) {
// 非空校验
if (e null) throw new NullPointerException();
// 拿到存储数据条数的count
final AtomicInteger count this.count;
// 查看当前数据条数是否等于队列限制长度达到了这个长度直接返回false
if (count.get() capacity)
return false;
// 声明c作为标记存在
int c -1;
// 将存储的数据封装为Node对象
NodeE node new NodeE(e);
// 获取生产者的锁。
final ReentrantLock putLock this.putLock;
// 竞争锁资源
putLock.lock();
try {// 再次做一个判断查看是否还有空间
if (count.get() capacity) {
// enqueue扔数据
enqueue(node);
// 将数据个数 1
c count.getAndIncrement();
// 拿到count的值 小于 长度限制
// 有生产者在基于await挂起这里添加完数据后发现还有空间可以存储数据
// 唤醒前面可能已经挂起的生产者
// 因为这里生产者和消费者不是互斥的写操作进行的同时可能也有消费者在消费数据。
if (c 1 capacity)
// 唤醒生产者
notFull.signal();
}
} finally {
// 释放锁资源
putLock.unlock();
}
// 如果c 0代表添加数据之前队列元素个数是0个。
// 如果有消费者在队列没有数据的时候来消费此时消费者一定会挂起线程
if (c 0)
// 唤醒消费者
signalNotEmpty();
// 添加成功返回true失败返回-1
return c 0;
}
//
private void enqueue(NodeE node) {
// 将当前Node设置为last的next并且再将当前Node作为last
last last.next node;
}
//
private void signalNotEmpty() {
// 获取读锁
final ReentrantLock takeLock this.takeLock;
takeLock.lock();
try {
// 唤醒。
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
sync - wait / notify
3.2.3 offer(time,unit)方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 非空检验
if (e null) throw new NullPointerException();
// 将时间转换为纳秒
long nanos unit.toNanos(timeout);// 标记
int c -1;
// 写锁数据条数
final ReentrantLock putLock this.putLock;
final AtomicInteger count this.count;
// 允许中断的加锁方式
putLock.lockInterruptibly();
try {
// 如果元素个数和限制个数一致直接准备挂起
while (count.get() capacity) {
// 挂起的时间是不是已经没了
if (nanos 0)
// 添加失败返回false
return false;
// 挂起线程
nanos notFull.awaitNanos(nanos);
}
// 有空余位置enqueue添加数据
enqueue(new NodeE(e));
// 元素个数 1
c count.getAndIncrement();
// 当前添加完数据还有位置可以添加数据唤醒可能阻塞的生产者
if (c 1 capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();}
// 如果之前元素个数是0唤醒可能等待的消费者
if (c 0)
signalNotEmpty();
return true;
}
3.2.4 put方法
public void put(E e) throws InterruptedException {
if (e null) throw new NullPointerException();
int c -1;
NodeE node new NodeE(e);
final ReentrantLock putLock this.putLock;
final AtomicInteger count this.count;
putLock.lockInterruptibly();
try {
while (count.get() capacity) {
// 一直挂起线程等待被唤醒
notFull.await();
}
enqueue(node);
c count.getAndIncrement();
if (c 1 capacity)
notFull.signal();
} finally {putLock.unlock();
}
if (c 0)
signalNotEmpty();
}
3.3 消费者方法实现原理
从remove方法开始查看消费者获取数据的方式
3.3.1 remove方法
public E remove() {
E x poll();
if (x ! null)
return x;
else
throw new NoSuchElementException();
}
3.3.2 poll方法
public E poll() {
// 拿到队列数据个数的计数器
final AtomicInteger count this.count;// 当前队列中数据是否0
if (count.get() 0)
// 说明队列没数据直接返回null即可
return null;
// 声明返回结果
E x null;
// 标记
int c -1;
// 获取消费者的takeLock
final ReentrantLock takeLock this.takeLock;
// 加锁
takeLock.lock();
try {
// 基于DCL确保当前队列中依然有元素
if (count.get() 0) {
// 从队列中移除数据
x dequeue();
// 将之前的元素个数获取并--
c count.getAndDecrement();
if (c 1)
// 如果依然有数据继续唤醒await的消费者。
notEmpty.signal();
}
} finally {
// 释放锁资源
takeLock.unlock();
}// 如果之前的元素个数为当前队列的限制长度
// 现在消费者消费了一个数据多了一个空位可以添加
if (c capacity)
// 唤醒阻塞的生产者
signalNotFull();
return x;
}
//
private E dequeue() {
// 拿到队列的head位置数据
NodeE h head;
// 拿到了head的next因为这个是哨兵Node需要拿到的head.next的数据
NodeE first h.next;
// 将之前的哨兵Node.next置位null。help GC。
h.next h;
// 将first置位新的head
head first;
// 拿到返回结果first节点的item数据也就是之前head.next.item
E x first.item;
// 将first数据置位null作为新的head
first.item null;
// 返回数据
return x;
}
//
private void signalNotFull() {
final ReentrantLock putLock this.putLock;
putLock.lock();
try {
// 唤醒生产者。
notFull.signal();
} finally {
putLock.unlock();
}
}
3.3.3 poll(time,unit)方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 返回结果
E x null;
// 标识
int c -1;
// 将挂起实现设置为纳秒级别
long nanos unit.toNanos(timeout);
// 拿到计数器
final AtomicInteger count this.count;
// take锁加锁
final ReentrantLock takeLock this.takeLock;takeLock.lockInterruptibly();
try {
// 如果没数据进到while
while (count.get() 0) {
if (nanos 0)
return null;
// 挂起当前线程
nanos notEmpty.awaitNanos(nanos);
}
// 剩下内容和之前一样。
x dequeue();
c count.getAndDecrement();
if (c 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c capacity)
signalNotFull();
return x;
}
3.3.4 take方法
public E take() throws InterruptedException {
E x;int c -1;
final AtomicInteger count this.count;
final ReentrantLock takeLock this.takeLock;
takeLock.lockInterruptibly();
try {
// 相比poll(time,unit)方法这里的出口只有一个就是中断标记位抛出异常否则一直等待
while (count.get() 0) {
notEmpty.await();
}
x dequeue();
c count.getAndDecrement();
if (c 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c capacity)
signalNotFull();
return x;
}