第6章| Java并发容器和框架

ConcurrentHashMap的实现原理与使用

ConcurrentHashMap是线程安全且高效的HashMap

为什么使用ConcurrentHashMap

原因:

  • 线程不安全的HashMap可能导致程序死循环
    比如HashMap在并发执行put操作时会引发死循环,因为多线程操作会导致HashMap的Entry链表形成环形结构,此时Entry的next节点永远不为空,会导致死循环获取Entry

  • HashTable虽然是线程安全的,但是效率低下
    HashTable使用synchronized保证线程安全,在线程竞争激烈情况下HashTable效率低下。

  • ConcurrentHashMap的分段锁技术提升了并发访问率
    在HashTable中,当出现并发访问时所有线程都得竞争同一把锁。而ConcurrentHashMap里有多把锁,每把锁锁定其中一部分数据,那么就可以在并发访问时提高访问效率。

  • 查看HashMap、HashTable和ConcurrentHashMap底层源码实现

ConcurrentHashMap的结构

  1. ConcurrentHashMap由Segment(可重入锁)数组结构和HashEntry数组结构构成,一个ConcurrentHashMap里有一个Segment数组。
  2. Segment数组的结构与HashMap相似,是一种数组和链表结构。一个Segment包含一个HashEntry数组,HashEntry是一个链表结构的元素。对HashEntry进行修改时,必须先获得他对应的Segment锁,具体结构图如下:

ConcurrentHashMap结构图

ConcurrentHashMap的初始化

通过源码可知,Segments数组的长度ssize是通过concurrencyLevel计算得出的。为了能通过按位与的散列算法来定位Segments数组的索引,必须保证segments数组的长度是2的N次方,所以必须计算出一个大于或等于concurrentLevel的最小的2的N次方作为segments数组的长度。
其中计算公式为hash%length==hash&(length-1),公式成立的前提为长度为2的N次方

定位Segment

  1. 在插入和读取元素时,必须先通过散列算法定位到Segment,然后使用变种hash算法对元素的hashCode进再散列
  2. 进行再散列的目的是减少散列冲突,使元素能够更均匀分布在不同的Segment上,提高容器的存取效率。

ConcurrentLinkedQueue

基于链接节点的无届线程安全队列,采用“先进先出”的规则排序节点。

入队列

  1. 将入队节点设置成为当前队列尾节点的下一个节点;
  2. 更新tail节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点;如果tail节点的next节点为空,则将入队节点设置成tail的next节点。也就是说tail节点不总是尾节点。

出队列

  1. 出队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。
  2. 当head节点有元素,直接弹出head节点里的元素,而且不更新head节点;当head节点没有元素时,出队操作才更新head节点。
  3. 获取head节点的元素,判断是否为空,如果为空,表示另一个线程已经进行了一次出队列操作将该节点的元素取走,如果不为空,则使用CAS将head节点设置为null并返回head节点的元素。

阻塞队列

阻塞队列常用于生产者和消费者场景,生产者是向队列添加元素的线程,消费者是从队列里取元素的线程。

阻塞队列不可用时,支持阻塞的插入和移除方法提供了以下四种处理方式

处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:队列满时,继续插入数据则抛出IllegalStateException;队列空时,从队列取数据则抛出NoSuchElementException;
  • 返回特殊值:插入元素时,成功返回true;移除元素时,如果没有则返回null。
  • 一直阻塞:队列满时,继续插入数据会一直阻塞生产者线程直到队列可用或响应中断退出;队列空时,从队列取数据会阻塞消费者线程直到队列不空。
  • 超时退出:队列满时,继续插入数据会阻塞生产者线程一段时间,超过指定时间则退出。

Java里的阻塞队列

  1. ArrayBlockingQueue
    用数组实现的有界阻塞队列,FIFO;
  2. LinkedBlockingQueue
    用链表实现的无界阻塞队列,默认和最大长度为Integer.MAX_VALUE,FIFO;
  3. PriorityBlockingQueue
    支持优先级的无界阻塞队列,默认自然顺序排序,也可以自定义顺序规则;
  4. DelayQueue
    支持延时获取元素的无界阻塞队列,在创建元素时可指定延时,只有延迟期满才能从队列中提取元素。
    适用场景:
    ① 设计缓存系统:用DelayQueue保存缓存元素的有效期,用一个循环线程循环查询DelayQueue,能获取到说明缓存有效期到了;
    ② 调度定时任务:用DelayQueue保存当天要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。
  5. SynchronousQueue
    不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素,默认用非公平性性策略访问队列。
  6. LinkedTransferQueue
    由链表结构组成的无界阻塞TransferQueue队列,主要多了tryTransfer和transfer方法。
    ① transfer方法:如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻传输给消费者;如果没有消费者在等待接收元素,transfer方法将元素存放在队列tail节点,并等到该元素被消费者消费了才返回。
    ② tryTransfer方法:如果当前有消费者正在等待接收元素,tryTransfer方法可以把生产者传入的元素立刻传输给消费者;如果没有消费者在等待接收元素,tryTransfer方法将元素存放在队列tail节点,并返回false。
    区别:tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法必须等到消费者消费了才返回。
  7. LinkedBlockingDeque
    由链表结构组成的双向阻塞队列,可以从队列的两端插入和移除元素。

阻塞队列的实现原理

使用等待 / 通知模式实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private final Condition notEmpty;
private final Condition notFull;

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
}
  • 当生产者往满的队列中添加元素时会阻塞生产者(await),当消费者消费了一个队列中的元素后,会通知生产者当前队列可用(signal)。
  • 当消费者从空的队列中取元素时会阻塞消费者(await),当生产者添加了一个元素后,会通知消费者当前队列可用(signal)。

Fork/Join 框架

Fork/Join 框架是Java 7 提供的一个用于并行执行任务的框架,把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果。

  1. 分割任务;
  2. 执行任务并合并结果。
码哥 wechat
欢迎关注个人订阅号:「码上行动GO」