Java多线程和Executors框架

2018-04-26  本文已影响131人  siriusing

Java多线程

正确的编程方法:首先使代码正确运行,然后再提高代码的速度。

常见创建线程的三种方式:


        //1. Thread方式
        MyThread thread=new MyThread();
        thread.start();
        //2. Runnable
        Runnable runnable=new Runnable() {
            @Override
            public void run() {

                for(int i=0;i<100;++i){
                    System.out.println(Thread.currentThread().getName()+Thread.currentThread().getId()+"---"+i);

                }

            }
        };
        new Thread(runnable).start();
        //3. Callable
        Callable<Integer> callable=new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {

                for(int i=0;i<100;++i){
                    System.out.println(Thread.currentThread().getName()+Thread.currentThread().getId()+"---"+i);
                }
                return 15;
            }
        };

        FutureTask<Integer> futureTask=new FutureTask(callable);
        new Thread(futureTask).start();
        try {
            Integer rt = futureTask.get();
            System.out.println(rt);

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

区别:

接口和类的区别:

线程安全

什么是线程安全?

当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。

----- Java 并发编程实战

这里的正确性包括:

如:long a ,a in [Long.MIN_VALUE,LONG.MAX_VALUE],
那么当a>Long.MAX_VALUE时,就不满足不变性条件
    如:两个属性定义满足:a+b=5;
    那么{a=2,b=3}就是有效状态,{a=3,b=3}就是无效状态

这里其实Java并发编程关注的就是共享对象的状态转换

线程安全类封装了必要的同步机制,因此客户端无须进一步采取同步措施

编写线程安全的代码,其核心在于要对状态操作进行管理,特别是对共享的可变的状态的访问。

封装线程安全类的一些手段:

//反例:
    private List<Integer> myList=new ArrayList<>();

    public List getList(){
        
        //对外发布对象,外部可以得到内部对象的引用,之后对这个list的操作不受本类控制,myList的所有权不可控
        return this.myList;
    }
//正例
    private List<Integer> myList=new ArrayList<>();
    
    public List getList(){

        //拷贝一份
        ArrayList<Integer> list = new ArrayList<>();
        for(Integer i:myList){

            list.add(new Integer(i));

        }
        //拒绝修改
        return Collections.unmodifiableList(myList);
    }
/**
 * 不变性条件: blood,lostBlood in [0,100]
 * 后验条件:blood+lostBlood=100
 */
public class Person {
    
    private int blood=100;
    private int lostBlood=0;

    public void injured(int k){
        //必须是同一把锁
        synchronized(this){
            blood-=k;
            lostBlood+=k;

        }
    }
}

Servlet

Servlet在非分布式环境下默认是单例的,可以配置为多实例。所以要注意它的线程安全问题。

配置:

 <load-on-startup>1</load-on-startup>

Executors 框架

实现一个支持并发的小型服务器:

public class TaskServer {
    private static final int NTHREADS=100;
    private static final int PORT=80;

    //固定大小的线程池
    private static final Executor exec= Executors.newFixedThreadPool(NTHREADS);


    public static void main(String[] args) throws IOException {

        ServerSocket serverSocket = new ServerSocket(PORT);
        
        while(true){

            //阻塞
            final Socket s=serverSocket.accept();

            Runnable task=new Runnable() {
                @Override
                public void run() {
                    handleRequest(s);
                }
            };

            exec.execute(task);
        }
    }
    private static void handleRequest(Socket s) {
        //do something
    }
}
       
    Callable task= new Callable() {
            
    @Override
    public Object call() throws Exception {
        
        //do something and return

        int rt=1;
        return rt;
        }
    };
    
    //提交上去的任务得到一个future,也即一次性任务
    Future future = exec.submit(task);


    //阻塞方法,只有任务执行完毕才会返回值,且一次完成,之后返回都是一致的
    int rt=funture.get();

ComCompletionService包装Executor对象,并且内置一个已完成队列。还有调度逻辑。

会将已完成的任务放到BlockQueue中,要用的时候直接take


image.png
package cc.siriuscloud;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

public class Renderer {
    private static final int NTHREADS=100;

    private final ExecutorService executor;


    public Renderer() {
        this.executor = Executors.newFixedThreadPool(NTHREADS);
    }


    @Test
    public void renderPage() {

        List<String> infos = getList();

        //包装成completionService
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);

        for (final String item : infos) {
            //提交任务
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {

                    int rt = Integer.parseInt(item);

                    synchronized (this){

                        //模仿耗时操作
                        Random random = new Random();
                        wait(Math.abs(random.nextInt(1000)));
                    }
                    return rt;
                }
            });

        }


        // 执行完的任务结果放在已完成任务队列中,
        try {

            for (int t = 0, n = infos.size(); t < n; ++t) {
                //取出一个已完成的任务,没有任何结果时阻塞
                Future<Integer> f = completionService.take();
                Integer rt = f.get();
                System.out.println(" 结果是:rt=" + rt);

            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    private List<String> getList() {
        ArrayList<String> list = new ArrayList<>();
        list.add("1000");
        list.add("110");
        list.add("11");
        list.add("1");
        return list;

    }
}


参考:

ConcurrentHashMap学习

上一篇 下一篇

猜你喜欢

热点阅读