多线程
Java 程序天生就是多线程的
一个 Java 程序从 main()方法开始执行,然后按照既定的代码逻辑执行,看 似没有其他线程参与,但实际上 Java 程序天生就是多线程程序,因为执行 main() 方法的是一个名称为 main 的线程。
[6] Monitor Ctrl-Break //监控 Ctrl-Break 中断信号的
[5] Attach Listener //内存 dump,线程 dump,类信息统计,获取系统属性等
[4] Signal Dispatcher // 分发处理发送给 JVM 信号的线程
[3] Finalizer // 调用对象 finalize 方法的线程
[2] Reference Handler//清除 Reference 的线程
[1] main //main 线程,用户程序入口
一个线程是进程中的执行流
一个进程可以同时包括多个线程
实现线程的两种方式
Thread(主要是继承extends)
thread 对象需要一个任务来执行,任务是指线程在启动时执行的工作,该工作的功能代码被卸载run()方法中
run方法必须使用一下语法格式
public class ThreadDemo extends Thread {
int count =10;
@Override
public void run() {
while (true){
System.out.println("-----------"+count);
if(--count==0){
return;
}
}
}
public static void main(String[] args) {
ThreadDemo threadDemo=new ThreadDemo();
threadDemo.start();
System.out.println("--------主方法结束");
}
}
线程必须用start()方法启动
Runnable(主要是实现 implements)
实质上Thread类实现了Runnable 接口,其中的run方法正式对Runnable接口中的run()方法的具体实现;
构造方法:
public Thread(Runnable target);
public Thread(Runnable target,String threadName);
public class RunnableDemo implements Runnable {
int count =10;
@Override
public void run() {
while (true){
System.out.println("-----------"+count);
if(--count==0){
return;
}
}
}
public static void main(String[] args) {
RunnableDemo threadDemo=new RunnableDemo();
new Thread(threadDemo).start();
System.out.println("--------主方法结束");
}
}
线程的生命周期
image.png就绪状态:
sleep();
wait();
等待输入/输出完成
可称为运行状态方法:
notify();
notifyAll();
interrupt();
线程休眠结束;
输入输出结束;
image.png
线程的休眠:
@Override
public void run() {
while (true){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-----------"+count);
if(--count==0){
return;
}
}
}
线程的加入
public class JoinTest extends JFrame {
/**
*
*/
private static final long serialVersionUID = 1L;
private Thread threadA; // 定义两个线程
private Thread threadB;
final JProgressBar progressBar = new JProgressBar(); // 定义两个进度条组件
final JProgressBar progressBar2 = new JProgressBar();
int count = 0;
public static void main(String[] args) {
init(new JoinTest(), 100, 100);
}
public JoinTest() {
super();
// 将进度条设置在窗体最北面
getContentPane().add(progressBar, BorderLayout.NORTH);
// 将进度条设置在窗体最南面
getContentPane().add(progressBar2, BorderLayout.SOUTH);
progressBar.setStringPainted(true); // 设置进度条显示数字字符
progressBar2.setStringPainted(true);
// 使用匿名内部类形式初始化Thread实例子
threadA = new Thread(new Runnable() {
int count = 0;
public void run() { // 重写run()方法
while (true) {
progressBar.setValue(++count); // 设置进度条的当前值
try {
Thread.sleep(100); // 使线程A休眠100毫秒
threadB.join(); // 使线程B调用join()方法
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
threadA.start(); // 启动线程A
threadB = new Thread(new Runnable() {
int count = 0;
public void run() {
while (true) {
progressBar2.setValue(++count); // 设置进度条的当前值
try {
Thread.sleep(100); // 使线程B休眠100毫秒
} catch (Exception e) {
e.printStackTrace();
}
if (count == 100) // 当count变量增长为100时
break; // 跳出循环
}
}
});
threadB.start(); // 启动线程B
}
// 设置窗体各种属性方法
public static void init(JFrame frame, int width, int height) {
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setSize(width, height);
frame.setVisible(true);
}
}
线程中断
JDK早已废除了stop方法,不建议使用stop()来停止一个线程的运行。现在提倡在run()方法中无限循环,然后使用一个布尔标记循环停止
通知线程要停止了(体现线程的协作)
interrupt();
isInterrupted();
Thread.interrupted(); 会将中断表示为改为false;
//不推荐
private boolean isContinue=false;
public void setContinue(boolean aContinue) {
isContinue = aContinue;
}
@Override
public void run() {
while (!isContinue){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"-----------"+count);
--count;
}
}
@Override
public void run() {
//推荐用法
// while (!isInterrupted()) {
while (!Thread.interrupted()) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "-----------" + count);
--count;
}
}
public static void main(String[] args) throws InterruptedException {
ThreadDemo threadDemo = new ThreadDemo();
threadDemo.setName("线程A");
threadDemo.start();
Thread.sleep(4000);
threadDemo.interrupt();
System.out.println("--------主方法结束");
}
如果线程中使用了sleep() 和wait() 方法,可以使用interrupt()方法退出循环但是会抛出InterruptedException异常,只要捕获异常,并处理中断业务即可。
public class InterruptedSwing extends JFrame {
private static final long serialVersionUID = 1L;
Thread thread;
public static void main(String[] args) {
init(new InterruptedSwing(), 100, 100);
}
public InterruptedSwing() {
super();
final JProgressBar progressBar = new JProgressBar(); // 创建进度条
// 将进度条放置在窗体合适位置
getContentPane().add(progressBar, BorderLayout.NORTH);
progressBar.setStringPainted(true); // 设置进度条上显示数字
thread = new Thread(new Runnable() {
int count = 0;
public void run() {
while (true) {
progressBar.setValue(++count); // 设置进度条的当前值
try {
Thread.sleep(1000); // 使线程休眠1000豪秒
// 捕捉InterruptedException异常
} catch (InterruptedException e) {
System.out.println("当前线程序被中断");
break;
}
}
}
});
thread.start(); // 启动线程
thread.interrupt(); // 中断线程
}
public static void init(JFrame frame, int width, int height) {
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setSize(width, height);
frame.setVisible(true);
}
}
线程优先级
public class PriorityTest extends JFrame {
private static final long serialVersionUID = 1L;
private Thread threadA;
private Thread threadB;
private Thread threadC;
private Thread threadD;
public PriorityTest() {
getContentPane().setLayout(new GridLayout(4, 1));
// 分别实例化4个线程
final JProgressBar progressBar = new JProgressBar();
final JProgressBar progressBar2 = new JProgressBar();
final JProgressBar progressBar3 = new JProgressBar();
final JProgressBar progressBar4 = new JProgressBar();
getContentPane().add(progressBar);
getContentPane().add(progressBar2);
getContentPane().add(progressBar3);
getContentPane().add(progressBar4);
progressBar.setStringPainted(true);
progressBar2.setStringPainted(true);
progressBar3.setStringPainted(true);
progressBar4.setStringPainted(true);
threadA = new Thread(new MyThread(progressBar));
threadB = new Thread(new MyThread(progressBar2));
threadC = new Thread(new MyThread(progressBar3));
threadD = new Thread(new MyThread(progressBar4));
//优先级不在1-10之内 会出现ILLegalArgumentException异常
setPriority("threadA", 5, threadA);
setPriority("threadB", 5, threadB);
setPriority("threadC", 4, threadC);
setPriority("threadD", 3, threadD);
}
// 定义设置线程的名称、优先级的方法
public static void setPriority(String threadName, int priority,
Thread t) {
t.setPriority(priority); // 设置线程的优先级
t.setName(threadName); // 设置线程的名称
t.start(); // 启动线程
}
public static void main(String[] args) {
init(new PriorityTest(), 100, 100);
}
public static void init(JFrame frame, int width, int height) {
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
frame.setSize(width, height);
frame.setVisible(true);
}
private final class MyThread implements Runnable { // 定义一个实现Runnable接口的类
private final JProgressBar bar;
int count = 0;
private MyThread(JProgressBar bar) {
this.bar = bar;
}
public void run() { // 重写run()方法
while (true) {
bar.setValue(count += 10); // 设置滚动条的值每次自增10
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("当前线程序被中断");
}
}
}
}
}
线程安全
两个线程同时存取单一对象的数据;
类锁:多个对象之间互不干扰,锁的其实是class对象 ;
private static synchronized void synClass(){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
}
System.out.println("---------");
}
方法锁
int num = 10; // 设置当前总票数
public void run() {
while (true) {
if (num > 0) {
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"---" + num--);
}
}
}
public static void main(String[] args) {
ThreadSafeTest t = new ThreadSafeTest(); // 实例化类对象
Thread tA = new Thread(t,"A"); // 以该类对象分别实例化4个线程
Thread tB = new Thread(t,"B");
Thread tC = new Thread(t,"C");
Thread tD = new Thread(t,"D");
tA.start(); // 分别启动线程
tB.start();
tC.start();
tD.start();
}
JAVA提供了synchronized关键字来防止资源冲突
public void run() {
while (true) {
synchronized ("") {
if (num > 0) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "---" + num--);
}
}
}
}
synchronized(object){
}
Object 为任意一个对象,每个对象都存在一个标志位,并具有两个值,0 和1 ,一个线程运行到同步块时首先检查该对象的标志位,如果为0状态,表明此同步块中存在其他线程在运行,这时该线程处于就绪状态,直到处于同步块中的线程执行完同步块中代码时 状态改为1,线程才能执行同步块代码
同步方法就是将synchronized 方法上面;
public synchronized void doinit(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
volatile关键字,最轻量的同步机制
只适合一写多读.
守护线程的使用
private static class UseThread extends Thread{
@Override
public void run() {
try {
while (!isInterrupted()) {
System.out.println(Thread.currentThread().getName()
+ " I am extends Thread.");
}
System.out.println(Thread.currentThread().getName()
+ " interrupt flag is " + isInterrupted());
} finally {
//守护线程中finally不一定起作用
System.out.println(" .............finally");
}
}
}
public static void main(String[] args) throws InterruptedException {
UseThread useThread = new UseThread();
useThread.setDaemon(true);
useThread.start();
Thread.sleep(5);
// useThread.interrupt();
}
ThreadLocal
每个线程都有自己的副本,保证隔离;
引发的内存泄漏分析
强引用 Object o=new Object
软引用
弱引用 weakReference 只要发生GC 就会被回收
虚引用
public class UseThreadLocal {
private static ThreadLocal<Integer> intLocal
= new ThreadLocal<Integer>(){
@Override
protected Integer initialValue() {
System.out.println("this initialvalue is running......");
return 1;
}
};
private static ThreadLocal<String> stringThreadLocal =new ThreadLocal<String>(){
@Override
protected String initialValue() {
System.out.println("this initialvalue is running......");
return " threadName is :"+Thread.currentThread().getName() ;
}
};
/**
* 运行3个线程
*/
public void StartThreadArray(){
Thread[] runs = new Thread[3];
for(int i=0;i<runs.length;i++){
runs[i]=new Thread(new TestThread(i));
}
for(int i=0;i<runs.length;i++){
runs[i].start();
}
}
/**
*类说明:测试线程,线程的工作是将ThreadLocal变量的值变化,并写回,看看线程之间是否会互相影响
*/
public static class TestThread implements Runnable{
int id;
public TestThread(int id){
this.id = id;
}
public void run() {
System.out.println(Thread.currentThread().getName()+":start");
Integer s = intLocal.get();
s = s+id;
intLocal.set(s);
System.out.println(Thread.currentThread().getName()
+":"+ intLocal.get());
System.out.println(stringThreadLocal.get());
//当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度
intLocal.remove();
}
}
public static void main(String[] args){
UseThreadLocal test = new UseThreadLocal();
test.StartThreadArray();
}
}
ThreadLocal的线程不安全
public class ThreadLocalUnsafe implements Runnable{
//对象中同一个对象引用
public static Number number = new Number(0);
//去掉静态即可
// public Number number = new Number(0);
public void run() {
//每个线程计数加一
number.setNum(number.getNum()+1);
//将其存储到ThreadLocal中
value.set(number);
SleepTools.ms(2);
//输出num值
System.out.println(Thread.currentThread().getName()+"="+value.get().getNum());
}
public static ThreadLocal<Number> value = new ThreadLocal<Number>() {
};
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new ThreadLocalUnsafe()).start();
}
}
private static class Number {
public Number(int num) {
this.num = num;
}
private int num;
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return "Number [num=" + num + "]";
}
}
}
线程开发工具类
fork/join
算法中有 快速排序,归并排序,外部排序用到fork/join 概念
image.png
RecursiveTask接受返回值;
RecursiveAction不接受返回值;
CountDownLatch
控制器:控制main线程将会等待所有Woker结束后才能继续执行
image.png
public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6);
/*初始化线程*/
private static class InitThread implements Runnable{
public void run() {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work......");
latch.countDown();
for(int i =0;i<2;i++) {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ........continue do its work");
}
}
}
/*业务线程等待latch的计数器为0完成*/
private static class BusiThread implements Runnable{
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i =0;i<3;i++) {
System.out.println("BusiThread_"+Thread.currentThread().getId()
+" do business-----");
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 1st......");
latch.countDown();
System.out.println("begin step 2nd.......");
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 2nd......");
latch.countDown();
}
}).start();
new Thread(new BusiThread()).start();
for(int i=0;i<=3;i++){
Thread thread = new Thread(new InitThread());
thread.start();
}
latch.await();
System.out.println("Main do ites work........");
}
}
CyclicBarrier
到达一个节点,然后在一起执行
CountDownLatch 总数可以和线程数不一样
CyclicBarrier 必须等于线程数
image.png
public class UseCyclicBarrier {
private static CyclicBarrier barrier
= new CyclicBarrier(4,new CollectThread());
//存放子线程工作结果的容器
private static ConcurrentHashMap<String,Long> resultMap
= new ConcurrentHashMap<>();
public static void main(String[] args) {
for(int i=0;i<4;i++){
Thread thread = new Thread(new SubThread());
thread.start();
}
}
/*汇总的任务*/
private static class CollectThread implements Runnable{
@Override
public void run() {
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result);
System.out.println("do other business........");
}
}
/*相互等待的子线程*/
private static class SubThread implements Runnable{
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId()+"",id);
try {
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do something ");
//汇总1次
barrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do its business ");
//汇总2次
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Semaphore
线程流控
useful.release(); 可以凭空new出来一个连接 放进池子
所以用两个 来控制
image.png
public class DBPoolSemaphore {
private final static int POOL_SIZE = 10;
//两个指示器,分别表示池子还有可用连接和已用连接
private final Semaphore useful,useless;
//存放数据库连接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolSemaphore() {
this.useful = new Semaphore(10);
this.useless = new Semaphore(0);
}
/*归还连接*/
public void returnConnect(Connection connection) throws InterruptedException {
if(connection!=null) {
System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"
+"可用连接数:"+useful.availablePermits());
useless.acquire();
synchronized (pool) {
pool.addLast(connection);
}
useful.release();
}
}
/*从池子拿连接*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
useless.release();
return connection;
}
}
Callable、Future和FutureTask
image.png通过线程拿到返回结果
public class UseFuture {
/*实现Callable接口,允许有返回值*/
private static class UseCallable implements Callable<Integer>{
private int sum;
@Override
public Integer call() throws Exception {
System.out.println("Callable子线程开始计算!");
// Thread.sleep(1000);
for(int i=0 ;i<5000;i++){
if(Thread.currentThread().isInterrupted()) {
System.out.println("Callable子线程计算任务中断!");
return null;
}
sum=sum+i;
System.out.println("sum="+sum);
}
System.out.println("Callable子线程计算结束!结果为: "+sum);
return sum;
}
}
public static void main(String[] args)
throws InterruptedException, ExecutionException {
UseCallable useCallable = new UseCallable();
//包装
FutureTask<Integer> futureTask = new FutureTask<>(useCallable);
Random r = new Random();
new Thread(futureTask).start();
Thread.sleep(1);
if(r.nextInt(100)>50){
System.out.println("Get UseCallable result = "+futureTask.get());
}else{
System.out.println("Cancel................. ");
futureTask.cancel(true);
}
}
}
乐观锁 ---- 成不成功无所谓,先执行;
悲观锁 ---- 先占有 在执行, 经常造成死锁;
原子操作CAS--无锁化编程
CAS(Compare And Swap)
类似事务,要么 全部完成,要么不成功;
一般处理器 提供 CAS指令()
CAS的原理
利用了现代处理器都支持的CAS的指令,
循环这个指令,直到成功为止
CAS的问题
ABA问题: 拿到 A1 去电脑内存中值为A3
开销问题: 循环的开销
只能保证一个共享变量的原子操作
image.png
Jdk中相关原子操作类的使用
更新基本类型类:AtomicBoolean,AtomicInteger,AtomicLong
更新数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新引用类型:AtomicReference,AtomicMarkableReference,AtomicStampedReference
原子更新字段类: AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater