《实战JAVA——高并发程序设计》part1:

2019-06-23  本文已影响0人  让我再睡会儿啊

重点1:常见错误


ArrayList在并发下的问题:

eg.

package org.chain.current.demo.exceptiondemo;

import java.util.ArrayList;

public class ArrayListIssue {
    static class ArrayListThread implements Runnable {

        private ArrayList<Integer> list;

        ArrayListThread(ArrayList<Integer> lsit) {
            this.list = lsit;
        }

        @Override
        public void run() {
            for (int i = 0; i < 1000000; i++) {
                list.add(i);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ArrayList<Integer> list = new ArrayList<>(10);
        Thread t1 = new Thread(new ArrayListThread(list));
        Thread t2 = new Thread(new ArrayListThread(list));
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("process end...,the list size is " + list.size());
    }
}

HashMap在并发下的问题:

eg.

package org.chain.current.demo.exceptiondemo;

import java.util.HashMap;

public class HashMapIssue {
    static class HashMapThread implements Runnable {
        int startNum;
        HashMap<String, String> hashMap;

        HashMapThread(int startNum, HashMap<String, String> hashMap) {
            this.startNum = startNum;
            this.hashMap = hashMap;
        }

        @Override
        public void run() {
            for (int i = startNum; i < 10000; i += 2) {
                hashMap.put(String.valueOf(i), "test");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        HashMap<String, String> hashMap = new HashMap<>(16);
        Thread t1 = new Thread(new HashMapThread(0, hashMap));
        Thread t2 = new Thread(new HashMapThread(1, hashMap));
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("process end the map size is " + hashMap.size());
    }
}

ps:JDK8对HashMap内部做了大规模的调整,不会出现程序无法结束的问题


错误的加锁

eg.

package org.chain.current.demo.exceptiondemo;

/**
 * 锁应该加在正确的对象上
 */
public class LockObjectDemo {
    static class BadLockOnInteger implements Runnable {

        Integer integer = 0;

        @Override
        public void run() {
            for (int j = 0; j < 10000; j++) {
                synchronized (integer) {
                    integer++;
                }
//                synchronized (this) {
//                    integer++;
//                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        BadLockOnInteger badLockOnInteger = new BadLockOnInteger();
        Thread t1 = new Thread(badLockOnInteger);
        Thread t2 = new Thread(badLockOnInteger);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println("process is end the integer is " + badLockOnInteger.integer);
    }
}

重点二:并行模式


并行流水线

如果我们要计算(B+C)*B/2,必须按照算数优先级的顺序依次计算,无法并行。
可以借鉴生产流水线的思想,把计算过程拆分为:
1.p1=B+C
2.p2=p1*B
3.p3=p2/2
eg.

package org.chain.current.demo.parallelpattern;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 流水线demo
 * 计算(B+C)*B/2
 */
public class FlowLineDemo {
    private static BlockingQueue<Msg> qOne = new LinkedBlockingQueue<>();
    private static BlockingQueue<Msg> qTwo = new LinkedBlockingQueue<>();
    private static BlockingQueue<Msg> qThree = new LinkedBlockingQueue<>();

    static class Msg {
        float a;
        float b;
        StringBuilder log = new StringBuilder();
    }

    static class ProcessOne implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Msg temp = qOne.take();
                    float a = temp.a;
                    float b = temp.b;
                    temp.a = (a + b);
                    temp.b = a;
                    temp.log.append("P1:B=").append(a).append(",C=").append(b).append(",(B+C)=").append(temp.a).append("<<- ->>");
                    qTwo.put(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class ProcessTwo implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Msg temp = qTwo.take();
                    float a = temp.a;
                    float b = temp.b;
                    temp.a = a * b;
                    temp.log.append("P2:(B+C)=").append(a).append(",B=").append(b).append(",(B+C)*B=").append(temp.a).append("<<- ->>");
                    qThree.put(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class ProcessThree implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    Msg temp = qThree.take();
                    float a = temp.a / 2;
                    temp.a = a;
                    temp.log.append("P3:(B+C)*B=").append(a).append(",(B+C)*B/2=").append(temp.a);
                    System.out.println("flow end," + temp.log);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Random random = new Random();
                Msg msg = new Msg();
                msg.a = random.nextInt(10);
                msg.b = random.nextInt(10);
                try {
                    qOne.put(msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Producer());
        Thread t2 = new Thread(new ProcessOne());
        Thread t3 = new Thread(new ProcessTwo());
        Thread t4 = new Thread(new ProcessThree());
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t1.join();
        t2.join();
        t3.join();
        t4.join();
    }
}

并行搜索

比如在一个数组中查询某个数,可以将数组分段利用多线程并行搜索
eg

package org.chain.current.demo.producerandconsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 并行搜索demo
 */
public class parallelSearch {
    private static int[] arr;
    private static AtomicInteger result = new AtomicInteger(-1);

    private static Integer search(int val, int start, int end) {
        for (int i = start; i < end; i++) {
            if (result.get() != -1) {
                return result.get();
            }
            if (val == arr[i]) {
                result.compareAndSet(-1, i);
                return result.get();
            }
        }
        return -1;
    }

    static class SearchTask implements Callable<Integer> {
        private int val;
        private int start;
        private int end;

        public SearchTask(int val, int start, int end) {
            this.val = val;
            this.start = start;
            this.end = end;
        }

        @Override
        public Integer call() throws Exception {
            System.out.println("ThreadName:" + Thread.currentThread().getName() + ",start:" + start + ",end:" + end);
            return search(val, start, end);
        }
    }

    private static int PSearch(int val, int threadNum) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        List<Future<Integer>> futures = new ArrayList<>();
        int step = arr.length / threadNum;
        for (int i = 0; i < arr.length; i += step) {
            int end = i + step;
            if ((end + step) > arr.length) {
                end = arr.length;
                futures.add(executorService.submit(new SearchTask(val, i, end)));
                break;
            }
            futures.add(executorService.submit(new SearchTask(val, i, end)));
        }
        executorService.shutdown();
        for (Future<Integer> future : futures) {
            if (future.get() >= 0) {
                return future.get();
            }
        }
        return -1;
    }

    private static void initArray(int i) {
        arr = new int[i];
        Random random = new Random();
        for (int j = 0; j < i; j++) {
            arr[j] = random.nextInt(10);
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        initArray(10);
        System.out.println("arr:" + Arrays.toString(arr));
        System.out.println("result:" + PSearch(5, 3));
    }
}

并行排序

回顾了基础的排序算法,冒泡排序,奇偶交换排序,插入排序,希尔排序。
利用并发实现奇偶交换排序和希尔排序。
eg1:奇偶交换排序并发版

package org.chain.current.demo.parallelpattern.sort;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 奇偶交换排序并行模式
 */
public class ParallelOddEvenSortDemo {

    private static int[] arr;

    private static ExecutorService pool = Executors.newCachedThreadPool();

    private static int exchFlag = 1;

    private static synchronized int getExchFlag() {
        return exchFlag;
    }

    private static synchronized void setExchFlag(int v) {
        exchFlag = v;
    }

    private static class ExchangeTask implements Runnable {
        int i;
        CountDownLatch cdl;

        ExchangeTask(int i, CountDownLatch cdl) {
            this.i = i;
            this.cdl = cdl;
        }

        @Override
        public void run() {
            if (arr[i] > arr[i + 1]) {
                arr[i] = arr[i] ^ arr[i + 1];
                arr[i + 1] = arr[i] ^ arr[i + 1];
                arr[i] = arr[i] ^ arr[i + 1];
                setExchFlag(1);
            }
            cdl.countDown();
        }
    }

    private static void parallelOddEvenSort() throws InterruptedException {
        int start = 0;
        while (getExchFlag() == 1 || start == 1) {
            setExchFlag(0);
            CountDownLatch cdl = new CountDownLatch(arr.length / 2 - (arr.length % 2 == 0 ? start : 0));
            for (int i = start; i < arr.length - 1; i += 2) {
                pool.submit(new ExchangeTask(i, cdl));
            }
            cdl.await();
            start = start == 0 ? 1 : 0;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        arr = DemoUtil.init(10, 100);
        System.out.println("the origin arr is:" + Arrays.toString(arr));
        parallelOddEvenSort();
        System.out.println("the sorted arr is:" + Arrays.toString(arr));
        pool.shutdown();
    }
}

eg2:希尔排序并发版本

package org.chain.current.demo.parallelpattern.sort;

import java.util.Arrays;

/**
 * 希尔排序串行demo
 */
public class ShellSortDemo {

    public static void main(String[] args) {
        int[] arr = DemoUtil.init(19, 100);
        System.out.println("origin array:" + Arrays.toString(arr));
        shellSort(arr);
        System.out.println("sorted array:" + Arrays.toString(arr));
    }

    public static void shellSort(int[] arr) {
        //计算最大h
        int h = 1;
        while (h <= arr.length / 3) {
            h = 3 * h + 1;
        }
        while (h > 0) {
            for (int i = h; i < arr.length; i++) {
                if (arr[i] < arr[i - h]) {
                    int tmp = arr[i];
                    int j = i - h;
                    while (j >= 0 && arr[j] > tmp) {
                        arr[j + h] = arr[j];
                        j -= h;
                    }
                    arr[j + h] = tmp;
                }
            }
            h = (h - 1) / 3;
        }
    }
}

点我获取代码

上一篇下一篇

猜你喜欢

热点阅读