[컴] Spark RDD programming

spark 스파크 / rdd 사용법

Spark RDD programming

high level 에서는, 모든 Spark application 은 하나의 driver program 으로 구성된다.

이 drive program 은

  • user 의 main function 을 실행해주고,
  • cluster 에서 다양한 parallel operations(병렬 작업들)을 수행해 준다.

RDD는 element들의 collection 이다. 그리고 병렬로 수행되어지는 cluster 의 node들에 partition되어 있다.(구분되어 나눠져있다.)

RDD 를 만드는 2가지 방법

  1. 작성한 driver program 에 존재하는 collection 을 parallelizing 하거나
  2. 외부 storage system 에 있는 dataset 을 참고한다.
    • 가능한 외부 storage system
      • Hadoop InputFormat 을 지원하는 모든 data source
      • shared filesystem, HDFS, HBase 등

me: 대략적으로 보면, scala나 java 를 이용하는 것이 나을 듯 하다. python 은 지원하지 않는 기능이 있을 수 있다.

parallelize

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

외부의 Dataset 들

spark 는 다음을 지원

  • text file 들을 지원
    • 모든 Spark 의 file-based input method 들은 directory, 압축파일, wildcard 를 지원한다.
  • SequenceFiles
  • Hadoop InputFormat
# python example
# 'data.txt', 'hdfs://test.txt', 's3a://test.txt', ...

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

val distFile = sc.textFile("data.txt")
val distFile = sc.textFile('data.txt')
val distFile = sc.textFile('/my/*.txt')
val distFile = sc.textFile('/my/*.gz')
// java example
JavaRDD<String> distFile = sc.textFile("data.txt");

몇가지 주의할 점들:

  • local filesystem 의 path 를 사용할때는 여러 worker node 들이 같은 경로로 접근한다는 사실을 기억하자. 그러기 때문에 파일을 모든 worker 에 copy 를 하거나, network-mounted shared file system 을 사용해야 한다.
  • 모든 Spark 의 file-based input method 들은 directory, 압축파일, wildcard 를 지원한다.
  • textFile method 는 2번째 optional parameter(minPartitions=None)를 갖는데, ’partition 의 수’를 정할 수 있다. 기본적으로 Spark 는 파일의 각 block 을 하나의 partition 으로 만든다.(HDFS 에서는 block 의 기본값이 128MB 이다.) Note: block 보다 작은 단위로 partition 을 만들 수는 없다.

Spark 의 API 는 다른 여러 data format 들을 지원한다.

  • sparkContext.wholeTextFiles 는 (filename, content) pair 를 return 해준다.
    • 여러개의 작은 text file 들을 가진 directory 를 읽을 때 쓸 수 있다.
    • textFile 은 “각 file 에 있는 line” 당 1개의 record 를 return 해준다.
  • For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text.
  • For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use SparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce).
  • RDD.saveAsObjectFile and SparkContext.objectFile (python 에선 RDD.saveAsPickleFile and SparkContext.pickleFile) 를 이용해서 RDD 를 간단한 format 으로 저장할 수 있다. java, scala 에서는 Avro format 을 사용하는 것이 더 효과적이다.

안티 패턴 spark[ref. 2]

  • 비동기적으로 update 하는 연산: 일괄 분석을 염두에 두고 설계했기 때문에 공유된 데이터를 비동기적으로 update하는 연산(예: 온라인 트랜잭션 처리 등)에는 적합하지 않다(spark streaming 은 다른 이야기다.)
  • 대량의 데이터가 아닌 경우
    • 스파크는 잡(job)과 태스크(task)를 시작하는 데 상당한 시간을 소모하기 때문에 대량의 데이터를 처리하는 작업이 아니라면 굳이 스파크를 사용할 필요가 없다.
    • 소량의 데이터를 처리할 때는 스파크 같은 분산 시스템보다 간단한 관계형 데이터베이스나 잘 짜인 스크립트가 훨씬 더 빠르다.

RDD operations

See Also

  1. Difference between DataFrame, Dataset, and RDD in Spark - Stack Overflow, 2015
  2. https://stackoverflow.com/a/50825051, 2018

Reference

  1. RDD Programming Guide - Spark 3.1.2 Documentation
  2. 스파크를 다루는 기술 Spark in Action, 페타 제체비치, 마르코 보나치 저, 2018

댓글 없음:

댓글 쓰기