JUC
1.开启多线程的方式:
-
继承Thread类实现run方法
public class ThreadTest extends Thread{ //开启线程的方法3:Thread , Runable , Callable @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(i+Thread.currentThread().getName()); } } }
创建线程的2种方式:
ThreadTest threadTest = new ThreadTest(); threadTest.start();
new ThreadTest().start();
-
实现Runnable接口,实现run方法
public class RunableTest implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }
创建线程的3种方法:
RunableTest runableTest = new RunableTest(); new Thread(runableTest).start();
new Thread(new RunableTest()).start();
new Thread(()->{ System.out.println(Thread.currentThread().getName()); }).start();
-
继承Callable接口(adj. 可随时支取的,请求即付的,随时可偿还的;)就是有返回值的!!!
callable接口的要借助线程池来实现
public class CallableTest implements Callable<Integer> { @Override public Integer call() throws Exception { return 1024; } public static void main(String[] args) throws ExecutionException, InterruptedException { CallableTest callableTest = new CallableTest(); //创建服务 ExecutorService executorService = Executors.newFixedThreadPool(5); //提交线程 Future<Integer> submit = executorService.submit(callableTest); //获取结果 Integer integer = submit.get(); System.out.println(integer); //关闭服务 executorService.shutdown(); } }
同步代码块:就是被加上锁的那一段代码
2.sleep和wait之间的区别:
-
来自不同的类:
sleep来自Thread
wait来自Object
-
释放锁的问题:
sleep不会释放锁
wait会释放锁
-
使用的范围
wait必须使用在同步代码块中
sleep可以使用在任何地方
-
捕获异常
wait不必捕获异常
sleep必须捕获异常
3.Lock和synchronized:
synchronized:
卖ticket的例子sale
public class synchronizedTest {
private int ticket = 1000;
private boolean flag = true;
public synchronized void sale(){
if (ticket>=0){
System.out.println(Thread.currentThread().getName()+"===>买走了第"+ticket--+"张");
}else {
flag=false;
System.out.println("票卖完了");
}
}
public static void main(String[] args) {
synchronizedTest synchronizedTest = new synchronizedTest();
new Thread(()->{
while (synchronizedTest.flag){
synchronizedTest.sale();
}
},"A").start();
new Thread(()->{
while (synchronizedTest.flag){
synchronizedTest.sale();
}
},"B").start();
new Thread(()->{
while (synchronizedTest.flag){
synchronizedTest.sale();
}
},"C").start();
}
}
Lock:
常用的实现类为ReentrantLock()
其实所有的锁都叫可重入锁。
public class LockTest {
private int ticket = 1000;
ReentrantLock lock = new ReentrantLock();
public void sale(){
lock.lock();
try {
if (ticket>0){
System.out.println(Thread.currentThread().getName()+"===>买走了第"+ticket--+"张");
}
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
LockTest lockTest = new LockTest();
new Thread(()->{
for (int i = 0; i < 30; i++) {
lockTest.sale();
}
}).start();
new Thread(()->{
for (int i = 0; i < 30; i++) {
lockTest.sale();
}
}).start();
new Thread(()->{
for (int i = 0; i < 30; i++) {
lockTest.sale();
}
}).start();
}
}
Synchronized 和 Lock 的区别:
- Synchronized 是内置的java关键字,lock只是一个类
- synchronized是基于JVM层面的,Lock是基于JDK层面的
- Synchronized无法获取锁的状态,Lock可获取锁的状态
- synchronized是会自动上锁的,而lock需要自己手动释放锁,不然会死锁
- synchronized可重入锁,不可以中断,lock可重入锁,可中断
- synchronized适合少量代码同步问题,Lock适合大量同步代码同步的问题。
- lock锁可以更加细粒度的去控制整个程序。
4.生产者和消费者
synchronized版:
- 判断等待
- 业务
- 通知
public class PCSynchronized {
private int number = 0;
public synchronized void add() throws InterruptedException {
if (number!=0){
wait();
}
System.out.println(Thread.currentThread().getName()+"加"+number++);
this.notifyAll();
}
public synchronized void subtract() throws InterruptedException {
if (number!=1){
wait();
}
System.out.println(Thread.currentThread().getName()+"减"+number--);
this.notifyAll();
}
public static void main(String[] args) {
PCSynchronized pcSynchronized = new PCSynchronized();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
pcSynchronized.add();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
pcSynchronized.subtract();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
pcSynchronized.subtract();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
}
}
正常步骤:等待通知,执行业务,通知其他线程
如果只是执行AB线程是没有问题的,但是如果我加入了c线程就会出现虚假唤醒。
- wait 方法让线程在这个方法这里停止运行,一直等待被唤醒,下次唤醒时,就顺着wait向下执行。wait之前都不会执行!!!!!
所以为什么会虚假唤醒?线程被唤醒,同时对应判断的值也发生了改变,但是我们使用的是if,if只会判断一次,再线程次被唤醒if是无法二次判断的。
解决虚假唤醒的方法:
吧if换成while()
Lock:
package Point03PC;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PCLock {
private int number = 5;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void add() throws InterruptedException {
lock.lock();
try {
while (number==5){
condition.await();
}
System.out.println(Thread.currentThread().getName()+"加"+number++);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void subtract() throws InterruptedException {
lock.lock();
try {
while (number==0){
condition.await();
}
System.out.println(Thread.currentThread().getName()+"减"+number--);
condition.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
PCLock pcLock = new PCLock();
new Thread(()->{
try {
for (int i = 0; i < 40; i++) {
pcLock.add();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 40; i++) {
pcLock.add();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
new Thread(()->{
try {
for (int i = 0; i < 40; i++) {
pcLock.subtract();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"C").start();
}
}
lock锁的实现类为ReentrantLock
其中的Condition类我是用来控制线程的堵塞等待和唤醒。
5.8锁现象:
-
标准情况下谁先获得锁谁就先执行。
-
没有锁的方法是不会被锁住的。
-
两个对象,两个同步方法中,不同对象的锁是不会互通的。一个对象的锁是不会影响到另外一个对象。
-
静态的方法全局唯一, 类一加载就有了!锁的是Class
总结:
new this 具体的一个手机
static Class 唯一的一个模板
6、集合类不安全
ConcurrentModificationException(并发修改异常)
list不安全:
- ArrayList是线程不安全的
List<String> list = new ArrayList<>();
解决方案:
List<String> list = new Vector<>();
底层是数组加上synchronized锁
List<String> list = Collections.synchronizedList(new ArrayList<>());
通过这个Collections工具类将线程不安全的ArrayList变成了线程安全的。
List<String> list = new CopyOnWriteArrayList<>();
CopyOnWrite写入时复制的计算机思想
低层是使用lock锁实现的。
set不安全:
-
set实现类Hashset是线程不安全的。
HashSet<String> set = new HashSet<String>();
解决方法:
Set<String> safeSet = Collections.synchronizedSet(set);
CopyOnWriteArraySet<String> safeSet = new CopyOnWriteArraySet<>();
Map 不安全:
- Map实现类HashMap是线程不安全的。
Map<String, Object> map = new HashMap<>();
解决方案:
Map<String, Object> map1 = new HashMap<>(); Map<String, Object> map = Collections.synchronizedMap(map1);
Map<String, Object> map1 = new HashMap<>(); Map<String, Object> map = new ConcurrentHashMap<String, Object>(map1);
7.Callable:
我们之前使用Callable是使用ExecutorService。
这里探究的是如何使用Thread来执行Callable。
思考过程:
-
Thread的构造方法里面只能放Runable
-
一个类实现什么接口,那么这个类就是对应的接口!!!如实现了HttpServlet的类就是servlet!!!
-
我们想要使用Thread来启动Callable我们就必须在Runable的实现类中去寻找!!
4.
TIM截图20200325201257.png
- 在Runable的实现类中找到了FutureTask,这个实现类的构造方法放入的是Callable
![](https://img.haomeiwen.com/i20044989/13f50bd14133dc8e.png)
public class CallableTest implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println(1024); return 1024; } public static void main(String[] args) throws ExecutionException, InterruptedException { CallableTest callableTest = new CallableTest(); FutureTask futureTask = new FutureTask<Integer>(callableTest); new Thread(futureTask).start(); Integer res = (Integer) futureTask.get();//获取结果 System.out.println(res); } }
![](https://img.haomeiwen.com/i20044989/f5ea0e11d9553c81.png)
注意点:
-
Callable
接口类似于Runnable,因为它们都是为其实例可能由另一个线程执行的类设计的。 -
Runnable不返回结果,也不能抛出被检查的异常。
8、常用的辅助类(必会)
辅助类来检查线程是否都执行完成。
1.CountDownLatch(减法计数器)
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"======>");
countDownLatch.countDown();//-1
},String.valueOf(i)).start();
}
System.out.println("hhh我不堵塞");
countDownLatch.await();//堵塞,等待计数器到0
System.out.println("结束");
}
}
- 计数器减一
countDownLatch.countDown();
- countDownLatch.await();之后的东西都会被堵塞,直到等待计数器到0
countDownLatch.await();
![](https://img.haomeiwen.com/i20044989/a1f705ef45aa6c7a.png)
![](https://img.haomeiwen.com/i20044989/34dba41cfb8a3966.png)
2.CyclicBarrier(加分计数器)
public class CyclicBarrierTest {
public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙");
});
for (int i = 1; i <= 7; i++) {
new Thread(()->{
System.out.println("集其了"+Thread.currentThread().getName()+"个龙珠======>");
try {
cyclicBarrier.await();//堵塞等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
-
堵塞
cyclicBarrier.await();//堵塞等待
![](https://img.haomeiwen.com/i20044989/4a860e2350a06f3e.png)
3.Semaphore
达到的效果就是限流,通一时间只有指定数量的线程可以运行
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了车位");
TimeUnit.SECONDS.sleep(2);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
9.读写锁
读写锁和lock的ReentrantLock是一个东西,只是读写锁粒度更细。
读写锁和lock锁之间的关系:
![](https://img.haomeiwen.com/i20044989/39b44a73326fd604.png)
10、阻塞队列:
![](https://img.haomeiwen.com/i20044989/0260a1d38f6bb75b.png)
方法 | 抛出异常 | 有返回值,不抛出异常 | 堵塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer() | put() | offer(,) |
移除 | remove | poll() | take() | poll(,) |
检测队首元素 | element | peek |
BlockingQueue<String> ArrayQueue = new ArrayBlockingQueue<String>(4);
BlockingQueue<String> LinkedQueue = new LinkedBlockingDeque<String>(4);
SychronousQueue
是一个容量为一的堵塞队列,进去一个出来一个。
对应的方法为put和take
BlockingQueue<String> SynchronousQueue = new SynchronousQueue<String>();
11.线程池:
3大方法,7大参数,4大拒绝策略
线程池的好处:
- 线程的复用
- 可以控制最大并发数
- 便于管理线程
- 3大方法
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorService = Executors.newCachedThreadPool();
- 7大参数
通过源码分析发现三大方法的实现是用ThreadPoolExecutor来实现的。
所以本质就是ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
.......
}
-
corePoolSize 核心线程
-
maximumPoolSize 最大线程
-
keepAliveTime 处于活跃状态的时间
-
unit 时间单位
-
workQueue 堵塞队列,任务等待
-
threadFactory 线程工厂,创建线程
-
RejectedExecutionHandler 拒绝策略
四大拒绝策略:RejectedExecutionHandler
拒绝策略会在线程池满了的情况使用
- AbortPolicy(),不处理任务抛出异常
- CallerRunsPolicy(),从哪里来回哪里去
- DiscardOldestPolicy(),和最先执行的线程去争抢资源
- DiscardPolicy(),丢弃任务,不抛出异常
线程池大小的设置:(CPU密集型,io密集型)
1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!
2、IO 密集型 > 判断你程序中十分耗IO的线程,就设置其线程数量的2倍。
四大函数式接口:
Function函数式接口
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
断定型接口:有一个输入参数,返回值只能是 布尔值!
public interface Predicate<T> {
/**
* Evaluates this predicate on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate,
* otherwise {@code false}
*/
boolean test(T t);
}
Consumer 消费型接口
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
}
Supplier 供给型接口
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
13.Stream流式计算
14.ForkJoin
分支合并,程序并行执行,增加运行速度。
将大问题分解成为一个一个独立的小问题,递归去执行
其中用到的思想就是分治法的思想,分而治之。
ForkJoin 特点:工作窃取
提前完成的线程会去窃取未完成线程的一个任务来执行。(双端队列)
例子 求和计算
package Point12_FrokJoin;
import java.util.concurrent.RecursiveTask;
public class ForkJoinTest extends RecursiveTask<Long> {
/**
* 求和计算
*/
private Long start;
private Long end;
private Long temp = 10000L;
public ForkJoinTest(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
Long sum = 0L;
if ((end-start)<temp){
for (Long i = start; i <= end; i++) {
sum+=i;
}
//System.out.println(Thread.currentThread().getName());
return sum;
}else {
Long middle = (start + end)/2;
ForkJoinTest forkJoinTest1 = new ForkJoinTest(start,middle);
forkJoinTest1.fork();
ForkJoinTest forkJoinTest2 = new ForkJoinTest(middle+1,end);
forkJoinTest2.fork();
return forkJoinTest1.join()+forkJoinTest2.join();
}
}
}
package Point12_FrokJoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test3();
}
public static void test() throws ExecutionException, InterruptedException {
// 结果为:500000000500000000
// 时间为:4012
Long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTest forkJoinTest = new ForkJoinTest(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTest);
Long res = submit.get();
System.out.println("结果为:"+res);
Long end = System.currentTimeMillis();
System.out.println("时间为:"+(end-start));
}
public static void test2() throws ExecutionException, InterruptedException {
// 结果为:500000000500000000
// 时间为:2997
Long start = System.currentTimeMillis();
Long sum = 0L;
for (int i = 0; i <= 10_0000_0000; i++) {
sum+=i;
}
System.out.println("结果为:"+sum);
Long end = System.currentTimeMillis();
System.out.println("时间为:"+(end-start));
}
public static void test3() throws ExecutionException, InterruptedException {
// 结果为:500000000500000000
// 时间为:165
Long start = System.currentTimeMillis();
Long res = LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
System.out.println("结果为:"+res);
Long end = System.currentTimeMillis();
System.out.println("时间为:"+(end-start));
}
}
15、异步回调
Future 设计的初衷: 对将来的某个事件的结果进行建模