더북(TheBook)

6.4.1 스트리밍 DataFrame 생성

 

스트리밍 DataFrameSparkSessionread 메서드 대신 readStream을 호출해 생성한다. readStream 메서드는 DataFrameReader와 거의 비슷한 메서드를 제공하는 DataStreamReader를 반환한다. DataFrameReader와 달리 DataStreamReader는 연속으로 유입되는 스트림 데이터를 읽어 들인다.

6.1절 예제에서 사용한 ch06input 폴더의 파일들을 정형 스트리밍 API로 로드하는 방법을 알아보자. 일단 먼저 DataFrame에 필요한 암시적 메서드들을 임포트해야 한다.

import spark.implicits._

 

다음으로 DataStreamReadertext 메서드를 사용해 폴더의 파일들을 로드하자. (스파크 버전 2.0.0에서 실행하면 5장 예제와 마찬가지로 AlreadyExistsException 오류가 발생한다. 하지만 실습에는 문제가 없다.)

scala> val structStream = spark.readStream.text("ch06input")
structStream: org.apache.spark.sql.DataFrame = [value: string]

 

출력 결과에서 볼 수 있듯이 text 메서드는 단일 칼럼(value)으로 구성된 DataFrame 객체를 반환한다. isStreaming 메서드를 호출해 이 DataFrame이 스트리밍 DataFrame인지 확인할 수 있다.

scala> structStream.isStreaming
res0: Boolean = true
신간 소식 구독하기
뉴스레터에 가입하시고 이메일로 신간 소식을 받아 보세요.