Spark Architecture: Shuffle
Shuffle 이란 무엇일까? 하루에 발생한 전화량을 계산한다고 가정해보자. 이 경우 “day”를 key로 설정할것이고, 각각의 value는 1로 설정할것이다. 그리고 각각의 key에 대해서 value들을 합할것이다. 하지만 데이터를 클러스터에 저장할때, 같은 key에 대한 value들을 어떻게 계산할 수 있을까? 유일한 방법은 같은 key에 대한 value들이 같은 머신에 있도록 하는 방법 뿐이다.
이 주제를 논의하기에 앞서, 이 글에서는 MapReduce naming covention을 따르도록 하겠다. shuffle에서 source executor에서 데이터를 내보내는 task를 “mapper” 라 부르고 target executor로 데이터를 consume하는 task를 “reducer” 라 하고 둘 사이에 발생하는 것을 “shuffle” 이라 부르겠다.
Shuffling은 일반적으로 2가지 중요한 compression parameter가 존재한다 : spark.shuffle.compress
- engine이 shuffle output을 압축할지 말지, spark.shuffle.spill.compress
- intermediate shuffle spill file을 압축할지 말지. 두가지의 기본 설정은 true이다 그리고 spark.io.compression.codec
은 snappy로 설정되어있다.
Spark에는 많은 shuffle 구현방법이 존재한다. 어떤 구현을 사용할지는 spark.shuffle.manager
파라미터로 결정된다. 3가지 옵션이 존재한다 : hash,sort,tungsten-sort 그리고 “sort” 옵션이 기본값(>= spark 1.2.0)이다.
Hash Shuffle
Spark 1.2.0 이전에는 shuffle의 기본 옵션이었다. 많은 단점이 있는데, 파일의 수가 많아 지는것이 원인이다. 각각의 mapper task가 각각의 reducer에 대해서 독립적인 파일을 생성하고, 결과적으로 M*R 개의 파일을 생성한다. 이때 M은 mapper의 수이고 R은 reducer의 수이다. mapper와 reducer 수가 커지면 output buffer size, open file 수, 파일들을 생성하고 삭제하는 속도 등의 관점에서 이는 큰 문제가 된다.
이를 최적화하는 방법이 하나 존재한다, spark.shuffle.consolidateFiles
파라미터로 결정이 되는데 기본값은 false이다. true로 설정이 되면 “mapper” output 파일들이 하나로 모이게 된다. 만약 클러스터가 E개의 executor가 존재하고 그리고 C개의 core가 존재하고 각각의 task가 T개의 CPU를 요청한다면, execution slot의 수는 E*C/T
가 될것이다. 그리고 shuffle 하는 동안 생성되는 파일 수는 E*C/T*R
이다. 만약 100개의 executor가 존재하고 10개의 core가 있고 task당 1개의 core를 할당하고 46000 “reducer” 가 있다고 가정하면, 2 billion 파일수에서 46million 파일 수로 줄일 수 있다. reducer 각각에 대해서 새로운 파일을 생성하는 대신에, output file의 pool 을 생성한다. map task가 데이터를 출력하기 시작하면, 이 pool에서 R개의 파일로 구성된 그룹을 요청한다. 이 작업이 끝나면, R개의 파일로 구성된 그룹을 다시 pool에 돌려준다. 각각의 executor가 C/T task를 병렬로 실행하기 때문에, 오직 C/T개의 output file 그룹만 생성하게 된다. 각각의 group은 R개의 파일로 구성된다. 처음 C/T 개의 “map” task가 끝나면 다음 “map” task는 pool 존재하는 그룹을 재사용하게 된다.
장점
- 빠르다 - sorting이 필요없고 hash table이 유지된다.
- 데이터를 소팅하는 메모리 overhead가 없다.
- IO overhead가 없다 - 데이터는 HDD에 정확히 한번 기록되고 정확히 한번 읽어진다.
단점
- 파티션양이 커질때, 퍼포먼스는 떨어진다.(많은 output file이 생성 되므로)
- 다량의 파일은 random IO에 대한 IO skew를 야기한다. 일반적으로 sequential IO보다 100배 느리다.
그리고 물론 데이터가 파일에 저장될때, 직렬화되어 저장되고 압축된다. 데이터를 읽을때는 반대로 압축해제된 후 역직렬화된다. fetch 쪽에서 중요한 parameter는 spark.reducer.maxSizeInFlight
(기본 48MB) 이고 원격 executor 각각의 reducer에서 요청하는 데이터의 양을 의미한다. 이 사이즈는 executor에 의해 정확히 5개의 병렬 요청으로 나누어지고, 이는 처리속도를 증가시키기 위한것이다. 이 사이즈를 증가시키면, reducer는 map task output의 데이터를 더 큰 단위로 요청할것이다. 이는 성능을 개선하지만, reducer process의 메모리 사용량 또한 증가시킨다.
만약 reducer측에서 레코드 순서가 중요하지 않다면, reducer는 map output에 대한 의존을 가진 iterator를 리턴하지만 순서가 중요하다면, 모든 데이터를 가져와서 ExternalSorter를 사용해서 정렬할것이다.
Sort Shuffle
Spark 1.2.0 부터 기본 shuffle 알고리즘이다. 일반적으로, Hadoop MapReduce에 사용되는 shuffle logic을 구현하기 위한 시도이다. hash shuffle에서는 reducer 각각에 대해서 하나의 독립적 파일을 생성하지만, sort shuffle은 reducer id로 정렬된 하나의 파일을 생성한다. 이 방식으로 “reducer x”와 연관된 데이터 블록의 주소를 읽어서 한번의 fseek을 하여 데이터 chunk를 읽어들일 수 있다. 물론 reducer의 수가 작을때는, 독립적 파일에 해싱하는것이 sorting 보다는 빠를 것이다, 그래서 sort shuffle은 “fallback” plan이 있다 : “reducer”의 수가 spark.shuffle.sort.bypassMergeThreshold
(기본 200) 보다 작으면 fallback plan을 사용하여 데이터를 독립적인 파일로 해싱하고 이 파일들을 단일 파일로 합친다. 이 로직은 BypassMergeSortShuffleWriter 클래스에 구현되어있다.
웃긴사실은 데이터를 “map” 쪽에서 정렬하지만 reduce쪽에서 결과를 머지 하지않는것이다 - 데이터 ordering이 필요한 경우 데이터를 다시 정렬한다. Cloudera는 이 아이디어에 대해 재밌는 입장을 내놓았다. http://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/. 그들은 mapper에서 미리 설정된 데이터를 이용하는 로직을 구현하는 과정을 시작했다. reduce 쪽에서 정렬하는것은 TimSort 를 사용하고 이는 pre-sorted output을 이용하는 훌륭한 알고리즘이다. N개의 원소로 구성된 M개의 sorted array를 merging하는 작업의 시간복잡도는 O(MNlogM)
이다(MinHeap을 사용하는 경우).
만약 전체 “map” output을 저장할 충분한 메모리가 없으면 어떻게 할까? 중간 결과를 disk로 보내야할것이다. spark.shuffle.spill
로 spilling을 활성화할수 있다(기본 활성화). 만약 비활성화 한다면 map output을 보관할 충분한 메모리가 없고, OOM errror가 발생한다.
disk에 저장하기 전에 map output을 저장하는데 사용될수 있는 메모리 양은 “JVM Heap Size” * spark.shuffle.memoryFraction
* spark.shuffle.safetyFraction
값이다. 기본값은 “JVM Heap Size” * 0.2 * 0.8 = “JVM Heap Size” * 0.16이 된다. 만약 한 executor에서 여러 스레드를 실행한다면( spark.executor.cores / spark.task.cpus 값을 1이상으로 설정) task당 “map” output을 저장하는데 사용되는 평균 메모리는 “JVM Heap Size” * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus 가 된다. 2 core 나머지 디폴트 값을 적용시 0.08 * “JVM Heap Size” 가 된다.
Spark은 “map” output data를 메모리에 저장하기 위해서 내부적으로 AppendOnlyMap 구조를 사용한다. Spark은 hash table을 구현한 scala로 자체적으로 구현된 버전을 사용한다. open hashing을 사용하고 quadratic probing 을 사용하여, key와 value를 같은 array에 보관한다. hash function으로 Google Guava library에서 murmur3_32을 사용한다.
hash table을 사용해서 “combiner” 로직을 table에 in place하게 적용할수 있게 된다 - 존재하는 key에 추가된 새로운 값은 기존값과 “combine” logic을 하는데 사용되고 “combine”의 output은 새로운 값으로 저장된다.
spilling이 발생할때, “sorter”를 “AppendOnlyMap”에 저장된 데이터에 대해서 실행하고, TimSort를 실행한다음 이 데이터가 disk로 기록된다.
Sorted output은 spilling이 발생하거나 mapper output이 더이상 없을때 disk로 기록이 된다. 실제로 disk로 기록이 될지는 file buffer cache와 같은 OS의 세팅에 달려있다. spark은 단지 “write” instruction을 보낼 뿐이다.
각각의 spill file은 독립적으로 disk로 쓰여진다, merging은 데이터가 “reducer”에 의해서 요청이 될 때만 수행되고, Hadoop MapReduce에서 일어나는 것처럼 “on-disk merger”를 실행하지 않는다. 여러 spill file들로 부터 dynamic하게 데이터를 수집한후, Java PriorityQueue class에 의해 구현된 Min Heap을 사용해서 머지한다.
장점
- “map” side에서 생성된 파일의 수가 적다.
- random IO 작업이 적게 발생하고, 대부분 sequential write,read
단점
- hashing보다 느리다. bypassMergeThreshold parameter를 클러스터에 맞게 튜닝할 수 있다, 일반적으로 대부분의 클러스터에게 디폴트값은 너무 높게 설정되어있다.
- Spark shuffle의 임시 데이터를 위해서 SSD 드라이브를 사용 할 경우, hash shuffle이 더 좋을 수 있다.
Unsafe Shuffle or Tungsten Sort
spark.shuffle.manager = tungsten-sort 로 설정해서 활성화 할 수 있다. 이 셔플에서 구현된 최적화는 다음과 같다.
- deserialize 과정 없이, serialized binary data에 대해서 직접적으로 동작시킨다. 자신의 데이터를 복사하기 위해서 unsafe memory copy 함수를 사용한다, serialized data에 대해서는 잘 동작한다.
- compressed record pointer와 partition id 배열들을 정렬하는 ShuffleExternalSorter 를 사용한다. record당 8바이트 공간을 사용함으로써, CPU cache와 더 효과적으로 동작한다.
- 레코드들이 deserialize 되지 않으므로, serialized data를 spilling 하는 작업이 직접적으로 수행되지 않는다. (no deserialize-compare-serialized-spill logic)
- shuffle compression codec이 serialized stream의 concatenation을 지원할때 자동으로 Extra spill-merging 최적화가 적용된다. 오직 fast merging이 활성화 되어있을때(
shuffle.unsafe.fastMergeEnabled
), Spark의 LZF serializer에 의해서 지원된다.
다음 단계의 최적화로 이 알고리즘은 off-heap storage buffer 를 사용한다.
각각의 데이터 spill에 대해서, described pointer array를 정렬하고 indexed partition file에 기록하고 이 파일들을 하나의 파일로 머지한다.
장점
- performance 최적화
단점
- mapper 쪽에서 데이터 순서를 다루는것이 아직 불가능
- 아직 off-heap sorting buffer를 제공해주지 않음
- 아직 stable 하지 않음