Java多线程和Executors框架
2018-04-26 本文已影响131人
siriusing
Java多线程
正确的编程方法:首先使代码正确运行,然后再提高代码的速度。
常见创建线程的三种方式:
- Thread
//1. Thread方式
MyThread thread=new MyThread();
thread.start();
- Runnable
//2. Runnable
Runnable runnable=new Runnable() {
@Override
public void run() {
for(int i=0;i<100;++i){
System.out.println(Thread.currentThread().getName()+Thread.currentThread().getId()+"---"+i);
}
}
};
new Thread(runnable).start();
- Callable
//3. Callable
Callable<Integer> callable=new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for(int i=0;i<100;++i){
System.out.println(Thread.currentThread().getName()+Thread.currentThread().getId()+"---"+i);
}
return 15;
}
};
FutureTask<Integer> futureTask=new FutureTask(callable);
new Thread(futureTask).start();
try {
Integer rt = futureTask.get();
System.out.println(rt);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
区别:
- 性质:
- Runnable和Callable是接口
- Thread 是类,Thread 实现了 Runnable
- 调用:
- Callable 调用call(), 有返回值
- Runnable 调用run(),Runnable 无返回值,
- 异常:
- call 可以抛出异常到拥有者线程,也就是异常可知
- Thread和Runnable的异常无法在其他线程catch
- Callable 可以得到一个返回一次性任务FutureTask对象,表示一次性的异步计算
接口和类的区别:
- 接口可以多implement,Java类只能单继承
- 接口的Class文件中superClass是空的,而类的superClass只要不是Object就一定非空
- 接口是不涉及实现的,所以对象的attributes中没有Code属性,而Class有
线程安全
什么是线程安全?
当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。
----- Java 并发编程实战
这里的正确性包括:
- 不变性条件
- 用于判断状态是有效的还是无效的,也就是状态的
状态空间
- 用于判断状态是有效的还是无效的,也就是状态的
如:long a ,a in [Long.MIN_VALUE,LONG.MAX_VALUE],
那么当a>Long.MAX_VALUE时,就不满足不变性条件
- 后验条件
- 判断状态迁移是否有效
如:两个属性定义满足:a+b=5;
那么{a=2,b=3}就是有效状态,{a=3,b=3}就是无效状态
这里其实Java并发编程关注的就是共享对象的状态转换
。
线程安全类封装了必要的同步机制,因此客户端无须进一步采取同步措施
编写线程安全的代码,其核心在于要对状态操作进行管理,特别是对
共享的
和可变的
状态的访问。
- 无状态的对象一定是线程安全的。
- 线程间共享对象,实际上是对象所有权的传递,所有权唯一
封装线程安全类的一些手段:
- 尽量使得属性是
final
的,因为无状态的对象一定是线程安全的。 - 尽量不对外发布状态,因为发布后所有权的转移是不可控的。
//反例:
private List<Integer> myList=new ArrayList<>();
public List getList(){
//对外发布对象,外部可以得到内部对象的引用,之后对这个list的操作不受本类控制,myList的所有权不可控
return this.myList;
}
//正例
private List<Integer> myList=new ArrayList<>();
public List getList(){
//拷贝一份
ArrayList<Integer> list = new ArrayList<>();
for(Integer i:myList){
list.add(new Integer(i));
}
//拒绝修改
return Collections.unmodifiableList(myList);
}
- 线程封闭。保证只有一个线程在访问当前变量。最具体的表现就是方法中不对外发布的局部变量
- 加锁。其实是为了保证对象所有权在一个时间点只有一个线程能得到。
- 独立的状态,单独加锁就好了
- 组合的状态,状态的转换必须满足后验条件,必须用同一把锁来锁住。
/**
* 不变性条件: blood,lostBlood in [0,100]
* 后验条件:blood+lostBlood=100
*/
public class Person {
private int blood=100;
private int lostBlood=0;
public void injured(int k){
//必须是同一把锁
synchronized(this){
blood-=k;
lostBlood+=k;
}
}
}
- 记得把类和其方法的线程安全性写入文档
Servlet
Servlet在非分布式环境下默认是单例的,可以配置为多实例。所以要注意它的线程安全问题。
-
创建的时机:
- 第一次访问Servlet的时候创建
- web服务器启动时创建
配置:
<load-on-startup>1</load-on-startup>
- 创建:tomcat等服务器
Executors 框架
- Executor接口
- ExecutorService 接口扩展Executor接口,增加了生命周期方法
实现一个支持并发的小型服务器:
public class TaskServer {
private static final int NTHREADS=100;
private static final int PORT=80;
//固定大小的线程池
private static final Executor exec= Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(PORT);
while(true){
//阻塞
final Socket s=serverSocket.accept();
Runnable task=new Runnable() {
@Override
public void run() {
handleRequest(s);
}
};
exec.execute(task);
}
}
private static void handleRequest(Socket s) {
//do something
}
}
- ExecutorService 和Callable:支持返回值和捕捉异常
Callable task= new Callable() {
@Override
public Object call() throws Exception {
//do something and return
int rt=1;
return rt;
}
};
//提交上去的任务得到一个future,也即一次性任务
Future future = exec.submit(task);
//阻塞方法,只有任务执行完毕才会返回值,且一次完成,之后返回都是一致的
int rt=funture.get();
- ComCompletionService
ComCompletionService包装Executor对象,并且内置一个已完成队列。还有调度逻辑。
会将已完成的任务放到BlockQueue中,要用的时候直接take
image.png
package cc.siriuscloud;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
public class Renderer {
private static final int NTHREADS=100;
private final ExecutorService executor;
public Renderer() {
this.executor = Executors.newFixedThreadPool(NTHREADS);
}
@Test
public void renderPage() {
List<String> infos = getList();
//包装成completionService
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
for (final String item : infos) {
//提交任务
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int rt = Integer.parseInt(item);
synchronized (this){
//模仿耗时操作
Random random = new Random();
wait(Math.abs(random.nextInt(1000)));
}
return rt;
}
});
}
// 执行完的任务结果放在已完成任务队列中,
try {
for (int t = 0, n = infos.size(); t < n; ++t) {
//取出一个已完成的任务,没有任何结果时阻塞
Future<Integer> f = completionService.take();
Integer rt = f.get();
System.out.println(" 结果是:rt=" + rt);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private List<String> getList() {
ArrayList<String> list = new ArrayList<>();
list.add("1000");
list.add("110");
list.add("11");
list.add("1");
return list;
}
}
参考: