더북(TheBook)

zipPartitions 변환 연산자의 첫 번째 인자 목록에 preservesPartitioning이라는 선택 인수를 추가로 전달할 수 있다(기본 값: false). 조인 함수가 데이터의 파티션을 보존한다고 판단했다면 이 인수를 true로 설정할 수 있다. 반대로 이 인수를 false로 설정하면 결과 RDD의 Partitioner를 제거하며, 이후 다른 변환 연산자를 실행할 때 셔플링이 발생한다.

간단한 예제로 zipPartitions를 사용하는 방법을 살펴보자. 먼저 두 RDD를 다음과 같이 생성한다. 첫 번째 RDD(rdd1)에는 정수 열 개를 파티션 열 개에 분산 저장하고, 두 번째 RDD(rdd2)에는 문자열 여덟 개를 파티션 열 개에 저장한다. 그런 다음 두 RDD의 파티션을 zipPartitions로 결합해 각 요솟값을 조합한 문자열을 만든다.

scala> val rdd1 = sc.parallelize(1 to 10, 10)
scala> val rdd2 = sc.parallelize((1 to 8).map(x=>"n"+x), 10)
scala> rdd1.zipPartitions(rdd2, true)((iter1, iter2) => {
        iter1.zipAll(iter2, -1, "empty")
        .map({case(x1, x2)=>x1+"-"+x2})
    }).collect()
res1: Array[String] = Array(1-empty, 2-n1, 3-n2, 4-n3, 5-n4, 6-empty, 7-n5, 8-n6, 9-n7, 10-n8)

 

이 코드에서는 스칼라의 zipAll 함수를 사용해 두 Iterator를 결합했다. zipAll 함수를 사용한 이유는 컬렉션의 크기가 달라도 이들을 결합할 수 있기 때문이다. 첫 번째 Iterator가 두 번째보다 요소가 더 많으면 zipAll 함수는 첫 번째 Iterator의 나머지 요소에 empty라는 일종의 모조 값(dummy value)을 결합한다(두 번째 Iterator 요소가 더 많으면 -1을 사용한다). 결과 RDD를 살펴보면 rdd2의 파티션 1번과 6번이 빈 것을 알 수 있다. 이 empty 값을 어떻게 처리할지는 활용 사례마다 다를 것이다. 또 drop이나 flatMap 등 다른 Iterator 함수를 사용해 파티션에 포함된 요소 개수를 변경할 수도 있다. Iterator 함수가 잘 기억나지 않는다면 4.2.4절을 참고하자.

신간 소식 구독하기
뉴스레터에 가입하시고 이메일로 신간 소식을 받아 보세요.