common-lang3源码分析

2019-10-28  本文已影响0人  trillstones

event包

EventUtils

事件绑定和监听的工具类

//给事件源绑定监听器
    public static <L> void addEventListener(final Object eventSource, final Class<L> listenerType, final L listener) 

//绑定事件到某个对象的方法,即发生事件后,可以执行某个对象的方法
     public static <L> void bindEventsToMethod(final Object target, final String methodName, final Object eventSource,
            final Class<L> listenerType, final String... eventTypes) 

concurrent包

AtomicInitializer

用于初始化对象,调用get()方法后,要不只获得一个单例,要不重新出事化一个对象。

AtomicSafeInitializer

用于初始化对象,但是它能保证只初始化一次。

BackgroundInitializer

用于在后台初始化资源,调用start()方法后,可以做其他事情,直到调用get()方法,如果已经初始化完毕,则可以马上获取,否则线程会被阻塞

CallableBackgroundInitializer

带返回值的初始化

// a Callable that performs a complex computation
   Callable<Integer> computationCallable = new MyComputationCallable();
   // setup the background initializer
   CallableBackgroundInitializer<Integer> initializer =
       new CallableBackgroundInitializer(computationCallable);
   initializer.start();
   // Now do some other things. Initialization runs in a parallel thread
   ...
   // Wait for the end of initialization and access the result
   Integer result = initializer.get();
BasicThreadFactory

线程工厂类

 BasicThreadFactory factory = new BasicThreadFactory.Builder()
       .namingPattern("workerthread-%d")
       .daemon(true)
       .priority(Thread.MAX_PRIORITY)
       .build();
   ExecutorService exec = Executors.newSingleThreadExecutor(factory);
ThresholdCircuitBreaker

阈值电子回路。传递一个增量值,如果到达阈值,则表示开启

long threshold = 10L;
   ThresholdCircuitBreaker breaker = new ThresholdCircuitBreaker(10L);
   ...
   public void handleRequest(Request request) {
       long memoryUsed = estimateMemoryUsage(request);
       if (breaker.incrementAndCheckState(memoryUsed)) {
           // actually handle this request
       } else {
           // do something else, e.g. send an error code
       }
   }
AbstractCircuitBreaker

抽象电子回路。提供一些基础实现,另外实现了事件监听,即到达阈值后会告诉监听方。关键代码如下

//添加监听器
 public void addChangeListener(final PropertyChangeListener listener) {
        changeSupport.addPropertyChangeListener(listener);
    }

//状态变更,告诉监听方
 protected void changeState(final State newState) {
        if (state.compareAndSet(newState.oppositeState(), newState)) {
            changeSupport.firePropertyChange(PROPERTY_NAME, !isOpen(newState), isOpen(newState));
        }
    }
ConcurrentUtils

并发工具类。提供一些方法将检查型异常转为非检查型异常。

//不存在则初始化并放入map
 public static <K, V> V createIfAbsent(final ConcurrentMap<K, V> map, final K key,
            final ConcurrentInitializer<V> init) throws ConcurrentException {
        if (map == null || init == null) {
            return null;
        }

        final V value = map.get(key);
        if (value == null) {
            return putIfAbsent(map, key, init.get());
        }
        return value;
    }
ConstantInitializer

调用get()方法,总会返回同一个对象。不用使用同步。

Memoizer

记忆器? 为什么拼写不正确?
主要功能是缓存一个计算结果,下次来取,直接获取结果,不用重新计算

public O compute(final I arg) throws InterruptedException {
        while (true) {
            Future<O> future = cache.get(arg);
            if (future == null) {
                final Callable<O> eval = () -> computable.compute(arg);
                final FutureTask<O> futureTask = new FutureTask<>(eval);
                future = cache.putIfAbsent(arg, futureTask);
                //如果future是null,表示第一次执行
                if (future == null) {
                    future = futureTask;
                //第一次执行
                    futureTask.run();
                }
            }
            try {
                return future.get();
            } catch (final CancellationException e) {
                cache.remove(arg, future);
            } catch (final ExecutionException e) {
                if (recalculate) {
                    cache.remove(arg, future);
                }

                throw launderException(e.getCause());
            }
        }
    }
MultiBackgroundInitializer

名字叫多重后台初始化器。

protected MultiBackgroundInitializerResults initialize() throws Exception {
        Map<String, BackgroundInitializer<?>> inits;
        synchronized (this) {
            // create a snapshot to operate on
            inits = new HashMap<>(
                    childInitializers);
        }

        // start the child initializers
        final ExecutorService exec = getActiveExecutor();
        for (final BackgroundInitializer<?> bi : inits.values()) {
            if (bi.getExternalExecutor() == null) {
                // share the executor service if necessary
                bi.setExternalExecutor(exec);
            }
// 异步提交,提交完就会返回
            bi.start();
        }

        // collect the results
        final Map<String, Object> results = new HashMap<>();
        final Map<String, ConcurrentException> excepts = new HashMap<>();
        for (final Map.Entry<String, BackgroundInitializer<?>> e : inits.entrySet()) {
            try {
//可是在e.getValue().get()的时候会阻塞,其中一个没执行完,就会等待执行完,然后等待下一个执行完
                results.put(e.getKey(), e.getValue().get());
            } catch (final ConcurrentException cex) {
                excepts.put(e.getKey(), cex);
            }
        }

        return new MultiBackgroundInitializerResults(inits, results, excepts);
    }
TimedSemaphore

时间的信号量。超过时间,会自动恢复信号量。不用调用release()来释放信号量

TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECOND, 10);
   StatisticsThread thread = new StatisticsThread(sem);
   thread.start();


 public class StatisticsThread extends Thread {
       //信号量用来限制数据库负载
       private final TimedSemaphore semaphore;

       public StatisticsThread(TimedSemaphore timedSemaphore) {
           semaphore = timedSemaphore;
       }

       public void run() {
           try {
               while (true) {
//获取信号量
                   semaphore.acquire();   // limit database load
//执行查询
                   performQuery();        // issue a query
               }
           } catch(InterruptedException) {
               // fall through
           }
       }
       ...
   }


实现逻辑是:起一个定时器,定时重置acquireCount变量(表示已被使用的信号量)为0,然后其他线程就可以拿到信号量,不用一直被阻塞住(因为之前在acquire()方法里调用了wait()方法)

//起一个定时器,定时执行endOfPeriod
 protected ScheduledFuture<?> startTimer() {
        return getExecutorService().scheduleAtFixedRate(() -> endOfPeriod(), getPeriod(), getPeriod(), getUnit());
    }

//使用synchronized来保证acquireCount的同步访问
 synchronized void endOfPeriod() {
        lastCallsPerPeriod = acquireCount;
        totalAcquireCount += acquireCount;
        periodCount++;
        acquireCount = 0;
//唤醒其他已阻塞的线程
        notifyAll();
    }
上一篇 下一篇

猜你喜欢

热点阅读