程序员

java锁(3)Lock接口及LockSupport工具详解

2019-07-18  本文已影响8人  桥头放牛娃

1、Lock接口

1.1、Lock玉synchronized的不同

Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问;
Lock和synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。
Lock在使用时需要显式地获取和释放锁,虽然其缺少隐式获取锁的便捷性,但却拥有了锁获取与释放的可操作性、可中断的获取锁及超时获取锁等多种synchronized关键字所不具备的同步特性。

1.2、Lock接口实现

public interface Lock {

    //获取锁,若锁不可用,则当前线程将会阻塞,直到获得锁
    void lock();

    //获取锁,当被中断或获取到锁才返回;
    //若锁不可用,则当前线程被阻塞,直到获取锁或被中断
    void lockInterruptibly() throws InterruptedException;

   //尝试获取锁,并立即返回;true:获取锁成功;false:获取锁失败
    boolean tryLock();

   //尝试在指定的超时时间获取锁,当获取到锁时返回true;当超时或被中断时返回false
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

   //释放锁
    void unlock();

   //返回一个和锁绑定的条件队列;
   //在等待条件之前线程必须先获取当前锁,同时await()方法会原子地释放锁
   //并在返回之前重新获取到锁
    Condition newCondition();
}

2、LockSupport工具类

LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。

2.1、LockSupport的基本特征

重入性:
LockSupport是非重入锁;
面向线程锁:
这是LockSupport很重要的一个特征,也是与synchronized,object,reentrantlock最大的不同,这样也就没有公平锁和非公平的区别,所以无法只是依靠LockSupport实现单例模式。同时面向线程锁的特征在一定程度上降低代码的耦合度。
锁park与解锁unpark顺序可颠倒性:
这个特征虽然是Java锁中比较独特是特征,其实这个特征也是基于LockSupport的状态,类似二元信号量(0和1)实现的。
解锁unpark的重复性:
unpark可重复是指在解锁的时候可以重复调用unpark;同样因为LockSupport基于二元锁状态重复调用unpark并不会影响到下一次park操作;

2.2、LockSupport与其他锁的比较

基于LockSupport提供的核心的功能,即:park阻塞与unpark恢复运行状态;与此功能类似的有Object的wait/notify和ReentrantLock.Condition的await()、signal()等;
锁的实现机制不同:
LockSupport面向的是线程,而Object和ReentrantLock.Condition都是典型的依赖一个对象实现锁机制;

锁的监视器依赖:
LockSupport不需要依赖监视器,在源码中可以发现LockSupport并没有提供pubic的构造器,它的所有方法都是静态的;在Object和ReentrantLock.Condition都是需要new一个自身对象作为监视器介质;

粒度:
粒度上很显然LockSupport粒度更细;

使用灵活度:
LockSupport不需要依赖监视器一定程度上降低耦合而且解锁unpark和锁park顺序灵活;

2.3、LockSupport的实现

public class LockSupport {
    private LockSupport() {} // Cannot be instantiated.

    //设置阻塞当前线程的操作者,一般保存阻塞操作的线程Thread对象
    //parkBlocker对象为线程Thread.class的属性parkBlocker
    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

   //唤醒给定线程;
   //1、若给定线程的许可(permit)不可用,则赋予给定线程许可;
   //2、若给定线程被park阻塞,则唤醒给定线程;
   //3、若给定线程正常运行,则本次unpark会保证下次针对该线程的park不会阻塞该线程
   //4、当线程还未启动,umpark操作不会有任何作用
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    //阻塞当前当前线程,直到获得许可;
    //当许可可用时,park会立即返回;
    //当许可不可用时,当前线程将会阻塞,直到以下事件发生
        //1、其他线程调用unpark唤醒此线程
        //2、其他线程调用Thread#interrupt中断线程
        //3、调用应不可知错误返回
    //park方法不会报告是什么原因导致的调用返回
    //调用者需在返回时自行检查是什么条件导致调用返回
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

   //功能与park基本相同,此方法添加阻塞时间参数
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
    //功能与park基本相同,此方法添加阻塞到某个时间点的参数
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }

    //返回给定线程的parkBlocker参数
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    
    //
    public static void park() {
        UNSAFE.park(false, 0L);
    }

   
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

    

    //unsafe对象
    private static final sun.misc.Unsafe UNSAFE;
    //保存parkBlocker属性在Thread中的偏移量
    private static final long parkBlockerOffset;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }

}

2.5、LockSupport底层实现

LockSupport的Park及Unpark是通过Unsafe的对应类实现的,而Unsafe的相关实现在jvm中,以下是jvm中Unsafe.Park()及Unsafe.Unpark()的cpp实现。

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
  HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
  EventThreadPark event;

  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
  if (event.should_commit()) {
    const oop obj = thread->current_park_blocker();
    if (time == 0) {
      post_thread_park_event(&event, obj, min_jlong, min_jlong);
    } else {
      if (isAbsolute != 0) {
        post_thread_park_event(&event, obj, min_jlong, time);
      } else {
        post_thread_park_event(&event, obj, time, min_jlong);
      }
    }
  }
  HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) {
  Parker* p = NULL;

  if (jthread != NULL) {
    ThreadsListHandle tlh;
    JavaThread* thr = NULL;
    oop java_thread = NULL;
    (void) tlh.cv_internal_thread_to_JavaThread(jthread, &thr, &java_thread);
    if (java_thread != NULL) {
      // This is a valid oop.
      if (thr != NULL) {
        // The JavaThread is alive.
        p = thr->parker();
      }
    }
  } // ThreadsListHandle is destroyed here.

  // 'p' points to type-stable-memory if non-NULL. If the target
  // thread terminates before we get here the new user of this
  // Parker will get a 'spurious' unpark - which is perfectly valid.
  if (p != NULL) {
    HOTSPOT_THREAD_UNPARK((uintptr_t) p);
    p->unpark();
  }
} UNSAFE_END

而Unsafe中的相关实现是通过os_posix.cpp中的Parker::park()及Parker::unpark()实现的。

void Parker::park(bool isAbsolute, jlong time) {

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's
  // an interrupt pending.
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  struct timespec absTime;
  if (time < 0 || (isAbsolute && time == 0)) { // don't wait at all
    return;
  }
  if (time > 0) {
    to_abstime(&absTime, time, isAbsolute, false);
  }

  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from
  // unparking. Also re-check interrupt before trying wait.
  if (Thread::is_interrupted(thread, false) ||
      pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert_status(status == 0, status, "invariant");
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait(&_cond[_cur_index], _mutex);
    assert_status(status == 0, status, "cond_timedwait");
  }
  else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
    assert_status(status == 0 || status == ETIMEDOUT,
                  status, "cond_timedwait");
  }
  _cur_index = -1;

  _counter = 0;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}
void Parker::unpark() {
  int status = pthread_mutex_lock(_mutex);
  assert_status(status == 0, status, "invariant");
  const int s = _counter;
  _counter = 1;
  // must capture correct index before unlocking
  int index = _cur_index;
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "invariant");

  // Note that we signal() *after* dropping the lock for "immortal" Events.
  // This is safe and avoids a common class of futile wakeups.  In rare
  // circumstances this can cause a thread to return prematurely from
  // cond_{timed}wait() but the spurious wakeup is benign and the victim
  // will simply re-test the condition and re-park itself.
  // This provides particular benefit if the underlying platform does not
  // provide wait morphing.

  if (s < 1 && index != -1) {
    // thread is definitely parked
    status = pthread_cond_signal(&_cond[index]);
    assert_status(status == 0, status, "invariant");
  }
}

由以上源码可知,实现park及unpark的关键是Mutex互斥体、Condition信号量、_counter计数器。
park主要流程:
当_counter > 0,则直接调用pthread_mutex_unlock解锁并返回;
当超时时间time>0,则调用pthread_cond_timedwait进行超时等待,直到超时时间到达;
当超时时间time=0,则调用pthread_cond_wait等待;
当wait返回时设置_counter = 0,并调用pthread_mutex_unlock解锁;
unpark主要流程:
调用pthread_mutex_lock获取锁,设置_counter=1,调用pthread_mutex_unlock解锁;
若_counter的原值等于0,则调用pthread_cond_signal进行通知处理;

上一篇下一篇

猜你喜欢

热点阅读