IT@程序员猿媛程序员程序园

Java7并发编程实战手册-笔记

2019-05-04  本文已影响3人  编程小世界

Java7并发编程实战手册

线程管理

Thread/Runnable/Thread.State

线程的信息获取和设置

线程的中断

sleep/yield

join

daemon

UncaughtExceptionHandler

Thread#setDefaultUncaughtExceptionHandler

ThreadLocal/InheritableThreadLocal

ThreadGroup

uncaughtException

ThreadFactory

进群:697699179可以获取Java各类入门学习资料!

这是我的微信公众号【编程study】各位大佬有空可以关注下,每天更新Java学习方法,感谢!

学习中遇到问题有不明白的地方,推荐加小编Java学习群:697699179内有视频教程 ,直播课程 ,等学习资料,期待你的加入

线程同步基础

synchronized 同步方法 this

synchronized 属性对象 object

同步代码块中使用条件

Object#wait/notify/notifyAll

虚假唤醒(while)

Lock

ReentrantLock

try/finally

ReadWriteLock

公平性 fair

Condition

while

lock/unlock之间

线程同步辅助类

Semaphore

内部计数器

acquire/release

acquireUninterruptibly

忽略线程中断且不会抛出任何异常

CountDownLatch

内部计数器被初始化之后就不能被再次初始化,唯一能改值的是countDown

CylicBarrier

BrokenBarrierException

reset

Phaser

在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步

Phaser(3) 指定参与阶段同步的线程数是3个

被Phaser类置于休眠的线程不会响应中断事件

awaitAdvanceInterruptibly(int phaser),被中断会抛出InterruptedException

onAdavance,阶段改变时被自动执行

Exchanger

只能同步两个线程

exchange调用后将休眠直到其他的线程到达

线程执行器

ThreadPoolExecutor/Executors

shutdown,当执行完所有待运行的任务后,它将结束执行,调用完毕立即返回,结合awaitTermination判断是否线程池是否关闭

awaitTermination(long timeout, TimeUnit unit)

提供很多方法获取自身状态的信息

newCachedThreadPool/newFixedThreadPool

Future/Callbale

submit

call() throws Exception

ThreadPoolExecutor#invokeAny

返回第一个完成任务且没有抛出异常的任务的执行结果

ThreadPoolExecutor#invokeAll

等待所有任务的完成

landon:是否可以用这个方法做场景心跳

while(!isDone)  {    -longstartTime = getCurrent()    - 场景调度器提交场景任务,等待所有场景任务执行完毕(invokeAll),这样亦可以保证潜在的顺序问题,因为每次都是将当前的tick执行完毕    - 而非之前submit,如果一次tick执行超过50ms则下次循环又会向线程池提交任务,会出现同一个任务多个线程执行的潜在情况    -longendTime = getCurrent()    -longsleepTime = startTime + TickInterval - endTime    -if(sleepTime >0) sleep(sleepTime)    -elseprintln("busy,oneTick executeTime:"+ sleepTime)  }

ScheduledThreadPoolExecutor

schedule

scheduleAtFixedRate

scheduleWithFixDelay

Future

cancel

get

FutureTask

done,允许在执行器中的任务执行结束之后还可以执行一些代码

FutureTask implements RunnableFuture

FutureTask(Callable callable)

CompletionService

submit,提交任务

poll/take,获取任务已经完成的Future对象

即任务完成后将Future对象放到一个完成的阻塞队列中

RejectedExecutionHandler

Fork/Join框架

分治

fork-将一个任务拆分成更小的多个任务

join-等待子任务的完成执行

工作窃取算法-work-stealing algorithm

ForkJoinPool

ForkJoinTask

RecursiveAction 任务无返回结果

RecursiveTask 任务有返回结果

递归

if(problemsize>defaultsize){    tasks = divide(task)    execute(tasks) }else{resolve problem using another algorithm}

Task extends RecursiveAction // 无返回结果

compute

if(...)// divide{Taskt1 =newTask(...)Taskt2 =newTask(...)  invokeAll(t1,t2)// 同步调用,执行创建的多个子任务}else{...}

ForkJoinPool#execute(task) --默认创建一个线程数等于计算机cpu数目的线程池

合并任务的结果

if(problemsize>defaultsize)  tasks = divide(task)  execute(tasks)  groupResultsreturnresultelseresolve problemreturnresult

ForkJoinTask#get 等待返回任务计算结果

异步运行任务

同步方式如invokeAll,任务被挂起,直到任务被发送到fork/join线程池中执行。该方式允许ForkJoinPool采用工作窃取算法

异步方式如fork时(立即返回),无法使用该算法

ForkJoinTask#V join()

get和join有区别

任务中抛出异常

ForkJoinTask#isCompletedAbnormally 检查主任务或者子任务是否抛出了异常

getException 获取异常信息

任务抛出运行时异常,会影响其父任务...父任务..

取消任务

在任务开始前可以取消

例:在数字数组中寻找一组数字,拆分为更小的问题,但仅关心数字的一次出现。当我们找到他时,就会取消其他子任务

可存储发送到线程池中的所有任务,当发现当前任务找到数字后,取消非当前任务的所有任务

如果任务正在运行或者已经执行结束,则不能取消,cancel返回false。因此可以尝试去取消所有的任务而不用担心可能带来的间接影响

并发集合

ConcurrentLinkedDeque 非阻塞

getFirst.../peekFirst.../removeFirst.../pollFirst... - 双端队列

LinkedBlockingDeque 阻塞

put/take/poll... 双端队列

PriorityBlockingQueue

队列的元素必须实现Comparable接口

按照排序结果决定插入元素的位置

DelayQueue

元素必须实现Delayed接口

public interface Delayed extends Comparable

两个待实现方法

compareTo(Delayed o)

getDelay(Timeunit unit)

从队列取元素时,到期的元素会返回(未来的元素等待到期_延迟时间)

landon:是否可以用于游戏服务器中的计时器 如果有多个timer 按照到期时间排队_

ConcurrentSkipListMap

根据键值排序所有元素

内部机制-Skip List

# firstEntry/lastEntry/subMap/...

landon:可以和TreeMap做比较,一个线程不安全,一个线程安全

ThreadLocalRandom

# current,该方法是一个静态方法,如果调用线程还未关联随机数对象,就会初始化一个(localInit)

Atomic Variable

compareAndSet,这是是最主要的方法

判断内存变量的值是否是expect,如果是说明未被其他线程改过,可以直接用update新值更新

否则说明被其他线程改过,进而可以选择下一步处理方式

AtomicReference#public final boolean compareAndSet(V expect, V update)

CAS

AtomicIntegerArray -原子数组

LinkedTransferQueue

AtomicReference--实现单例

publicclassSingleton{privatestaticfinalAtomicReference INSTANCE =newAtomicReference();privateSingleton(){}publicstaticSingletongetInstance(){for(;;) {                    Singleton current = INSTANCE.get();if(current !=null) {returncurrent;                    }                    current =newSingleton();if(INSTANCE.compareAndSet(null, current)) {returncurrent;                  }          }        }}

定制并发类

定制ThreadPoolExecutor

继承该类

覆写_记得调用super

shutdown

shutdownNow

输出如等待执行的任务数目 getQueue().size...

getCompletedTaskCount,获得已执行过的任务数

getActiveCount,获得正在执行的任务数

beforeExecute

afterExecute

实现基于优先级的Executor类

ThreadPoolExector参数传入PriorityBlockingQueue

如果是fixed两个线程,那么前2个任务是被2个线程执行的;后面的排队任务按照优先级顺序执行

实现ThreadFactory接口生成定制线程

覆写newThread方法

返回的Thread可以是定制的Thread对象(MyThread extends Thread)

可外部直接调用Threadfactory#newThread返回线程

在Executor对象中使用ThreadFactory

线程池参数中指定线程工厂

Executors内部有一个DefaultThreadFactory

Executors$DefaultThreadFactory

定制运行在定时线程池中的任务

继承ScheduledThreadPoolExecutor

覆写protected RunnableScheduledFuture decorateTask(

Runnable runnable, RunnableScheduledFuture task)

可自定义一个调度类extends FutureTask implements RunnableScheduledFuture参考ScheduledThreadPoolExecutor$ScheduledFutureTask

通过实现ThreadFactory接口为Fork/Join框架生成定制线程

ForkJoinPool内部实现

一个任务队列,存放等待被执行的任务

一个执行这些任务的线程池

ForkJoinWorkerThread持有一个ForkJoinPool.WorkQueue workQueue

work-stealing mechanics

MyWorkerThread extends ForkJoinWorkerThread

onStart

onTermination

调用super

MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory

newThread

定制运行在Fork/Join框架中的任务

MyWorkTask extends ForkJoinTask(Void)

getRawResult

exec

调用compute抽象方法

实现定制Lock类

ReentrantLock内部有一个很重要的类Sync(AQS)

abstract static class Sync extends AbstractQueuedSynchronizer

static final class FairSync extends Sync

static final class NonfairSync extends Sync

自定义实现一个MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer

tryActuire

tryRelease

自定义实现MyLock implements Lock

lock

unlock

tryLock

newCondition

实现基于优先级的传输队列

MyPriorityTransferQueue extends PriorityBlockingQueue implements TransferQueuetryTransfer

transfer

hasWaitingConsumer

getWaitingConsumerCount

take

实现自己的原子对象

MyCounter extends AtomicInteger

for(;;)    {intvalue=get();if(value==10)returnfalse;else{intnewValue =value+1;      boolean changed = compareAndSet(value,newValue);if(changed)returntrue;      }    }

测试并发应用程序

监控Lock接口

ReentrantLock内部方法都是protected的,所以可以继承

MyLock extends ReentrantLock

调用Thread getOwner()

调用Collection getQueuedThreads()

...

监控Phaser类

getPhase

getRegisteredParties

getArrivedParties

...

监控执行器框架

ThreadPoolExecutor

getPoolSize

getCorePoolSize

getActiveCount

getTaskCount

...

监控Fork/Join池

getPoolSize

getParallelism

getActiveThreadCount

getStealCount

...

输出高效的日志信息

输出必要的信息

为消息设定恰当的级别

使用FindBugs分析并发代码

配置Eclipse调试并发代码

可选择Default suspend policy for new breakpoints的值为Suspend VM

默认为Suspend Thread

配置NetBeans调试并发代码

使用MultithreadedTC测试并发代码

MultithreadedTestCase

waitForTick

上一篇下一篇

猜你喜欢

热点阅读