java8-流

2018-10-14  本文已影响11人  lesline

2、java8: 流

reactor

流是JavaAPI的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理。

示例

示例一:取热量小于400,并按卡路里排序

  1. Java7
    public static List<String> getLowCaloricDishesNamesInJava7(List<Dish> dishes){
        List<Dish> lowCaloricDishes = new ArrayList<>();
        for(Dish d: dishes){
            if(d.getCalories() < 400){
                lowCaloricDishes.add(d);
            }
        }
        List<String> lowCaloricDishesName = new ArrayList<>();
        Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
            public int compare(Dish d1, Dish d2){
                return Integer.compare(d1.getCalories(), d2.getCalories());
            }
        });
        for(Dish d: lowCaloricDishes){
            lowCaloricDishesName.add(d.getName());
        }
        return lowCaloricDishesName;
    }
  1. java8实现
    public static List<String> getLowCaloricDishesNamesInJava8(List<Dish> dishes){
        return dishes.stream()
                .filter(d -> d.getCalories() < 400)
                .sorted(comparing(Dish::getCalories))
                .map(Dish::getName)
                .collect(toList());
    }

为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream():

流概念

简短的定义就是“从支持数据处理操作的源生成的元素序列”。


流的构成.png

特点:
只遍历一次:一个流只能遍历一次
内部迭代:steams库使用内部迭代把迭代做了,不需要通过用户编写代码进行外部迭代

操作

中间操作和终端操作

中间操作和终端操作.png

操作分类

筛选:filter/distinct
截短流:limit
跳过元素:skip
映射:map、flatmap
查找:anyMatch/noneMatch/findAny/findFirst
归约:reduce/collect

map/flatmap的区别:

        List<Integer> mapResult = Stream.of(1, 2)
                .map(number -> number + 1).collect(toList());

        List<Integer> flatMapResult = Stream.of(Arrays.asList(1, 2), Arrays.asList(3, 4))
                .flatMap(numbers -> numbers.stream()).collect(toList());

map:
操作流中的每一个元素,(入参:出参=1:1)
flatmap:
仍操作流中的每一个元素,但流中的元素是列表,flatmap的Function入参是列表,出参是是流,将流合并作为执行结果是)(入参:出参=1:n)
一言以蔽之,flatmap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。

        Arrays.asList("Hello", "World").stream()
                .map(line -> line.split(""))
                .flatMap(line -> Arrays.stream(line))
                .distinct()
                .forEach(System.out::println);

执行过程如下图:


flatmap执行过程.png

归约求最大值、最小值、和

        //求和
        List<Integer> numbers = Arrays.asList(3, 4, 5, 1, 2);
        int sum = numbers.stream().reduce(0, (a, b) -> a + b);
        int sum1 = numbers.stream().reduce(0,  (a, b) -> Integer.sum(a, b));
        int sum2 = numbers.stream().reduce(0, Integer::sum);
          int sum3 = numbers.stream().mapToInt(Integer::intValue).sum();



        //最大值
        int max = numbers.stream().reduce(0, Integer::max);

        //最小值
        Optional<Integer> min = numbers.stream().reduce(Integer::min);

构建流

由值、数组、集合生成流水
由文件生成流
由函数生成流:创建无限流

用流收集数据

收集器

收集器用于将stream中的元素做汇总,传递给collect方法的参数是Collector接口的一个实现。

collect(Collectors.toList()):按顺序给每个元素生成一个列表;
collect(Collectors.groupingBy(Transaction::getCurrency)):生成一个map,它的键是货币,值是等于该货币的列表。
按币种对交易进行分组:

    //建立累积交易分组的Map
        Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
        //迭代Transaction的List
        for (Transaction transaction : transactions) {
            //提取Transaction的货币
            Currency currency = transaction.getCurrency();
            List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
            //如果分组Map中没有这种货币的条目,就创建一个
            if (transactionsForCurrency == null) {
                transactionsForCurrency = new ArrayList<>();
                transactionsByCurrencies.put(currency, transactionsForCurrency);
            }
            //将当前遍历的Transaction加入同一货币的Transaction的List
            transactionsForCurrency.add(transaction);
        }
        Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().collect(groupingBy(Transaction::getCurrency));

预定义的收集器
从Collectors类提供的工厂方法(例如groupingBy)创建的收集器。
它们主要提供了三大功能:

归约和汇总

          //总和
        long howManyDishes = menu.stream().collect(counting());

        //平均
        double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
        
          //最大值
        Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
        Optional<Dish> mostCaloricDish = menu.stream().collect(maxBy(dishCaloriesComparator));

        //汇总 最大、最小、平均、总和  IntSummaryStatistics{count=9, sum=4300, min=120, average=477.777778, max=800}
        IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));

连接字符串

        //连接字符串 1,2,3
        String str = Arrays.asList(1, 2, 3).stream().map(String::valueOf).collect(joining(","));
        int sum1 = menu.stream().collect(Collectors.summingInt(Dish::getCalories));
        int sum2 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, (i, j) -> i + j));
        int sum3 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, Integer::sum));
        int sum4 = menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();
        int sum5 = menu.stream().mapToInt(Dish::getCalories).sum();

注意:stream接口的collect和reduce方法通常可以获取相同的结果,尽可能的为手头的问题探索不同的解决方案,但在通用的方案中,始终选择最专门化的一个。

分组

分组示例:

        //在鱼类型分类
        Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));
        
        //把热量不到400卡路里的菜划分为“低热量”(diet),热量400到700卡路里的菜划为“普通”(normal),高于700卡路里的划为“高热量”(fat)。
        Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
                groupingBy(dish -> {
                    if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                    else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                    else return CaloricLevel.FAT;
                }));

多级分组:

  Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishedByTypeAndCaloricLevel = menu.stream().collect(
                groupingBy(Dish::getType,
                        groupingBy((Dish dish) -> {
                            if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                            else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                            else return CaloricLevel.FAT;
                        })
                )
        );

多级分组过程:


多级分组.png

按子组收集数据

        //每类菜有多少个
        Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));

        //每类菜的总卡路里数
        Map<Dish.Type, Integer> sumCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
                summingInt(Dish::getCalories)));

        //每类菜的最高热量的菜
        Map<Dish.Type, Optional<Dish>> mostCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
                maxBy(Comparator.comparingInt(Dish::getCalories))));

        //每类菜的最高热量的菜-去除Optional
        Map<Dish.Type, Dish> mostCaloricDishesByTypeWithoutOptional = menu.stream().collect(
                groupingBy(Dish::getType,
                        collectingAndThen(
                                maxBy(Comparator.comparingInt(Dish::getCalories)),
                                Optional::get)));

分区

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组,false是一组。

        Map<Boolean, List<Dish>> partitionByVegeterian = menu.stream().collect(partitioningBy(Dish::isVegetarian));

结果:{false=[pork, beef, chicken, prawns, salmon], true=[French fries, rice, season fruit, pizza]}

并行流

可以通过对收集源调用parallelStream方法来把集合转换为并行流。
并行流内部使用了默认的ForkJoinPool(分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
// 使用这个属性可以修改默认的线程数
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”20”);
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。

public static long parallelSum(List<Long> list) {
    return list.stream().parallel().reduce(Long::sum).get();
}

使用并行流时注意事项:
1、要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高
2、尽量不要有共享变更的修改,如果有,要注意线程安全(加锁的话,会出现中午执行)

上一篇下一篇

猜你喜欢

热点阅读