더북(TheBook)

6.1.4.1 데이터 파싱

예제 데이터를 처리하려면 먼저 파일의 각 줄을 다루기 쉬운 객체(예: 스칼라의 케이스 클래스)로 변환해야 한다. 주문 데이터를 담을 Order 클래스를 다음과 같이 정의하자.

scala> import java.sql.Timestamp
scala> case class Order(time: java.sql.Timestamp, orderId:Long, clientId:Long, symbol:String, amount:Int, price:Double, buy:Boolean)

 

주문 시각 필드에는 java.sql.Timestamp 클래스를 사용했다. Timestamp 클래스는 스파크 DataFrame이 기본으로 지원하는 데이터 타입이므로 DStream에서 DataFrame을 만들 때도 이 Order 클래스를 바로 사용할 수 있다.

그런 다음 filestream DStream의 각 줄을 파싱하고 Order 객체로 구성된 DStream을 생성해야 한다. 이 작업을 완수하는 방법은 다양하지만, 예제에서는 DStream의 모든 RDD를 각 요소별로 처리하는 flatMap 변환 연산자를 사용한다. 유사한 기능의 map 연산자 대신 flatMap을 사용하는 이유는 포맷이 맞지 않는 데이터를 건너뛰기 위해서다. flatMap에 전달할 매핑 함수는 데이터를 포맷대로 파싱할 수 있으면 해당 데이터를 리스트에 담아 반환하고, 파싱이 실패하면 빈 리스트를 반환한다.

신간 소식 구독하기
뉴스레터에 가입하시고 이메일로 신간 소식을 받아 보세요.