ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Java] 스트림과 병렬 처리 - 수집, 병렬 처리
    CSE/Java 2015. 9. 10. 18:07
    스트림과 병렬 처리는 여러 절로 구성되어 있습니다.








    11. 수집(collect())

     스트림은 요소들을 필터링 또는 매핑한 후 요소들을 수집하는 최종 처리 메소드인 collect()를 제공하고 있습니다.


     이 메소드를 이용하면 필요한 요소만 켈렉션으로 담을 수 있고, 요소들을 그룹핑 한 후 집계(리덕션) 할 수 있습니다.




     11. 1 필터링한 요소 수집

      Stream의 collect(Collector<T, A, R> collector) 메소드는 필터링 또는 매핑된 요소들을 새로운 컬렉션에 수집하고, 이 컬렉션을 리턴합니다.




     리턴 타입

     메소드(파라미터) 

     인터페이스 

     R 

     collect(Collector<T, A, R> collector) 

     Stream 



      파라미터인 Collector(수집기)는 어떤 요소를 어떤 컬렉션에 수집할 것인지를 결정합니다.


      Collector의 타입 파라미터 T는 요소이고, A는 누적기(Accumulator)입니다.


      그리고 R은 요소가 저장될 컬렉션입니다.


      풀어서 해석하면 T 요소를 A 누적기가 R에 저장한다는 의미입니다. Collector의 구현 객체는 다음과 같이 Collectors 클래스의 다양한 정적 메소드를 이용해서 얻을 수 있습니다.




     리턴 타입

     Collectors의 정적 메소드 

     설명 

     Collector<T, ?, List<T>> 

     toList() 

     T를 List에 저장 

     Collector<T, ?, Set<T>> 

     toSet() 

     T를 Set에 저장  

     Collector<T, ?, Collection<T>> 

     toCollection(

        Supplier(Collection<T>>

     )

     T를 Supplier가 제공한 Collection에 저장 

     Collector<T, ?, Map<K, U>> 

     toMap(

        Function<T, K> keyMapper,

        Function<T, U> valueMapper

     ) 

     T를 K와 U로 매핑해서 K를 키로, U를 값으로 Map에 저장 

     Collector<T, ?, ConcurrentMap<K, U>> 

     toConcurrentMap(

        Function<T, K> keyMapper,

        Function<T, U> valueMapper

     ) 

     T를 K와 U로 매핑해서 K를 키로, U를 값으로 ConcurrentMap에 저장 




      리턴값인 Collector를 보면 A가 ?로 되어 있는데, 이것은 Collector가 R(컬렉션)에 T(요소)를 저장하는 방법을 알고 있어 A(누적기)가 필요 없기 때문입니다.


      Map과 ConcurrentMap의 차이점은 Map은 스레드에 안전하지 않고, ConcurrentMap은 스레드에 안전합니다. 멀티 스레드 환경에서 사용하려면 ConcurrentMap을 얻는 것이 좋습니다. 다음 코드는 전체 학생 중에서 남학생만 필터링해서 별도의 List를 생성합니다.





    1
    2
    3
    4
    5
    6
    Stream<Student> totalStream = totalList.stream();
    Stream<Student> maleStream = totalStream.filter( s -> s.getSex() == Student.Sex.MALE);
    Collector<Student, ?, List<Student>> collector = Collectors.toList();
     
    List<student> maleList = maleStream.collect(collector);
     
    cs




      전체 학생 List에서 Stream을 얻습니다. 남학생만 필터링해서 Stream을 얻습니다. List에 Student를 수집하는 Collector를 얻습니다. Stream에서 collect() 메소드로 Student를 수집해서 새로운 List를 얻습니다. 상기 코드에서 변수를 생략하면 다음과 같이 간단하게 작성할 수 있습니다.



    1
    2
    3
    4
    List<Student> maleList = totalList.stream()
        .filter(s -> s.getSex() == Student.Sex.MALE)
        .collect(Collectors.toList());
     
    cs



      * ToListExam.java



    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    package stream;
     
    import java.util.Arrays;
    import java.util.List;
    import java.util.Set;
    import java.util.stream.Collectors;
    import java.util.HashSet;
     
    public class ToListExam {
     
        public static void main(String[] args) {
            List<Student> studentList = Arrays.asList(
                    new Student("Jolie"19, Student.Sex.FEMALE),
                    new Student("Anne"21, Student.Sex.FEMALE),
                    new Student("Martin"17, Student.Sex.MALE),
                    new Student("Pierre"15, Student.Sex.MALE),
                    new Student("Garcons"19, Student.Sex.MALE)
            );
            
            // 남학생들만 묶어서 List 생성
            List<Student> maleList = studentList.stream()
                    .filter( s -> s.getSex() == Student.Sex.MALE)
                    .collect(Collectors.toList());
            
            maleList.stream()
                .forEach(s -> System.out.println(s.getName()));
            
            System.out.println();
            
            // 여학생들만 묶어서 HashSet 생성
            Set<Student> femaleSet = studentList.stream()
                    .filter( s -> s.getSex() == Student.Sex.FEMALE)
                    .collect(Collectors.toCollection(HashSet :: new));
            
            femaleSet.stream()
                .forEach(s -> System.out.println(s.getName()));
        }
     
    }
     
    cs






      * Student.java


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
     
    package stream;
     
    public class Student implements Comparable<Student> {
        public enum Sex {
            MALE, FEMALE
        }
     
        public enum City {
            Seoul, Pusan
        }
     
        private String name;
        private int score;
        private Sex sex;
        private City city;
     
        public Student(String name, int score, Sex sex) {
            this.name = name;
            this.score = score;
            this.sex = sex;
        }
     
        public Student(String name, int score, Sex sex, City city) {
            this.name = name;
            this.score = score;
            this.sex = sex;
            this.city = city;
        }
     
        public Sex getSex() {
            return sex;
        }
     
        public void setSex(Sex sex) {
            this.sex = sex;
        }
     
        public City getCity() {
            return city;
        }
     
        public void setCity(City city) {
            this.city = city;
        }
     
        public String getName() {
            return name;
        }
     
        public void setName(String name) {
            this.name = name;
        }
     
        public int getScore() {
            return score;
        }
     
        public void setScore(int score) {
            this.score = score;
        }
     
        @Override
        public int compareTo(Student o) {
            return Integer.compare(score, o.score);
        }
    }
     
    cs









     












     11.2 사용자 정의 컨테이너에 수집하기

      이번에는 List, Set, Map과 같은 컬렉션이 아니라 사용자 정의 컨테이너 객체에 수집하는 방법에 대해 알아보도록 합시다.


      스트림은 요소들을 필터링, 또는 매핑해서 사용자 정의 컨테이너 객체에 수집할 수 있도록 다음과 같이 collect() 메소드를 추가적으로 제공합니다.




     인터페이스

     리턴 타입 

     메소드(파라미터)

     Stream 

     R 

     collect(Supplier<R>, BiConsumer<R, ? super T>, Biconsumer<R, R>) 

     IntStream 

     R 

     collect(Supplier<R>, ObjIntConsumer<R>, Biconsumer<R, R>) 

     LongStream 

     R 

     collect(Supplier<R>, ObjLongConsumer<R>, Biconsumer<R, R>) 

     DoubleStream 

     R 

     collect(Supplier<R>, ObjDoubleConsumer<R>, Biconsumer<R, R>) 



      - 첫 번째 Supplier는 요소들이 수집될 컨테이너 객체(R)을 생성하는 역할을 합니다. 순차 처리(싱글 스레드) 스트림에서는 단 한번 Supplier가 실행되고 하나의 컨테이너 객체를 생성합니다. 병렬 처리(멀티 스레드) 스트림에서는 여러 번 Supplier가 실행되고 스레드별로 여러개의 컨테이너 객체를 생성합니다. 하지만 최종적으로 하나의 컨테이너 객체로 결합합니다.


      - 두 번째 XXXConsumer는 컨테이너 객체(R)에 요소(T)를 수집하는 역할을 합니다. 스트림에서 요소를 컨테이너에 수집할 때마다 XXXConsumer가 실행됩니다.


      - 세 번째 XXXConsumer는 컨테이너 객체(R)를 결합하는 역할을 합니다. 순차 처리 스트림에서는 호출되지 않고, 병렬 처리 스트림에서만 호출되어 스레드별로 생성된 컨테이너 객체를 결합해서 최종 컨테이너 객체를 완성합니다.



     

      리턴 타입 R은 요소들이 최종 수집된 컨테이너 객체입니다. 순차 처리 스트림에서는 리턴 객체가 첫 번째 Supplier가 생성한 객체지만, 병렬 처리 스트림에서는 최종 결합된 컨테이너 객체가 됩니다. 병렬 처리는 다음 절에서 살펴보도록 하고 여기서는 순차 처리를 이용해서 사용자 정의 객체에 요소를 수집하는 것을 살펴보기로 합시다. 학생들 중에서 남학생만 수집하는 MaleStudent 컨테이너가 다음과 같이 정의되어 있다고 가정합시다.



      * MaleStudent.java


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
     
    package stream;
     
    import java.util.ArrayList;
    import java.util.List;
     
    public class MaleStudent {
        private List<Student> list;
     
        public MaleStudent() {
            list = new ArrayList<Student>();
            System.out.println("[" + Thread.currentThread().getName() + "] MaleStudent()");
        }
     
        public void accumulate(Student student) {
            list.add(student);
            System.out.println("[" + Thread.currentThread().getName() + "] accumulate()");
        }
     
        public void combine(MaleStudent other) {
            list.addAll(other.getList());
            System.out.println("[" + Thread.currentThread().getName() + "] combine()");
        }
     
        public List<Student> getList() {
            return list;
        }
     
    }
     
    cs




      이제 스트림에서 읽은 남학생을 MaleStudent에 수집하는 코드를 보도록 합시다.


    1
    2
    3
    4
    5
    6
    7
    8
    9
    Stream<Student> studentStream = studentList.stream();
    Stream<Student> maleStream = studentStream.filter(s -> s.getSex() == Student.Sex.MALE);
     
    Supplier<MaleStudent> supplier = () -> new MaleStudent();
    BiConsumer<MaleStudent, Student> accumulator = (ms, s) -> ms.accumulate(s);
    BiConsumer<MaleStudent, MaleStudent> combiner = (ms1, ms2) -> ms1.combine(ms2);
     
    MaleStudent maleStudent = maleStream.collect(supplier, accumulator, combiner);
     
    cs




      Line 1: 전체 학생 List에서 Stream을 얻습니다.

      Line 2: 남학생만 필터링해서 Stream을 얻습니다.

      Line 4: MaleStudent를 공급하는 Supplier를 얻습니다.

      Line 5: MaleStudent와 Student를 파라미터로 받아서 MaleStudent의 accumulate() 메소드로 Student를 수집하는 BiConsumer를 얻습니다. 

      Line 6: 두 개의 MaleStudent를 파라미터로 받아 combine() 메소드로 결합하는 BiConsumer를 얻습니다.

      Line 8: supplier가 제공하는 MaleStudent에 accumulater가 Student를 수집해서 최종 처리된 MaleStudent를 얻습니다.


      싱글 스레드에서는 combiner는 사용되지 않습니다. 상기 코드에서 변수를 생략하면 다음과 같이 간단하게 작성할 수 있습니다.



    1
    2
    3
    4
    5
    6
    7
    8
    MaleStudent maleStudent = studentList.stream()
        .filter(s -> s.getSex() == Student.Sex.MALE)
        .collect(
            () -> new MaleStudent(),
            (r, t) -> r.accumulate(t),
            (r1, r2) -> r1.combine(r2)
        );
     
    cs




      람다식을 메소드 참조로 변경하면 다음과 같이 더욱 간단하게 작성 할 수 있습니다.



    1
    2
    3
    4
    MaleStudent maleStudent = studentList.stream()
        .filter(s -> s.getSex() == Student.Sex.MALE)
        .collect(MaleStudent :: new, MaleStudent :: accumulate, MaleStudent :: combine);
     
    cs





      다음 예제는 순차 스트림을 이용해서 사용자 정의 컨테이너인 MaleStudent에 남학생만 수집합니다.



     * MaleStudentExam.java





    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
     
    package stream;
     
    import java.util.Arrays;
    import java.util.List;
     
    public class MaleStudentExam {
     
        public static void main(String[] args) {
            List<Student> studentList = Arrays.asList(
                new Student("Man1"11, Student.Sex.MALE),    
                new Student("Tmp1"11, Student.Sex.FEMALE),    
                new Student("Man3"11, Student.Sex.MALE),    
                new Student("Tmp2"11, Student.Sex.FEMALE),    
                new Student("Man5"11, Student.Sex.MALE)     
            );
            
            MaleStudent maleStudent = studentList.stream()
                    .filter(s -> s.getSex() == Student.Sex.MALE)
                    .collect(MaleStudent :: new, MaleStudent :: accumulate, MaleStudent :: combine);
            
            maleStudent.getList().stream()
                .forEach(s -> System.out.println(s.getName()));
        }
     
    }
     
    cs










       실행 결과를 보면 순차 처리를 담당하는 스레드는 main 스레드임을 알 수 있습니다. MaleStudent() 생성자가 딱 한 번 호출되었기 때문에 한 개의 MaleStudent 객체가 생성되었고, accumulate() 가 세 번 호출 되었기 때문에 요소들이 3번 수집되었습니다. 그래서 collect()가 리턴한 최종 MaleStudent에는 남학생 세 명이 저장되어 있는 것을 볼 수 있습니다.





     11.3 요소를 그룹핑해서 수집

      collect() 메소드는 단순히 요소를 수집하는 기능 이외에 컬렉션의 요소들을 그룹핑해서 Map 객체를 생성하는 기능도 제공합니다.


      collect()를 호출할 때 Collectors의 groupingBy() 또는 groupingByConcurrent()가 리턴하는 Collector를 파라미터로 대입하면 됩니다.


      groupingBy()는 스레드에 안전하지 않은 Map을 생성하지만, groupingByConcurrent()는 스레드에 안전한 ConcurrentMap을 생성합니다.


      다음 코드는 학생들을 성별로 그룹핑하고 나서, 같은 그룹에 속하는 학생 List를 생성한 후, 성별을 키로, 학생 List를 값으로 갖는 Map을 생성합니다. 


      collect()의 파라미터로 groupingBy(Function<T, K> classifier)를 사용하였습니다.









    1
    2
    3
    4
    5
    6
    7
    Stream<Student> studentStream = studentList.stream();
     
    Function<Student, Student.Sex> classifier = Student :: getSex;
    Collector<Student, ?, Map<Student.Sex, List<Student>>> collector = 
        Collectors.groupingBy(classifier);
     
    Map<Student.Sex, List<Student>> mapBySex = studentStream.collect(collector);
    cs

      



      Line 1: 전체 학생 List에서 Stream을 얻습니다.

      Line 3: Student를 Student.Sex로 매핑하는 Function을 얻습니다.

      Line 4-5: Student.Sex가 키가 되고, 그룹핑된 List<Student>가 값인 Map을 생성하는 Collector를 얻습니다.

      Line 7: Stream의 collect() 메소드로 Student를 Student.Sex 별로 그룹핑해서 Map을 얻습니다.



      상기 코드에서 변수를 생략하면 다음과 같이 간단하게 작성할 수 있습니다.



    1
    2
    3
    Map<Student.Sex, List<Student>> mapBySex = studentList.stream()
        .collect(Collectors.groupingBy(Student :: getSex));
     
    cs



     

      * GroupingByExam.java



    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
     
    package stream;
     
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
     
    public class GroupingByExam {
     
        public static void main(String[] args) {
            List<Student> list = Arrays.asList(new Student("Kush"40, Student.Sex.MALE, Student.City.Seoul),
                    new Student("Pierre"22, Student.Sex.MALE, Student.City.Seoul),
                    new Student("Jolie"18, Student.Sex.FEMALE, Student.City.Seoul),
                    new Student("Jane"18, Student.Sex.FEMALE, Student.City.Pusan),
                    new Student("Sozi"22, Student.Sex.FEMALE, Student.City.Pusan));
     
            Map<Student.Sex, List<Student>> mapBySex = 
                    list.stream().collect(Collectors.groupingBy(Student::getSex));
            
            System.out.println("[남학생] ");
            mapBySex.get(Student.Sex.MALE).stream()
                .forEach(s -> System.out.println("\t" + s.getName()));
            
            System.out.println("[여학생] ");
            mapBySex.get(Student.Sex.FEMALE).stream()
                .forEach(s -> System.out.println("\t" + s.getName()));
            
            System.out.println();
            
            Map<Student.City, List<String>> mapByCity = list.stream()
                    .collect(
                        Collectors.groupingBy(
                            Student :: getCity,
                            Collectors.mapping(Student :: getName, Collectors.toList())
                        )
                    );
            
            System.out.print("[서울] ");
            mapByCity.get(Student.City.Seoul).stream()
                .forEach(s -> System.out.println("\t" + s));
            
            System.out.print("[부산] ");
            mapByCity.get(Student.City.Pusan).stream()
                .forEach(s -> System.out.println("\t" + s));
            
            
        }
     
    }
     
    cs








     












     11.4 그룹핑 후 매핑 및 집계

      Collectors.groupingBy() 메소드는 그룹핑 후, 매핑이나 집계(평균, 카운팅, 연결, 최대, 최소, 합계)를 할 수 있도록 두 번째 파라미터로 Collector를 가질 수 있습니다.


      이전 예제에서 그룹핑된 학생 객체를 학생 이름으로 매핑하기 위해 mapping() 메소드로 Collector를 얻었습니다.


      Collectors는 mapping() 메소드 이외에도 집계를 위해 다양한 Collector 를 리턴하는 다음과 같은 메소드를 제공하고 있습니다.







     * GroupingAndReductionExam.java


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
     
    package stream;
     
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
     
    public class GroupingAndReductionExam {
     
        public static void main(String[] args) {
            List<Student> list = Arrays.asList(new Student("Kush"40, Student.Sex.MALE, Student.City.Seoul),
                    new Student("Pierre"22, Student.Sex.MALE, Student.City.Seoul),
                    new Student("Jolie"18, Student.Sex.FEMALE, Student.City.Seoul),
                    new Student("Jane"18, Student.Sex.FEMALE, Student.City.Pusan),
                    new Student("Sozi"22, Student.Sex.FEMALE, Student.City.Pusan));
            
            // 성별로 평균 점수를 저장하는 Map 얻기
            Map<Student.Sex, Double> mapBySex = list.stream()
                    .collect(
                        Collectors.groupingBy(
                            Student :: getSex,
                            Collectors.averagingDouble(Student :: getScore)
                    )
                );
            
            System.out.println("남학생 평균 점수: " + mapBySex.get(Student.Sex.MALE));
            System.out.println("여학생 평균 점수: " + mapBySex.get(Student.Sex.FEMALE));
            
            
            // 성별을 쉼표로 구분한 이름을 저장하는 Map 얻기
            Map<Student.Sex, String> mapByName = list.stream()
                    .collect(
                        Collectors.groupingBy(
                            Student :: getSex,
                            Collectors.mapping(Student :: getName, Collectors.joining(","))
                        )
                    );
            
            System.out.println("남학생 전체 이름: " + mapByName.get(Student.Sex.MALE));
            System.out.println("여학생 전체 이름: " + mapByName.get(Student.Sex.FEMALE));
        }
     
    }
     
    cs












    12. 병렬 처리

     병렬 처리(Parallel Operation)란 멀티 코어 CPU 환경에서 하나의 작업을 분할해서 각각의 코어가 병렬적으로 처리하는 것을 말하는 데, 병렬처리의 목적은 작업 처리 시간을 줄이기 위한 것입니다.


     자바 8부터 요소를 병렬 처리할 수 있도록 하기 위해 병렬 스트림을 제공하기 때문에 컬렉션(배열)의 전체 요소 처리 시간을 줄여 줍니다.





     12.1 동시성(Concurrency)과 병렬성(Parallelism)

      멀티 스레드는 동시성 또는 병렬성으로 실행되기 때문에 이 용어들에 대해 정확히 이해하는 것이 좋습니다.


      이 둘은 멀티 스레드의 동작 방식이라는 점에서 동일하지만 서로 다른 목적을 가지고 있습니다.


      동시성은 멀티 작업을 위해 멀티 스레드가 번갈아가며 실행하는 성질을 말하고, 병렬성은 멀티 작업을 위해 멀티 코어를 이용해서 동시에 실행하는 성질을 말합니다.


      싱글 코어 CPU를 이용한 멀티 작업은 병렬적으로 실행되는 것처럼 보이지만, 사실은 번갈아가며 실행하는 동시성 작업입니다.



     




      병렬성은 데이터 병렬성과 작업 병렬성으로 구분할 수 있습니다.




       데이터 병렬성(Data Parallelism)

     데이터 병렬성은 전체 데이터를 쪼개어 서브 데이터들로 만들고 이 서브 데이터들을 병렬 처리해서 작업을 빨리 끝내는 것을 말합니다. 자바 8에서 지원하는 병렬 스트림은 데이터 병렬성을 구현한 것입니다. 멀티 코어의 수만큼 대용량 요소를 서브 요소들로 나누고, 각각의 서브 요소들을 분리된 스레드에서 병렬 처리 시킵니다. 예를 들어 쿼드 코어(Quad Core) CPU일 경우 4개의 서브 요소들로 나누고, 4개의 스레드가 각각의 서브 요소들을 병렬 처리합니다.




       작업 병렬성(Task Parallelism)

     작업 병렬성은 서로 다른 작업을 병렬 처리하는 것을 말합니다. 작업 병렬성의 대표적인 예는 웹 서버입니다. 웹 서버는 각각의 브라우저에서 요청한 내용을 개별 스레드에서 병렬로 처리합니다.







     12.2 포크조인(ForkJoin) 프레임워크

      병렬 스트림은 요소들을 병렬 처리하기 위해 포크조인(ForkJoin) 프레임워크를 사용합니다.


      병렬 스트림을 이용하면 런타임 시에 포크조인 프레임워크가 동작하는데, 포크 단계에서는 전체 데이터를 서브 데이터로 분리합니다. 그리고 나서 서브 데이터를 멀티 코어에서 병렬로 처리합니다.


      조인 단계에서는 서브 결과를 결합해서 최종 결과를 만들어 냅니다. 예를 들어 쿼드 코어 CPU에서 병렬 스트림으로 작업을 처리한 경우, 스트림의 요소를 N개라고 보았을 때 포크 단계에서는 전체 요소를 4등분 합니다. 그리고 1등분씩 개별 코어에서 처리하고 조인 단계에서는 3번의 결합과정을 거쳐 최종 결과를 산출합니다.








      병렬 처리 스트림은 실제로 포크 단계에서 그림처럼 차례대로 요소를 4등분 하지 않습니다. 이해하기 쉽도록 하기 위해 위 그림은 차례대로 4등분 했지만, 내부적으로 서브 요소를 나누는 알고리즘이 있습니다. 포크조인 프레임워크는 포크와 조인 기능 이외에 스레드풀인 ForkJoinPool을 제공합니다. 각각의 코어에서 서브 요소를 처리하는 것은 개별 스레드가 해야 하므로 스레드 관리가 필요합니다. 포크조인 프레임워크는 ExecutorService의 구현 객체인 ForkJoinPool을 사용해서 작업 스레드를 관리합니다. 


















     12.3 병렬 스트림 생성

       병렬 처리를 위해 코드에서 포크조인 프레임워크를 직접 사용할 수는 있지만, 병렬 스트림을 이용할 경우에는 백그라운드에서 포크조인 프레임워크가 사용되기 때문에 개발자는 매우 쉽게 병렬 처리를 할 수 있습니다. 병렬 스트림은 다음 두 가지 메소드로 얻을 수 있습니다.






      parallelStream() 메소드는 컬렉션으로부터 병렬 스트림을 바로 리턴합니다. 


      parallel() 메소드는 순처 처리 스트림을 병렬 처리 스트림으로 변환해서 리턴합니다. 


      어떤 방법으로 병렬 스트림을 얻더라도 이후 요소 처리 과정은 병렬 처리됩니다. 내부적으로 전체 요소를 서브 요소들로 나누고, 이 서브 요소들을 개별 스레드가 처리합니다. 서브 처리 결과가 나오면 결합해서 마지막 최종 처리 결과를 리턴합니다.


      * MaleStudentExam.java


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package stream;
     
    import java.util.Arrays;
    import java.util.List;
     
    public class MaleStudentExam {
     
        public static void main(String[] args) {
            List<Student> studentList = Arrays.asList(
                new Student("Man1"11, Student.Sex.MALE),    
                new Student("Tmp1"11, Student.Sex.FEMALE),    
                new Student("Man3"11, Student.Sex.MALE),    
                new Student("Tmp2"11, Student.Sex.FEMALE),    
                new Student("Man5"11, Student.Sex.MALE)     
            );
            
            MaleStudent maleStudent = studentList.parallelStream()
                    .filter(s -> s.getSex() == Student.Sex.MALE)
                    .collect(MaleStudent :: new, MaleStudent :: accumulate, MaleStudent :: combine);
            
            maleStudent.getList().stream()
                .forEach(s -> System.out.println(s.getName()));
        }
     
    }
     
    cs









      실행 결과를 보면 main 스레드와 ForkJoinPool에서 3개의 스레드가 사용되어 총 4개의 스레드가 동작합니다.


      각각의 스레드가 하나의 서브 작업이라고 본다면 총 4개의 서브 작업으로 분리되었다고 생각하면 됩니다.


      각 서브 작업은 남학생을 누적시킬 MaleStudent 객체를 별도로 생성하기 때문에 MaleStudent() 생성자가 5번 실행되었습니다.


      하지만 전체 학생 중에서 남학생이 3명 밖에 없으므로 accumulate()는 3번밖에 호출되지 않았습니다. 누적이 완료된 5개의 객체는 MaleStudent 객체는 4번의 결합으로 최종 MaleStudent가 만들어지므로 combine() 메소드가 4번 호출되었습니다.









     12.4 병렬 처리 기능

      스트림 병렬 처리가 스트림 순차 처리보다 항상 실행 성능이 좋다고 판단해서는 안 됩니다. 병렬 처리에 영향을 미치는 다음 3가지 요인을 잘 살펴보아야 합니다.



       요소의 수와 요소당 처리 시간

     컬렉션에 요소의 수가 적고 요소당 처리 시간이 짧으면 순차 처리가 오히려 병렬 처리보다 빠를 수 있습니다. 병렬 처리는 스레드풀 생성, 스레드 생성이라는 추가적인 비용이 발생하기 때문입니다.




       스트림 소스의 종류

     ArrayList, 배열은 인덱스로 요소를 관리하기 때문에 포크 단계에서 요소를 쉽게 분리할 수 있어 병렬 처리 시간이 절약됩니다. 반면에 HashSet, TreeSet은 요소 분리가 쉽지 않고, LinkedList 역시 링크를 따라가야 하므로 요소 분리가 쉽지 않습니다. 따라서 이 소스들은 ArrayList, 배열보다는 상대적으로 병렬 처리가 늦습니다.





       코어(Core)의 수

     싱글 코어 CPU일 경우에는 순차 처리가 빠릅니다. 병렬 스트림을 사용할 경우 스레드의 수만 증가하고 동시성 작업으로 처리되기 때문에 좋지 못한 결과를 보여줍니다. 코어의 수가 많으면 많을수록 병렬 작업 처리 속도는 빨라집니다.



      다음 예제는 work() 메소드의 실행 시간(요소당 처리 시간)을 조정함으로써 순차 처리와 병렬 처리 중 어떤 것이 전체 요소를 빨리 처리하는지 테스트합니다.




     * SequenceVsParallelExam.java


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
     
    package stream;
     
    import java.util.Arrays;
    import java.util.List;
     
    public class SequenceVsParallelExam {
     
        public static void work(int value) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static long testSequencial(List<Integer> list) {
            long start = System.nanoTime();
            list.stream().forEach((a) -> work(a));
            long end = System.nanoTime();
            long runTime = end - start;
            
            return runTime;
        }
        
        public static long testParallel(List<Integer> list) {
            long start = System.nanoTime();
            list.stream().parallel().forEach((a) -> work(a));
            long end = System.nanoTime();
            long runTime = end - start;
            
            return runTime;
        }
        
        public static void main(String[] args) {
            List<Integer> list = Arrays.asList(0123456789101112131415);
            
            long timeSequencial = testSequencial(list);
            
            long timeParallel = testParallel(list);
            
            if (timeSequencial < timeParallel) {
                System.out.println("순차처리가 더 빠름");
            } else {
                System.out.println("병렬처리가 더 빠름");
            }
        }
     
    }
     
    cs






     실행 결과 병렬 처리가 더 빠른것으로 판명났습니다. 다음 예제는 스트림 소스가 ArrayList인 경우와 LinkedList일 경우 대용량 데이터의 병렬 처리 성능을 테스트한 것입니다.




      * ArrayListVsLinkedListExam.java

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
     
    package stream;
     
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
     
    public class ArrayListVsLinkedListExam {
     
        public static void work(int value) {
        }
     
        public static long testParallel(List<Integer> list) {
            long start = System.nanoTime();
            list.stream().parallel().forEach((a) -> work(a));
            long end = System.nanoTime();
            long time = end - start;
     
            return time;
        }
     
        public static void main(String[] args) {
            List<Integer> arrayList = new ArrayList<Integer>();
            List<Integer> linkedList = new LinkedList<Integer>();
     
            for (int i = 0; i < 1000000; i++) {
                arrayList.add(i);
                linkedList.add(i);
            }
     
            long arrListListParallel = testParallel(arrayList);
            long linkListListParallel = testParallel(linkedList);
     
            arrListListParallel = testParallel(arrayList);
            linkListListParallel = testParallel(linkedList);
     
            if (arrListListParallel < linkListListParallel) {
                System.out.println("ArrayList처리가 더 빠름");
            } else {
                System.out.println("LinkedList처리가 더 빠름");
            }
     
        }
     
    }
     
    cs




     






     * 이 포스트은 서적 '이것이 자바다' 를 참고하여 작성한 포스트입니다.

    댓글

Designed by Tistory.