实现根据key分片的线程池-ShardThreadPoolExe

2020-05-31  本文已影响0人  sandy_cheng

实现一个线程池,该线程池可根据key来确定让哪一个线程来执行。该线程池是固定大小的线程池,初始化后不可改变线程的数量,主要由阻塞队列blockingqueue及执行者Work构成,结构如下:

package com.sandy.util;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.*;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.LockSupport;

import java.util.concurrent.locks.ReentrantLock;

/**

* @author: chengyu

* @create: 2020-05-31 21:21

**/

public class ShardThreadPoolExecutorextends AbstractExecutorService {

private final ReentrantLockmainLock =new ReentrantLock();

/**

* Wait condition to support awaitTermination

*/

    private final Conditiontermination =mainLock.newCondition();

private final Conditionshutdown =mainLock.newCondition();

/**

* cpu cores

*/

    private IntegerpoolSize = Runtime.getRuntime().availableProcessors();

/**

* thread pool name

*/

    private StringpoolName ="shard-thread-pool-executor";

/**

* task to be execute

*/

    private List>commanders;

/**

* blocking queue

*/

    private Classclazz = ArrayBlockingQueue.class;

/**

* worker thread

*/

    private Mapworkers =new ConcurrentHashMap<>();

/**

* work queue map

*/

    private Map>workQueueMap =new ConcurrentHashMap<>();

/**

* close thread pool

*/

    private boolean shutDown;

/**

* terminate thread pool

*/

    private boolean shutDownNow;

/**

* queue size

*/

    private IntegerqueueSize =200;

public ShardThreadPoolExecutor(int poolSize, String poolName, Class clazz,

Integer queueSize) {

if(poolSize <0 || poolName ==null || poolName.trim() =="" || clazz ==null || queueSize ==null || queueSize <0){

throw new IllegalArgumentException();

}

this.poolSize = poolSize;

this.poolName = poolName;

this.clazz = clazz;

initCommanders();

}

public ShardThreadPoolExecutor() {

initCommanders();

}

/**

* init blocking queue

*

    * @param

    * @return void @createTime:2020/5/24 9:54

    * @author: chengyu3

*/

    private void initCommanders() {

commanders =new ArrayList<>(poolSize);

for (int i =0; i

try {

commanders.add((BlockingQueue)clazz.getConstructor(Integer.TYPE).newInstance(queueSize));

}catch (Exception e) {

throw new IllegalStateException(e);

}

}

}

@Override

    public void shutdown() {

shutDown =true;

for (Work work :workers.values()) {

mainLock.lock();

while (!tryTerminateWork(work)) {

LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));

}

mainLock.unlock();

}

}

private boolean tryTerminateWork(Work work) {

if (work.state ==1 &&workQueueMap.get(work).isEmpty()) {

work.interrupt();

return true;

}

return false;

}

@Override

    public List shutdownNow() {

shutDownNow =true;

shutDown =true;

for (Work work :workers.values()) {

work.thread.interrupt();

}

return new ArrayList(workers.values());

}

@Override

    public boolean isShutdown() {

return shutDown;

}

@Override

    public boolean isTerminated() {

// TODO

        throw new UnsupportedOperationException();

}

@Override

    public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {

long nanos = unit.toNanos(timeout);

final ReentrantLock mainLock =this.mainLock;

mainLock.lock();

try {

for (;;) {

if (workers.size() ==0){

return true;

}

if (nanos <=0){

return false;

}

nanos =termination.awaitNanos(nanos);

}

}finally {

mainLock.unlock();

}

}

@Override

    public void execute(Runnable command) {

if (!(commandinstanceof Command)) {

throw new IllegalArgumentException("param must be instance of ShardThreadPoolExecutor Command");

}

Command cmd = ((Command) command);

Object shardObject = cmd.getShardKey();

if (shardObject ==null) {

throw new IllegalStateException("shardObject is null");

}

if (poolSize >workers.size() && !hasWorker(cmd)) {

// execute new work

            newWork(cmd);

}else {

// add to queue

            try {

getCommandQueue(cmd).put(cmd);

}catch (InterruptedException e) {

return;

}

}

}

private boolean hasWorker(Command command) {

return workers.containsKey(getBlockQueueIndex(command));

}

private BlockingQueue getCommandQueue(Command command) {

return commanders.get(getBlockQueueIndex(command));

}

private Integer getBlockQueueIndex(Command command) {

int hashCode = command.getShardKey().hashCode();

// can do better

        return Math.abs(hashCode %poolSize);

}

private void newWork(Command command) {

Work work =new Work(command);

work.thread.start();

workQueueMap.put(work, getCommandQueue(command));

workers.put(getBlockQueueIndex(command), work);

}

private Command getCommand(BlockingQueue commandDeque) {

if (shutDownNow) {

return null;

}

try {

return commandDeque.take();

}catch (InterruptedException e) {

return null;

}

}

private void removeWork(Work work) {

if (isShutdown()) {

Integer key =null;

for (Map.Entry workEntry :workers.entrySet()) {

if (work == workEntry.getValue()) {

key = workEntry.getKey();

}

}

workers.remove(key);

}

}

public interface Commandextends Runnable {

String getShardKey();

}

private class Workextends Thread {

private Commandcommand;

private Threadthread;

private int state =0;

public Work(Command command) {

thread =new Thread(this::run, poolName + "-" + workers.size());

this.command = command;

}

@Override

        public void run() {

execute(command);

}

private void execute(Command command) {

BlockingQueue commandDeque = getCommandQueue(command);

Command cmd = command;

while (cmd !=null || (cmd = getCommand(commandDeque)) !=null) {

try {

state =0;

cmd.run();

state =1;

}finally {

cmd =null;

}

}

removeWork(this);

}

}

}

这部分代码中,shutdown和shutdownNow目前还有些问题,后续将会继续修改一下。

使用此工具类很简单,测试代码:

ShardThreadPoolExecutor executor =new ShardThreadPoolExecutor();

for(int i =0; i <100; i++){

String key = i %2 ==0?"a":"b";

String value = i %2 ==0?"a_a":"b_b";

ShadingCommand command =new ShadingCommand<>(key,value);

executor.execute(command);

}

executor.shutdown();

executor.awaitTermination(1L, TimeUnit.SECONDS);

其中ShadingCommand需要实现Command接口(此接口在ShardThreadPoolExecutor类中),否则会抛出异常,使用者可根据任务需要自定义实现类,ShadingCommand实现:

private class ShadingCommandimplements ShardThreadPoolExecutor.Command{

private Stringkey;

private T t;

public ShadingCommand(String key,T t){

this.key = key;

this.t = t;

}

@Override

    public String getShardKey() {

return key;

}

@Override

    public void run() {

try {

Thread.sleep(100L);

}catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Thread.currentThread().getName()+",key:"+key+",value:"+t);

}

}

测试结果:

shard-thread-pool-executor-1,key:b,value:b_b

shard-thread-pool-executor-0,key:a,value:a_a

shard-thread-pool-executor-0,key:a,value:a_a

shard-thread-pool-executor-1,key:b,value:b_b

shard-thread-pool-executor-0,key:a,value:a_a

shard-thread-pool-executor-1,key:b,value:b_b

shard-thread-pool-executor-1,key:b,value:b_b

shard-thread-pool-executor-0,key:a,value:a_a

shard-thread-pool-executor-0,key:a,value:a_a

shard-thread-pool-executor-1,key:b,value:b_b

可以看到,相同key的数据只会被相同线程处理。

上一篇 下一篇

猜你喜欢

热点阅读