多线程笔记 一

2018-05-11  本文已影响56人  骑着乌龟追小兔

Callable and Future

callable 返回的结果通常会封装成一个Future对象;
Callable Interface
public interface Callable<V> {
V call() throws Exception;
}
Future Interface
interface Future<V> {
V get();
V get(long timeout, TimeUnit unit);
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
示例
public class ComplexCalculator implements Callable<String> {
@Override
public String call() throws Exception {
// just sleep for 10 secs to simulate a lengthy computation
Thread.sleep(10000);
System.out.println("Result after a lengthy 10sec calculation");
return "Complex Result"; // the result
}
}
public static void main(String[] args) throws Exception {

ExecutorService es = Executors.newSingleThreadExecutor();

System.out.println("Time At Task Submission : " + new Date());

Future<String> result = es.submit(new ComplexCalculator());
// the call to Future.get() blocks until the result is available.So //we are in for about a 10 sec wait now
System.out.println("Result of Complex Calculation is : " +result.get());
System.out.println("Time At the Point of Printing the Result : " + new Date());
}

Future 方法介绍

CountDownLatch


  1. CountDownLatch 初始化需要有固定的数量
  2. await方法将会一直阻塞,如果countDown() 调用次数依然没有超过初始化的givennumber,在countdown 次数执行完毕之后,所有等待的线程都会被释放,在此之后的任何调用都会迅速返回结果。
  3. 这是一个一次性策略,如果你需要重新设置数量没那么请使用CyclicBarrier。
关键方法
示例代码
import java.util.concurrent.*;

class DoSomethingInAThread implements Runnable {
    CountDownLatch latch;

    public DoSomethingInAThread(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            System.out.println("Do some thing");
            latch.countDown();
        } catch (Exception err) {
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try {
            int numberOfThreads = 5;
            if (args.length < 1) {
                System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
                return;
            }
            try {
                numberOfThreads = Integer.parseInt(args[0]);
            } catch (NumberFormatException ne) {
            }
            CountDownLatch latch = new CountDownLatch(numberOfThreads);
            for (int n = 0; n < numberOfThreads; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of " + numberOfThreads + "
                    threads");
        } catch (Exception err) {
            err.printStackTrace();
        }
    }
}

备注:

  1. CountDownLatch 在主线程初始化,count为5
  2. 通过调用await 方法阻塞主线程.
  3. 总共创建了 五个DoSomethingInAThread对象,每个对象都通过执行countDown 减少 count。
  4. 一旦couter 变成0 那么主线程状态就会被重新恢复。

多线程基础


class CountAndPrint implements Runnable {
    private final String name;
    CountAndPrint(String name) {
        this.name = name;
    }
    /** This is what a CountAndPrint will do */
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            System.out.println(this.name + ": " + i);
        }
    }
    public static void main(String[] args) {
// Launching 4 parallel threads
        for (int i = 1; i <= 4; i++) {
// `start` method will call the `run` method
// of CountAndPrint in another thread
            new Thread(new CountAndPrint("Instance " + i)).start();
        }
// Doing some others tasks in the main Thread
        for (int i = 0; i < 10000; i++) {
            System.out.println("Main: " + i);
        }
    }
}

加锁同步措施

锁是同步机制的基础,一般用来同步代码块或者关键字

固有锁
int count = 0; // shared among multiple threads
public void doSomething() {
synchronized(this) {
        ++count; // a non-atomic operation
        }
        }

重入锁

    int count = 0; // shared among multiple threads
    Lock lockObj = new ReentrantLock();
    public void doSomething() {
        try {
            lockObj.lock();
            ++count; // a non-atomic operation
        } finally {
            lockObj.unlock(); // sure to release the lock without fail
        }
    }

加锁的同时允许被打断
    class Locky {
        int count = 0; // shared among multiple threads
        Lock lockObj = new ReentrantLock();
        public void doSomething() {
            try {
                try {
                    lockObj.lockInterruptibly();
                    ++count; // a non-atomic operation
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // stopping
                }
            } finally {
                if (!Thread.currentThread().isInterrupted()) {
                    lockObj.unlock(); // sure to release the lock without fail
                }
            }
        }
    }
只有在符合某些条件下才会被加锁
    public class Locky2 {
        int count = 0; // shared among multiple threads
        Lock lockObj = new ReentrantLock();
        public void doSomething() {
            boolean locked = lockObj.tryLock(); // returns true upon successful lock
            if (locked) {
                try {
                    ++count; // a non-atomic operation
                } finally {
                    lockObj.unlock(); // sure to release the lock without fail
                }
            }
        }
    }

Semaphore(信号灯)

Semaphore是一个高度同步的类,其维护一组许可,这些许可可以被请求以及释放。Semaphore 可以想象成一个一组许可的计数器。当有线程请求的时候计数器-1,有线程释放的时候计数器+1. 如果计数器 为0那么当有线程尝试发起请求的时候,那么这个线程将会一直阻塞,只到许可条件产生。
Semaphore初始化方式
   class Pool {
        /*
        * Note that this DOES NOT bound the amount that may be released!
        * This is only a starting value for the Semaphore and has no other
        * significant meaning UNLESS you enforce this inside of the
        * getNextAvailableItem() and markAsUnused() methods
        */
        private static final int MAX_AVAILABLE = 100;
        private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
        /**
         * Obtains the next available item and reduces the permit count by 1.
         * If there are no items available, block.
         */
        public Object getItem() throws InterruptedException {
            available.acquire();
            return getNextAvailableItem();
        }
        /**
         * Puts the item into the pool and add 1 permit.
         */
        public void putItem(Object x) {
            if (markAsUnused(x))
                available.release();
        }
        private Object getNextAvailableItem() {
// Implementation
        }
        private boolean markAsUnused(Object o) {
// Implementation
        }
    }

Runnable Object

Runnable interface 定义一个run()方法,在其中的代码都在新的线程执行。
Runnable对象可以传递给Thread 构造器。Thread 的 start 方法被调用便意味着 runnable 对象被执行了。

示例代码
  public class HelloRunnable implements Runnable {
        @Override
        public void run() {
            System.out.println("Hello from a thread");
        }
        public static void main(String[] args) {
            new Thread(new HelloRunnable()).start();
        }
    }

   public static void main(String[] args) {
        Runnable r = () -> System.out.println("Hello world");
        new Thread(r).start();
    }

Runnable vs Thread subclass


创建一个死锁

死锁通常发生在两个互相等待结束的竞争操作,但是并没有。在java中一个lock 与每个对象都有关系。为了避免多线程同步修改一个对象这种情况,我们可以使用一个叫做synchronized 的关键字,不过还是要付出一些代价的。错误的使用synchronized可能会产生死锁。
考虑到有两个线程在操作一个实例,我们给线程定义为1,2,然后假设我们有两个资源文件R1,R2。线程11去请求R1而且需要R2,但是R2 被线程2 占用了 线程2 需要线程1,这样下去就比较刺激了 线程1 得到了R1 ,线程2 得到R2 。双方都在等待互相释放对象,那么神奇的死锁便产生了。
示例代码
 public class Example2 {
        public static void main(String[] args) throws InterruptedException {
            final DeadLock dl = new DeadLock();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
// TODO Auto-generated method stub
                    dl.methodA();Java® Notes for Professionals 676
                }
            });
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
// TODO Auto-generated method stub
                    try {
                        dl.method2();
                    } catch (InterruptedException e) {
// TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            t1.setName("First");
            t2.setName("Second");
            t1.start();
            t2.start();
        }
    }
    class DeadLock {
        Object mLock1 = new Object();
        Object mLock2 = new Object();
        public void methodA() {
            System.out.println("methodA wait for mLock1 " + Thread.currentThread().getName());
            synchronized (mLock1) {
                System.out.println("methodA mLock1 acquired " + Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                    method2();
                } catch (InterruptedException e) {
// TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        public void method2() throws InterruptedException {
            System.out.println("method2 wait for mLock2 " + Thread.currentThread().getName());
            synchronized (mLock2) {
                System.out.println("method2 mLock2 acquired " + Thread.currentThread().getName());
                Thread.sleep(100);
                method3();
            }
        }
        public void method3() throws InterruptedException {
            System.out.println("method3 mLock1 "+ Thread.currentThread().getName());
            synchronized (mLock1) {
                System.out.println("method3 mLock1 acquired " + Thread.currentThread().getName());
            }
        }
    }

创建一个Thread线程实例

在java 中主要有两种方式创建线程,一般来说之间新建一个线程然后执行比较容易。两种方式的主要区别就是在哪儿创建执行代码。 在java中一个线程就是一个对象,Thread的实例或者子类对象。所以第一种创建线程的方式就是创建其子类,然后复现run 方法。
示例代码
class MyThread extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                System.out.println("Thread running!");
            }
        }
    }
    
    MyThread t = new MyThread();
Thread 类还可以接受一个String 值作为构造方法,这在多线程编程调试的时候会特别好用

    class MyThread extends Thread {
        public MyThread(String name) {
            super(name);
        }
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                System.out.println("Thread running! ");
            }
        }
    }
    
    MyThread t = new MyThread("Greeting Producer");
第二种创建线程的方式是用一个runnable 对象。然后就会在一个单独的线程中执行runnable中run 的操作。
Thread t = new Thread(aRunnable);
当然你还可以这样定义
Thread t = new Thread(operator::hardWork, "Pi operator");
一般来说,这两种创建方式你都可以放心使用,但是明智的做法应该采用后者。
下面我们来说一下本文要说的第四种
ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");
所以我们来总结一下,线程可以通过如下构造
Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)

最后一个允许我们定义个一个期望的size 来创建新的线程
通常来说创建很多相同属性的线程会让你十分痛苦。这时候就该 java.util.concurrent.ThreadFactory 登场了。这个接口可以通过工厂模式极大的减少我们创建线程的过程,该接口使用十分简单newThread(Runnable)。
 class WorkerFactory implements ThreadFactory {
        private int id = 0;
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Worker " + id++);
        }
    }
    

原子操作

原子操作指的是一次操作期间这个对象只能被执行一次,在执行操作期间其他线程是没有机会观察或者改变状态。
我们来看一下负面案例
   private static int t = 0;
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count
        is for demonstration purposes.
        for (int i = 0; i < 100; i++) {
            executorService.execute(() -> {
                t++;
                System.out.println(MessageFormat.format("t: {0}", t));
            });
        }
        executorService.shutdown();
    }
这个案例存在两个问题。第一个就是++1 这个行为不是原子的。他包含多个操作:获取值,+1,赋值。这就是为什么当我们允许这个样例的时候,我们基本上看不到t:100.第二个问题两个线程有可能同时获取值然后+1赋值。当我们假设当前t=10,然后两个线程同时操作t。两个线程都对t 设置成11,既然这样后面执行的程序可以获取t的值了,即便线程1还没有关闭。
为了避免这种情况,我们一般使用java.util.concurrent.atomic.AtomicInteger,该类可以为我们提供很多原子操作。

    private static AtomicInteger t = new AtomicInteger(0);
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count
        is for demonstration purposes.
        for (int i = 0; i < 100; i++) {
            executorService.execute(() -> {
                int currentT = t.incrementAndGet();
                System.out.println(MessageFormat.format("t: {0}", currentT));
            });
        }
        executorService.shutdown();
    }

incrementAndGet 方法会自动递增然后返回最新值。这样就消除了之前的那种竞争条件 race condition。但是要注意在这个案例中输出的依旧是无序的,这是因为我们对于打印输出没有做任何处理,这个案例仅仅展示如何使用AtomicInteger来避免竞争条件。
上一篇下一篇

猜你喜欢

热点阅读