本文共 10522 字,大约阅读时间需要 35 分钟。
package com.mylearn.thread.atomic; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * Created by IntelliJ IDEA. * User: yingkuohao * Date: 13-10-30 * Time: 上午10:41 * CopyRight:360buy * Descrption: 测试AtomicInteger * To change this template use File | Settings | File Templates. */ public class AtomicIntegerCompareTest { private int value; public AtomicIntegerCompareTest(int value) { this.value = value; } public synchronized int increase() { return value++; } public static void main(String args[]) { final AtomicIntegerCompareTest test = new AtomicIntegerCompareTest(0); final CountDownLatch latch = new CountDownLatch(3); //保证主线程等待子线程,统计时间 final CountDownLatch latchMain = new CountDownLatch(1); //开启三个线程计算value自增,synchronized,内存屏障的方式 for (int i = 0; i < 3; i++) { Thread t1 = new Thread(new Runnable() { public void run() { try { latchMain.await(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 1000 * 1000; i++) { test.increase(); } latch.countDown(); } }); t1.start(); } latchMain.countDown(); //子线程同时开启 long start = System.nanoTime(); try { latch.await(); //加一个countDownLatch } catch (InterruptedException e1) { e1.printStackTrace(); } long end = System.nanoTime(); System.out.println("synchronized time elapse:" + (end - start)); System.out.println("AtomicIntegerCompareTest.value=" + test.getValue()); final CountDownLatch latch1 = new CountDownLatch(3); final CountDownLatch latch2Start = new CountDownLatch(1); //跑三个线程,通过AtomicInteger来计算自增,CAS final AtomicInteger atomic = new AtomicInteger(0); for (int i = 0; i < 3; i++) { Thread t4 = new Thread() { @Override public void run() { try { latch2Start.await(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 1000 * 1000; i++) { atomic.incrementAndGet(); } latch1.countDown(); } }; t4.start(); } latch2Start.countDown(); long start1 = System.nanoTime(); try { latch1.await(); } catch (InterruptedException e1) { e1.printStackTrace();// } long end1 = System.nanoTime(); System.out.println("AtomicInteger time elapse:" + (end1 - start1)); System.out.println("AtomicIntegerCompareTest.value=" + test.getValue()); } public int getValue() { return value; } public void setValue(int value) { this.value = value; } } |
运行结果:
synchronized time elapse:510183189
AtomicIntegerCompareTest.value=3000000
AtomicInteger time elapse:204934395
AtomicIntegerCompareTest.value=3000000
上例中用了两种方法进行变量的自增:
1. 传统的Synchronized:通过字节码指令monitorenter和monitorexit来保证原子性。
2. Atomic包原子方式:通过CAS方式保证原子性。
可见,后者的性能更加可观。
AtomicInteger提供了几个方法,以getAndIncrement为例,我们来看下CAS原理:
private volatile int value;//volatile关键字 public final int getAndIncrement() { for (;;) { //轮询int current = get(); //获取当前值,返回value,内存可见性 int next = current + 1; //自增 if (compareAndSet(current, next)) //比较当前值有没有发生改变,如果设置成功,返回当前值,否则进行下一次轮询,直至成功 return current; } } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); //通过unsafe包的比较并交换方法来操作,如果当前值(this)和预期值(expect)相等,则把当前值设为目标值(update) } |
Unsafe:
jboolean sun::misc::Unsafe::compareAndSwapInt (jobject obj, jlong offset, jint expect, jint update) { jint *addr = (jint *)((char *)obj + offset); return compareAndSwap (addr, expect, update); }static inline bool compareAndSwap (volatile jlong *addr, jlong old, jlong new_val) { jboolean result = false; spinlock lock; if ((result = (*addr == old))) *addr = new_val; return result; } |
独占锁是一项悲观技术:它假设最坏的情况(如果你不锁门,那么捣蛋鬼就会闯入并搞得一团糟),并且只有在确保其他线程不会造成干扰的情况下才能执行下去。
对于细粒度的操作,还有另一种更高效的方法,也是一种乐观的方法,通过这种方法可以在不发生干扰的情况下完成更新操作。这种方法需要借助冲突检查机制来判断在更新过程中是否存在来自其他线程的干扰,如果存在,这个操作将失败,并且可以重试。这种乐观的方法就好像一句谚语:“原谅比准许更容易得到”,其中“更容易”在这里相当于“更高效”
。
现在,几乎所有的处理器中都包含了某种形式的原子读-改-写指令,如比较并交换(Compare -and-swap)或关联加载、条件存储(load-linked、store-conditional)。操作系统和JVM使用这些指令来实现锁和并发的数据结构。
在大多数处理器架构中采用的方法时实现一个比较并交换(CAS)指令。CAS包含了三个操作数:需要读写的内存位置V、进行比较的值A和拟写入的新值B。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。(这种变化形式被称为比较并设置,无论操作是否成功都会返回)
CAS的含义是:“我认为V的值应该是A,如果是,那么将V的值更新为B,否则不修改并告诉V的实际值为多少”。CAS是一项乐观的技术,它希望能成功地执行更新操作,并且如果有另一个线程在最近一次检查后更新了改变量,那么CAS能检测到这个错误。
CAS思想模拟操作:
package com.mylearn.thread.atomic; import java.util.concurrent.CountDownLatch; /** * Created by IntelliJ IDEA. * User: yingkuohao * Date: 13-11-8 * Time: 下午12:06 * CopyRight:360buy * Descrption: * To change this template use File | Settings | File Templates. */ public class CASTest { private int value = 0; public synchronized int get() { return value; } /** * 模拟硬件compareAndSwap,通过 synchronized保证 * @param expectValue * @param newValue * @return */ public synchronized int compareAndSwap(int expectValue, int newValue) { System.out.println("cureentThread:" +Thread.currentThread().getName()+"value="+value); int oldValue = value; if (oldValue == expectValue) { //对比期望值,如果相等就用新值赋值value = newValue; } return oldValue; //无论成功失败都返回旧值 } /** * 判断CAS是否成功,通过 synchronized保证 * @param expectValue * @param newValue * @return */ public synchronized boolean compareAdnSet(int expectValue, int newValue) { return expectValue == compareAndSwap(expectValue, newValue); } public static void main(String args[]) { final CASTest casTest = new CASTest(); final CountDownLatch countDownLatch = new CountDownLatch(3); Thread thread = new Thread(new Runnable() { public void run() { System.out.println("thead0开始执行"); int i = 0; for (; ; ) { i++; System.out.println("thread0第" + i + "次执行,value="+casTest.get()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } if (casTest.compareAdnSet(0, 1)) { System.out.println("thread0 CAS 成功,cureValue="+casTest.get()); countDownLatch.countDown(); return; } } } }); Thread thread1 = new Thread(new Runnable() { public void run() { System.out.println("thead1开始执行"); int i = 0; for (; ; ) { i++; System.out.println("thread1第" + i + "次执行,value="+casTest.get()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } if (casTest.compareAdnSet(0, 1)) { System.out.println("thread1 CAS 成功,cureValue="+casTest.get()); countDownLatch.countDown(); return; } } } }); Thread thread2 = new Thread(new Runnable() { public void run() { System.out.println("thead2开始执行"); int i = 0; for (; ; ) { i++; System.out.println("thread2第" + i + "次执行,value="+casTest.get()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } if (casTest.compareAdnSet(1, 2)) { //准备修改成2 System.out.println("thread2 CAS 成功,curValue=" + casTest.get()); countDownLatch.countDown(); return; } } } }); thread.start(); thread1.start(); thread2.start(); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } System.out.println("fianle:" + casTest.get()); } } |
结果:
thead0开始执行 thead2开始执行 thead1开始执行 thread2第1次执行,value=0 thread0第1次执行,value=0 thread1第1次执行,value=0 cureentThread:Thread-2value=0 thread2第2次执行,value=0 cureentThread:Thread-2value=0 cureentThread:Thread-1value=0 thread2第3次执行,value=1 cureentThread:Thread-0value=1 thread1 CAS 成功,cureValue=1 thread0第2次执行,value=1 cureentThread:Thread-2value=1 thread2 CAS 成功,curValue=2 cureentThread:Thread-0value=2 thread0第3次执行,value=2 cureentThread:Thread-0value=2 thread0第4次执行,value=2 。。。 |
基本上上述代码的蓝色部分就是CAS的核心思想了。Thead0和Thread1都是把value从0改为1,Thread2负责把value从1改为2;但是CAS通过synchronized控制,也就是Thead0和Thrad1只会有一个会成功,第二个线程永远执行不成功,因为当进入compareAndSwap代码时读取value,value拿的总是最新值,已经被第一个线程改变了。CAS的使用就是一个死循环,判断直至CAS成功,返回结果,线程结束。
当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。然而,失败的线程并不会挂起(这与获取锁的情况不同:当获取锁失败时,线程将被挂起),而是被告知在这次竞争中失败,并可以再次尝试。由于一个线程在竞争CAS时失败不会阻塞,因此它可以决定是否重新尝试,或者执行一些恢复操作,也或者不执行任何操作。这种灵活性就大大减少了与锁相关的活跃性风险。
CAS的典型使用模式是:首先从V中读取值A,并根据A计算新值B,然后再通过CAS以原子方式将V中的值由A变为B。由于CAS能检测到来自其他线程的干扰,因此即使不使用锁也能够实现原子的读-改-写操作序列。
具体参见java.util.concurrent.atomic 包,值得一提的是,这个是基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。这个东东在构造非阻塞算法中用的非常多,比如我们前面看到的ConcurrentLinkedQueue中,队列的push,pop,都可以通过CAS的操作来执行。其中的CAS主要就是来完成的。
Michael-Scott非阻塞算法:ConcurrentLinkedQueue用的就是此算法
package com.mylearn.thread.atomic; import java.util.concurrent.atomic.AtomicReference; /** * Created by IntelliJ IDEA. * User: yingkuohao * Date: 13-11-8 * Time: 下午3:14 * CopyRight:360buy * Descrption: Michael-Scott非阻塞算法中的插入算法 * 当插入一个新的元素时,需要更新两个指针。首先更新当前最后一个元素的next指针,将新节点连接到列表队尾, * 然后更新尾节点,将其指向这个新元素。在这两个操作之间,队列处于一种中间状态;在第二次更新完成后, * 队列将再次处于稳定状态。 * <p/> * 当队列处于稳定状态时,尾节点的next域将为空,如果队列处于中间状态,那么tail.next将为空。因此,任何线程 * 都能通过检查tail.next来获取队列的当前状态。而且,当队列处于中间状态时,可以通过将尾节点向前移动一个节点, * 从而结束其他线程正在执行的插入元素操作,并使得队列恢复为稳定状态。 * To change this template use File | Settings | File Templates. */ public class LinkedQueue<E> { private static class Node<E> { final E item; final AtomicReference<Node<E>> next; private Node(E item, Node<E> next) { this.item = item; this.next = new AtomicReference<Node<E>>(next); } private final Node<E> dummy = new Node<E>(null, null); //哑节点 private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy); //头结点 private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy); //尾节点 public boolean put(E item) { Node<E> newNode = new Node<E>(item, null); while (true) { Node<E> curTail = tail.get(); Node<E> tailNext = curTail.next.get(); if (curTail == tail.get()) { if (tailNext != null) { //A 队列处于中间状态,也就是其他线程走到了C,把tail的next已经赋值了。tail此时指向的是整个队列的倒数第二个节点 //推进尾节点,帮助其他线程完成,帮助其他线程完成D操作,然后继续轮询,等到稳定状态后插入自己持有的节点 tail.compareAndSet(curTail, tailNext); //B } else { // 队列处于稳定状态,tail节点指向最后一个元素 if (curTail.next.compareAndSet(null, newNode)) { //C CAS操作,尝试插入新节点,tail的next指向新节点tail.compareAndSet(curTail, tailNext); //D CAS操作,插入成功,tail节点向后推进,指向tail的next return true; } } } } } /* * put方法再插入新元素之前,将先检查队列是否处于中间状态,(A)。如果是,那么有另一个线程正在插入元素(C和D之间)。 * 此时当前线程不会等待其他线程执行完成,而是帮助它完成操作,并将尾节点向前推进一个节点(B)。然后,他将重复执行这种 * 检查,以免另一个线程已经开始插入新元素,并继续推进尾节点,直到它发现队列处于稳定状态之后,才会开始执行自己的插入操作。 * * 由于C中的CAS将新节点连接到队列尾部,因此如果两个线程同时插入元素,那么这个CAS将失败。在这样的情况下,并不会造成 * 破坏:不会发生任何变化,并且当前的线程只需要重新读取尾节点并重试。如果步骤C成功了,那么插入操作将生效,第二个CAS * 被认为是一个“清理操作”,因为它既可以由执行插入操作的线程来执行,也可以由其他任何线程来执行。如果D失败,那么执行 * 插入操作的线程将返回,而不是重新执行CAS,因为不需重试——另一个线程已经在B中完成了这个工作。 * * 这种方式能够工作,因为在任何线程尝试将一个新节点插入到队列之前,都会首先通过检查tail.next是否非空来判断是否需要 * 清理队列。如果是,他会推进尾节点,直到队列处于稳定状态。 * */ } } |
ABA问题指的是V由A变成B,又由B变成了A。通过CAS检查是会有问题的,其实在这个CAS过程中V发生了改变。
解决方案:增加一个版本号,更新的时候不只更新引用,还更新一个版本号。这样即使A变为B又变为A,版本号也是不同的。
转载地址:http://cwrrb.baihongyu.com/