<<java编程思想>>笔记:并发2
2018-09-29 本文已影响12人
烛火的咆哮
- java中的原子操作类
原子操作是指程序编译后,对应于一条cpu操作指令,即原子操作时最小的不可再分指令集,编程中的原子操作是线程安全的,不需要使用进行线程同步和加锁机制来确保原子操作的线程同步
public class AtomicIntergerTest implements Runnable { // 创建一个值为0的Integer类型原子类
private AtomicInteger i = new AtomicInteger(0);
public int getValue() { // 获取Integer类型原子类的值
return i.get();
}
private void evenIncrement() { // 值增加2
i.addAndGet(2);
}
public void run() {
while (true) {
evenIncrement();
}
}
//
public static void main(String[] args) {
// 创建一个定时任务,5秒钟终止运行
new Timer().schedule(new TimerTask() {
public void run() {
System.err.println("abort");
System.exit(0);
}
}, 5000);
ExecutorService exec = Executors.newCachedThreadPool();
AtomicIntergerTest at = new AtomicIntergerTest();
exec.execute(at);
while (true) {
int val = at.getValue();
// 奇数
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}
- 变为原子操作后就不需要修饰为synchronized了
- 线程本地存储
- 为了防止多个线程对同一个共享资源操作碰撞,可以使用线程局部变量,机制是为共享的变量在每个线程中创建一个存储区存储变量副本
- 线程局部变量ThreadLocal实例通常是勒种的private static 字段,他们希望将状态与某个先相关联,例子如下:
public class UniqueThreadGenerator {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
// 创建一个线程局部变量
private static final ThreadLocal<Integer> uniqueNum = new ThreadLocal<Integer>() {
// 覆盖线程局部变量的initialvalue方法,为线程局部变量初始化赋值
protected Integer initialValue() {
return uniqueId.getAndIncrement();
}
};
public static int getCurrentThreadId() {
// 获取线程局部变量的当前线程副本中的值
return uniqueId.get();
}
}
线程局部变量ThreadLocal类中有下面四个方法:
-
T get();返回此线程局部变量的当前线程副本中的值;
-
protected T initialValue();返回此线程局部变量的当前线程的初始值
-
void remove(); 移除此线程局部变量当前线程的值
-
void set(T value); 将此线程局部变量的当前线程副本中的值设置为指定值
-
只要线程是活动的,并且线程局部变量实例是可访问的,每个线程都保持对其线程局部变量副本的隐式引用,在线程消失后,其线程局部变量实例的所有副本都会被垃圾回收
-
ThreadLocal通常作为静态存储区域,在创建ThreadLocal时,只能使用get()和set()方法访问该对象的内容
- 线程的yield,sleep和wait的区别
a. yield:
- 正在运行的线程让出cpu时间片,让线程调度器运行其他优先级相同的线程,使用该方法的线程有可能立即进入执行状态.
- yield不释放对象锁,即yield线程对象中其他synchronized的数据不能被别的线程使用
- 相当于暗示线程调度器我的任务已经做的差不多了
b. sleep - 当前正在运行的线程进入阻塞状态,在sleep时间内,该线程肯定不会再运行,sleep可以使优先级低的线程得到执行的机会,也可以让同优先级和高优先级的线程有执行机会.
- sleep的线程如果持有对象的线程锁,则sleep期间也不会释放线程锁,即sleep线程对象中其他synchronized的数据不能被别的线程使用.
- sleep方法可以在非synchronized线程同步的方法中调用,因为sleep不释放锁
- 相当于睡了一会,起来了该干嘛干嘛,毫不影响
c. wait - 正在运行的线程进入等待队列中,wait是object方法,线程调用wait方法后会释放掉所有持有的对象锁,即wait线程对象中,其他synchronized的数据可以被别的线程使用
- wait(),notify()和notifyAll()方法必须只能在synchronized线程同步方法中调用,因为在调用这些方法之前必须首先获得线程锁,如果在非线程同步方法中调用,编译时没有问题,运行时会提示 LLLegalMonitorStateException异常,异常信息是当前线程不是方法的监视器对象所有者
- 即彻底的被抛弃,只能等待代码的显示调用才能进入就绪队列,
- 线程间的通信
- java中通过wait(),notify()和notifyAll()方法通信,j5后还引入了signal()和signalAll()进行线程通信
a. wait(),notify()和notifyAll()方法: - wait是一个线程进入阻塞状态,wait也可以像sleep那样指定时间,notify与notifyAll用于唤醒处于阻塞状态的线程,进入可运行状态,这三个方法都在根类当中,
b. await(),signal(),signalAll()进行线程通信:
- j5中引入了线程锁Lock来实现线程同步,使用Condition将对象的监视器分解成截然不同的对象,以便通过这些对象与任一线程锁Lock组合使用,使用signal与signalAll唤醒处于阻塞中的线程
- 使用ScheduledThreadPoolExeecutor实现定时任务
- 不多bb 看例子
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
class DemoSchedule {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 创建并在给定延迟后时间启动一词性操作
public void schedule(Runnable event, long delay) {
scheduler.schedule(event, delay, TimeUnit.SECONDS);
}
// 创建并在给定延迟时间后,每个给定时间周期性执行的操作
public void repeat(Runnable event, long initiadelay, long period) {
scheduler.scheduleAtFixedRate(event, initiadelay, period, TimeUnit.SECONDS);
}
}
class OnetimeSchedule implements Runnable {
public void run() {
System.out.println("one time schedule task:");
}
}
class RepeatSchedule implements Runnable {
public void run() {
System.out.println("repeat schedule task: ");
}
}
/**
* 定时任务示例
* @author zhdpx
*
*/
public class TestSchedule{
public static void main(String[] args) {
DemoSchedule scheduler = new DemoSchedule ();
scheduler.schedule(new OnetimeSchedule(), 10);
scheduler.repeat(new RepeatSchedule(), 5,5);
}
}
- 需要先创建定时任务模型,在通过类对象调用定时模型,塞入需要执行线程
- 信号量Semaphore
- 一个正常的线程同步锁只允许同一时间只有一个任务访问一个资源,而计数器信号量Semaphore运行多个任务在同一时间访问一个资源,Semaphore是一个计数信号量,维护一个许可集,通常用于限制可以访问某些资源的线程数目,
- 在访问资源前,每个线程必须首先从信号量获取许可,从而保证可以使用该资源,使用完毕后,需返还信号量
- 若当前信号量中没有资源访问许可,则信号量会阻塞调用的线程,直到获取一资源许可,否则,线程将被中断.
public class Pool<T> {
// 限制的线程数目
private int size;
// 存放资源集合
private List<T> items = new ArrayList<T>();
// 标记资源是否被使用
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
// 创建信号量对象
available = new Semaphore(size, true);
for (int i = 0; i < size; ++i) {
try {
// 加载资源对象那个并存放到集合中
items.add(classObject.newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
// TODO: handle exception
}
}
}
// 访问资源
public T checkout() throws InterruptedException {
// 获取许可
available.acquire();
return getItem();
}
// 释放资源
public void checkIn(T x) {
if (releaseItem(x))
// 释放一个资源许可,将其返回给信号量
available.release();
}
// 获取资源
private synchronized T getItem() {
for (int i = 0; i < size; i++) {
// 资源没有被使用
if (!checkedOut[i]) {
// 标记资源被使用
checkedOut[i] = true;
return items.get(i);
}
}
return null;
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
// 资源不在资源集合中
if (index == -1)
return false;
// 资源正在被使用
if (checkedOut[index]) {
// 将资源标记为不再使用
checkedOut[index] = false;
return true;
}
return false;
}
public static void main(String[] args) {
}
}
- Exchanger线程同步交换器
- 可以用来完成线程间的数据交换,提供了一个同步点,在这个同步点,一对线程可以交换数据,每个线程通过exchagge方法的入口提供数据给他的伙伴线程,并接受他的伙伴线程提供的资源,并返回,当两个线程通过Exchagnger交换了对象,这个交换对于两个线程来说都是安全的,
public class TestExchanger {
Cup emptyCup = new Cup(0);
Cup fullCup = new Cup(100);
Exchanger<Cup> exchanger = new Exchanger<Cup>();
// 服务员类
class Waiter implements Runnable {
private int addSpeed = 1;
public Waiter(int addspeed) {
this.addSpeed = addspeed;
}
public void run() {
while (emptyCup != null) {
try {
// 如果被子已满,则与顾客交换,服务员获得空杯
if (emptyCup.isFull()) {
emptyCup = exchanger.exchange(emptyCup);
System.out.println("waiter : " + emptyCup.getCapacity());
} else {
emptyCup.addWaterToCup(addSpeed);
System.out.println(
"waiter add " + addSpeed + " and current capacity is: " + emptyCup.getCapacity());
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
}
} catch (InterruptedException e) {
e.printStackTrace();
// TODO: handle exception
}
}
}
}
// 顾客类
class Customer implements Runnable {
int drinkSpeed = 1;
public Customer(int drinkSpeed) {
this.drinkSpeed = drinkSpeed;
}
public void run() {
while (fullCup != null) {
try {
// 如果杯子已空,则与服务员进行交换,顾客获得装满水的杯子
if (fullCup.isEmpty()) {
fullCup = exchanger.exchange(fullCup);
System.out.println("Customer: " + fullCup.getCapacity());
} else {
fullCup.drinkWaterFromCup(drinkSpeed);
System.out.println(
"Customer drink " + drinkSpeed + " and current capacity is : " + fullCup.getCapacity());
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
}
} catch (InterruptedException e) {
e.printStackTrace();// TODO: handle exception
}
}
}
}
public static void main(String[] args) {
TestExchanger test = new TestExchanger();
new Thread(test.new Waiter(3)).start();
new Thread(test.new Customer(6)).start();
}
}
class Cup {
private int capacity = 0;
public Cup(int capacity) {
this.capacity = capacity;
}
public int getCapacity() {
return capacity;
}
public void addWaterToCup(int i) {
capacity += i;
capacity = capacity > 100 ? 100 : capacity;
}
public void drinkWaterFromCup(int i) {
capacity += i;
capacity = capacity < 0 ? 0 : capacity;
}
public boolean isFull() {
return capacity == 100 ? true : false;
}
public boolean isEmpty() {
return capacity == 0 ? true : false;
}
}
代码目测记事本手撸,小错误很多,都修正了下
原帖传送门