1 多线程带来的安全风险
1.1 安全性问题
共享数据
1.2 活跃性问题
死锁,饥饿,活锁
1.3 性能问题
服务时间过长,响应时间不灵敏,吞吐率过低,资源消耗过高,可伸缩性较低
synchronized(独占锁),volatile,显式锁,原子变量
2 多线程编程
加锁的含义不仅是互斥行为,还包括内存可见性。
2.1 ThreadLocal
为每个使用该变量的线程都存有一份独立的副本。
ThreadLocalRandom 返回特定于当前线程的Random类实例。
2.2 Thread
join() 等待调用join方法的线程终止。
yield() 使当前线程重新回到可执行状态,其只能使同优先级或更高优先级的线程有执行的机会.
sleep() 在指定时间内让当前正在执行的线程暂停执行即进入阻塞状态,但不会释放“锁标志”。
interrupt()
如果线程处于被阻塞状态(例如处于sleep, wait, join 等状态),那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常,但如果wait线程没有获取到锁,不会抛出异常,获取到锁之后才会抛出异常;
如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,被设置中断标志的线程将继续正常运行,不受影响;
interrupt() 修改了被调用线程的中断状态来告知那个线程它被中断了;但并不能真正的中断线程,需要被调用的线程自己进行配合才行;当线程被阻塞的时候,比如被Object.wait, Thread.join和Thread.sleep三种方法之一时,调用它的interrput()方法会产生一个InterruptedException异常。
isInterrupted()
interrupted() 返回当前中断状态,和清除中断状态,不能恢复false为true,只能把true清除为false。
注意:捕捉到InterruptedException后,中断状态仍然为false.因此抛出时,需要再次调用interrupt()将中断状态设为true.
2.3 synchronized内部锁
wait()
notifyAll()
notify()
注:内部锁和条件的局限--不能中断一个正在试图获得该锁的线程;试图获得锁时不能设定超时;每个锁仅有单一的条件,可能是不够的。
2.4 volatile
volatile变量不会被缓存在寄存器或其他处理器不可见的地方,因此读取volatile变量总会返回最新写入的值.访问volatile变量不会执行加锁操作,不会使线程阻塞.不足以确保递增操作的原子性。
2.5 原子包
java.util.concurrent.atomic包
2.6 Lock
ReentrantLock
ReentrantReadWriteLock
tryLock() 尝试获得锁而不阻塞;也可设置一定时间的阻塞;
lockInterruptibly()
2.7 Condition
await() 将该线程放到条件的等待集中;可设置等待时间;
awaitUninterruptibly() 如果线程被中断,不会抛出InterruptedException;
signalAll() 解除该线程的等待集中的所有线程的阻塞状态;
signal() 从该条件的等待集中随机选择一个线程,解除其阻塞状态。
2.8 同步器
2.8.1 阻塞队列
支持生产者,消费者这种设计模式;可以通过阻塞队列在设计中构建资源管理机制。
put() take() 会阻塞,可队列当做线程管理工具
add() remove() element() 会抛出异常
offer() poll() peek() 可能返回false或null,给出错误提示
LinkedBlockingQueue,ArrayBlockingQueue都是FIFO队列,两者分别与LinkedList与ArrayList类似,但比同步List拥有更好的并发性能。
PriorityBlockingQueue按优先级排列的队列
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
TransferQueue -> LinkedTransfterQueue
SynchronousQueue
2.8.2 闭锁Latch
CountDownLatch:使一个或多个线程等待一组事件发生。
2.8.3 CyclicBarrier
类似于闭锁,它能阻塞一组线程直到某个事件发生;
栅是循环的。
2.8.4 Exchanger
当两个线程在同一个数据缓冲区的两个实例上工作的时候,可以使用交换器。
2.8.5 信号量Semaphore
用作互斥锁或资源池或将任一容器变成有界阻塞容器。
2.9 Future FutureTask
Future保存异步计算的结果;
FutureTask可以将Callable转换成Future和Runnable,它实现了两者的接口。
Future.get的行为取决于任务的状态。已完成,立即返回结果;否则将阻塞直到任务进入完成状态。
2.10 Executor
为什么使用线程池
一个线程池中包含了许多了准备运行的空间线程,run方法结束后,线程不会死亡,而是准备为下一个请求提供服务;
减少并发线程的数目。
ExecutorService
newCachedThreadPool 必要时创建新线程,空闲线程被保留60s
newFixedThreadPool 包含固定的线程数;空闲线程会被一直保留
newSingleThreadPool 只有一个线程
submit(Runnable r)
submit(Runnable r, T result)
submit(Callable c)
shutdown()
ScheduledExecutorService 将定时任务与线程池功能结合
newScheduledThreadPool 使用给定的线程数来调度任务
newSingleScheduledThreadPool 使用一个单独线程来调度任务
schedule(Callable<T> task, long time, TimeUnit unit) schedule(Runnable task, long time, TimeUnit unit)
scheduleAtFixedRate(Runnable task, long initalDelay, long delay, TimeUnit unit) 周期性地调用任务
scheduleWithFixedRate(Runnable task, long initalDelay, long delay, TimeUnit unit) 在一次调用完成和下一次调用开始有时间长度为delay的延迟
ScheduledExecutorService与TimerTask的对比
Timer类负责管理延迟任务以及周期任务,
然而在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。
另外,TimerTask抛出一个未检查的异常,那么Timer并不捕获异常,将终止定时线程。
线程池能够弥补这个缺陷
控制任务组
invokeAll()
invokeAny()
ExecutorCompletionService 将任务的结果按可获得顺序保存
take() 移除下一个已完成的结果,无则阻塞
poll() 移除下一个已完成的记过,无则返回null
RecursiveTask<T> / RecursiveAction
invokeAll() 接收到很多任务并阻塞
join() 生成结果
get() 可能抛出异常
compute() 不抛异常
2.10 线程安全的集合
ConcurrentHashMap
1.6 1.7中的实现
主要思路:分段锁,但是对于需要扫描整个Map的方法(size(),containValue())是弱一致性的。
Segements继承了ReentrantLock,类似于HashMap,内部是一个Entry数组,数组中的元素是一个链表。HashEntry的实现如下:
1 static final class HashEntry{2 final int hash;3 final K key;4 volatile V value;5 volatile HashEntry next;6 }
创建分段锁:
1,7中的分段锁除了第一个外,其他使用延迟初始化机制。每次put之前都需要检查key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创建。
ensureSegment可能在并发环境下被调用,ensureSegment未使用锁来控制竞争,而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。
put/putIfAbsent/pullAll
put方法会被定位到代理到具体的Segment上,1.7中ConcurrentHashMap在获得Segment锁的过程中,做了一定的优化 - 在真正申请锁之前,put方法会通过tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁。
在获取锁的过程中对整个链表进行遍历,主要目的是希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。
在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntry的value值,否则新建一个HashEntry节点,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。
put方法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过Unsafe的putOrderedObject()方法来实现,这里并未使用具有原子写语义的putObjectVolatile()的原因是:JMM会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存,从而保证这些变更对其他线程是可见的。
rehash
相对于HashMap的resize,ConcurrentHashMap的rehash原理类似,但是Doug Lea为rehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。
remove
get与containsKey
get与containsKey都没有使用锁,而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义,来获得Segment以及对应的链表,然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这里体现了ConcurrentHashMap的弱一致性。
size、containsValue
这些方法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的Segment(通过Unsafe的getObjectVolatile()以保证原子读语义),获得对应的值以及所有Segment的modcount之和。如果连续两次所有Segment的modcount和相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。
当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。值得注意的是,加锁过程中要强制创建所有的Segment,否则容易出现其他线程创建Segment并进行put,remove等操作。
另外
ConcurrentHashMap并不允许key或者value为null,原因是在ConcurrentHashMap中,一旦value出现null,则代表HashEntry的key/value没有映射完成就被其他线程所见,需要特殊处理。在JDK6中,get方法的实现中就有一段对HashEntry.value == null的防御性判断。
弱一致性的原因
get几乎是一个无锁的操作,虽然get中有一个readValueUnderLock调用,不过这句执行到的几率极小;
clear一个segment一个segment的清理,导致之前清理的segment可能又被填充了数据;
迭代器则是在遍历过程中,如果已经遍历的数组上的内容变化了,迭代器不会抛出ConcurrentModificationException异常。如果未遍历的数组上的内容发生了变化,则有可能反映到迭代过程中。
1.8中的实现
重要属性
sizeCtl:-1代表正在初始化;-N 表示有N-1个线程正在进行扩容操作;正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,始终为当前容量的0.75倍。
1 /** 2 * 盛装Node元素的数组 它的大小是2的整数次幂 3 */ 4 transient volatile Node[] table; 5 6 /**14 hash表初始化或扩容时的一个控制位标识量。 7 负数代表正在进行初始化或扩容操作 8 -1代表正在初始化 9 -N 表示有N-1个线程正在进行扩容操作10 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小11 */12 private transient volatile int sizeCtl; 13 14 // 以下两个是用来控制扩容的时候 单线程进入的变量15 private static int RESIZE_STAMP_BITS = 16;16 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;17 18 static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点19 static final int TREEBIN = -2; // hash值是-2 表示这时一个TreeBin节点
transfer
要点:支持并发扩容,但并没有加锁。
细节:读方法发现访问的node为ForwardingNode时,则会通过ForwardingNode的find方法,去nextTable中查找相关元素;
写方法发现访问的node为ForwardingNode时,则会先帮助其扩容。
如果加入扩容时,遇到TransferInde为0,即所有node分配完毕,或者扩容线程已经达到了最大扩容线程数,则放弃扩容,立即返回。
put
如果发现这个节点是null,则直接放入;
如果发现这个节点是ForwardingNode,则帮助其扩容;
否则,在该节点加锁,将节点按照红黑树的方式加入,或者加入链表的尾端(若链表的长度大于8,则转换为红黑树)。
treeIfBin
这个方法用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode.
get
get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。
size
size():基于baseCount
sumCount():如果addCount()并发过大导致baseCount()可见性失败,会进行一些特殊处理。详情请参阅:https://blog.csdn.net/chenssy/article/details/73521950
xmappingCount():基于sumCount()
Travser
如果发现是ForwardingNode,则到nextTable上的两个桶上去遍历,之后再回到tab上遍历。
CopyOnWriteArrayList
1. 所有的修改线程对底层数组进行复制;读时构建迭代器,它包含一个对当前数组的引用;
2. 在某些情况下它提供了更好的性能,并且在迭代期间不需要对容器进行加锁或复制;
3. 缺点: 内存占用问题(写入时复制,两份对象内存),数据一致性问题。
CopyOnWriteArraySet
基于CopyOnWriteArrayList实现的。
ConcurrentSkipListMap
基于跳表实现的线程安全的SortedMap。算法复杂度还是log(n)级别的。
ConcurrentSkipListSet
基于ConcurrentSkipListMap实现的SortedSet。
ConcurrentLinkedQueue
基于CAS实现的非阻塞的队列.
插入新元素时检查尾节点处于稳定状态还是中间状态.
每一个Node使用普通的volatile类型引用,并通过基于反射的AtomicReferenceFieldUpdater来进行更新.
ConcurrentLinkedDeque
线程安全的双端队列,当然也可以当栈使用。由于是linked的,所以大小不受限制的。
注: 以上集合返回弱一致的迭代器,不一定能反应它们被构造之后的所有的修改,也不会抛出ConcurrentModificationException异常。
ConcurrentStack
基于链表Node<E>和AtomicReference实现的非阻塞栈.
Collections.synchronizedMap Collections.synchronizedList
1. 和Hashtable一样,实现上在调用map所有方法时,都对整个map进行同步;
2. 但是这个HashMap不能随便地进行迭代,因为它只是简单包装了HashMap,而回看HashMap的实现,我们可以发现,对于冲突的key,形成一个链表,明显如果有一个线程在历遍HashMap,另一个线程在做删除操作,则很有可能出错。