JAVA 多线程与高并学习笔发记(五)——synchronize

2022-07-03  本文已影响0人  简单一点点

在 Java 中,线程同步使用最多的方法是使用 synchronized 关键字,每个 Java 对象都隐含有一把锁,称为 Java 内置锁。使用 synchronized 调用相当于获取 syncObject 的内置锁,可以对临界区代码段进行排他性保护。

synchronized 同步方法

当使用 synchronized 关键字修饰一个方法时,该方法被声明为同步方法。例如:

public synchronized void seldPlus() {
    amount++;
}

这样只允许一个线程访问方法,其它线程需要执行同一个方法的化,只能等待和排队。

synchronized 同步块

有时候直接在方法中使用 synchronized 关键字会占用多个临界区资源,影响执行效率,这时候可以考虑使用同步块。

考虑下面的方法。

public class TwoPlus {
    private int sum1 = 0;
    private int sum2 = 0;

    public synchronized void plus(int val1, int val2) {
        this.sum1 += val1;
        this.sum2 += val2;
    }
}

其中的同步方法包含对两个临界资源的操作,需要全部占用才能进入,但他们两个其实是互相独立的,这里可以改造成同步块:

public class TwoPlus {
    private int sum1 = 0;
    private int sum2 = 0;
    private Integer sum1Lock = new Integer(1); // 同步锁一
    private Integer sum2Lock = new Integer(2); // 同步锁二

    public void plus(int val1, int val2) {
        // 同步块1
        synchronized(this.sum1Lock) {
            this.sum1 += val1;
        }
        // 同步块2
        synchronized(this.sum2Lock) {
            this.sum2 += val2;
        }

    }
}

实际上,同步方法就是把整个方法作为一个同步代码块,并使用 this 对象锁。

静态的同步方法

普通的 synchronized 实例方法,同步锁为当前对象的 this 的监视锁,如果是静态方法呢?

静态方法属于 Class 实例而不是单个 Object 实例,因此静态方法的同步锁对应的是 Class 对象的监视锁。为了区分,一般将 Object 对象的监视锁叫做对象锁,将 Class 对象的监视锁称为类锁。

由于每个类只有一个 Class 实例,因此使用类锁作为 synchronized 的同步锁时会造成同一个JVM 内的所有线程只能互斥地进入临界区段。

synchronized 使用了Java 中的内置锁,相关原理在下一章再详细介绍,这里先看下一个同步的应用,生产者——消费者问题。

生产者——消费者问题

生产者——消费者问题(Producer-Consumer Problem)也称有限缓冲问题(Boundeded-Buffer Problem),是一个多线程同步问题的经典案例。

生产者——消费者问题描述了两类访问共享缓冲区的线程在实际运行时会发生的问题,生产者线程的主要功能是生成一定量的数据放到缓冲区中,然后重复此过程。消费者线程的主要功能是从缓冲区提取(或消耗)数据。

生产者-消费者问题的关键是:

解决该问题的方案被称为“生产者——消费者”模式。

先看一个线程不安全版本的生产者——消费者问题的实现版本。

首先定义数据缓冲区类:

public class NotSafeDataBuffer<T> {

    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();

    // 保存数量
    private AtomicInteger amount = new AtomicInteger(0);

    // 向数据区增加一个元素
    public void add(T element) throws Exception {
        if(amount.get() > MAX_AMOUNT) {
            System.out.println("队列已经满了!");
            return;
        }
        dataList.add(element);
        System.out.println(element + "");
        amount.incrementAndGet();

        // 如果数据不一致,就抛出异常
        if(amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" +dataList.size());
        }
    }

    public T fetch() throws Exception {
        if(amount.get() <= 0) {
            System.out.println("队列已经空了!");
            return null;
        }

        T element = dataList.remove(0);
        System.out.println(element + "");
        amount.decrementAndGet();
        // 如果数据不一致,就抛出异常
        if(amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}

然后定义生产者类。

public class Producer implements Runnable {

    // 生产的时间间隔
    public static final int PRODUCE_GAP = 200;
    // 总次数
    static final AtomicInteger TURN = new AtomicInteger(0);
    // 生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);
    // 生产者名称
    String name = null;
    // 生产者动作
    Callable action = null;

    int gap = PRODUCE_GAP;

    public Producer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        name = "生产者-" + PRODUCER_NO.incrementAndGet();
    }

    @Override
    public void run() {
        while(true) {
            try {
                // 执行生产动作
                Object out = action.call();
                // 输出生产结果
                if(out != null) {
                    System.out.println("第"  + TURN.get() + "轮生产: " + out);
                }

                try {
                    Thread.sleep(gap);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                TURN.incrementAndGet();
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}

接下来定义消费者类。

public class Consumer implements Runnable{
    // 消费的时间间隔
    public static final int PRODUCE_GAP = 100;
    // 总次数
    static final AtomicInteger TURN = new AtomicInteger(0);
    // 消费者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);
    // 消费者名称
    String name = null;
    // 消费者动作
    Callable action = null;

    int gap = PRODUCE_GAP;

    public Consumer(Callable action, int gap) {
        this.action = action;
        this.gap = gap;
        name = "生产者-" + PRODUCER_NO.incrementAndGet();
    }

    @Override
    public void run() {
        while(true) {
            TURN.incrementAndGet();

            try {
                // 执行消费动作
                Object out = action.call();
                if(out != null) {
                    System.out.println("第"  + TURN.get() + "轮生产: " + out);
                }

                try {
                    Thread.sleep(gap);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }

}

最后将几个类关联起来,模拟生产和消费的动作。

public class NotSafePetStore {

    private static NotSafeDataBuffer<String> notSafeDataBuffer = new NotSafeDataBuffer<>();

    // 生产者执行的动作
    static Callable<String> produceAction = () -> {
        // 随机生成一个字符串作为商品
        String goods = getRandomString(6);
        // 将商品加上共享数据区
        try {
            notSafeDataBuffer.add(goods);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return goods;
    };

    static Callable<String> consumerAction = () -> {
        // 获取商品
        String goods = null;
        try {
            goods = notSafeDataBuffer.fetch();
        } catch(Exception e) {
            e.printStackTrace();
        }
        return goods;
    };

    public static void main(String[] args) throws InterruptedException {
        System.setErr(System.out);
        // 同时并发执行的线程数
        final int THREAD_TOTAL = 20;
        // 线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
        for(int i = 0; i < 5; i++) {
            // 生产者 500 ms 生产一个商品
            threadPool.submit(new Producer(produceAction, 500));
            // 消费者 1500 ms 消费一个商品
            threadPool.submit(new Consumer(consumerAction, 1500));
        }
    }

    public static String getRandomString(int length) {
        String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        Random random = new Random();
        StringBuffer sb = new StringBuffer();

        for(int i=0;i<length;i++){
            int number=random.nextInt(62);
            sb.append(str.charAt(number));
        }
        return sb.toString();
    }
}

运行程序,可以看到明显的报错。

producer-consumer.png

换成安全版本,其实很简单,把数据缓冲区的取出和加入方法加上 synchronized 关键字即可。

public class NotSafeDataBuffer<T> {

    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();

    // 保存数量
    private AtomicInteger amount = new AtomicInteger(0);

    // 向数据区增加一个元素
    public synchronized void add(T element) throws Exception {
        if(amount.get() > MAX_AMOUNT) {
            System.out.println("队列已经满了!");
            return;
        }
        dataList.add(element);
        System.out.println(element + "");
        amount.incrementAndGet();

        // 如果数据不一致,就抛出异常
        if(amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" +dataList.size());
        }
    }

    public synchronized T fetch() throws Exception {
        if(amount.get() <= 0) {
            System.out.println("队列已经空了!");
            return null;
        }

        T element = dataList.remove(0);
        System.out.println(element + "");
        amount.decrementAndGet();
        // 如果数据不一致,就抛出异常
        if(amount.get() != dataList.size()) {
            throw new Exception(amount + "!=" + dataList.size());
        }
        return element;
    }
}

再次运行程序,会发现报错消失了。

上一篇下一篇

猜你喜欢

热点阅读