并发(Concurrency)

2018-03-03  本文已影响0人  李军_6eaa
Synchronization有哪些作用?
对atomic的理解
Synchronization,volatile,AtomicLong正确使用的几个小例子
// Broken! - How long would you expect this program to run?
//boolean类型的读和写,虽然能保证原子性(mutual exclusion),但不能保证线程间可靠的通信
public class StopThread {
    private static boolean stopRequested;
    public static void main(String[] args)
            throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
            int i = 0;
            while (!stopRequested)
                i++;
        });
        backgroundThread.start();
        TimeUnit.SECONDS.sleep(1);
        stopRequested = true;
    }
}
// 优化方案一
// Properly synchronized cooperative thread termination
// Synchronization is not guaranteed to work unless both read and write operations are synchronized
public class StopThread {
    private static boolean stopRequested;
    private static synchronized void requestStop() {
        stopRequested = true;
    }
    private static synchronized boolean stopRequested() {
        return stopRequested;
    }
    public static void main(String[] args)
            throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
            int i = 0;
            while (!stopRequested())
                i++;
        });
        backgroundThread.start();
        TimeUnit.SECONDS.sleep(1);
        requestStop();
    }
}
// 优化方案二
// Cooperative thread termination with a volatile field
// volatile 能保证reliable communication between threads,但不能保证mutual exclusion
public class StopThread {
    private static volatile boolean stopRequested;
    public static void main(String[] args)
            throws InterruptedException {
        Thread backgroundThread = new Thread(() -> {
            int i = 0;
            while (!stopRequested)
                i++;
        });
        backgroundThread.start();
        TimeUnit.SECONDS.sleep(1);
        stopRequested = true;
    }
}
// Broken - requires synchronization!
// increment operator (++) is not atomic: first it reads the value, and then it writes back a new value, equal to the old value plus one
private static volatile int nextSerialNumber = 0;
public static int generateSerialNumber() {
    return nextSerialNumber++;
}
// Lock-free synchronization with java.util.concurrent.atomic
// java.util.concurrent.atomic: This package provides primitives for lock-free, thread-safe programming on single variables
private static final AtomicLong nextSerialNum = new AtomicLong();
public static long generateSerialNumber() {
    return nextSerialNum.getAndIncrement();
}
使用synchronization时,应该注意些什么?
Obtain the lock, examine the shared data, transform it as necessary, and drop the lock. 
If you must perform some time-consuming activity, find a way to move it out of the synchronized region
// Broken - invokes alien method from synchronized block!
public class ObservableSet<E> extends ForwardingSet<E> {
    public ObservableSet(Set<E> set) { super(set); }

    private final List<SetObserver<E>> observers
            = new ArrayList<>();

    public void addObserver(SetObserver<E> observer) {
        synchronized(observers) {
            observers.add(observer);
        }
    }

    public boolean removeObserver(SetObserver<E> observer) {
        synchronized(observers) {
            return observers.remove(observer);
        }
    }

    private void notifyElementAdded(E element) {
        synchronized(observers) {
            for (SetObserver<E> observer : observers)
                observer.added(this, element); // alien method(外来方法)
        }
    }

    @Override public boolean add(E element) {
        boolean added = super.add(element);
        if (added)
            notifyElementAdded(element);
        return added;
    }

    @Override public boolean addAll(Collection<? extends E> c) {
        boolean result = false;
        for (E element : c)
            result |= add(element);  // Calls notifyElementAdded
        return result;
    }
}

@FunctionalInterface public interface SetObserver<E> {
    // Invoked when an element is added to the observable set
    void added(ObservableSet<E> set, E element);
}

public static void main(String[] args) {
    ObservableSet<Integer> set = new ObservableSet<>(new HashSet<>());

   // work fine
    set.addObserver((s, e) -> System.out.println(e));

    for (int i = 0; i < 100; i++)
        set.add(i);
}
// throw ConcurrentModificationException
set.addObserver(new SetObserver<>() {
    public void added(ObservableSet<Integer> s, Integer e) {
        System.out.println(e);
        if (e == 23)
            s.removeObserver(this);
    }
});

// uses a background thread needlessly,导致deadlock
set.addObserver(new SetObserver<>() {
   public void added(ObservableSet<Integer> s, Integer e) {
      System.out.println(e);
      if (e == 23) {
         ExecutorService exec =
               Executors.newSingleThreadExecutor();
         try {
            exec.submit(() -> s.removeObserver(this)).get();
         } catch (ExecutionException | InterruptedException ex) {
            throw new AssertionError(ex);
         } finally {
            exec.shutdown();
         }
      }
   }
});
//  方法一:Alien method moved outside of synchronized block - open calls
private void notifyElementAdded(E element) {
    List<SetObserver<E>> snapshot = null;
    synchronized(observers) {
        snapshot = new ArrayList<>(observers);
    }
    for (SetObserver<E> observer : snapshot)
        observer.added(this, element);
}
// 方法二:Thread-safe observable set with CopyOnWriteArrayList
// 此处rarely modified and often traversed,适合使用CopyOnWriteArrayList
private final List<SetObserver<E>> observers = new CopyOnWriteArrayList<>();
public void addObserver(SetObserver<E> observer) {
    observers.add(observer);
}
public boolean removeObserver(SetObserver<E> observer) {
    return observers.remove(observer);
}
private void notifyElementAdded(E element) {
    for (SetObserver<E> observer : observers)
        observer.added(this, element);
}
使用Concurrency时,应该注意什么?
//Creating a work queue
ExecutorService exec = Executors.newSingleThreadExecutor();
//submit a runnable for execution
exec.execute(runnable);
//tell the executor to terminate gracefully
exec.shutdown();
//wait for a particular task to complete
with the get method
//wait for any or all of a collection of tasks to complete
using the invokeAny or invokeAll methods
//wait for the executor service to terminate
using the awaitTermination method
//retrieve the results of tasks one by one as they complete
using an ExecutorCompletionService
//schedule tasks to run at a particular time or to run periodically
using a ScheduledThreadPoolExecutor
//For a small program, or a lightly loaded server
using Executors.newCachedThreadPool
//For a heavily loaded production server
using Executors.newFixedThreadPool
说说你对Thread和executor framework的理解
说说你对fork-join的理解
考虑到正确使用wait 和 notify的困难性,应该首先考虑使用higher-level concurrency utilities
higher-level concurrency utilities分为哪几类?
concurrent collections小例子
// This method simulates the behavior of String.intern
// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();

public static String intern(String s) {
    String previousValue = map.putIfAbsent(s, s);
    return previousValue == null ? s : previousValue;
}
// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
    String result = map.get(s);
    if (result == null) {
        result = map.putIfAbsent(s, s);
        if (result == null)
            result = s;
    }
    return result;
}
说说你对Synchronizers(同步器)的理解
说说你对CountDownLatch的理解
举一个使用CountDownLatch的例子
// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency,
            Runnable action) throws InterruptedException {
    CountDownLatch ready = new CountDownLatch(concurrency);
    CountDownLatch start = new CountDownLatch(1);
    CountDownLatch done  = new CountDownLatch(concurrency);

    for (int i = 0; i < concurrency; i++) {
        executor.execute(() -> {
            ready.countDown(); // Tell timer we're ready
            try {
                start.await(); // Wait till peers are ready
                action.run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                done.countDown();  // Tell timer we're done
            }
        });
    }

    ready.await();     // Wait for all workers to be ready
    long startNanos = System.nanoTime();
    start.countDown(); // And they're off!
    done.await();      // Wait for all workers to finish
    return System.nanoTime() - startNanos;
}
为了维护遗产代码,在使用wait 和 notify时,应该注意什么?
// The standard idiom for using the wait method
synchronized (obj) {
    while (<condition does not hold>)
        obj.wait(); // (Releases lock, and reacquires on wakeup)
    ... // Perform action appropriate to condition
}
thread safety的级别有哪些?
private final Object lock = new Object();
public void foo() {
    synchronized(lock) {
        ...
    }
}
什么叫做Lazy initialization
在使用Lazy initialization时,应该注意些什么?
lazy initialization有哪些好的技术?
// Double-check idiom for lazy initialization of instance fields
// 局部变量(result), 是确保变量(field)在初始化完成的情形下,只被read only once,使用局部变量(result)会比不使用它快1.4倍
private volatile FieldType field;

private FieldType getField() {
    FieldType result = field;
    if (result == null) {  // First check (no locking)
    synchronized(this) {
            if (field == null)  // Second check (with locking)
                field = result = computeFieldValue();
        }
    }
    return result;
}
private static class FieldHolder {
    static final FieldType field = computeFieldValue();
}

private static FieldType getField() { return FieldHolder.field; }
// Single-check idiom - can cause repeated initialization!
private volatile FieldType field;

private FieldType getField() {
    FieldType result = field;
    if (result == null)
        field = result = computeFieldValue();
    return result;
}
不要依赖thread scheduler
// Awful CountDownLatch implementation - busy-waits incessantly!
public class SlowCountDownLatch {
    private int count;

    public SlowCountDownLatch(int count) {
        if (count < 0)
           throw new IllegalArgumentException(count + " < 0");
        this.count = count;
    }

    public void await() {
        while (true) {
            synchronized(this) {
                if (count == 0)
                    return;
            }
        }
    }

    public synchronized void countDown() {
        if (count != 0)
            count--;
    }
}
上一篇 下一篇

猜你喜欢

热点阅读