예를 들어 각 고객이 구매한 제품의 전체 목록을 가져오려면 다음과 같이 aggregateByKey를 호출한다.
scala> val prods = transByCust.aggregateByKey(List[String]())( ---- ❶ zeroValue에 빈 리스트를 지정한다. (prods, tran) => prods ::: List(tran(3)), ---- ❷ 제품을 리스트에 추가한다. (prods1, prods2) => prods1 ::: prods2) ---- ❸ 키가 같은 두 리스트를 이어 붙인다. scala> prods.collect() res0: Array[(String, List[String])] = Array((88,List(47.149.147.123, 74.211.5.196, ...), (82,List(8.140.151.84, 23.130.185.187, ...), ...)
Note
스칼라에서는 ::: 연산자로 두 리스트를 이어 붙일 수 있다.
❶의 zeroValue에는 빈 List를 사용했다. ❷에서 전달된 aggregateByKey의 첫 번째 함수는 RDD의 각 파티션별로 요소를 병합하며, ❸의 두 번째 함수는 최종 결과를 병합한다. 그러나 이 함수의 동작 원리를 제대로 파악하려면 먼저 데이터 파티셔닝을 이해해야 한다.