多线程

2020-05-02  本文已影响0人  relax_小罗罗

Java 程序天生就是多线程的

一个 Java 程序从 main()方法开始执行,然后按照既定的代码逻辑执行,看 似没有其他线程参与,但实际上 Java 程序天生就是多线程程序,因为执行 main() 方法的是一个名称为 main 的线程。
[6] Monitor Ctrl-Break //监控 Ctrl-Break 中断信号的
[5] Attach Listener //内存 dump,线程 dump,类信息统计,获取系统属性等
[4] Signal Dispatcher // 分发处理发送给 JVM 信号的线程
[3] Finalizer // 调用对象 finalize 方法的线程
[2] Reference Handler//清除 Reference 的线程
[1] main //main 线程,用户程序入口

一个线程是进程中的执行流
一个进程可以同时包括多个线程

实现线程的两种方式

Thread(主要是继承extends)

thread 对象需要一个任务来执行,任务是指线程在启动时执行的工作,该工作的功能代码被卸载run()方法中
run方法必须使用一下语法格式

public class ThreadDemo extends Thread {
    int count =10;
    @Override
    public void run() {
     while (true){
         System.out.println("-----------"+count);
         if(--count==0){
             return;
         }
     }
    }

    public static void main(String[] args) {
        ThreadDemo threadDemo=new ThreadDemo();
        threadDemo.start();
        System.out.println("--------主方法结束");
    }
}

线程必须用start()方法启动

Runnable(主要是实现 implements)

实质上Thread类实现了Runnable 接口,其中的run方法正式对Runnable接口中的run()方法的具体实现;
构造方法:
public Thread(Runnable target);
public Thread(Runnable target,String threadName);

public class RunnableDemo implements Runnable {
    int count =10;
    @Override
    public void run() {
        while (true){
            System.out.println("-----------"+count);
            if(--count==0){
                return;
            }
        }
    }

    public static void main(String[] args) {
        RunnableDemo threadDemo=new RunnableDemo();
        new Thread(threadDemo).start();
        System.out.println("--------主方法结束");
    }
}

线程的生命周期

image.png

就绪状态:
sleep();
wait();
等待输入/输出完成

可称为运行状态方法:
notify();
notifyAll();
interrupt();
线程休眠结束;
输入输出结束;


image.png

线程的休眠:

  @Override
    public void run() {
     while (true){
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println("-----------"+count);
         if(--count==0){
             return;
         }
     }
    }

线程的加入

public class JoinTest extends JFrame {
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    private Thread threadA; // 定义两个线程
    private Thread threadB;
    final JProgressBar progressBar = new JProgressBar(); // 定义两个进度条组件
    final JProgressBar progressBar2 = new JProgressBar();
    int count = 0;

    public static void main(String[] args) {
        init(new JoinTest(), 100, 100);
    }

    public JoinTest() {
        super();
        // 将进度条设置在窗体最北面
        getContentPane().add(progressBar, BorderLayout.NORTH);
        // 将进度条设置在窗体最南面
        getContentPane().add(progressBar2, BorderLayout.SOUTH);
        progressBar.setStringPainted(true); // 设置进度条显示数字字符
        progressBar2.setStringPainted(true);
        // 使用匿名内部类形式初始化Thread实例子
        threadA = new Thread(new Runnable() {
            int count = 0;

            public void run() { // 重写run()方法
                while (true) {
                    progressBar.setValue(++count); // 设置进度条的当前值
                    try {
                        Thread.sleep(100); // 使线程A休眠100毫秒
                        threadB.join(); // 使线程B调用join()方法
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        threadA.start(); // 启动线程A
        threadB = new Thread(new Runnable() {
            int count = 0;

            public void run() {
                while (true) {
                    progressBar2.setValue(++count); // 设置进度条的当前值
                    try {
                        Thread.sleep(100); // 使线程B休眠100毫秒
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (count == 100) // 当count变量增长为100时
                        break; // 跳出循环
                }
            }
        });
        threadB.start(); // 启动线程B
    }

    // 设置窗体各种属性方法
    public static void init(JFrame frame, int width, int height) {
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        frame.setSize(width, height);
        frame.setVisible(true);
    }
}

线程中断

JDK早已废除了stop方法,不建议使用stop()来停止一个线程的运行。现在提倡在run()方法中无限循环,然后使用一个布尔标记循环停止
通知线程要停止了(体现线程的协作)
interrupt();
isInterrupted();
Thread.interrupted(); 会将中断表示为改为false;

//不推荐
        private boolean isContinue=false;

    public void setContinue(boolean aContinue) {
        isContinue = aContinue;
    }

    @Override
    public void run() {
        while (!isContinue){
            try {

                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"-----------"+count);
            --count;
        }
}
 @Override
    public void run() {
        //推荐用法
      //  while (!isInterrupted()) {
        while (!Thread.interrupted()) {
            try {

                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "-----------" + count);
            --count;
        }


    }

    public static void main(String[] args) throws InterruptedException {
        ThreadDemo threadDemo = new ThreadDemo();
        threadDemo.setName("线程A");
        threadDemo.start();
        Thread.sleep(4000);
        threadDemo.interrupt();

        System.out.println("--------主方法结束");
    }

如果线程中使用了sleep() 和wait() 方法,可以使用interrupt()方法退出循环但是会抛出InterruptedException异常,只要捕获异常,并处理中断业务即可。

public class InterruptedSwing extends JFrame {

    private static final long serialVersionUID = 1L;
    Thread thread;

    public static void main(String[] args) {
        init(new InterruptedSwing(), 100, 100);
    }

    public InterruptedSwing() {
        super();
        final JProgressBar progressBar = new JProgressBar(); // 创建进度条
        // 将进度条放置在窗体合适位置
        getContentPane().add(progressBar, BorderLayout.NORTH);
        progressBar.setStringPainted(true); // 设置进度条上显示数字
        thread = new Thread(new Runnable() {
            int count = 0;

            public void run() {
                while (true) {
                    progressBar.setValue(++count); // 设置进度条的当前值
                    try {
                        Thread.sleep(1000); // 使线程休眠1000豪秒
                        // 捕捉InterruptedException异常
                    } catch (InterruptedException e) {
                        System.out.println("当前线程序被中断");
                        break;
                    }
                }
            }
        });
        thread.start(); // 启动线程
        thread.interrupt(); // 中断线程
    }

    public static void init(JFrame frame, int width, int height) {
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        frame.setSize(width, height);
        frame.setVisible(true);
    }
}

线程优先级

public class PriorityTest extends JFrame {
    private static final long serialVersionUID = 1L;
    private Thread threadA;
    private Thread threadB;
    private Thread threadC;
    private Thread threadD;

    public PriorityTest() {
        getContentPane().setLayout(new GridLayout(4, 1));
        // 分别实例化4个线程
        final JProgressBar progressBar = new JProgressBar();
        final JProgressBar progressBar2 = new JProgressBar();
        final JProgressBar progressBar3 = new JProgressBar();
        final JProgressBar progressBar4 = new JProgressBar();
        getContentPane().add(progressBar);
        getContentPane().add(progressBar2);
        getContentPane().add(progressBar3);
        getContentPane().add(progressBar4);
        progressBar.setStringPainted(true);
        progressBar2.setStringPainted(true);
        progressBar3.setStringPainted(true);
        progressBar4.setStringPainted(true);
        threadA = new Thread(new MyThread(progressBar));
        threadB = new Thread(new MyThread(progressBar2));
        threadC = new Thread(new MyThread(progressBar3));
        threadD = new Thread(new MyThread(progressBar4));
        //优先级不在1-10之内 会出现ILLegalArgumentException异常
        setPriority("threadA", 5, threadA);
        setPriority("threadB", 5, threadB);
        setPriority("threadC", 4, threadC);
        setPriority("threadD", 3, threadD);
    }

    // 定义设置线程的名称、优先级的方法
    public static void setPriority(String threadName, int priority,
                                   Thread t) {
        t.setPriority(priority); // 设置线程的优先级
        t.setName(threadName); // 设置线程的名称
        t.start(); // 启动线程
    }

    public static void main(String[] args) {
        init(new PriorityTest(), 100, 100);
    }

    public static void init(JFrame frame, int width, int height) {
        frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        frame.setSize(width, height);
        frame.setVisible(true);
    }

    private final class MyThread implements Runnable { // 定义一个实现Runnable接口的类
        private final JProgressBar bar;
        int count = 0;

        private MyThread(JProgressBar bar) {
            this.bar = bar;
        }

        public void run() { // 重写run()方法
            while (true) {
                bar.setValue(count += 10); // 设置滚动条的值每次自增10
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("当前线程序被中断");
                }
            }
        }
    }

}

线程安全

两个线程同时存取单一对象的数据;
类锁:多个对象之间互不干扰,锁的其实是class对象 ;

   private static synchronized void synClass(){
      
         try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
        System.out.println("---------");
    }

方法锁

int num = 10; // 设置当前总票数

    public void run() {
        while (true) {
            if (num > 0) {
                try {
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"---" + num--);
            }
        }
    }

    public static void main(String[] args) {
        ThreadSafeTest t = new ThreadSafeTest(); // 实例化类对象
        Thread tA = new Thread(t,"A"); // 以该类对象分别实例化4个线程
        Thread tB = new Thread(t,"B");
        Thread tC = new Thread(t,"C");
        Thread tD = new Thread(t,"D");
        tA.start(); // 分别启动线程
        tB.start();
        tC.start();
        tD.start();
    }

JAVA提供了synchronized关键字来防止资源冲突

 public void run() {
        while (true) {
            synchronized ("") {
                if (num > 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "---" + num--);
                }
            }
        }
    }

synchronized(object){
}
Object 为任意一个对象,每个对象都存在一个标志位,并具有两个值,0 和1 ,一个线程运行到同步块时首先检查该对象的标志位,如果为0状态,表明此同步块中存在其他线程在运行,这时该线程处于就绪状态,直到处于同步块中的线程执行完同步块中代码时 状态改为1,线程才能执行同步块代码
同步方法就是将synchronized 方法上面;

 public synchronized void  doinit(){
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

volatile关键字,最轻量的同步机制

只适合一写多读.

守护线程的使用

 private static class UseThread extends Thread{
        @Override
        public void run() {
            try {
                while (!isInterrupted()) {
                    System.out.println(Thread.currentThread().getName()
                            + " I am extends Thread.");
                }
                System.out.println(Thread.currentThread().getName()
                        + " interrupt flag is " + isInterrupted());
            } finally {
                //守护线程中finally不一定起作用
                System.out.println(" .............finally");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        UseThread useThread = new UseThread();
        useThread.setDaemon(true);
        useThread.start();
        Thread.sleep(5);
       // useThread.interrupt();
    }

ThreadLocal

每个线程都有自己的副本,保证隔离;
引发的内存泄漏分析
强引用 Object o=new Object
软引用
弱引用 weakReference 只要发生GC 就会被回收
虚引用

public class UseThreadLocal {
    private static ThreadLocal<Integer> intLocal
            = new ThreadLocal<Integer>(){
        @Override
        protected Integer initialValue() {
            System.out.println("this initialvalue is running......");
            return 1;
        }
    };

    private static ThreadLocal<String> stringThreadLocal =new ThreadLocal<String>(){
        @Override
        protected String initialValue() {
            System.out.println("this initialvalue is running......");
            return " threadName  is :"+Thread.currentThread().getName() ;
        }
    };

    /**
     * 运行3个线程
     */
    public void StartThreadArray(){
        Thread[] runs = new Thread[3];
        for(int i=0;i<runs.length;i++){
            runs[i]=new Thread(new TestThread(i));
        }
        for(int i=0;i<runs.length;i++){
            runs[i].start();
        }
    }

    /**
     *类说明:测试线程,线程的工作是将ThreadLocal变量的值变化,并写回,看看线程之间是否会互相影响
     */
    public static class TestThread implements Runnable{
        int id;
        public TestThread(int id){
            this.id = id;
        }
        public void run() {
            System.out.println(Thread.currentThread().getName()+":start");
            Integer s = intLocal.get();
            s = s+id;
            intLocal.set(s);
            System.out.println(Thread.currentThread().getName()
                    +":"+ intLocal.get());
            System.out.println(stringThreadLocal.get());
            //当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度
            intLocal.remove();
        }
    }

    public static void main(String[] args){
        UseThreadLocal test = new UseThreadLocal();
        test.StartThreadArray();
    }
}

ThreadLocal的线程不安全

public class ThreadLocalUnsafe implements Runnable{
    //对象中同一个对象引用
    public static Number number = new Number(0);
    
    //去掉静态即可
  //  public Number number = new Number(0);

    public void run() {
        //每个线程计数加一
        number.setNum(number.getNum()+1);
        //将其存储到ThreadLocal中
        value.set(number);
        SleepTools.ms(2);
        //输出num值
        System.out.println(Thread.currentThread().getName()+"="+value.get().getNum());
    }

    public static ThreadLocal<Number> value = new ThreadLocal<Number>() {
    };

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new ThreadLocalUnsafe()).start();
        }
    }

    private static class Number {
        public Number(int num) {
            this.num = num;
        }

        private int num;

        public int getNum() {
            return num;
        }

        public void setNum(int num) {
            this.num = num;
        }

        @Override
        public String toString() {
            return "Number [num=" + num + "]";
        }
    }

}

线程开发工具类

fork/join

算法中有 快速排序,归并排序,外部排序用到fork/join 概念


image.png

RecursiveTask接受返回值;
RecursiveAction不接受返回值;

CountDownLatch

控制器:控制main线程将会等待所有Woker结束后才能继续执行


image.png
public class UseCountDownLatch {
    
    static CountDownLatch latch = new CountDownLatch(6);

    /*初始化线程*/
    private static class InitThread implements Runnable{

        public void run() {
            System.out.println("Thread_"+Thread.currentThread().getId()
                    +" ready init work......");
            latch.countDown();
            for(int i =0;i<2;i++) {
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ........continue do its work");
            }
        }
    }

    /*业务线程等待latch的计数器为0完成*/
    private static class BusiThread implements Runnable{

        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(int i =0;i<3;i++) {
                System.out.println("BusiThread_"+Thread.currentThread().getId()
                        +" do business-----");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            public void run() {
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work step 1st......");
                latch.countDown();
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work step 2nd......");
                latch.countDown();
            }
        }).start();
        new Thread(new BusiThread()).start();
        for(int i=0;i<=3;i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }

        latch.await();
        System.out.println("Main do ites work........");
    }
}

CyclicBarrier

到达一个节点,然后在一起执行
CountDownLatch 总数可以和线程数不一样
CyclicBarrier 必须等于线程数


image.png
public class UseCyclicBarrier {

    private static CyclicBarrier barrier
            = new CyclicBarrier(4,new CollectThread());

    //存放子线程工作结果的容器
    private static ConcurrentHashMap<String,Long> resultMap
            = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for(int i=0;i<4;i++){
            Thread thread = new Thread(new SubThread());
            thread.start();
        }

    }

    /*汇总的任务*/
    private static class CollectThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
                result.append("["+workResult.getValue()+"]");
            }
            System.out.println(" the result = "+ result);
            System.out.println("do other business........");
        }
    }

    /*相互等待的子线程*/
    private static class SubThread implements Runnable{

        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            resultMap.put(Thread.currentThread().getId()+"",id);
            try {
                    Thread.sleep(1000+id);
                    System.out.println("Thread_"+id+" ....do something ");
                    //汇总1次
                barrier.await();
                Thread.sleep(1000+id);
                System.out.println("Thread_"+id+" ....do its business ");
                //汇总2次
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

Semaphore

线程流控
useful.release(); 可以凭空new出来一个连接 放进池子
所以用两个 来控制


image.png
public class DBPoolSemaphore {
    
    private final static int POOL_SIZE = 10;
    //两个指示器,分别表示池子还有可用连接和已用连接
    private final Semaphore useful,useless;
    //存放数据库连接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();
    //初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }
    public DBPoolSemaphore() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }
    
    /*归还连接*/
    public void returnConnect(Connection connection) throws InterruptedException {
        if(connection!=null) {
            System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"
                    +"可用连接数:"+useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }
    
    /*从池子拿连接*/
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
    
}

Callable、Future和FutureTask

image.png

通过线程拿到返回结果

public class UseFuture {
    
    
    /*实现Callable接口,允许有返回值*/
    private static class UseCallable implements Callable<Integer>{
        private int sum;
        @Override
        public Integer call() throws Exception {
            System.out.println("Callable子线程开始计算!");  
//          Thread.sleep(1000);
            for(int i=0 ;i<5000;i++){
                if(Thread.currentThread().isInterrupted()) {
                    System.out.println("Callable子线程计算任务中断!");
                    return null;
                }
                sum=sum+i;
                System.out.println("sum="+sum);
            }  
            System.out.println("Callable子线程计算结束!结果为: "+sum);  
            return sum; 
        }
    }
    
    public static void main(String[] args) 
            throws InterruptedException, ExecutionException {

        UseCallable useCallable = new UseCallable();
        //包装
        FutureTask<Integer> futureTask = new FutureTask<>(useCallable);
        Random r = new Random();
        new Thread(futureTask).start();

        Thread.sleep(1);
        if(r.nextInt(100)>50){
            System.out.println("Get UseCallable result = "+futureTask.get());
        }else{
            System.out.println("Cancel................. ");
            futureTask.cancel(true);
        }

    }

}

乐观锁 ---- 成不成功无所谓,先执行;
悲观锁 ---- 先占有 在执行, 经常造成死锁;

原子操作CAS--无锁化编程

CAS(Compare And Swap)
类似事务,要么 全部完成,要么不成功;
一般处理器 提供 CAS指令()
CAS的原理
利用了现代处理器都支持的CAS的指令,
循环这个指令,直到成功为止
CAS的问题
ABA问题: 拿到 A1 去电脑内存中值为A3
开销问题: 循环的开销
只能保证一个共享变量的原子操作


image.png

Jdk中相关原子操作类的使用
更新基本类型类:AtomicBoolean,AtomicInteger,AtomicLong
更新数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新引用类型:AtomicReference,AtomicMarkableReference,AtomicStampedReference
原子更新字段类: AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater

上一篇下一篇

猜你喜欢

热点阅读