spark rdd programming
대체로 ref. 1 에 대한 번역이다.
Spark RDD operations
RDD 들은 2개의 operation 을 지원한다.
- transformation : 존재하는 data 로 부터 dataset을 만드는 동작
map
같은 동작- spark 의 모든 transformation 은 lazy operation 이다. dataset 에 어떤 transformation 이 필요한지만 기억해 뒀다가, action 이 ‘driver program 에 return 할 result’ 를 필요로 할때 계산을 시작한다.
- 이렇기에, ‘
map
을 통해 만들어 지는 dataset’ 이 reduce 에서 사용되어지고, driver 로는 단지 reduce 의 결과값만 return 하게 된다. 거대한 mapped dataset 이 아니라. - Transformations
- action : dataset에 대한 연산후에 drive program 에 값을 return 해주는 동작
reduce
같은 동작collect
,count
,first
, …- Actions
기본적으로 각 transformed RDD 는 그것에 대한 action 을 실행할 때마다 다시 계산되어질 수 있다. 그러나 이 RDD 를 persist
method 를 이용해서 memory 에 계속 유지(persist) 할 수 있다. persist
method 가 실행되면 Spark 는 다음에 그것을 query 할 때 더 빠른 access 를 위해서, cluster 에서 element 들을 유지한다. disk 에 RDD 를 저장하는 방법도 제공하고 있고, 여러 node에 복사해 놓는 방법도 제공한다.
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
lineLengths.persist()
val totalLength = lineLengths.reduce((a, b) => a + b)
lineLengths.persist()
를 사용하면, lineLengths 가 처음 계산되는 때에 memory 에 저장하게 된다.(즉, lazy operation 이기에 reduece 에 의해 dataset 이 computed 되는 그 순간)
spark 에 function 넘기기
Spark 의 API 들은 cluster 에서 실행하기 위해 driver program 에서 function 들을 parameter로 넘기는 방법에 크게 의존하고 있다. 이것을 하기 위한 2가지 추천하는 방법
- Anonymous function 을 사용하는 방법 : 짧은 코드에 사용할 수 있다.
- global singleton object 를 만들고, 그 안의 static method 를 이용하는 방법
closure 사용시 주의
Understanding closures | RDD Programming Guide - Spark 3.1.2 Documentation
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
- job 들을 실행하기 위해, Spark 는 RDD operation 들의 processing 을 task 들로 분리한다.
- 각 task들은 executor 에 의해 수행된다.
- 수행에 앞서서 Spark 는 task 의 closure 를 계산한다(computes).
- executor 가 RDD 를 이용한 계산을 하기 위해 closure 의 ‘변수’, ‘method’ 는 executor 들에게 visible 해야만 한다.
key-value pairs
- 몇몇 spark operation 은 “key-value pais 의 RDD” 에서만 가능하다.
- 대표적인 것이 “shuffle” operation 들, 예를 들면 key 에 의한 grouping 또는 aggregating
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
counts.sortByKey() // to sort the pairs alphabetically
counts.collect() // returns an array of objects.
Shuffle operation
셔플을 하는 동안에 무슨 일이 일어나는지 이해해 보자.
reduceByKey
를 한번 보자. 이것을 수행하면, 새로운 RDD 가 만들어진다. 이 RDD 는 하나의 key 에 대한 모든 value 들이 하나의 tuple 로 묶여있다. 이 tuple 은 key 와 그 key와 관련된 모든 value 들에 대한 reduce fuction 의 수행결과 를 갖는다.
문제는 하나의 key 에 대한 모든 값들이 항상 하나의 partition에 있는 것이 아니라는 것이다. 심지어 같은 machine 에 없을 수 있다. 그러나 그것들은 결과를 계산하기 위해 함께 위치해야 한다.
Spark에서, 특정 작업을 위한 필요한 위치에 있기 위해서 데이터는 일반적으로 여러 partiton 들에 분산되지는 않는다. 계산중, 하나의 task 는 하나의 partiton 에서 동적할 것이다. 그래서, 실행하려는 한번의 reduceByKey
reduce task 애서 사용할 모든 data 를 조직하기 위해 Spark 는 all-to-all operation의 수행이 필요하다.
스파크는 모든 키에 대한 값들을 찾기위해 모든 partiton 들에서 읽어야만 한다. 그리고 나서 모든 값들을 가져와서 각 key에 대한 최종 결과를 계산해야 한다. 이것을 shuffle 이라고 한다.
shuffle 을 일으키는 operation들은 다음과 같다.
- repartition, coalesce
- ’ByKey
- join, cogroup
정렬
mapPartitons
을 사용해 각 partition을 sort 할 수 있다.repartitionAnsSortWithinPartitions
은 repartition 하는 동시에 파티션들을 정렬할 수 있다.sortBy
는 globally 정렬된 RDD 를 만들때
성능 영향
- Shuffle(셔플) 은 디스크 I/O, 데이터 직렬화(serialization), 네트워크 I/O를 포함하기 때문에 비용이 많이 든다.
- 셔플에 대한 데이터를 구성하기 위해,
- spark 는 tasks 들의 집합(set)을 만든다.(여기서의 map, reduce는 spark 의 transformation, action 이라고 보면 된다.)
map
task : data 를 조직하기 위한 task 들reduce
task 들: 그것을 aggregate 하기 위한 task들
내부적으로, 개별 map task들에서 나온 결과들은 그들이 fit 하지 않을때 까지, 메모리에 보관된다. 그리고 나서, 이 결과들은 target partition 에 기반해서 정렬되고, 하나의 파일로 쓰여진다(written). reduce side 에서, task들은 관련있는 정렬된 block 들을 읽는다.(tasks read the relevant sorted blocks)
특정 shuffle operation 은 많은 양의 heap memory 를 사용할 수 있다. 왜냐하면, 그들은 records를 전송하기 ‘전’ 또는 ’후’에 records 를 정리하기 위해 in-emory 자료구조들을 사용하기 때문이다.
구체적으로, reduceByKey
그리고 aggregateByKey
가 map side에서 이러한 자료구조를 만들고, ’ByKey operation들은 이것들을 reduce side 에서 만든다.
데이터가 메모리보다 크면, Spark는 이러한 테이블을 disk 로 넘길 것이다. 이것이 디스크 I/O에 대한 추가 오버헤드와 증가된 garbage collection 를 유발한다.
Shuffle은 또한 디스크에 많은 중간 과정 파일(intermediate files)을 생성한다.
Spark 1.3부터 이러한 파일은 해당 RDD 들이 더 이상 사용되지 않고 garbabe collected 될 때까지 보존된다. 이것은 관련된 것들이(lineage)가 다시 계산되는 경우 shuffle 파일을 다시 만들 필요가 없도록 하기 위해서 이다.
application 이 이 RDD들에 대한 reference들을 유지하거나, GC 가 자주 시작되지 않는다면, garbage collection은 오랜 시간 동안 일어나지 않을 수 있다. 이렇게 되면, 오랜시간 수행되는 Spark job 들은 많은 양의 disk 공간을 소모할 수 있다. 임시 저장 디렉토리는 Spark context 를 설정할때 spark.local.dir
로 정할 수 있다.
셔플 동작은 다양한 configuration parameters를 변경하여 맞출 수 있다. (참조: Shuffle Behavior | Configuration - Spark 3.1.2 Documentation)
RDDs, Dataframes, Datasets
RDD, DataFrames, SparkSQL 의 속도 비교
1.pyspark - RDD, DataFrames, Spark SQL: 360-degree compared? - Stack Overflow, 2018 1. Comparing performance of Spark DataFrames API to Spark RDD | Adsquare, 2015 1. Spark RDDs vs DataFrames vs SparkSQL - part 1
See Also
- Difference between DataFrame, Dataset, and RDD in Spark - Stack Overflow, 2015
- 쿠…sal: [컴] RDD programming
댓글 없음:
댓글 쓰기