java MulitThreadUtils 多线程工具类

2019-07-09  本文已影响0人  iarchitect

工具类:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

public class MulitThreadUtils {

    Logger logger = LoggerFactory.getLogger(MulitThreadUtils.class);

    /**
     * 线程资源池,io频繁线程
     */
    public static ThreadLocal<ExecutorService> ANSYNCHTHREADPOOL = new ThreadLocal<ExecutorService>() {
        public ExecutorService get() {
            if (super.get() == null) {
                super.set(new ThreadPoolExecutor(0, 20, 1, TimeUnit.HOURS, new SynchronousQueue<>()));
            }
            return super.get();
        }
    };

    /**
     * 提交异步执行
     *
     * @param callables
     * @param oneThreadTimeoutSecond
     * @return
     */
    public static <T> List<T> submits(Collection<Callable<T>> callables, int oneThreadTimeoutSecond) {
        try {
            if (callables == null || callables.isEmpty()) {
                return Collections.emptyList();
            }
            //单线程
            if (callables.size() == 1) {
                return Arrays.asList(callables.iterator().next().call());
            }

            List<T> result = new ArrayList<>();
            if (oneThreadTimeoutSecond == 0) {//串行执行
                callables.forEach((item) -> {
                    try {
                        T call = item.call();
                        result.add(call);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                System.out.println("seralizable execute");
            } else {//并发执行
                ExecutorService executorService = ANSYNCHTHREADPOOL.get();
                List<Future<T>> futures = executorService.invokeAll(callables, oneThreadTimeoutSecond, TimeUnit.SECONDS);
                futures.stream().forEach(item -> {
                    try {
                        result.add(item.get());
                    } catch (Exception e) {
                        result.add(null);
                        e.printStackTrace();
                    }
                });
            }
            return result;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return Collections.emptyList();
    }
}

使用方式:

class QueryMediaKb implements Callable<PageInfo<Map<String, String>>> {

   private MediaKbSearch mks;
   private Map<String, Object> searchCondition;

   public QueryMediaKb(MediaKbSearch mks,Map<String, Object> searchCondition) {
      this.mks = mks;
      this.searchCondition = searchCondition;
   }

   @Override
   public PageInfo<Map<String, String>> call() throws Exception {
      return mks.searchMediaKb(searchCondition);
   }
}
List<Callable<PageInfo<Map<String, String>>>> callables=new ArrayList<>();
for (MediaKbSearch mks : kbSearchList) {
   callables.add(new QueryMediaKb(mks,searchCondition));
}

List<PageInfo<Map<String, String>>> mediaKbSearchResultList = MulitThreadUtils.submits(callables,5);
上一篇下一篇

猜你喜欢

热点阅读