[컴] spark cluster mode 에서 yarn 을 사용할 때 최적의 설정값 계산

spark/ spark-summit /최적화

spark cluster mode 에서 yarn 을 사용할 때 최적의 설정값 계산

ref. 1,2 의 일부 번역이다.

하나의 application 에 있는 모든 spark executor 는 같은 수의 고정된 core 수와 같은 크기의 고정된 heap size를 갖는다.

이 고정된 core 수는 --executor-cores parameter 나 spark.executor.cores 로 설정할 수 있고, heap size 는 --executor-memoryspark.executor.memory로 알 수 있다.

--executor-cores 5 라고 하면, 한 executor 가 동시에 최대 5개의 task 를 수행할 수 있는 것이다. heap size 는 “Spark 가 cache 할 수 있는 data 양”, grouping, aggregations, joins 등에 사용되는 “shuffle 자료구조들의 최대 사이즈들” 에 영향을 준다.

--num-executorsspark.executor.instances 는 spark application 이 요청할 수 있는 executor 들의 수를 조정한다.

  • 한 executor 의 core 수
    • --executor-cores parameter
    • spark.executor.cores in spark-defaults.conf
  • 한 executor 의 heap size
    • --executor-memory
    • spark.executor.memory
  • spark application 이 요청할 수 있는 executor 들의 수
    • --num-executors
    • spark.executor.instances

YARN과 연계된 설정

관련된 YARN 속성들은 다음 2가지 이다.

  • yarn.nodemanager.resource.memory-mb : 각 노드의 모든 container들이 사용하는 메모리 합의 최대값을 정한다.
  • yarn.nodemanager.resource.cpu-vcores : 각 노드의 모든 container들이 사용하는 cores 개수 합의 최대값을 정한다.

executor core 5개는 YARN 에게 5개의 가상 코어를 요청할 것이다.

메모리 요청은 좀 더 복잡하다.

  • --executor-memory/spark.executor.memory 는 executor heap size 를 조정하지만, JVM들이 heap 의 메모리 일부를 사용할 수 있다. 예를 들어, interend String 들과 direct byte buffer들에서 사용할 수 있다.
    • 각 executor 가 Yarn 에게 요청하는 full memory 양은 executor memory 와 memoryOverhead 의 합이다. 그래서 executor memory 를 정할 때, spark.yarn.executor.memoryOverhead 를 위한 값을 남겨줘야 한다. 기본값은 spark.executor.memory * 7% 이며, 최소치는 384 이다.(max(384, spark.executor.memory * 7%))
  • YARN 은 아마 요청된 메모리보다 살짝 반올림할 것이다. YARN의 yarn.scheduler.minimum-allocation-mb 은 요청으로 온 값의 최소값을 정해주고 yarn.scheduler.increment-allocation-mb 속성은 증가치(increment) 값을 조정할 수 있게 해준다.

그림

  • application master(AM) 이 녀석은 executor 가 아니다.(non-executor container) AM 은 YARN에게 container 를 요청할 수 있다. 이 AM도 resource 를 사용한다. 이것도 예산에 넣어야 한다.
    • yarn-client mode 에서 AM 의 기본 스펙은 1 vCore/ 1024MB 이다.
    • yarn-cluster mode 에서 AM 은 driver 를 실행한다. 그래서 --driver-memory--driver-cores 속성으로 사용하는 resource 양을 늘릴 수 있다.
  • 너무 많은 메모리를 이용해서 executor 들을 실행하는 것은 과도한 garbage collection delay 를 일으키게 된다. 대략적인 추측으로, 64GB는 하나의 executor 에 대한 좋은 상단 limit 이다.(즉, 하나의 executor 는 64GB 이하의 memory 를 사용하는 것이 좋다.)
  • HDFS client 는 많은 thread 가 동시에 수행되는 것에 대해 문제가 있다. 대략적은 추측은, 하나의 executor 에 5개 이하의 task 에서 완전한 write 처리량(full write throughput)을 달성할 수 있다. 그래서 executor 당 core 수를 5개 밑으로 유지하는 것이 좋다.
    • ref. 2 에서는 core 수를 4개가 제일 좋다고 이야기한다. 4개까지는 1개를 늘릴때마다 100% 증가한다면, 5번째는 30~50%의 향상만 있었다고 한다.(spark 1.5이전 버전)
  • 아주 작은 executor들을 실행하는 것은 하나의 JVM 에서 여러개의 task들을 실행하는 데서 오는 이득을 버리게 된다.

예시

6개의 node가 있는 cluster 가 있고, 각 node는 NodeManager 가 돌고 있다. 각 노드는 16 cores/64GB meory 이다. 각 NodeManager 가용량은 15 core / 63GB 가 될 것이다.(yarn.nodemanager.resource.memory-mbyarn.nodemanager.resource.cpu-vcores) 1 core / 1GB memory 를 남기는 이유는 node 마다 ’OS’와 ’Hadoop 데몬’을 실행하기 위한 리소스가 필요하기 때문이다.

이제 spark 에 option 으로 --num-executors 6 --executor-cores 15 --executor-memory 63G 를 사용하면 될 것 같지만, 아니다.

  • executor memory overhead 부분을 남겨줘야 하고,
  • AM(application manager) 가 node 들중 하나의 core를 사용할 것이다. 그래서 모든 노드가 15core를 사용할 수는 없다.
  • executor 당 15core 는 HDFS I/O 처리량 이 나쁘게 나올 수 있다.(위에서 하나의 executor 가 5개 이하의 task에서 full write throughput 을 달성할수 있다고 했다.)

그래서 --num-executors 17 --executor-cores 5 --executor-memory 19G 이 더 나은 설정이다.

  • 이 설정이 AM이 실행되는 1개의 node 만 2개의 executor을 갖고, 나머지 모든 노들들이 3개의 executor 를 갖게 된다.
  • 메모리는 63GB 을 3개로 나눈값은 21GB 이지만 executor.memoryOverhead 값으로 1.47GB(21 * 0.07) 을 고려해서 빼주자.

shuffle 과 storage [ref. 2]

spark executor 는 2개의 메모리 영역이 있다.

  • storage : caching 을 위한 공간
  • shuffle : pre-shuffle 을 aggregation 하는 공간

이 설정값들은 developer 가 설정할 수 있는 값은 아니다.

executor 를 위해 10GB를 할당받은 경우, JVM 이 10GB 를 사용할 수 있게 되는데, 이때 이중에 몇% 를 storage 로 사용할 것인가를 이 spark.storage.safetyFraction 값으로 정하게 된다. 대체로 약 전체 메모리의 90% 정도를 사용하고, 10% 정도를 ’안전(safety)’을 위해 남겨둔다. 이 ’안전’부분이 Out-of-Memroy 을 막는데 도움을 준다.

spark.shuffle.safetyFraction 이라는 설정값도 있다. 기본값은 80% 이다. 즉 JVM heap의 80%를 사용한다는 이야기다. 이것도 Out-of-memory 이슈를 피하는 데 도움을 준다.

이 safetyFraction 안에서 비율을 조정하게 되는데, 이 때 spark.storage.memoryFractionspark.shuffle.memoryFraction 가 사용된다. 기본값은 60%(storage), 20%(shuffle) 이다.

그래서 10gb 를 spark 가 할당받으면, 이중에 실제로 90%정도인 9gb만 사용하게 되고 이중에 60%인 5.4gb 정도를 caching 을 위한 용도(storage)로 사용하게 되는 것이다.

5.4gb = 10gb * 90% * 60%

shuffle 이라는 label 이 붙은 것은 pre-reduce aggregation 을 위해 사용된다. reduceByKey 를 사용하면, local에서 먼저 reduce 가 발생한다. 그리고 결과를 저장하기 위해 shuffle memory 가 사용된다. 만약 local 의 reduce by key 결과가 이 메모리 영역보다 크면 spark 는 disk 에 쓰기 시작한다.

1.6gb = 10gb * 80% * 20%

간단히 보면, executor memory 를 10gb 잡으면 이중 54% 정도는 storage 로 16%는 shuffle 로 쓰인다고 보면 될 듯 하다.

rdd programming 에서 .persist, .cache 라고 사용하면, data 가 memory에 persisted 되거나 cached 된다. rdd.cache()rdd.persist(storageLevel.MEMORY_ONLY) 와 같다. 이 persist method에서 사용하는 option 으로 어느 storage level 에서 저장할 지 정할 수 있다.

  • MEMORY_ONLY : java obejct 로 data 를 저장하는 방법, space efficient 하지 않다. int 나 string 같은 java object 가 많은 다른 packing 들이 되어있다. 하지만 이것이 빠르다. 만약 data를 모두 MEMORY_ONLY 로 cached 할 수 있으면, 이것을 선택하면 된다.
  • MEMORY_ONLY_SER : serialized format 으로 data 를 저장한다. 잠재적으로 더 많은 data 를 memory 에 넣을 수 있게 해준다. 약간의 비용이 있는데, data 가 사용할 때 serialized/deserialized 되어야 한다는 점이다. 그래서 이것은 좀 더 CPU intensive 하다. 이것은 일반적으로 data 를 caching 하는 좋은 방법이다.
  • MEMROY_AND_DISK_SER : 이것은 data 를 serialized format 으로 저장한다. 이것은 MEMORY_ONLY_SER 와 비슷한 장점을 갖고, 거기에 더해서 메모리에 맞지 않는 것이 있을때 local disk 에 serialized format 으로 저장하게 된다. 이것은 heavy processing 이 있거나 또는 caching 이전에 joins/reduces 가 있다면, 사용하면 좋다.
  • OFF_HEAP: 2.0 이상에서 존재
    • Off-heap 은 JVM's Garbage Collector mechanism 에 의해 관리되지 않는다. 그래서 application 에서 처리해줘야 한다. 
    • storage format 으로 구성된 on-heap space 와의 차이는 object 들이 자동으로 JVM 에 의해 serialized/deserialized 되는데, off-heap 에서는 application 이 이 작업을 해줘야 한다.
    • Apache Spark and off-heap memory on waitingforcode.com - articles about Apache Spark

NOTE : spark 에서는 기본 java serializer 가 기본 설정인데 이보다는 Kryo Serializer 가 더 좋다.

cached data 가 얼마나 되는지는 spark web ui 의 storage tab 을 통해 파악할 수밖에 없다. 정확한 size 의 executor 들을 찾는데 spark ui 는 매우 중요하다.

executors 의 크기를 정하는 법[ref. 2]

  1. The JVM heap 은 64GB 보다 크지 않아야 한다. 40 gb 이 적절해 보인다. 이것은 garbage collection 때문이다. 만약 executor가 너무 커지면, GC 는 application performance 을 잡아먹을 것이다. G1C1같은 새로운 GC 방법들은 가능하지만, 이 가이드는 우리가 전통적인 방법을 사용할 것이라는 것을 가정하고 하는 이야기이다.
  2. executor는 4core 를 넘으면 안된다. 시행착오를 통해서 볼때, 4core 일때 가장 완전한 이득을 얻는다. 그래서 1core 가 100% 의 산출량을 갖는다고 한다면, 4 cores 는 400% 가 넘는 산출량(throughput)이 된다. 5 core 에서는 투입한것 대비 이득이 대략 430-450% 정도이다.
  3. 적은수의 large executor 들이 일반적으로 많은 수의 small executor 들보다 낫다. 많은 executors 를 가질수록, network 를 통해서 보내야 하는 data 들이 더 많아 진다. 이것에 대한 하나의 따르라는 경고(caveat)가 있다, 가끔 YARN 으로 부터 하나의 node 에서 4 core 와 40gb 를 얻기가 어렵다. 그래서 만약 cluster 가 highly utilized 라면, job 을 시작하기위해 더 많은 시간을 써야 한다.

업무에 따른 설정[ref. 2]

여러 타입의 업무들이 있다.

  1. ETL
  2. caching 을 하는 Data 분석
  3. 머신러닝

ETL(Extract, Transform, Load)

  • ETL 에서는 일반적으로 제한된 reduce 들이 있고, 많은 job 들이 map only job 들이다. (me: data 변환이 주이기 때문에, 총합등, data 들 전체와 관련된 계산들이 적기 때문에?)
  • 이런 이유로, data locality 를 더 잘 하기 위해서 많은 작은 executor 들을 이용하는 것이 훨씬 낫다.
  • 추가로, ETL에서 caching data 는 자주 일어나지 않는다. 그래서 우리는 executor들의 storage 부분을 완전히 없앨 필요가 있다. 좋은 시작점은 2core 를 가진 많은 4gb executor 들을 요청하는 것이다. 그리고 spark.storage.memoryFraction를 0으로 하고, spark.shuffle.memoryFraction 를 1로 하는 것이다. 이것은 executor 이 그 공간을 join 과 reduce 를 위한 공간으로 사용하게 해준다. (me: caching 은 같은 data 를 가지고 여러 내용을 뽑아낼때, 즉, 같은 map job 을 한 이후 여러 다른 reduce job 을 할때 필요?)
  • 추가로, 우리는 관련된 data set의 총size 를 고려한다. 좋은 시작점은 disk 에 있는 1GB data 당 1gb RAM 을 잡는 것이다. 일반적인 40gb 보다 작은 dataset에서 spark-summit command 는 아래와 비슷할 것이다.
spark-sumbit --class MyClass --executor-memory 4g --executor-cores 2 --num-executors 10 --conf spark.storage.memoryFraction=0 --conf spark.shuffle.memoryFraction=1 /path/to/app.jar

데이터 분석

Caching 을 사용하는 데이터 분석(Data Analysis)

데이터 분석에서는 종종 같은 dataset 들에 대해 여러번 query 한다. 이런 경우, 우리는 caching 을 이용하고, HDFS read 를 제한한다. caching 을 사용한 데이터 분석은 “경험에 바탕을 둔 방법(rules of thumb)” 에 더 의존한다. executor 들의 수에 대한 좋은 시작점은 1GB dataset 에 대해 2gb RAM 을 사용하는 것이다. 40gb dataset 에 대한 spark-submit command 의 예시는 아래 처럼 될 것이다.(me: 각 executor 가 20gb 의 dataset 을 감당?)

spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 /path/to/app.jar

머신러닝(Machine Learning)

Machine Learning 은 실제로 한가지 작은 예외를 두면 “caching 을 사용하는 data analysis” 와 같은 아이디어를 를 이용할 수 있다.

model 들을 building 할 때, applicaiton driver 는 작업에 더 많은 압박을 받는다. driver 에 더 많은 memory/cores 들을 추가하는 것은 application 이 좀 더 부드럽게 동작하게 도와준다. 또한, driver memory 는 당신이 Out-of-memory 에러를 받을 때만 필요하다.(?) 40gb dataset 에 대한 spark-submit command 의 모습은 아래와 비슷할 것이다.

spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 --conf spark.driver.memory=2g --conf spark.driver.cores=2 /path/to/app.jar

실제 확인, spark web ui 의 executor tab

가장 중요한 점은 Spark UI 로 가서 memory 사용을 monitor 하는 것이다. storage tab 은 당신의 application 이 사용하는 resource 의 양을 정하려 할때 중요하다.

spark-shell --master yarn-client --executor-memory 1g --num-executors 2

spark web ui 에서 ‘Executors tab’ 에 가서, shuffle write 와 storage memory 를 잘 봐야 한다.

storage memory 를 보면 1g의 executor-memory 를 요청했는데 530mb 정도만 잡혀있다. 위에서 이야기한 대로 대략 54%정도가 storage memory 로 잡힌다.

shuffle write 는 shuffle operation 전에 메모리보다 data 가 커서 disk 에 쓰여지는 data 의 양이다.(spill to disk)

storage memory 는 각각의 executor 가 caching 을 위해 사용가능한 메모리양이다.(사용량/전체사용가능량)

이 2개의 칼럼들은 우리가 너무 많거나 적은 executor 를 가질는 것을 결정하는 데 도움을 준다.

우리가 많은 양의 shuffle write 를 갖는다면, 우리는 아마도 spark.shuffle.memoryFraction 을 높여야 한다. 중간의 disk 로의 write 를 줄이기 위해서 만약 우리가 cache 를 전부 쓰고 있다면, 우리는 executor size 를 올리기를 원할 수 있다.

이것은 좋은 스파크의 메모리를 관리를 이해하는데 좋은 경험을 준다. 처음에 executor 들의 size를 어떻게 할 것인지 그리고 조정(calibration) 할 것인지에 대해 이해하는데 좋은 경험을 준다.(glimpse)

See Also

  1. 쿠…sal: [컴] spark-submit 과 assembly jar

Reference

  1. How-to: Tune Your Apache Spark Jobs (Part 2) - Cloudera Blog
  2. Spark on YARN - Executor Resource Allocation Optim… - Cloudera Community - 246569, 2016-07 : spark 1.5 이전에 대한 설정

댓글 없음:

댓글 쓰기