多线程笔记 二

2018-05-11  本文已影响39人  骑着乌龟追小兔

1.Exclusive write / Concurrent read access 互斥读写

有时候我们会对一份数据同时进行读和写的操作
ReadWriteLock 接口还有他的实现类ReentrantReadWriteLock 可以让我们实现如下场景的功能:
  1. 可能有任意数量的同步读取操作。如果有至少一个读取操作获得允许,那么就不会产生写入操作。
  2. 最多只能有一个写操作,如果已经有一个写操作已经被允许那么就不能进行读操作。
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;


    public class Sample {
// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions. protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();
// This is a typical data that needs to be protected for concurrent access protected static int data = 0;

        /**
         * This will write to the data, in an exclusive access
         */
        public static void writeToData() {
            RW_LOCK.writeLock().lock();
            try {
                data++;
            } finally {
                RW_LOCK.writeLock().unlock();
            }
        }

        public static int readData() {
            RW_LOCK.readLock().lock();
            try {
                return data;
            } finally {
                RW_LOCK.readLock().unlock();
            }
        }
    }
备注:如上场景我们应该使用AtomicInteger,但是我们在这边只是用来举例,这个锁操作并不关心这个数据是否是一个原子类型的变量。
在读操作这一边的锁是非常有必要的,虽然这个操作看起来像是针对普通读操作的。事实上如果你不在读文件时候进加锁,那么任何操作都有可能会出错:
  1. 基本类型的写入操作在任何行虚拟机上都不保证是原子类型的操作。在写入一个64bits 的 long型数据最后只会有32bits。

为了更高的性能要求,还有一种更快类型的锁,叫做StampedLock ,除此之外还有一些一起继承乐观锁的类型。这个所以与ReadWriteLock工作情况区别很大。


Producer-Consumer 生产者-消费者模型

    public class Producer implements Runnable {
        private final BlockingQueue<ProducedData> queue;

        public Producer(BlockingQueue<ProducedData> queue) {
            this.queue = queue;
        }

        public void run() {
            int producedCount = 0;
            try {
                while (true) {
                    producedCount++;                //put throws an InterruptedException when the thread is interrupted                queue.put(new ProducedData());            }        } catch (InterruptedException e) {            // the thread has been interrupted: cleanup and exit            producedCount--;            //re-interrupt the thread in case the interrupt flag is needeed higher up            Thread.currentThread().interrupt();        }        System.out.println("Produced " + producedCount + " objects");    } }


                }
                
    public class Consumer implements Runnable {
        private final BlockingQueue<ProducedData> queue;

        public Consumer(BlockingQueue<ProducedData> queue) {
            this.queue = queue;
        }

        public void run() {
            int consumedCount = 0;
            try {
                while (true) {                //put throws an InterruptedException when the thread is interrupted                ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);                // process data                consumedCount++;            }        } catch (InterruptedException e) {            // the thread has been interrupted: cleanup and exit            consumedCount--;            //re-interrupt the thread in case the interrupt flag is needeed higher up            Thread.currentThread().interrupt();        }        System.out.println("Consumed " + consumedCount + " objects");    } }

                } 
                
       public class ProducerConsumerExample {
        static class ProducedData {            // empty data object    }
            public static void main(String[] args) throws InterruptedException {
                BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);        // choice of queue determines the actual behavior: see various BlockingQueue implementations
                Thread producer = new Thread(new Producer(queue));
                Thread consumer = new Thread(new Consumer(queue));
                producer.start();
                consumer.start();
                Thread.sleep(1000);
                producer.interrupt();
                Thread.sleep(10);
                consumer.interrupt();
            }
        }
    }            

使用synchronized / volatile 对读写操作可见性的影响

   class Counter {
        private Integer count = 10;

        public synchronized void incrementCount() {
            count++;
        }

        public Integer getCount() {
            return count;
        }
    }
public synchronized Integer getCount() {  return count; }


获取你的程序中的所有线程状态

代码片段 Code snippet
import java.util.Set;

    public class ThreadStatus {
        public static void main(String args[]) throws Exception {
            for (int i = 0; i < 5; i++) {
                Thread t = new Thread(new MyThread());
                t.setName("MyThread:" + i);
                t.start();
            }
            int threadCount = 0;
            Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
            for (Thread t : threadSet) {
                if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
                    System.out.println("Thread :" + t + ":" + "state:" + t.getState());
                    ++threadCount;
                }
            }
            System.out.println("Thread count started by Main thread:" + threadCount);
        }
    }

    class MyThread implements Runnable {
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

解释:

Thread.getAllStackTraces().keySet()返回包含application 和 系统的所有线程。如果你只对你创建的线程的状态感兴趣,那么遍历Thread set 然后通过检查 Thread Group 来判断线程是否属于app的线程。

使用ThreadLocal

   private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();

    public static MyUserContext getContext() {
        return contexts.get(); // get returns the variable unique to this thread
    }

    public void doGet(...) {
        MyUserContext context = magicGetContextFromRequest(request);
        contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
        try {
// business logic
        } finally {
            contexts.remove(); // 'ensure' removal of thread-local variable
        }
    }

使用共享全局队列的多producer/consumer 案例


import java.util.concurrent.*;
import java.util.Random;
    public class ProducerConsumerWithES {
        public static void main(String args[]) {
            BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
            ExecutorService pes = Executors.newFixedThreadPool(2);
            ExecutorService ces = Executors.newFixedThreadPool(2);
            pes.submit(new Producer(sharedQueue, 1));
            pes.submit(new Producer(sharedQueue, 2));
            ces.submit(new Consumer(sharedQueue, 1));
            ces.submit(new Consumer(sharedQueue, 2));
            pes.shutdown();
            ces.shutdown();
        }
    }
    /* Different producers produces a stream of integers continuously to a shared queue,
    which is shared between all Producers and consumers */
    class Producer implements Runnable {
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
        private Random random = new Random();
        public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.threadNo = threadNo;
            this.sharedQueue = sharedQueue;
        }
        @Override
        public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
            while (true) {
                try {
                    int number = random.nextInt(1000);
                    System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                    sharedQueue.put(number);
                    Thread.sleep(200);
                } catch (Exception err) {
                    err.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
        public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.sharedQueue = sharedQueue;
            this.threadNo = threadNo;
        }
        @Override
        public void run() {
// Consumer consumes numbers generated from Producer threads continuously
            while(true){
                try {
                    int num = sharedQueue.take();
                    System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
                } catch (Exception err) {
                    err.printStackTrace();
                }
            }
        }
    }

Produced:497:by thread:1
Produced:300:by thread:2
Consumed: 497:by thread:1
Consumed: 300:by thread:2
Produced:64:by thread:2
Produced:984:by thread:1
Consumed: 64:by thread:1
Consumed: 984:by thread:2
Produced:102:by thread:2
Produced:498:by thread:1
Consumed: 102:by thread:1
Consumed: 498:by thread:2
Produced:168:by thread:2
Produced:69:by thread:1
Consumed: 69:by thread:2
Consumed: 168:by thread:1
  1. sharedQueue,是一个LinkedBlockingQueue,在生产者和消费者线程之间共享
  2. 生产者线程每隔200ms 生产一个数字 然后持续的添加入队列
  3. 消费者从sharedQueue 持续消耗数字
  4. 这个程序实现无需 synchronized或者锁结构。 BlockingQueue 是实现这个模型的关键。

使用Threadpool 相加两个 int 类型的数组

    public static void testThreadpool() {
        int[] firstArray = { 2, 4, 6, 8 };
        int[] secondArray = { 1, 3, 5, 7 };
        int[] result = { 0, 0, 0, 0 };
        ExecutorService pool = Executors.newCachedThreadPool();
// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
        for (int i = 0; i < result.length; i++) {
            final int worker = i;
            pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
        }
// Wait for all Workers to finish:
        try {
// execute all submitted tasks
            pool.shutdown();
// waits until all workers finish, or the timeout ends
            pool.awaitTermination(12, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            pool.shutdownNow(); //kill thread
        }
        System.out.println(Arrays.toString(result));
    }
说明:
  1. 这个案例只是单纯展示用。在实际使用中,我们不会仅仅为了这点任务就使用线程池。
  2. Java7 中你将看到使用匿名内部类而不是lamda 来实现这个任务。

Pausing Execution 暂停执行处理器时间对其他线程可用。sleep 方法有两个复写方法在Thread 类制作。

  1. 指定sleep 时间
public static void sleep(long millis) throws InterruptedException

  1. 指定sleep时间
public static void sleep(long millis, int nanos)

Thread 源码介绍
这对于系统核心的调度是非常重要的。这个可能产生不可预测的结果,而且有些实现甚至不考虑nano s参数。我们建议在 用try catch 包住 Thread.sleep 操作并且catch InterruptedException. 异常。

线程中断/终止线程

案例
用中断线程来打断任务执行
   class TaskHandler implements Runnable {
            private final BlockingQueue<Task> queue;
            TaskHandler(BlockingQueue<Task> queue) {
                this.queue = queue;
            }
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()) { // 
   
                    try {
                        Task task = queue.take(); // blocking call, responsive to interruption
                        handle(task);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            private void handle(Task task) {
// actual handling
            }
        }
    }
等待程序执行完毕,延迟设置打断flag
  class MustFinishHandler implements Runnable {
        private final BlockingQueue<Task> queue;

        MustFinishHandler(BlockingQueue<Task> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            boolean shouldInterrupt = false;
            while (true) {
                try {
                    Task task = queue.take();
                    if (task.isEndOfTasks()) {
                        if (shouldInterrupt) {
                            Thread.currentThread().interrupt();
                        }
                        return;
                    }
                    handle(task);
                } catch (InterruptedException e) {
                    shouldInterrupt = true; // must finish, remember to set interrupt flag when we're
                    done
                }
            }
        }

        private void handle(Task task) {
        // actual handling
        }
    }

固定的任务列表不过在中断时候可能会提前退出。

  class GetAsFarAsPossible implements Runnable {
        private final List<Task> tasks = new ArrayList<>();

        @Override
        public void run() {
            for (Task task : tasks) {
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
                handle(task);
            }
        }

        private void handle(Task task) {
        // actual handling
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读