Memory in Action - 浅谈Java 内存模型

2017-10-31  本文已影响0人  语落心生

作为Java多线程的难题之一,经常为面试时或者工作时所提及,最近由题主为大家送上Java内存模型
Question 1: Java 中能创建 volatile 数组吗?

volatile 关键字
用在多线程,同步变量。 线程为了提高效率,将某成员变量(如A)拷贝了一份(如B),线程中对A的访问其实访问的是B。只在某些动作时才进行A和B的同步。因此存在A和B不一致的情况。volatile就是用来避免这种情况的。volatile告诉jvm, 它所修饰的变量不保留拷贝,直接访问主内存中的(也就是上面说的A)

public class JoinThread {

    volatile long vl = 0L; //使用volatile声明64位的long型变量

    public void setVl(long l){
        vl = l; //单个volatile变量的写
    }

    public void getAndIncrement(){
        vl++; //复合(多个)volatile变量的读/写
    }

    public long getVl() {
        return vl;  //单个volatile变量的读
    }

    public static void main(String[] args){
        JoinThread joinThread = new JoinThread();
        joinThread.getAndIncrement();
        System.out.println(joinThread.getVl());
    }
}

Notices:对volatile变量的单个读/写,看成是使用同一个监视器锁对这些单个读/写操作做了同步。即我们调用getAndIncrement方法访问,无论我们是否复制了l变量,在对vl进行单个变量的读写时,指向的都是vl对应内存地址的值

synchronized 关键字
同步块在Java中是同步在某个对象上。所有同步在一个对象上的同步块在同时只能被一个线程进入并执行操作。所有其他等待进入该同步块的线程将被阻塞,直到执行该同步块中的线程退出。

so,我们不妨为需要监视的变量加上一个线程,在同步块内调用getAndIncrement方法

public class JoinThread {

    long vl = 0L;

    public synchronized void setVl(long l){
        vl = l;
    }

    public void getAndIncrement(){
        long temp = getVl();
        temp +=1L;
        setVl(temp);
    }

    public synchronized long getVl() {
        return vl;
    }

    public static void main(String[] args){
        JoinThread joinThread = new JoinThread();
        joinThread.getAndIncrement();
        System.out.println(joinThread.getVl());
    }
}

Notices:对一个volatile变量的单个读/写操作,与对一个vl变量的读/写操作使用同一个监视器锁来同步的执行效果相同

Question 2 : 如何调用 wait()方法的?使用if 块还是循环?为什么?

参考Effective Java中的生产者与消费者进行解答

生产者与消费者: 生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。

以下是我们需要去解决的两个问题

Firstly , 对于问题1,我们试着调用wait()方法,当Proucer对象每打印一次数据之后,调阻塞一次线程,便于切换到Consumer对象打印一次数据。

Producer.java

import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {


    private final Vector sharedQueue;

    private final int SIZE;

    public Producer(Vector sharedQueue,int size){

        this.sharedQueue = sharedQueue;
        this.SIZE = size;

    }

    @Override
    public void run() {
        for(int i=0;i<7;i++){
            System.out.println("Producted: "+i);

            try{
                produce(i);
            }catch (InterruptedException e){
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
            }

        }
    }

    private void produce(int i)throws InterruptedException{
        //wait if queue is full
        while (sharedQueue.size() == SIZE) {
            synchronized (sharedQueue) {
                System.out.println("Queue is full " + Thread.currentThread().getName()
                        + " is waiting , size: " + sharedQueue.size());

                sharedQueue.wait(); //使当前线程等待,直到另一个线程调用此对象的 [notify()](https://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#notify())方法或[notifyAll()](https://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#notifyAll())方法。
            }
        }

        //producing element and notify consumers
        synchronized (sharedQueue) {
            sharedQueue.add(i);
        }
    }

}

Consumer.java

import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    private final Vector sharedQueue;
    private final int SIZE;

    public Consumer(Vector sharedQueue, int size) {
        this.sharedQueue = sharedQueue;
        this.SIZE = size;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Consumed: " + consume());
                Thread.sleep(50);
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }

    private int consume() throws InterruptedException {
        //wait if queue is empty
        while (sharedQueue.isEmpty()) {
            synchronized (sharedQueue) {
                System.out.println("Queue is empty " + Thread.currentThread().getName()
                        + " is waiting , size: " + sharedQueue.size());

                sharedQueue.wait();
            }
        }

        //Otherwise consume element and notify waiting producer
        synchronized (sharedQueue) {
            return (Integer) sharedQueue.remove(0);//删除此向量中指定位置的元素。将任何后续元素移动到左侧(从其索引中减去一个元素),将索引0置为第一个移动首元素的元素
        }
    }


}

ProducerConsumer.java

import java.util.Vector;

public class ProducerConsumer {

    public static void main(String[] args){
        Vector shareQueue = new Vector();

        int size = 4;
        Thread prodThread = new Thread(new Producer(shareQueue,size),"Producer");
        Thread consThread = new Thread(new Consumer(shareQueue,size),"Consumer");


        prodThread.start();
        consThread.start();
    }

}

console ouput

/usr/lib/jvm/java1.8/bin/java -javaagent:/home/complone/Downloads/idea-IC-172.4155.36/lib/idea_rt.jar=57719:/home/complone/Downloads/idea-IC-172.4155.36/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java1.8/jre/lib/charsets.jar:/usr/lib/jvm/java1.8/jre/lib/deploy.jar:/usr/lib/jvm/java1.8/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java1.8/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java1.8/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java1.8/jre/lib/ext/jfxrt.jar:/usr/lib/jvm/java1.8/jre/lib/ext/localedata.jar:/usr/lib/jvm/java1.8/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunec.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java1.8/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java1.8/jre/lib/javaws.jar:/usr/lib/jvm/java1.8/jre/lib/jce.jar:/usr/lib/jvm/java1.8/jre/lib/jfr.jar:/usr/lib/jvm/java1.8/jre/lib/jfxswt.jar:/usr/lib/jvm/java1.8/jre/lib/jsse.jar:/usr/lib/jvm/java1.8/jre/lib/management-agent.jar:/usr/lib/jvm/java1.8/jre/lib/plugin.jar:/usr/lib/jvm/java1.8/jre/lib/resources.jar:/usr/lib/jvm/java1.8/jre/lib/rt.jar:/home/complone/IdeaProjects/WordCount/out/production/WordCount ProducerConsumer
Producted: 0
Producted: 1
Producted: 2
Producted: 3
Producted: 4
Queue is full Producer is waiting , size: 4
Consumed: 0
Consumed: 1
Consumed: 2
Consumed: 3
Queue is empty Consumer is waiting , size: 0

Secondly,对于问题2,用notifyAll()方法,当Producer线程结束后,无论取出一次或者几次数据,先调用notifyAll()通知正在等待的Consumer,不必等到一轮循环结束,再开始取出Consumer中的数据

Producer.java

import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {


    private final Vector sharedQueue;

    private final int SIZE;

    public Producer(Vector sharedQueue,int size){

        this.sharedQueue = sharedQueue;
        this.SIZE = size;

    }

    @Override
    public void run() {
        for(int i=0;i<7;i++){
            System.out.println("Producted: "+i);

            try{
                produce(i);
            }catch (InterruptedException e){
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
            }

        }
    }

    private void produce(int i)throws InterruptedException{
        //wait if queue is full
        while (sharedQueue.size() == SIZE) {
            synchronized (sharedQueue) {
                System.out.println("Queue is full " + Thread.currentThread().getName()
                        + " is waiting , size: " + sharedQueue.size());

                sharedQueue.wait(); //使当前线程等待,直到另一个线程调用此对象的 [notify()](https://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#notify())方法或[notifyAll()](https://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#notifyAll())方法。
            }
        }

        //producing element and notify consumers
        synchronized (sharedQueue) {
            sharedQueue.add(i);
            sharedQueue.notifyAll();//唤醒正在等待对象监视器的所有线程。线程通过调用其中一种wait方法等待对象的监视器
        }
    }

}

Consumer.java

import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    private final Vector sharedQueue;
    private final int SIZE;

    public Consumer(Vector sharedQueue, int size) {
        this.sharedQueue = sharedQueue;
        this.SIZE = size;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Consumed: " + consume());
                Thread.sleep(50);
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }

    private int consume() throws InterruptedException {
        //wait if queue is empty
        while (sharedQueue.isEmpty()) {
            synchronized (sharedQueue) {
                System.out.println("Queue is empty " + Thread.currentThread().getName()
                        + " is waiting , size: " + sharedQueue.size());

                sharedQueue.wait();
            }
        }

        //Otherwise consume element and notify waiting producer
        synchronized (sharedQueue) {
            sharedQueue.notifyAll();
            return (Integer) sharedQueue.remove(0);//删除此向量中指定位置的元素。将任何后续元素移动到左侧(从其索引中减去一个元素),将索引0置为第一个移动首元素的元素
        }
    }


}

console ouput

/usr/lib/jvm/java1.8/bin/java -javaagent:/home/complone/Downloads/idea-IC-172.4155.36/lib/idea_rt.jar=57719:/home/complone/Downloads/idea-IC-172.4155.36/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java1.8/jre/lib/charsets.jar:/usr/lib/jvm/java1.8/jre/lib/deploy.jar:/usr/lib/jvm/java1.8/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java1.8/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java1.8/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java1.8/jre/lib/ext/jfxrt.jar:/usr/lib/jvm/java1.8/jre/lib/ext/localedata.jar:/usr/lib/jvm/java1.8/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunec.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java1.8/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java1.8/jre/lib/javaws.jar:/usr/lib/jvm/java1.8/jre/lib/jce.jar:/usr/lib/jvm/java1.8/jre/lib/jfr.jar:/usr/lib/jvm/java1.8/jre/lib/jfxswt.jar:/usr/lib/jvm/java1.8/jre/lib/jsse.jar:/usr/lib/jvm/java1.8/jre/lib/management-agent.jar:/usr/lib/jvm/java1.8/jre/lib/plugin.jar:/usr/lib/jvm/java1.8/jre/lib/resources.jar:/usr/lib/jvm/java1.8/jre/lib/rt.jar:/home/complone/IdeaProjects/WordCount/out/production/WordCount ProducerConsumer
Producted: 0
Producted: 1
Producted: 2
Producted: 3
Producted: 4
Queue is full Producer is waiting , size: 4
Consumed: 0
Consumed: 1
Consumed: 2
Consumed: 3
Queue is empty Consumer is waiting , size: 0

console ouput

/usr/lib/jvm/java1.8/bin/java -javaagent:/home/complone/Downloads/idea-IC-172.4155.36/lib/idea_rt.jar=37187:/home/complone/Downloads/idea-IC-172.4155.36/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java1.8/jre/lib/charsets.jar:/usr/lib/jvm/java1.8/jre/lib/deploy.jar:/usr/lib/jvm/java1.8/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java1.8/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java1.8/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java1.8/jre/lib/ext/jfxrt.jar:/usr/lib/jvm/java1.8/jre/lib/ext/localedata.jar:/usr/lib/jvm/java1.8/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunec.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java1.8/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java1.8/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java1.8/jre/lib/javaws.jar:/usr/lib/jvm/java1.8/jre/lib/jce.jar:/usr/lib/jvm/java1.8/jre/lib/jfr.jar:/usr/lib/jvm/java1.8/jre/lib/jfxswt.jar:/usr/lib/jvm/java1.8/jre/lib/jsse.jar:/usr/lib/jvm/java1.8/jre/lib/management-agent.jar:/usr/lib/jvm/java1.8/jre/lib/plugin.jar:/usr/lib/jvm/java1.8/jre/lib/resources.jar:/usr/lib/jvm/java1.8/jre/lib/rt.jar:/home/complone/IdeaProjects/WordCount/out/production/WordCount ProducerConsumer
Producted: 0
Producted: 1
Producted: 2
Producted: 3
Producted: 4
Queue is full Producer is waiting , size: 4
Consumed: 0
Producted: 5
Queue is full Producer is waiting , size: 4
Consumed: 1
Producted: 6
Queue is full Producer is waiting , size: 4
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Queue is empty Consumer is waiting , size: 0

内存模型小结 - 主存与缓存机制

meomry.gif

同时我们就可以观察到volatile的特性:访问寄存器比访问内存单元要快,编译器会优化减少内存的读取,可能会读脏数据。声明变量为volatile,编译器不再对访问该变量的代码优化,仍然从内存读取,使访问稳定。

下面,让我们使用一个同步块标记讲解一下 线程安全-死锁

线程锁
锁是 Java 并发编程中最重要的同步机制,它可以让等待在临界区的线程互斥执行。

import static java.lang.Thread.sleep;

/**
 * Created by 26645 on 2017/11/14.
 */
public class DeadLockSample {

    private final Object obj1 = new Object();
    private final Object obj2 = new Object();

    private void testDeadlock(){
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                caLock12();
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                caLock21();
            }
        });
        t1.start();
        t2.start();
    }


    private void caLock21(){
        synchronized (obj2){
            sleep();
            synchronized (obj1){
                sleep();
            }
        }
    }

    private void caLock12(){
        synchronized (obj1){
            sleep();
            synchronized (obj2){
                sleep();
            }
        }
    }

    private void sleep(){
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){
        DeadLockSample test = new DeadLockSample();
        test.testDeadlock();
    }
}

死锁的解决方案

Notices: 优化建议

我们从上文可知,用sychronized修饰的方法或者语句块在代码执行完之后锁自动释放.而sleep()静态方法,它也能使线程暂停一段时间。但原因在于,sleep并不释放锁,当前线程放弃CPU,开始线程挂起,在挂起中不会释放锁。那么如果我们不想像消费者-生产者机制那样创建两个单独的线程,使用wait()notifyAll()方法通知对方线程何时释放资源。Bingo!介绍我们的第三种方法interrupt()

import static java.lang.Thread.sleep;

/**
 * Created by 26645 on 2017/11/14.
 */
public class DeadLockSample {

    private final Object obj1 = new Object();
    private final Object obj2 = new Object();

    private void testDeadlock(){
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                caLock12();
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                caLock21();
            }
        });
        t1.start();
        t1.interrupt();
        t2.start();
    }


    private void caLock21(){
        synchronized (obj2){
            sleep();
            synchronized (obj1){
                sleep();
            }
        }
    }

    private void caLock12(){
        synchronized (obj1){
            sleep();

            synchronized (obj2){
                sleep();
            }
        }
    }

    private void sleep(){
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){
        DeadLockSample test = new DeadLockSample();
        test.testDeadlock();
    }
}

线程1被打断,锁占用的资源被释放,线程2获取不到占用着的线程1的锁,被迫终止

console output

D:\java\jdk1.8.0_51\bin\java "-javaagent:D:\idea\IntelliJ IDEA 2017.1.4\lib\idea_rt.jar=49300:D:\idea\IntelliJ IDEA 2017.1.4\bin" -Dfile.encoding=UTF-8 -classpath D:\java\jdk1.8.0_51\jre\lib\charsets.jar;D:\java\jdk1.8.0_51\jre\lib\deploy.jar;D:\java\jdk1.8.0_51\jre\lib\ext\access-bridge-64.jar;D:\java\jdk1.8.0_51\jre\lib\ext\cldrdata.jar;D:\java\jdk1.8.0_51\jre\lib\ext\dnsns.jar;D:\java\jdk1.8.0_51\jre\lib\ext\jaccess.jar;D:\java\jdk1.8.0_51\jre\lib\ext\jfxrt.jar;D:\java\jdk1.8.0_51\jre\lib\ext\localedata.jar;D:\java\jdk1.8.0_51\jre\lib\ext\nashorn.jar;D:\java\jdk1.8.0_51\jre\lib\ext\sunec.jar;D:\java\jdk1.8.0_51\jre\lib\ext\sunjce_provider.jar;D:\java\jdk1.8.0_51\jre\lib\ext\sunmscapi.jar;D:\java\jdk1.8.0_51\jre\lib\ext\sunpkcs11.jar;D:\java\jdk1.8.0_51\jre\lib\ext\zipfs.jar;D:\java\jdk1.8.0_51\jre\lib\javaws.jar;D:\java\jdk1.8.0_51\jre\lib\jce.jar;D:\java\jdk1.8.0_51\jre\lib\jfr.jar;D:\java\jdk1.8.0_51\jre\lib\jfxswt.jar;D:\java\jdk1.8.0_51\jre\lib\jsse.jar;D:\java\jdk1.8.0_51\jre\lib\management-agent.jar;D:\java\jdk1.8.0_51\jre\lib\plugin.jar;D:\java\jdk1.8.0_51\jre\lib\resources.jar;D:\java\jdk1.8.0_51\jre\lib\rt.jar;E:\kotlin_learn\out\production\kotlin_learn;C:\Users\26645\.IntelliJIdea2017.1\config\plugins\Kotlin\kotlinc\lib\kotlin-runtime.jar;C:\Users\26645\.IntelliJIdea2017.1\config\plugins\Kotlin\kotlinc\lib\kotlin-reflect.jar;E:\kotlin_learn\junit-4.12.jar DeadLockSample
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at DeadLockSample.sleep(DeadLockSample.java:51)
    at DeadLockSample.caLock12(DeadLockSample.java:41)
    at DeadLockSample.access$000(DeadLockSample.java:6)
    at DeadLockSample$1.run(DeadLockSample.java:15)
    at java.lang.Thread.run(Thread.java:748)

那么如何优雅的中断线程呢?这里就需要引入java的volatile关键字

public class testThread implements Runnable{
    private volatile boolean flag = true; // 设置变量为线程可见
    public synchronized  void stop() {
        // TODO Auto-generated method stub
        flag = false;
    }
    @Override
    public synchronized void run() {
        // TODO Auto-generated method stub
        if(flag){
           // Todo Somethings
        }
    }
}


上一篇下一篇

猜你喜欢

热点阅读