더북(TheBook)

코드 마지막에 있는 stateSnapshots 메서드를 호출하지 않을 경우 mapWithState 메서드의 결과에는 현재 미니배치의 주기 사이에 거래를 주문했던 고객 ID와 누적 거래액만 포함된다. stateSnapshots 메서드는 updateStateByKey와 마찬가지로 전체 상태(즉, 모든 고객 ID)를 포함한 DStream을 반환한다.

StateSpec 객체에는 매핑 함수 외에도 파티션 개수, Partitioner 객체, 초기 상태 값을 담은 RDD, 제한 시간 등을 설정할 수 있다. 특히 초기 상태 값을 지정하는 기능을 잘 활용하면 종료된 스트리밍 작업을 재시작할 때도 종료 전 상태 값을 유지하고 재사용할 수 있다. 예를 들어 예제 애플리케이션에서는 하루의 주식 시장을 마감할 때 고객 목록과 누적 거래액을 저장해 두면, 다음 날에는 전날의 마지막 상태부터 다시 시작해 거래액을 계속 누적할 수 있다.

제한 시간을 설정하는 기능도 꽤 쓸 만하다. 제한 시간을 설정하면 스파크 스트리밍은 이 시간을 초과해 만료(expire)된 특정 상태 값을 삭제한다. 예제 애플리케이션에서는 고객이 접속한 세션의 만료 여부를 계산하는 데 이 기능을 사용할 수 있다. 동일한 기능을 updateStateByKey로 구현하려면, 매핑 함수에서 만료 여부를 직접 계산해야 한다.

마지막으로 StateSpec 객체는 빌더 패턴으로 구현되었다. 다시 말해 StateSpec 메서드를 연달아 호출해 매개변수를 설정할 수 있다.

StateSpec.function(updateAmountState).numPartitions(10).timeout(Minutes(30))

 

mapWithState는 기능뿐만 아니라 성능 면에서도 우수하다. mapWithStateupdateStateByKey보다 키별 상태를 열 배 더 많이 유지할 수 있으며, 처리 속도는 여섯 배나 더 빠르다2(새로 유입된 데이터가 없는 키를 연산에서 제외하는 아이디어가 큰 역할을 했다).

 

2 이 결과는 데이터브릭스에서 측정했다(http://mng.bz/42QD).

신간 소식 구독하기
뉴스레터에 가입하시고 이메일로 신간 소식을 받아 보세요.