程序员

多线程实现与同步工具包详解

2019-04-19  本文已影响0人  帅大叔的简书

多线程学习

概念

简述如下:

并发:指一个CPU可以异步的处理多个进程
并行:则是一个CPU同时处理多个进程
进程:程序运行的执行过程,是一个程序的实例。每个进程都有自己的虚拟地址空间和控制线程
线程:是进程的一个执行单元,是操作系统调度器(Schduler)分配处理器时间的基础单元。

一句话总结:

线程是程序执行时的最小单位,它是进程的一个执行流,是CPU调度和分派的基本单位,一个进程可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行。同样多线程也可以实现并发操作,每个请求分配一个线程来处理。

Java代码实现方式

完整代码如下:

import java.util.concurrent.*;

public class Demo1 {

    /**
     * 第一种实现方式:
     * 继承Thread类创建线程
     */
    public class MyThread extends Thread{
        private int ticket = 10;
        @Override
        public void run() {
            for (int i=0;i<=10;i++){
                if(ticket>0){
                    System.out.println(Thread.currentThread().getName()+"工号 还有 "+ticket--+" 张传单需要派发");
                }else{
                    System.out.println(Thread.currentThread().getName()+"工号 的传单全部派发完了,可以下班了");
                }
            }
        }
    }

    /**
     * 第二种实现方式:
     * 实现Runnable接口创建线程
     */
    public class MyRunable implements Runnable{
        private int ticket = 10;
        @Override
        public void run() {
            for (int i=0;i<10;i++){
                //加锁互斥
                synchronized (this){
                    if(ticket>0){
                        System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");
                    }else {
                        System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");
                        break;
                    }
                }
            }
        }
    }

    /**
     * 第三种实现方式:
     * 实现Callable接口通过FutureTask包装器来创建Thread线程
     */
    public class MyCallRunable implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            int sum=0;
            for (int i = 0; i <= 100; i++) {
                sum += i;
            }
            return sum;
        }
    }


    /**
     * 第四种方式:
     * 线程池的方式,使用ExecutorService、Callable、Future实现有返回结果的线程
     */
    public class OddCallRunable implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            int sum=0;
            for (int i = 0; i <= 100; i++) {
                if(i%2 == 0)
                    sum += i;
            }
            return sum;
        }
    }
    public class EvenCallRunable implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            int sum=0;
            for (int i = 0; i <= 100; i++) {
                if(i%2 == 1)
                    sum += i;
            }
            return sum;
        }
    }


    /**
     * 总结:
     * 方式一、继承Thread类,因为Java 是单继承,继承Thread的类不能再继承其他类,并且继承Thread 其本身就是执行线程,每个子线程之间相互独立,资源不共享
     * 方式二、实现Runnable接口,资源共享,多个线程可以对共享数据进行操作
     * 方式三、实现Callable接口通过FutureTask包装器来创建Thread线程,可以接收一个返回值
     * 方式四、使用ExecutorService、Callable、Future实现有返回结果的线程
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println("第一种方式:");
        MyThread thread1 = new Demo1().new MyThread();
        MyThread thread2 = new Demo1().new MyThread();
        MyThread thread3 = new Demo1().new MyThread();
        thread1.start();
        thread2.start();
        thread3.start();
        Thread.sleep(1000);
        System.out.println();
        System.out.println("第二种方式,");
        MyRunable myRunable = new Demo1().new MyRunable();
        Thread threadTwo1 = new Thread(myRunable);
        Thread threadTwo2 = new Thread(myRunable);
        Thread threadTwo3 = new Thread(myRunable);
        threadTwo1.start();
        threadTwo2.start();
        threadTwo3.start();

        Thread.sleep(1000);
        System.out.println();
        System.out.println("第三种方式:");
        MyCallRunable myCallRunable = new Demo1().new MyCallRunable();
        FutureTask<Integer> futureTask = new FutureTask<>(myCallRunable);
        Thread threadCall = new Thread(futureTask);
        threadCall.start();
        Integer integer = futureTask.get();
        System.out.println("result sum="+integer);



        Thread.sleep(1000);
        System.out.println();
        System.out.println("第四种方式:");
        // 创建一个线程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 创建多个有返回值的任务
        OddCallRunable oddCallRunable = new Demo1().new OddCallRunable();
        EvenCallRunable evenCallRunable = new Demo1().new EvenCallRunable();
        Future<Integer> submit1 = pool.submit(oddCallRunable);
        Future<Integer> submit2 = pool.submit(evenCallRunable);

        // 关闭线程池
        pool.shutdown();
        System.out.println("1-100 偶数和:"+submit1.get());
        System.out.println("1-100 奇数和:"+submit2.get());
        System.out.println("1-100 总和:"+(submit1.get()+submit2.get()));

    }
}

线程的类型

线程的常用方法

synchronized关键字

由于每个线程执行的过程是不可控的,所以很可能导致最终的结果与实际上的愿望相违背或者直接导致程序出错。
比如,当多个线程同时访问临界资源(一个对象,对象中的属性,一个文件,一个数据库等)时,就可能会产生线程安全问题。

临界资源就是共享资源。比如共享变量等

怎么解决线程安全?专业术语叫序列化访问临界资源

线程安全解决方案

synchronized 就是来解决线程安全的方案之一,互斥锁:顾名思义,能到达到互斥访问目的的锁,即在同一时刻,只能有一个线程访问临界资源,也称作同步互斥访问
如果对临界资源加上互斥锁,当一个线程在访问该临界资源时,其他线程便只能等待。
使用synchronized关键字来标记一个方法或者代码块,当某个线程调用该对象的synchronized方法或者访问synchronized代码块时,这个线程便获得了该对象的锁,其他线程暂时无法访问这个方法,只有等待这个方法执行完毕或者代码块执行完毕,这个线程才会释放该对象的锁,其他线程才能执行这个方法或者代码块。
就比如上述代码实现的方式二,片段代码如下,

public void run() {
    for (int i=0;i<10;i++){
        //加锁互斥
        synchronized (this){
            if(ticket>0){
                System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");
            }else {
                System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");
                break;
            }
        }
    }
}

synchronized (this) 中的this 就是一把锁,以当前对象作为锁,多个窗口售票,但却保证了卖票的数量不会出错。
上述的例子是修饰一段代码块,同样的也是可以修饰方法,看情况选择,修饰代码块比较灵活,比如一个方法里面可能你只有一小部分需要同步的话,就没必要放在方法上

synchronized的作用域

Lock

从Java 5之后,在java.util.concurrent.locks包下提供了另外一种方式来实现同步访问,那就是Lock。
可以说Lock就是synchronized的增强版,区别:

public class Demo2 {
    public static void main(String[] args) {
        MyRunable myRunable = new MyRunable();
        Thread thread1 = new Thread(myRunable);
        Thread thread2 = new Thread(myRunable);
        Thread thread3 = new Thread(myRunable);

        thread1.start();
        thread2.start();
        thread3.start();
    }
}
class MyRunable implements Runnable{
    private static int ticket = 10;
    private static Lock lock = new ReentrantLock();
    @Override
    public void run() {
        while (true){
            //加锁互斥
            try {
                lock.lock();
                if(ticket>0){
                    System.out.println(Thread.currentThread().getName()+"窗口 卖出倒数第"+ticket--+"张回家车票");
                }else {
                    System.out.println(Thread.currentThread().getName()+"窗口 车票已售罄");
                    break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }
    }
}

wait()、notify/notifyAll()

这三个方法是用于线程间通信的基础方法,但实际上,它们不是Thread类中的方法,而是Object类中的本地方法
当线程执行wait()方法时候,会释放当前的锁,然后让出CPU,进入等待状态。
只有当 notify/notifyAll() 被执行时候,才会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。
notify方法只唤醒一个等待(对象的)线程并使该线程开始执行。所以如果有多个线程等待一个对象,这个方法只会唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。notifyAll 会唤醒所有等待(对象的)线程,尽管哪一个线程将会第一个处理取决于操作系统的实现
需求:交替打印0-100 的奇偶数

// 打印的数据体
public class Number {
    int i=1;
    volatile boolean isOdd=false;

    public int getI() {
        return i;
    }

    public void setI(int i) {
        this.i = i;
    }

    public boolean isOdd() {
        return isOdd;
    }

    public void setOdd(boolean odd) {
        isOdd = odd;
    }
}

//奇数线程
public class EvenRunable implements Runnable{
    Number number;
    int maxNumber;
    public EvenRunable(Number number, int maxNumber) {
        this.number=number;
        this.maxNumber=maxNumber;
    }

    @Override
    public void run() {
        while (number.getI() <= maxNumber) {
            synchronized (number) {
                if (!number.isOdd) {
                    System.out.println("奇数i=" + number.getI());
                    number.setI(number.getI()+1);
                    number.setOdd(true);
                    //唤醒其他线程
                    number.notify();
                } else {
                    try {
                        //释放锁,阻塞
                        number.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

// 偶数线程
public class OddRunable implements Runnable {
    Number number;
    int maxNumber;

    public OddRunable(Number number, int maxNumber) {
        this.number = number;
        this.maxNumber = maxNumber;
    }

    @Override
    public void run() {
        while (number.getI() <= maxNumber) {
            synchronized (number) {
                if (number.isOdd) {
                    System.out.println("偶数i=" + number.getI());
                    number.setI(number.getI()+1);
                    number.setOdd(false);
                    number.notify();
                } else {
                    try {
                        number.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

// 运行
public class Main {
    public static void main(String[] args) {
        Number number = new Number();
        int maxNumber=100;
        EvenRunable evenRunable = new EvenRunable(number,maxNumber);
        OddRunable oddRunable = new OddRunable(number,maxNumber);
        Thread evenThread = new Thread(evenRunable);
        Thread oddThread = new Thread(oddRunable);
        oddThread.start();
        evenThread.start();
    }
}

//输出
奇数i=1
偶数i=2
奇数i=3
.....
偶数i=98
奇数i=99
偶数i=100

JUC(java.util.concurrent)包同步工具

AtomicInteger

AtomicInteger原子类型,线程安全,如何保证线程安全,它的部分源码如下:

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

private volatile int value;

我们看到value使用了volatile修饰符,volatile相当于synchronized的弱实现,也就是说volatile实现了类似synchronized的语义,却又没有锁机制。
它确保对volatile字段的更新以可预见的方式告知其他的线程。

例子:多线程顺序打印abc
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo5 implements Runnable{
    private static AtomicInteger currentCount = new AtomicInteger(0);

    private static final Integer MAX_COUNT = 30;

    private static String [] chars = {"a", "b", "c"};

    private String name;

    public Demo5(String name) {
        this.name =  name;
    }
    @Override
    public void run() {
        while(currentCount.get()<MAX_COUNT){
            if(this.name.equals(chars[currentCount.get()%3])){
                printAndPlusOne(this.name + "\t" + currentCount);
            }
        }
    }

    public void printAndPlusOne(String content){
        System.out.println(content);
        currentCount.getAndIncrement();
    }
    
    public static void main(String [] args){
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Demo5("a"));
        executorService.submit(new Demo5("b"));
        executorService.submit(new Demo5("c"));
        executorService.shutdown();
    }
}

CyclicBarrier

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
回环的意思是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

例子:多线程顺序打印abc
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo6 implements Runnable{

    //其参数表示屏障拦截的线程数量
    private static int threadNum = 3;
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum);
    private static Integer currentCount = 0;

    private static final Integer MAX_COUNT = 30;

    private static String [] chars = {"a", "b", "c"};

    private String name;

    public Demo6(String name) {
        this.name =  name;
    }

    @Override
    public void run() {
        while(currentCount<MAX_COUNT){
            while(this.name.equals(chars[currentCount%3]))
                printAndPlusOne(this.name + "\t" + currentCount);
            try {
                //使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。等threadNum 个全部await之后再全部同时执行
                cyclicBarrier.await();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public void printAndPlusOne(String name){
        System.out.println(name);
        currentCount ++;
    }

    public static void main(String [] args){
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Demo6("a"));
        executorService.submit(new Demo6("b"));
        executorService.submit(new Demo6("c"));
        executorService.shutdown();


    }
}

参考自:https://blog.csdn.net/u013968384/article/details/82584944

CountDownLatch

倒计时计数器,countDown() 方法调用递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
调用await()方法的线程会被挂起,它会等待直到计数值为0才继续执行

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo8 {

    public static void main(String [] args) throws InterruptedException {
        //赛道个数,一人一道
        int count = 6;
        //指挥官
        final CountDownLatch masterCountDownLatch = new CountDownLatch(1);
        //闭锁,可实现计数器递减
        final CountDownLatch countDownLatch = new CountDownLatch(count);
        AtomicInteger atomicInteger = new AtomicInteger(1);
        System.out.println("校园400米赛跑,即将开始");
        Thread.sleep(1*1000);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < count ; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+"准备好了");
                    //比赛选择准备
                    masterCountDownLatch.await();
                    System.out.println(Thread.currentThread().getName()+"拼命奔跑中");
                    Thread.sleep((long)(Math.random()*10000));
                    if(atomicInteger.decrementAndGet() >= 0){
                        System.out.println("冠军诞生了 "+Thread.currentThread().getName()+"首先到达终点");
                    }else{
                        System.out.println("其次 "+Thread.currentThread().getName()+"到达终点");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //闭锁减一
                countDownLatch.countDown();
            });
        }
        Thread.sleep(5*1000);
        System.out.println("预备,开始比赛");
        // 准备结束,开始比赛,处于等待的线程继续执行任务
        masterCountDownLatch.countDown();

        countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行
        System.out.println("比赛结束");
        executorService.shutdown();
    }
}

控制台打印:

校园400米赛跑,即将开始
pool-1-thread-1准备好了
pool-1-thread-2准备好了
pool-1-thread-3准备好了
pool-1-thread-4准备好了
pool-1-thread-5准备好了
pool-1-thread-6准备好了
预备,开始比赛
pool-1-thread-1拼命奔跑中
pool-1-thread-2拼命奔跑中
pool-1-thread-6拼命奔跑中
pool-1-thread-5拼命奔跑中
pool-1-thread-3拼命奔跑中
pool-1-thread-4拼命奔跑中
冠军诞生了 pool-1-thread-4首先到达终点
其次 pool-1-thread-6到达终点
其次 pool-1-thread-1到达终点
其次 pool-1-thread-2到达终点
其次 pool-1-thread-5到达终点
其次 pool-1-thread-3到达终点
比赛结束

Semaphore

信号量,是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
构造器参数 permits(许可数),定义资源可以并发访问的最大个数。
常用方法:

并发访问如下demo:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

public class Demo7{

    public static void main(String [] args) throws InterruptedException {
        int clientTotal = 12;
        // 同时并发执行的线程数,这里
        int threadTotal = 5;
        int count = 0;
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量,此处用于控制并发的线程数
        final Semaphore semaphore = new Semaphore(threadTotal);
        //闭锁,可实现计数器递减
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    //执行此方法用于获取执行许可,当总计未释放的许可数不超过threadTotal时,
                    //允许通行,否则线程阻塞等待,直到获取到许可。
                    semaphore.acquire();
                    //任务
                    goToWC();
                    //释放许可
                    semaphore.release();

                } catch (Exception e) {
                    //log.error("exception", e);
                    e.printStackTrace();

                }
                //闭锁减一
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();//线程阻塞,直到闭锁值为0时,阻塞才释放,继续往下执行
        executorService.shutdown();
    }
    //上厕所
    public static void goToWC() throws InterruptedException {
        System.out.println(new SimpleDateFormat("HH:mm:ss    ").format(new Date())+Thread.currentThread().getName()+"-正在使用厕所 ");
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName()+"上完厕所了,回去上班");
    }
}

上面的场景是这样的,一个楼层有5个厕所。现在有12 个人要上厕所。只能排队
控制台打印输出,如下

14:12:28    pool-1-thread-5-正在使用厕所 
14:12:28    pool-1-thread-7-正在使用厕所 
14:12:28    pool-1-thread-3-正在使用厕所 
14:12:28    pool-1-thread-2-正在使用厕所 
14:12:28    pool-1-thread-1-正在使用厕所 
pool-1-thread-5上完厕所了,回去上班
pool-1-thread-1上完厕所了,回去上班
pool-1-thread-2上完厕所了,回去上班
pool-1-thread-3上完厕所了,回去上班
14:12:30    pool-1-thread-9-正在使用厕所 
14:12:30    pool-1-thread-6-正在使用厕所 
pool-1-thread-7上完厕所了,回去上班
14:12:30    pool-1-thread-10-正在使用厕所 
14:12:30    pool-1-thread-11-正在使用厕所 
14:12:30    pool-1-thread-4-正在使用厕所 
pool-1-thread-6上完厕所了,回去上班
pool-1-thread-9上完厕所了,回去上班
14:12:32    pool-1-thread-8-正在使用厕所 
14:12:32    pool-1-thread-12-正在使用厕所 
pool-1-thread-4上完厕所了,回去上班
pool-1-thread-10上完厕所了,回去上班
pool-1-thread-11上完厕所了,回去上班
pool-1-thread-8上完厕所了,回去上班
pool-1-thread-12上完厕所了,回去上班

通过前面打印的时间,可知只有threadTotal 个获得许可,其他只能等待获得许可的线程释放许可才能继续执行。
这几个demo算是对多线程有一个了解吧,多线程还有更难的,一起加油!!!

上一篇下一篇

猜你喜欢

热点阅读