1. 程式人生 > >(2)Stream流編程——Webflux響應式編程利器

(2)Stream流編程——Webflux響應式編程利器

box rate 結果 信息 -m sorted ffffff letter har

Stream流編程

1. 是什麽,不是什麽

是一個高級的叠代器,不是一個數據結構、不是一個集合、不會存放數據、關註的是怎麽把數據高效處理

2. 創建/中間操作/終止操作

1) 創建
技術分享圖片

代碼演示

        List<String> list = new ArrayList<>();

        // 從集合創建
        list.stream();
        list.parallelStream();

        // 從數組創建
        Arrays.stream(new int[] { 2, 3, 5 });

        // 創建數字流
        IntStream.of(1, 2, 3);
        IntStream.rangeClosed(1, 10);

        // 使用random創建一個無限流
        new Random().ints().limit(10);
        Random random = new Random();

        // 自己產生流
        Stream.generate(() -> random.nextInt()).limit(20);

2) 中間操作
技術分享圖片

        String str = "my name is AlgerFan";

        System.out.println("--------------filter------------");
        // 把每個單詞的長度調用出來
        Stream.of(str.split(" ")).filter(s -> s.length() > 2)
                .map(String::length).forEach(System.out::println);

        System.out.println("--------------flatMap------------");
        // flatMap A->B屬性(是個集合), 最終得到所有的A元素裏面的所有B屬性集合
        // intStream/longStream 並不是Stream的子類, 所以要進行裝箱 boxed
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
                .forEach(i -> System.out.println((char) i.intValue()));

        System.out.println("--------------peek------------");
        // peek 用於debug. 是個中間操作,和 forEach 是終止操作
        Stream.of(str.split(" ")).peek(System.out::println)
                .forEach(System.out::println);

        System.out.println("--------------limit------------");
        // limit 使用, 主要用於無限流
        new Random().ints().filter(i -> i > 100 && i < 1000).limit(5)
                .forEach(System.out::println);

3) 終止操作
技術分享圖片

        String str = "my name is AlgerFan";

        System.out.println("-------並行流parallel--------");
        // 使用並行流
        str.chars().parallel().forEach(i -> System.out.print((char) i));
        System.out.println();
        // 使用 forEachOrdered 保證順序
        str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
        System.out.println();

        System.out.println("-------collect收集到list--------");
        // 收集到list
        List<String> list = Stream.of(str.split(" "))
                .collect(Collectors.toList());
        System.out.println(list);

        System.out.println("-------使用 reduce 拼接字符串--------");
        // 使用 reduce 拼接字符串
        Optional<String> letters = Stream.of(str.split(" "))
                .reduce((s1, s2) -> s1 + "|" + s2);
        System.out.println(letters.orElse(""));

        System.out.println("-------帶初始化值的reduce--------");
        // 帶初始化值的reduce
        String reduce = Stream.of(str.split(" ")).reduce("",
                (s1, s2) -> s1 + "|" + s2);
        System.out.println(reduce);

        System.out.println("-------計算所有單詞總長度--------");
        // 計算所有單詞總長度
        Integer length = Stream.of(str.split(" ")).map(s -> s.length())
                .reduce(0, (s1, s2) -> s1 + s2);
        System.out.println(length);

        System.out.println("-------max 的使用--------");
        // max 的使用
        Optional<String> max = Stream.of(str.split(" "))
                .max((s1, s2) -> s1.length() - s2.length());
        System.out.println(max.get());

        System.out.println("-------使用 findFirst 短路操作--------");
        // 使用 findFirst 短路操作
        OptionalInt findFirst = new Random().ints().findFirst();
        System.out.println(findFirst.getAsInt());

3. 並行流

以上已經接觸了parallel()並行流,能夠多線程的處理數據

4. 收集器

示例代碼:

        // 測試數據
        List<Student> students = Arrays.asList(
                new Student("小明", 10, Gender.MALE, Grade.ONE),
                new Student("大明", 9, Gender.MALE, Grade.THREE),
                new Student("小白", 8, Gender.FEMALE, Grade.TWO),
                new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
                new Student("小紅", 7, Gender.FEMALE, Grade.THREE),
                new Student("小黃", 13, Gender.MALE, Grade.ONE),
                new Student("小青", 13, Gender.FEMALE, Grade.THREE),
                new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
                new Student("小王", 6, Gender.MALE, Grade.ONE),
                new Student("小李", 6, Gender.MALE, Grade.ONE),
                new Student("小馬", 14, Gender.FEMALE, Grade.FOUR),
                new Student("小劉", 13, Gender.MALE, Grade.FOUR));

        // 得到所有學生的年齡列表
        // s -> s.getAge() --> Student::getAge , 不會多生成一個類似 lambda$0這樣的函數
        Set<Integer> ages = students.stream().map(Student::getAge)
                .collect(Collectors.toCollection(TreeSet::new));
        System.out.println("所有學生的年齡:" + ages);

        // 統計匯總信息
        IntSummaryStatistics agesSummaryStatistics = students.stream()
                .collect(Collectors.summarizingInt(Student::getAge));
        System.out.println("年齡匯總信息:" + agesSummaryStatistics);

        // 分塊
        Map<Boolean, List<Student>> genders = students.stream().collect(
                Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
        System.out.println("男女學生列表:" + genders);

        // 分組
        Map<Grade, List<Student>> grades = students.stream()
                .collect(Collectors.groupingBy(Student::getGrade));
        System.out.println("學生班級列表:" + grades);

        // 得到所有班級學生的個數
        Map<Grade, Long> gradesCount = students.stream().collect(Collectors
                .groupingBy(Student::getGrade, Collectors.counting()));
        System.out.println("班級學生個數列表:" + gradesCount);

測試結果

所有學生的年齡:[6, 7, 8, 9, 10, 13, 14]
年齡匯總信息:IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女學生列表:{false=[[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小紅, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE], [name=小紫, age=9, gender=FEMALE, grade=TWO], [name=小馬, age=14, gender=FEMALE, grade=FOUR]], true=[[name=小明, age=10, gender=MALE, grade=ONE], [name=大明, age=9, gender=MALE, grade=THREE], [name=小黃, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE], [name=小劉, age=13, gender=MALE, grade=FOUR]]}
學生班級列表:{FOUR=[[name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小馬, age=14, gender=FEMALE, grade=FOUR], [name=小劉, age=13, gender=MALE, grade=FOUR]], ONE=[[name=小明, age=10, gender=MALE, grade=ONE], [name=小黃, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE]], THREE=[[name=大明, age=9, gender=MALE, grade=THREE], [name=小紅, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE]], TWO=[[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小紫, age=9, gender=FEMALE, grade=TWO]]}
班級學生個數列表:{FOUR=3, ONE=4, THREE=3, TWO=2}

5. 運行機制

演示一個測試代碼
        Random random = new Random();
        // 隨機產生數據
        Stream<Integer> stream = Stream.generate(random::nextInt)
                // 產生300個 ( 無限流需要短路操作. )
                .limit(300)
                // 第1個無狀態操作,print(s)執行耗時操作5s
                .peek(s -> print("peek: " + s))
                // 第2個無狀態操作
                .filter(s -> {
                    print("filter: " + s);
                    return s > 1000000;
                })
                // 有狀態操作
                /*.sorted((i1, i2) -> {
                    print("排序: " + i1 + ", " + i2);
                    return i1.compareTo(i2);
                })*/
                // 又一個無狀態操作
                .peek(s -> {
                    print("peek2: " + s);
                });

        // 終止操作
        stream.count();

分析以上代碼,發現Stream創建了一個256長度的數組

  1. 所有操作是鏈式調用, 一個元素只叠代一次
  2. 每一個中間操作返回一個新的流. 流裏面有一個屬性sourceStage
    指向同一個 地方,就是Head
  3. Head->nextStage->nextStage->... -> null
  4. 有狀態操作會把無狀態操作階段,單獨處理
  5. 並行環境下, 有狀態的中間操作不一定能並行操作.
  6. parallel/ sequetial 這2個操作也是中間操作(也是返回stream)
    但是他們不創建流, 他們只修改 Head的並行標誌

(2)Stream流編程——Webflux響應式編程利器