15、Java多线程:线程同步

5.1 线程同步机制

线程同步机制是一套用于协调线程之间的数据访问的机制。该机制可以保障线程安全。Java平台提供的线程同步机制包括:锁、volatile关键字、final关键字、static关键字、以及相应的API(如Object.wait()/Object.notify())等。

5.2 锁概述

线程安全问题的产生前提是多个线程并发访问共享数据。所以我们将多个线程对共享数据的并发访问转换为串行访问,即一个共享数据一次只能被一个线程访问。锁就是复用这种思路来保障线程安全的。

锁可以理解为对共享数据进行保护的许可证。如果想要访问这些共享数据,就必须先持有该许可证。而许可证一次只能被一个线程持有,且访问完共享数据后,需要释放其持有的许可证。

线程持有锁后,直到释放锁为止执行的代码,我们称为临界区(Critical Section)。

JVM把锁分为内部锁和显示锁两种。内部锁通过synchronized关键字实现,显示锁通过java.concurrent.locks.Lock接口的实现类来实现。

锁的作用:

锁可以实现对共享数据的安全访问,保障线程的原子性、可见性和有序性。

锁通过互斥保障原子性。一个锁只能被一个线程持有,这就保证临界区的代码一次只能被一个线程执行,使得临界区代码所执行的操作自然而然地具有不可分割的特性,具备了原子性。

可见性的保障是通过写线程冲刷处理器的缓存和读线程刷新处理器缓存这两个动作实现的。在Java平台中,锁的获得隐含着刷新处理器缓存的动作,锁的释放隐含着冲刷处理器缓存的动作。

锁能够保障有序性。

注意:使用锁保障线程的安全性,必须满足以下条件:

这些线程在访问共享数据时,必须使用同一个把锁,当然读也是需要是同一把锁的。

锁相关的概念:

1、 可重入性(Reentrancy):当一个线程持有该锁的时候,能够再次申请并获得该锁;
2、 锁的争用与调度:Java平台中内部锁属于非公平锁,而显示Lock接口锁支持公平锁非公平锁;
3、 锁的粒度:一个锁可以保护的共享数据的数量大小称为锁的粒度保护的共享数据量大,称该锁的粒度粗,否则称该锁的粒度细锁的粒度过大会造成申请锁时不必要的等待,锁的力度过细会增加锁调度的开销;

5.3 内部锁:synchronized关键字

Java中的每个对象都有一个与之关联的内部锁(Intrinsic lock),这种锁也称为监视器(Monitor),它是一种排它锁,可以保障原子性、可见性和有序性。

内部锁是通过synchronized关键字实现的。synchronized关键字可以修饰类、方法、代码块

1、 修饰一个类:其作用的范围是synchronized后面括号括起来的部分,作用的对象是这个类的所有对象;

2、 修饰一个方法:被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象;

3、 修饰一个静态方法:其作用的范围是整个方法,作用的对象是这个类的所有对象;

4、 修饰一个代码块:被修饰的代码块称为同步语句块,其作用范围是大括号{}括起来的代码块,作用的对象是调用这个代码块的对象;

synchronized同步代码块:

package threadSafe.intrinsiclock;

public class Test01 {
    public static void main(String[] args) {
        //创建两个线程,分别调用mm()方法
        //先创建Test01对象,通过对象名调用mm()
        Test01 obj = new Test01();
        new Thread(new Runnable() {
            @Override
            public void run() {
                obj.mm();
            }
        }, "Thread1").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                obj.mm();
            }
        }, "Thread2").start();
    }

    //定义方法
    public void mm() {
        synchronized (this) {    //通常使用this当前对象作为锁对象
            for(int i = 1; i <= 100; i++) {
                System.out.println(Thread.currentThread().getName() + "-->" + i);
            }
        }
    }
}

通过synchronized关键字,在执行代码块的时候会获取该对象的锁,所以线程2必须要等线程1打印完后释放了锁才能获取到锁并执行代码块。所以可以看到,先是线程1的打印信息,再才是线程2的打印信息。

也可对指向常量的对象进行上锁:

package threadSafe.intrinsiclock;

public class Test01 {

    public static final Object OBJ = new Object();

    public static void main(String[] args) {
        ...
    }

    //定义方法
    public void mm() {
        synchronized (OBJ){
            for(int i = 1; i <= 100; i++) {
                System.out.println(Thread.currentThread().getName() + "-->" + i);
            }
        }
    }
}

当锁住静态代码块或者静态方法时,锁的是类,可以简单理解为将字节码文件进行了上锁。有的时候也称这把锁为类锁。

同步代码块和同步方法如何选择:

public void mm() throws InterruptedException {
    System.out.println("任务开始");
    TimeUnit.MILLISECONDS.sleep(500);   //任务准备工作

    synchronized (this) {
        for(int i = 0; i <= 10; i++) {
            System.out.println(Thread.currentThread().getName() + " :" + i);
        }
    }
}

像上面这样的方法,显然如果写成同步方法的话,那么准备任务的时候也获得了锁,这是不好的。所以像上面这样的方法,写成同步代码块比较好。

简而言之:同步方法的粒度粗,并发效率低。同步代码块的粒度细,并发效率高

脏读:

package threadSafe.intrinsiclock;

import java.util.concurrent.TimeUnit;

public class Test05 {

    public static void main(String[] args) {
        SharedValue sharedValue = new SharedValue();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sharedValue.setValue("chenxin", "456");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                sharedValue.readValue();
            }
        }, "Thread2").start();
    }

    static class SharedValue {
        String username = "xiaoxin";
        String password = "123";

        public void setValue(String username, String password) throws InterruptedException {
            this.username = username;
            TimeUnit.SECONDS.sleep(1);
            this.password = password;
        }

        public void readValue() {
            System.out.println("username: " + username);
            System.out.println("password:" + password);
        }
    }
}

出现脏读的原因:对共享数据的修改与对共享数据的读取不同步

线程出现异常会自动释放内部锁:

package threadSafe.intrinsiclock;

public class Test04 {

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                Test04.m1();
            }
        }, "Thread1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                new Test04().m2();
            }
        }, "Thread2").start();
    }

    public synchronized static void m1() {
        for(int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
            if(i == 50) i = Integer.parseInt("abc"); //产生异常
        }
    }

    public void m2() {
        synchronized (Test04.class) {
            for(int i = 0; i < 100; i++) {
                System.out.println(Thread.currentThread().getName() + ": " + i);
            }
        }
    }
}

可以发现,Thread1打印到50的时候,Thread2开始打印。说明,异常会释放锁,所以Thread2才能获得锁去执行。

死锁:

package threadSafe.intrinsiclock;

public class Test06 {

    public static void main(String[] args) {
        SubThread t1 = new SubThread();
        t1.setName("a");
        t1.start();
        SubThread t2 = new SubThread();
        t2.setName("b");
        t2.start();
    }

    static class SubThread extends Thread {
        private static final Object resource1 = new Object();
        private static final Object resource2 = new Object();
        @Override
        public void run() {
            if("a".equals(Thread.currentThread().getName())) {
                synchronized (resource1) {
                    System.out.println("a获得了resource1,现在申请resource2");
                    synchronized (resource2) {
                        System.out.println("a获得了resource2");
                    }
                }
            }
            if("b".equals(Thread.currentThread().getName())) {
                synchronized (resource2) {
                    System.out.println("b获得了resource2,现在申请resource1");
                    synchronized (resource1) {
                        System.out.println("b获得了resource1");
                    }
                }
            }
        }
    }
}

5.4 轻量级同步机制:volatile关键字

volatile的作用:

使变量在多个线程之间可见。当volatile关键字修饰的变量被改变时,会强制将其值刷新到主存中,并且,如果有其他处理器的缓存保存了该主存地址的值的话,会强制将其值刷新到别的处理器的缓存中。

volatile和synchronized的区别:

1、 volatile是线程同步的轻量级实现,所以volatile的性能肯定比synchronized要好;
2、 volatile只能修饰变量,而synchronized还可以修饰方法、代码块、类;
3、 多线程访问volatile变量不会发生阻塞,而synchronized可能会发生阻塞;
4、 volatile能保证数据的可见性,但是不能保证原子性而synchronized两者都可以保障;
5、 volatile解决的是变量在多个线程之间的可见性问题,而synchronized解决的是多个线程在访问资源时候的同步性问题;

volatile的非原子性:

package threadSafe;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Test01 {

    public static void main(String[] args) {
        //启动两个线程,不断调用getNum()方法
        MyInt myInt = new MyInt();

        for(int i = 1; i <= 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while(true) {
                        System.out.println(Thread.currentThread().getName() + "->" + myInt.getNum());
                        try {
                            TimeUnit.MILLISECONDS.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }

    static class MyInt{
        volatile int num;
        public int getNum() {
            return num++;
        }
    }
}

从结果可以发现,两个线程有的时候打印的是一样的值,说明即便是用volatile修饰了,num++还是不具有原子性。(可以使用原子类来保证原子性,例如AtomicInteger)

5.5 CAS

CAS(Compare And Swap),是由硬件实现的。

CAS可以将read-modify-write这类的操作转换为原子操作。

i++包括三个操作:读取i的值、i+1、将新的值保存到内存。

CAS的原理:在把值写到内存中时,会再次读取该地址的值,如果发现主存中的值与一开始读取到的值不同,则放弃写入(即撤销本次操作);否则就更新进去。

使用CAS实现一个线程安全的计数器:

package cas;

public class CASTest {
    public static void main(String[] args) {
        CASCounter casCounter = new CASCounter();
        for(int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ": " + casCounter.increment());
                }
            }, "Thread" + i).start();
        }
    }

}

class CASCounter{

    private long value;

    private boolean compareAndSwap(long expectedValue, long oldValue, long newValue) {
        if(expectedValue == oldValue) {
            value = newValue;
            return true;
        } else return false;
    }

    public long increment() {
        long oldValue = value;
        long newValue;
        do {
            newValue = value + 1;
        } while (compareAndSwap(value, oldValue, newValue));
        return value;
    }
}

CAS的ABA问题:

CAS实现原子性的背后有一个假设:如果共享变量的当前值与期望值相同,就假设共享变量没有被更改过。

但事实可能不是如此:x初始值位0,A将x修改为10,B将x又修改为0,此刻能否认为x没有被更改过呢?这就是CAS的ABA问题。

如果实际业务需要避免ABA问题,那么我们可以引入一个变量表示版本号,或者称修订号。每进行一次修改,修订号增加1。如果遇到当前版本号与期望版本号不一致,则获取新的版本号并继续修改。此时的过程是这样的。[A, 0] -> [B, 1] -> [C, 2]

5.6 原子变量类

原子变量类是基于CAS实现的。当我们对共享变量进行reda-modify-write的更新操作时,通过原子变量类可以保障操作的原子性和可见性。

read-modify-write操作指的是:对于此次操作,变量的新值依赖于变量的旧值。而不是像那种赋值操作。

前面提到过,volatile只能保障可见性,不能保障原子性。而原子变量类的内部使用的是volatile修饰的变量,并且使用CAS保障了原子性。有时将原子变量类看成是增强的volatile变量。

分组 原子变量类
基础数据型 AtomicInteger、AtomicLong、AtomicBoolean
数组 AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
字段更新器 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
引用型 AtomicReference、AtomicStampedReference、AtomicMarkableReference

AtomicLong:

package atomics.atomicLong;

import java.util.concurrent.atomic.AtomicLong;

//我们想要让整个计算过程只使用这一个计算器,所以这里我们将其设计成单例
public class Indicator {
    //将构造方法私有化
    private Indicator(){}
    //创建一个静态的实例类
    private static Indicator instance = new Indicator();
    //返回上面的那个实例类
    public static Indicator getInstance() {
        return instance;
    }
    //记录请求数
    private AtomicLong requestNum = new AtomicLong(0);
    //记录成功数
    private AtomicLong successNum = new AtomicLong(0);
    //记录失败数
    private AtomicLong failureNum = new AtomicLong(0);
    //请求数增加
    public void requestProcess() {
        requestNum.incrementAndGet();
    }
    //成功数增加
    public void requestProcessSuccess() {
        successNum.incrementAndGet();
    }
    //失败数增加
    public void requestProcessFailure() {
        failureNum.incrementAndGet();
    }
    //获取请求数
    public Long getRequestNum() {
        return requestNum.get();
    }
    //获取成功数
    public Long getRequestSuccessNum() {
        return successNum.get();
    }
    //获取失败数
    public Long getRequestFailureNum() {
        return failureNum.get();
    }
}

atomicArray:

package atomics.atomicIntegerArray;

import java.util.concurrent.atomic.AtomicIntegerArray;

public class Test {

    public static void main(String[] args) {
        //1、创建一个具有指定长度的原子数组
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
        System.out.println(atomicIntegerArray);
        //2、返回指定位置的元素
        System.out.println(atomicIntegerArray.get(0));
        System.out.println(atomicIntegerArray.get(1));
        //3、设置指定位置元素的值
        atomicIntegerArray.set(0, 1);
        System.out.println(atomicIntegerArray.getAndSet(0, 2));  //先获取旧值,再获取新值
        //4、修改某个数组元素的值
        System.out.println(atomicIntegerArray.addAndGet(0, 5));  //先修改,再返回
        System.out.println(atomicIntegerArray.getAndAdd(0, 6));  //先返回,再修改
        //5、CAS操作
        atomicIntegerArray.compareAndSet(0, 13, 222);  //如果0位置的值是22,就修改为222
        System.out.println(atomicIntegerArray.get(0));
        //6、自增/自减
        System.out.println(atomicIntegerArray.incrementAndGet(0));  //先增再获得
        System.out.println(atomicIntegerArray.getAndIncrement(0));  //先获得再增
        System.out.println(atomicIntegerArray.decrementAndGet(0));  //先减再获得
        System.out.println(atomicIntegerArray.getAndDecrement(0));  //先获得再减
    }
}

AtomicIntegerFieldUpdater:字段更新器

AtomicIntegerFieldUpdater可以对原子整数字段进行更新,要求:

  • 字段必须使用volatile修饰,是其在线程间可见。
  • 只能是实例变量,不能是静态变量,也不能用final修饰
package atomics.atomicIntegerField;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class SubThread extends Thread{
    //要更新的user对象
    private User user;
    //创建更新器,对user对象的age字段进行更新
    private AtomicIntegerFieldUpdater<User> updater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

    public SubThread(User user) {
        this.user = user;
    }

    @Override
    public void run() {
        //在子线程中对user对象的age自增10次
        for(int i = 0; i < 10; i++) {
            updater.incrementAndGet(user);
        }
    }
}

AtomicReference:原子引用对象

package atomics.atomicReference;

import java.util.concurrent.atomic.AtomicReference;

public class Test01 {

    //创建一个reference对象
    static AtomicReference<String> atomicReference = new AtomicReference<>("abc");

    public static void main(String[] args) {
        for(int i = 0; i < 100; i++) {
            int temp = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    atomicReference.set(atomicReference.get() + temp);
                    System.out.println(atomicReference.get());
                }
            }).start();
        }
    }
}

AtomicStampedReference:解决CAS中的ABA问题

package atomics.atomicStampedReference;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * AtomicStampedReference原子类可以用来解决CAS中的ABA问题
 * AtomicStampedReference原子类中有一个整数标记值stamp,每次执行CAS操作时,会比较它的版本。
 */
public class Test01 {

    private static AtomicStampedReference<String> atomicStampedReference = new AtomicStampedReference<>("abc", 0);

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                atomicStampedReference.compareAndSet("abc", "def", atomicStampedReference.getStamp(),
                        atomicStampedReference.getStamp()+1);
                System.out.println(Thread.currentThread().getName() + ": " + atomicStampedReference.getReference());
                atomicStampedReference.compareAndSet("def", "abc", atomicStampedReference.getStamp(),
                        atomicStampedReference.getStamp()+1);
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                int stamp = atomicStampedReference.getStamp();
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(atomicStampedReference.compareAndSet("abc", "ggg", stamp,
                        atomicStampedReference.getStamp()+1));
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println(atomicStampedReference.getReference());
    }
}