如何封装同步队列的线程池
同步队列
看到同步队列,第一想到的是AQS。
队列同步器(AQS)是用来构建锁或者其他同步组件的基础框架,使用一个int型变量代表同步状态,通过内置的队列来完成线程的排队工作。
AQS使用步骤:
①子类通过继承AQS并实现其抽象方法来管理同步状态,对于同步状态的更改通过提供的getState()、setState(int state)、compareAndSetState(int expect, int update)来进行操作,因为使用CAS操作保证同步状态的改变是原子的。
②子类被推荐定义为自定义同步组件的静态内部类,同步器本身并没有实现任何的同步接口,仅仅是定义了若干状态获取和释放的方法来提供自定义同步组件的使用。
③同步器既可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同类型的同步组件)
ASQ定义了两种资源共享的方式:
(1)独占,只有一个线程能执行,如ReentrantLock;
(2)共享,多个线程可以同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
关于同步器的几个重要方法 :
(1)sHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
(2)tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
(3)tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
(4)tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
(5)tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
AQS的本质,其实是CLH锁队列。
简单的介绍可以看看这个:AQS与CLH的简单介绍
线程池
简单的介绍可以看这个:线程池介绍
那怎么把这2个东西组合起来?
1、既然AQS帮我们完成了那么多事情,那我们直接用它来确保队列同步就行。实现一个同步锁。
public class CusLock {
public CusLock() {
sync = new Sync();
}
public void lock(){
sync.lock();
}
public void unlock(){
sync.unlock();
}
private final Sync sync;
static class Sync extends AbstractQueuedSynchronizer{
void lock(){
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
}else {
acquire(1);
}
}
void unlock(){
release(1);
}
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1;
boolean success = false;
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
success = true;
}
return success;
}
@Override
protected boolean tryRelease(int arg) {
assert arg == 1;
if(arg == 0){
throw new IllegalMonitorStateException();
}
setState(0);
setExclusiveOwnerThread(null);
return true;
}
@Override
protected boolean isHeldExclusively() {
return Thread.currentThread() == getExclusiveOwnerThread();
}
}
}
2、实现个订单任务,利用上面的同步锁来控制任务是否完成。
public class SynOrder {
private CusLock lock = new CusLock();
public void getOrderNo() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "--" + SimpleDateFormat.getTimeInstance(SimpleDateFormat.FULL).format(new Date()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
3、测试一下
public class Main {
public static void main(String[] args) throws InterruptedException {
SynOrder synOrder = new SynOrder();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synOrder.getOrderNo();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synOrder.getOrderNo();
}
});
System.out.println("t1:"+t1.getName());
System.out.println("t2:"+t2.getName());
t1.start();
t2.start();
Thread.currentThread().join();
}
}
4、建立线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
由于线程池是执行线程的,所以我们的order需要改一改:
public class SynOrder implements Runnable {
private CusLock lock = new CusLock();
public void getOrderNo() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "--" + SimpleDateFormat.getTimeInstance(SimpleDateFormat.FULL).format(new Date()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
@Override
public void run() {
getOrderNo();
}
}
for(int i=0;i<15;i++){
SynOrder synOrder = new SynOrder();
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();