더북(TheBook)

지금까지 설명한 내용을 간단한 예제로 이해해 보자. 이 예제는 Partitioner를 제거한 후 셔플링이 일어나는지 알아본 4.2.2절 예제와 유사하다. 코드는 다음과 같다.

val list = List.fill(500)(scala.util.Random.nextInt(10))
val listrdd = sc.parallelize(list, 5) ---- 무작위로 생성한 정수 배열을 분산해 파티션 다섯 개로 구성된 RDD를 생성한다.
val pairs = listrdd.map(x => (x, x*x)) ---- RDD를 Pair RDD로 매핑한다.
val reduced = pairs.reduceByKey((v1, v2)=>v1+v2) ---- 각 키별로 RDD 값을 합산한다.
val finalrdd = reduced.mapPartitions( ---- RDD의 각 파티션을 매핑해 키-값 쌍 내용을 문자열로 구성한다.
              iter => iter.map({case(k,v)=>"K="+k+",V="+v}))
finalrdd.collect()

 

예제가 딱히 실용적이거나 실제로 유용하지는 않지만 RDD 의존 관계의 개념을 설명하기에 충분하다. 예제에서 생성한 RDD 계보(즉, DAG)는 그림 4-4와 같다. 둥근 사각형은 각 RDD의 파티션을 의미한다. RDD 사이에 그린 굵은 화살표는 RDD를 생성한 변환 연산을 의미한다. 각 변환 연산자는 새로운 RDD(MapPartitionsRDD, ShuffledRDD 등)를 생성하며, 새로운 RDD는 이전 RDD의 자식 RDD가 된다.

▲ 그림 4-4 RDD 의존 관계를 그린 DAG 예

신간 소식 구독하기
뉴스레터에 가입하시고 이메일로 신간 소식을 받아 보세요.