Java 线程同步与实现

2022-04-23  本文已影响0人  Drew_MyINTYRE

为何要使用 Java 线程同步?

当多个线程同时操作一个可共享的资源变量时,将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而保证了该变量的唯一性和准确性。

Java 中提供了很多线程同步操作,比如:synchronized 关键字、wait/notifyAllReentrantLockCondition、一些并发包下的工具类、SemaphoreThreadLocalAbstractQueuedSynchronizer 等。本文主要说明一下这几种同步方式的使用及优劣。

ReentrantLock 可重入锁

对于同一个线程,可以继续调用加锁的方法,而不会被挂起。可重入锁内部维护一个计数器,对于同一个线程调用 lock 方法,计数器 +1,调用 unlock 方法,计数器-1。怎么理解呢?看看下面的例子:

private ReentrantLock lock = new ReentrantLock();

public void execute() {
    lock.lock();
    try {
        System.out.println(Thread.currentThread().getName() + " lock!");
        try {
            anotherLock();
            Thread.sleep(5000l);
        } catch (InterruptedException e) {
            System.err.println(Thread.currentThread().getName() + " interrupted");
            Thread.currentThread().interrupt();
        }
    } finally {
        lock.unlock();
    }
}

public void anotherLock() {
    lock.lock();
    try {
        System.out.println(Thread.currentThread().getName() + " lock again!");
    } finally {
        lock.unlock();
    }
}

输出:

Thread-0 lock!
Thread-0 lock again!

在一个加锁方法 execute() 中调用另外一个加锁方法 anotherLock() 并不会被挂起(不用等待锁,就不需要被挂起),可以直接调用(调用 execute 方法时计数器+1,然后内部又调用了 anotherLock 方法,计数器 +1,变成了2)。

synchronized

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        Thread {
            execute()
        }.apply {
            name = "thread-A"
        }.start()

        Thread {
            execute()
        }.apply {
            name = "thread-B"
        }.start()
    }

    @Synchronized
    fun execute() {
        Log.i("WWE", "${Thread.currentThread().name} -> synchronized called")
        try {
            anotherSynchronized()
            Thread.sleep(1500)
        } catch (ex: InterruptedException) {
            Thread.currentThread().interrupt();
        }
    }

    @Synchronized
    fun anotherSynchronized() {
        Log.i("WWE", "${Thread.currentThread().name} -> anotherSynchronized called")
    }
}

使用 synchronized 代码块同步关键代码即可,没有必要同步整个方法,同步是一种高开销的操作,因此应该尽量减少同步的内容。

关于 Lock 对象和 synchronized 两种锁选择的考量:

1,最好两个都不用,使用 java.util.concurrent 包提供的机制,能够帮助用户处理所有与锁相关的代码。

2,如果 synchronized 关键字能满足用户的需求,就用 synchronized,因为它能简化代码。

3,如果需要更高级的功能,就用 ReentrantLock 类,此时要注意及时释放锁,否则会出现死锁,通常在 finally 代码释放锁。

ReentrantLock 有提供 tryLock 方法,可以设置超时时间,如果超过了这个时间还没有获取到锁,就会放弃。ReentrantLock 可以使用多个 Condition,可以中断一个试图获得锁的线程,ReentrantLock 可以选择公平锁和非公平锁,ReentrantLock 可以获得正在等待线程的个数,计数器等;

Condition 条件对象

对于一个已经拿到了 Lock 锁的线程,如果该线程需要等待某个条件才会执行,这种情况就考虑使用 Condition 条件对象。

Condition 可以替代传统的线程间通信,用 await() 替换 wait(),用 signal() 替换 notify(),用 signalAll() 替换 notifyAll()

为什么方法名不直接叫 wait()/notify()/nofityAll()?因为 Object 的这几个方法是 final 的,不可重写!

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val lock = ReentrantLock()
        val condition = lock.newCondition()

        Thread {
            lock.lock()
            try {
                // do sth
                try {
                    condition.await()
                    Log.i("WWE", "${Thread.currentThread().name} -> i waked up, more strong!")
                } catch (ex: InterruptedException) {
                    Thread.currentThread().interrupt()
                }
            } finally {
                lock.unlock()
            }
        }.apply {
            name = "thread-A"
        }.start()

        Thread {
            lock.lock()
            try {
                // do sth
                try {
                    Thread.sleep(3000)
                    Log.i("WWE", "${Thread.currentThread().name} -> wake up from dream")
                } catch (ex: InterruptedException) {
                    Thread.currentThread().interrupt()
                }
                condition.signalAll()
            } finally {
                lock.unlock()
            }
        }.apply {
            name = "thread-B"
        }.start()
    }
}

输出:

2022-04-22 21:32:20.790 13761-13786/com.dev I/WWE: thread-B -> wake up from dream
2022-04-22 21:32:20.790 13761-13785/com.dev I/WWE: thread-A -> i waked up, more strong!

这个例子中 thread-A 执行到 condition.await() 时,thread-A 会被挂起,直到thread-B 调用了 condition.signalAll() 方法之后,thread-A 才会重新被激活执行。

这里需要注意的是 thread-A 调用 Condition 的 await() 方法之后,thread-A 线程释放锁,然后马上加入到 Condition 的等待队列中,由于 thread-A 释放了锁,thread-B 获得锁并执行,thread-B 执行 signalAll() 方法之后,Condition中的等待队列 thread-A 被取出并加入到 AQS 中,接下来 thread-B 执行完毕之后释放锁,由于 thread-A 已经在 AQS 的等待队列中,所以 thread-A 被唤醒,继续执行。

Condition 是被绑定到 Lock 上的,要创建一个 LockCondition 必须用 newCondition() 方法。传统线程的通信方式,Condition 都可以实现。Condition 的强大之处在于它可以为多个线程间建立不同的 Condition

wait&notify/notifyAll 方式

class MainActivity : AppCompatActivity() {

    private val obj = Object()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        Thread {
            doWait()
        }.apply {
            name = "thread-A"
        }.start()

        Thread {
            doNotify()
        }.apply {
            name = "thread-B"
        }.start()
    }

    private fun doWait() {
        synchronized(obj) {
            try {
                Log.i("WWE", "${Thread.currentThread().name} #doWait")
                obj.wait()
                Log.i("WWE", "${Thread.currentThread().name} wake up")
            } catch (ex: InterruptedException) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private fun doNotify() {
        synchronized(obj) {
            try {
                Log.i("WWE", "${Thread.currentThread().name} #doNotify")
                Thread.sleep(3000)
                obj.notifyAll()
                Log.i("WWE", "${Thread.currentThread().name} notifyAll")
            } catch (ex: InterruptedException) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

输出:

2022-04-23 01:02:31.160 15948-15987/com.dev I/WWE: thread-A doWait()
2022-04-23 01:02:31.161 15948-15988/com.dev I/WWE: thread-B doNotify()
2022-04-23 01:02:34.163 15948-15988/com.dev I/WWE: thread-B notifyAll
2022-04-23 01:02:34.163 15948-15987/com.dev I/WWE: thread-A wake up

这里需要注意的是 调用 wait/notifyAll 方法的时候一定要获得当前线程的锁,否则会发生 IllegalMonitorStateException 异常。

thread.join() 方法

class MainActivity : AppCompatActivity() {
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val thread = Thread {
            Log.i("WWE", "${Thread.currentThread().name} run")
        }.apply {
            name = "thread-A"
        }
        thread.start()

        try {
            thread.join()
        } catch (ex: InterruptedException) {
            ex.printStackTrace()
        }

        Log.i("WWE", "${Thread.currentThread().name} run")
    }
}

输出:

2022-04-23 01:10:06.870 16135-16159/com.dev I/WWE: thread-A run
2022-04-23 01:10:06.870 16135-16135/com.dev I/WWE: main run

Thread.yield() 方法

Yield 方法可以暂停当前正在执行的线程对象,让其它有相同优先级的线程执行。它是一个静态方法而且 只保证当前线程放弃 CPU 占用,而不能保证使其它线程一定能占用 CPU,执行 yield() 的线程有可能在进入到暂停状态后马上又被执行。

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        Thread {
            doSth()
        }.apply {
            name = "Thread-A"
        }.start()

       Thread {
            doSth()
        }.apply {
            name = "Thread-B"
        }.start()

        Thread.sleep(3000)

        Log.i("WWE", "${Thread.currentThread().name} run")
    }

    @Synchronized
    private fun doSth() {
        for(i in 0..3) {
            Log.i("WWE", "${Thread.currentThread().name} run")
            if("Thread-A" == Thread.currentThread().name && i == 1) {
                Thread.yield()
            }
        }
    }
}

Thread.sleep() 方法

在指定的时间内无法被唤醒,同时也不会释放对象锁(如果当前已经持有锁),该方法告诉操作系统在指定时间内不需为该线程分配执行时间片,实际上,调用 sleep() 方法时并不要求持有任何锁,也就不需要包裹在 synchronized 中。

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        for (i in 0 until 2) {
            Thread {
                try {
                    Log.i("WWE", "${Thread.currentThread().name} before")
                    Thread.sleep(5000)
                    Log.i("WWE", "${Thread.currentThread().name} after")
                } catch (ex: InterruptedException) {
                    ex.printStackTrace()
                }
            }.apply {
                name = "Thread-$i"
            }.start()
        }

        Log.i("WWE", "${Thread.currentThread().name} run")
    }
}

输出:

2022-04-23 02:48:45.438 19188-19188/com.dev I/WWE: main run
2022-04-23 02:48:45.438 19188-19211/com.dev I/WWE: Thread-0 before
2022-04-23 02:48:45.439 19188-19212/com.dev I/WWE: Thread-1 before
2022-04-23 02:48:50.441 19188-19212/com.dev I/WWE: Thread-1 after
2022-04-23 02:48:50.441 19188-19211/com.dev I/WWE: Thread-0 after

ThreadLocal

ThreadLocal 是一种把变量放到线程本地的方式来实现线程同步的。ThreadLocal 与同步机制都是为了解决多线程中相同变量的访问冲突问题。

class MainActivity : AppCompatActivity() {

    // SimpleDateFormat 不是一个线程安全的类,可以使用 ThreadLocal 实现同步
    private val dateFormatThreadLocal = object : ThreadLocal<SimpleDateFormat>() {
        override fun initialValue(): SimpleDateFormat? {
            return SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        }
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        Thread {
            Log.i(
                "WWE", "${Thread.currentThread().name} -> ${
                    dateFormatThreadLocal.get().format(
                        Date()
                    )
                }"
            )
        }.apply {
            name = "thread-A"
        }.start()

        Thread {
            Log.i(
                "WWE", "${Thread.currentThread().name} -> ${
                    dateFormatThreadLocal.get().format(
                        Date()
                    )
                }"
            )
        }.apply {
            name = "thread-B"
        }.start()
    }
}

输出:

2022-04-23 01:48:46.363 17102-17127/com.dev I/WWE: thread-A -> 2022-04-23 01:48:46
2022-04-23 01:48:46.364 17102-17128/com.dev I/WWE: thread-B -> 2022-04-23 01:48:46

Semaphore 信号量

Semaphore 用于控制在同一个时间允许访问线程的个数,保证线程可以被合理的使用,可以使用构造器初始化同一时间允许被访问线程的个数:

class MainActivity : AppCompatActivity() {
    private val semaphore = Semaphore(2)

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        for (i in 0 until 5) {
            Thread {
                try {
                    semaphore.acquire()
                    Log.i("WWE", "${Thread.currentThread().name} run")
                    Thread.sleep(5000)
                    semaphore.release()
                } catch (ex: InterruptedException) {
                    ex.printStackTrace()
                }
            }.apply {
                name = "Thread-$i"
            }.start()
        }
    }
}

输出:

2022-04-23 02:33:55.669 18447-18474/com.dev I/WWE: Thread-1 run
2022-04-23 02:33:55.669 18447-18476/com.dev I/WWE: Thread-3 run
2022-04-23 02:34:00.670 18447-18475/com.dev I/WWE: Thread-2 run
2022-04-23 02:34:00.670 18447-18473/com.dev I/WWE: Thread-0 run
2022-04-23 02:34:05.671 18447-18477/com.dev I/WWE: Thread-4 run

可以看出同一时间内,只有2个线程可以被同时访问,因为构造函数里传的是2。

CountDownLatch

CountDownLatch 是一个计数器,它的构造方法中需要设置一个数值,用来设定计数的次数。每次调用 countDown() 方法之后,这个计数器都会减去1,CountDownLatch 会一直阻塞着调用 await() 方法的线程,直到计数器的值变为0。

class MainActivity : AppCompatActivity() {
    private val countDownLatch = CountDownLatch(5)

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        for(i in 0 until 5) {
            Thread {
                Log.i("WWE", "${Thread.currentThread().name} sleep before")
                try {
                    Thread.sleep(5000)
                } catch (ex: InterruptedException) {
                    ex.printStackTrace()
                }
                Log.i("WWE", "${Thread.currentThread().name} sleep after")
                countDownLatch.countDown()
            }.apply {
                name = "Thread-$i"
            }.start()
        }

        try {
            countDownLatch.await()
        } catch (ex: InterruptedException) {
            ex.printStackTrace()
        }

        Log.i("WWE", "${Thread.currentThread().name} run")
    }
}

输出:

2022-04-23 05:30:20.426 20564-20588/com.dev I/WWE: Thread-0 sleep before
2022-04-23 05:30:20.426 20564-20592/com.dev I/WWE: Thread-4 sleep before
2022-04-23 05:30:20.426 20564-20589/com.dev I/WWE: Thread-1 sleep before
2022-04-23 05:30:20.426 20564-20591/com.dev I/WWE: Thread-3 sleep before
2022-04-23 05:30:20.426 20564-20590/com.dev I/WWE: Thread-2 sleep before
2022-04-23 05:30:25.429 20564-20589/com.dev I/WWE: Thread-1 sleep after
2022-04-23 05:30:25.429 20564-20590/com.dev I/WWE: Thread-2 sleep after
2022-04-23 05:30:25.429 20564-20588/com.dev I/WWE: Thread-0 sleep after
2022-04-23 05:30:25.430 20564-20592/com.dev I/WWE: Thread-4 sleep after
2022-04-23 05:30:25.430 20564-20591/com.dev I/WWE: Thread-3 sleep after
2022-04-23 05:30:25.430 20564-20564/com.dev I/WWE: main run

当线程调用 CountDownLatch 的 await 方法时,便会尝试获取 共享锁,不过一开始通常获取不到锁,于是线程被阻塞。共享锁 可获取到的条件是 锁计数器 的值为 0,而 锁计数器 的初始值为 count,当每次调用 CountDownLatch 对象的 countDown 方法时,也可以把 锁计数器 - 1。通过这种方式,调用 count 次 countDown 方法之后,锁计数器 就为 0 了,于是之前等待的线程就会继续运行了,并且此时如果再有线程想调用 await 方法时也会被立刻放行,不会再去做任何阻塞操作了。

使用原子变量实现线程同步

什么是原子操作呢?

原子操作就是指将 读取变量修改变量保存变量 看成一个整体来操作,即这几种行为要么同时完成,要么都不完成。

java.util.concurrent.atomic 包中提供了创建原子类型变量的工具类,使用该类可以简化线程同步。比如:其中 AtomicInteger 以原子方式更新 int 的值:

class Bank {
    private AtomicInteger account = new AtomicInteger(100);

    public AtomicInteger getAccount() {
        return account;
    }

    public void save(int money) {
        account.addAndGet(money);
    }
}

AbstractQueuedSynchronizer

AQS 是很多同步工具类的基础,比如:ReentrantLock 里的公平锁和非公平锁,Semaphore 里的公平锁和非公平锁,CountDownLatch 里的锁等他们的底层都是使用 AbstractQueuedSynchronizer 完成的。在实际开发当中,应当尽量远离底层结构。下面基于 AbstractQueuedSynchronizer 自定义实现一个独占锁。

class MySynchronizer : AbstractQueuedSynchronizer() {

    override fun tryAcquire(arg: Int): Boolean {
        if (compareAndSetState(0, 1)) {
            exclusiveOwnerThread = Thread.currentThread()
            return true
        }
        return false
    }

    override fun tryRelease(arg: Int): Boolean {
        state = 0
        exclusiveOwnerThread = null
        return true
    }

    fun lock() {
        acquire(1)
    }

    fun unlock() {
        release(1)
    }
}

class MainActivity : AppCompatActivity() {
    private val mySynchronizer = MySynchronizer()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        Thread {
            mySynchronizer.lock()
            try {
                Log.i("WWE", "${Thread.currentThread().name} run before")
                try {
                    Thread.sleep(5000)
                } catch (ex: InterruptedException) {
                    Thread.currentThread().interrupt()
                }
                Log.i("WWE", "${Thread.currentThread().name} run after")
            } finally {
                mySynchronizer.unlock()
            }
        }.apply {
            name = "Thread-A"
        }.start()

        Thread {
            mySynchronizer.lock()
            try {
                Log.i("WWE", "${Thread.currentThread().name} run")
            } finally {
                mySynchronizer.unlock()
            }
        }.apply {
            name = "Thread-B"
        }.start()

        Log.i("WWE", "${Thread.currentThread().name} run")
    }
}

输出:

2022-04-23 07:26:52.498 22603-22603/com.dev I/WWE: main run
2022-04-23 07:26:52.498 22603-22628/com.dev I/WWE: Thread-B run
2022-04-23 07:26:52.498 22603-22627/com.dev I/WWE: Thread-A run before
2022-04-23 07:26:57.499 22603-22627/com.dev I/WWE: Thread-A run after

使用阻塞队列实现线程同步

LinkedBlockingQueue 是一个基于链表的队列,先进先出的顺序(FIFO),范围任意的 blocking queue。

上一篇下一篇

猜你喜欢

热点阅读