| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |
- Kafka
- 개발
- 코딩테스트
- Trino
- bigdata engineering
- Apache Kafka
- Linux
- Spark
- Data Engineer
- pyspark
- bigdata engineer
- HDFS
- 알고리즘
- 영어
- Iceberg
- java
- hadoop
- apache iceberg
- BigData
- Data Engineering
- 백준
- 삼성역맛집
- HIVE
- 코딩
- 여행
- 맛집
- 코엑스맛집
- 자바
- 코테
- 프로그래머스
- Today
- Total
지구정복
[Spark] Structured Streaming 정리 본문
공식문서내용 번역: https://spark.apache.org/docs/latest/streaming/getting-started.html
1. 워드카운트 예제
개념을 배우기 전에 예제를 통해서 무엇인지 간단히 알아본다.
아래와 같이 스파크 세션 생성(파이썬 사용)
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
다음으로 Socket의 9999포트를 통해 데이터를 받는다고 가정하고,
socket 9999포트에서 데이터를 받을 수 있게 Streaming DataFrame을 아래와 같이 만든다.
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
위 코드에서 lines라는 Streaming DataFrame은 스트리밍으로 들어오는 텍스트 데이터를 무한정 받을 수 있다.
이 테이블은 기본적으로 'value'라는 하나의 컬럼을 가지게 된다.
그리고 스트리밍 텍스트 데이터를 value 컬럼의 데이터(하나의 행)가 되게 된다.
이제 Transformation 을 수행한다.
select, alias, explode, split이다.
select는 말 그대로 하나의 컬럼을 조회한다.
alias는 컬럼의 이름을 바꿔준다.
explode는 배열의 요소들을 풀어서 각각의 행으로 만들어준다.
예를들면 [apple, banana, kiwi] 가 있을 때 이를 아래와 같이 만들어준다.
fruits
------
apple
banana
kiwi
split는 특정 문자를 구분으로 단어로 나눠준다.
예를들면 a= "applie is the best fruit" 이란 문장이 있을 경우
split( a, " " ) -> 띄어쓰기로 문장을 나눈다.
new_a = [apple, is, the, best, fruit] 가 만들어진다.
따라서 위 코드에서 words는 lines로 들어오는 스트리밍 텍스트 데이터를 ,(컴마)를 구분자로 split해서 리스트로 만든다음
explode에서 이를 'value'컬럼의 행으로 만들어준 뒤
alias로 'value'컬럼의 이름을 'word'으로 변경한 DataFrame이다.
그 다음에 words 데이터프레임에 groupBy를 'word' 컬럼에 적용하여
같은 word의 count()를 세어서
이 값을 행으로 하는 새로운 데이터 프레임인 wordCounts 를 만들어준다.
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
그리고 위 코드에서 Spark의 lazy execution특성에 따라 outputMode("complete")코드때문에 실제 코드들이 실행된다.
wordCounts 데이터프레임을 Console로 출력해준다.
query.awaitTermination()
위 코드 덕분에 직접 잡을 끝내지 않는이상 해당 스파크 잡은 백그라운드에서 계속 실행된다.
이제 실제 동작을 확인하기 위해 같은 리눅스 서버에서 두 개의 터미널을 켠다.
첫 번째 터미널에서는 socket 9999포트로 스트리밍 데이터 전송을 위해 같은 서버에서 Netcat 리눅스 유틸리티를 실행한다.
$ nc -lk 9999
그리고 두 번째 터미널에서는 아래와 같이 스파크 스트리밍 잡도 실행한다.
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
이제 첫 번째 터미널에서 아래와 같이 스트리밍 데이터를 입력한다.
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
apache spark
apache hadoop
두 번째 터미널의 결과를 보면 다음과 같이 출력될 것이다.
# TERMINAL 2: RUNNING structured_network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
2. Programming Model
2.1. 기본 개념
기본 개념은 스트리밍 데이터프레임을 끊임없이 데이터가 append되는 테이블이라 이해하면 된다.
즉, 배치 데이터를 수집하는 테이블인데 그 시간 간격이 아주 짧다고 생각하면 된다.
이때 스트리밍 데이터프레임에 저장되는 방식은 3가지가 있다.
-Complete Mode: 새로운 스트리밍 데이터가 들어오면 해당 데이터로 데이터프레임이 덮어씌워진다.
-Append Mode: 새로운 스트리밍 데이터가 들어오면 해당 데이터가 데이터프레임의 마지막 행에 추가된다.
-Update Mode: 새로운 스트리밍 데이터가 들어오면 데이터프레임에 해당 데이터가 이미 존재하고 값이 다르다면 새롭게 들어온 데이터로 update된다. 만약 새로운 스트리밍 데이터에 집계함수가 적용되지 않는다면 append모드와 동일하다.
위에서 봤던 word count예제에서는 마지막 데이터 프레임인 'query'는 complete모드이다.
따라서 스트리밍 데이터가 들어와서 wordCounts 데이터 프레임을 거쳐 'query' 데이터프레임으로 오게되면
query 데이터 프레임은 이전 query 데이터프레임을 덮어씌우게 된다.
(단순히 wordCounts 데이터프레임을 출력해주는 용도이므로)
(이미지)
https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png
https://spark.apache.org/docs/latest/img/structured-streaming-model.png
2.2. Event-time과 Late Data 처리방법
스트리밍 데이터 자체에 해당 데이터가 발생한 시간이 적혀있는 경우가 있다.
이때 스트리밍 데이터가 스파크에 도착한 시간을 사용하면 안되고
실제 해당 데이터가 발생한 시점의 데이터를 사용해야 한다.
이런 경우 보통 스트리밍 데이터에 시간과 관련된 열이 있고, 이 열의 데이터를 집계함수를 활용해서 집계할 수 있다.
이때 활용되는 것이 window집계이다.
또한 가끔은 네트워크 등의 문제로 특정 시점의 데이터가 밀려서 들어오는 경우도 있다.
만약 늦게 도착하는 데이터가 무제한으로 있다면, Spark는 과거 모든 윈도우를 계속 열어놓고 상태 정보를 저장해야 한다.
그렇게되면 메모리(또는 디스크) 사용량이 커지고, 결국 리소스 부족으로 오류가 발생할 수 있다.
따라서 너무 늦게 도착하는 데이터의 경우 그냥 버릴 수 있는 기능이 있다.
spark의 watermark이다.
아래와 같이 설정하면 10분 늦게 도착한 데이터는 그냥 버린다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
spark = SparkSession.builder.appName("WatermarkExample").getOrCreate()
# 샘플 데이터 소스 (예: Kafka, 파일 등)
# 여기서는 스트리밍 데이터가 'eventTime'(타임스탬프 타입), 'value' 컬럼을 가진다고 가정
# 스트리밍 데이터프레임 읽기 (예시)
input_stream = spark.readStream.format("socket")\
.option("host", "localhost")\
.option("port", 9999)\
.load()
# 예시를 위해 eventTime과 value 컬럼 분리 가정 (예: CSV 형태)
from pyspark.sql.functions import split, expr
# 문자열 데이터를 "eventTime,value" 형식으로 받는다고 가정 (예: "2024-06-01 10:00:05,10")
split_cols = split(input_stream.value, ",")
events = input_stream.withColumn("eventTime", split_cols.getItem(0).cast("timestamp"))\
.withColumn("value", split_cols.getItem(1).cast("integer"))
# 워터마크를 설정해서, 10분 이상 늦은 데이터는 무시
# 그리고 1분 윈도우로 value를 집계
agg = events.withWatermark("eventTime", "10 minutes")\
.groupBy(window(col("eventTime"), "1 minute"))\
.sum("value")
# 결과 출력
query = agg.writeStream.format("console")\
.outputMode("append")\
.option("truncate", False)\
.start()
query.awaitTermination()
예시
2024-06-01 10:00:05,10
2024-06-01 10:00:30,15
2024-06-01 10:01:10,5
2024-06-01 09:49:00,20 <-- 너무 늦은 데이터 (워터마크 10분보다 더 늦음)
2.3. Fault Tolerance Semantics
각 스트리밍 데이터는 딱 한 번씩만 수집해야 한다.
그래서 보통 스트리밍 데이터의 원천에서 메시지 큐인 kafka나 kinesis 등으로 데이터를 보내고
스파크 스트리밍은 여기서 데이터를 소비한다.
이때 kafka나 Kinesis등은 보통 Offset이란 것이 존재해서 어디까지 스트리밍 데이터가 소비됐는지 표시가 된다.
스파크는 이러한 오프셋이 있는 스트리밍 데이터를 처리할 때 이 오프셋값을 체크포인팅하고 write-ahead logs로 기록해둬서
추후에 다시 재실행했을 때 어디서부턴 읽어서 처리하면 되는지 알 수 있다.
이러한 스트리밍 데이터의 소스(Kafka, kinesis 등)들은 idempotent(멱등성)하게 설계되어있다.
(멱등성: 어떤 연산을 여러 번 적용해도 결과가 변하지 않는 성질
전등 스위치를 '켜짐'상태에서 여러번 눌러도 항상 '켜짐'인 상태)
그리고 이러한 멱등성이 보장되는 데이터 원천과 Spark의 Structured Streaming의 Checkpoint, write-ahead 기능으로 '딱 한 번만 수집한다'를 보장할 수 있다.
3. 사용가능한 APIs와 DataFreames & DataSets 개념
3.1. Streaming DataFrames과 Streaming DataSets 만들기
이를 위해선 스트리밍 스파크 세션을 생성해야 한다.
이때 여러 Input Sources들이 올 수 있다.
-File Source: 특정 디렉터리 안에 있는 스트리밍 데이터를 읽는다.
CSV, JSON, ORC, PARQUET 등의 파일 가능
HDFS&YARN기반의 Spark라면 HDFS디렉터리가 된다.
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
-Kafka : kafka에 있는 Topic과 연결가능하다.
-Socket(For testing): 위 테스트처럼 소켓을 통해 가능하나 테스트용으로만 사용된다.
-rate(For testing): 정해진 행 수의 데이터만 1초당 가져온다.
1. 스키마 추론(Schema inference)
기본적으로, 파일 기반 스트리밍 소스에서는 스파크가 자동으로 스키마를 추론하지 않고, 사용자가 명시적으로 스키마를 지정해야 한다.
이는 스트리밍 쿼리 도중 실패가 발생해도 일관된 스키마를 보장하기 위함이다.
다만, 임시적이고 간단한 경우에는 spark.sql.streaming.schemaInference 설정을 true로 바꾸면 자동 스키마 추론을 다시 사용할 수 있다.
2. 파티션 발견(Partition discovery)
데이터가 /key=value/ 형태의 서브디렉터리로 나뉘어 있을 때, 스파크는 자동으로 이 파티션 디렉터리를 인식하고, 하위 디렉터리까지 재귀적으로 탐색한다.
사용자가 제공한 스키마에 파티션 컬럼(key)이 포함되어 있으면, 파일 경로에 있는 파티션 값(value)을 기반으로 해당 컬럼 값을 채워넣는다.
파티션으로 사용되는 디렉터리 구조는 쿼리가 시작할 때 반드시 존재해야 하며, 실행 도중에는 바뀌면 안 된다.
예를 들어, /data/year=2015/ 디렉터리가 있을 때 /data/year=2016/ 를 새로 추가하는 것은 허용된다.
하지만 파티션 컬럼명을 바꾸는 것은 허용되지 않는다. 예를 들어 /data/date=2016-04-17/ 와 같이 변경하는 것은 잘못된 예이다.
3.2. Streaming DataFrame/DataSets 조작하기
3.2.1. Selection, Projection, Aggregation
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
또한 아래처럼 임시뷰를 만들어서 SQL문법을 사용할 수 있다.
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") # returns another streaming DF
만약 어떤 데이터프레임이 스트리밍 데이터프레임인지 알고 싶다면 아래 명령어 실행
df.isStreaming()
3.2.2. Event Time에 대한 Window Operations
이전 wordcount 예제의 내용을 조금 바꿔서 이제 10분의 Window time, 5분 간격으로 다음 window time으로 넘어가도록 설정했을 때의 워드 개수를 출력해준다고 하자.
즉, 아래처럼 10분의 window 타임이 있다.
10:00~10:10,
10:05~10:15,
10:10~10:20,
10:15~10:25
위와 같이 5분 간격으로 다음 Window로 넘어가서 모든 Window가 겹치게하는 이유는
먼저 5분 간격이 아니라 10분 간격으로 다음 Window로 넘어간다면 경계에 있는 데이터가 누락될 수도 있다.
또한 조금 더 촘촘한 데이터 Aggregation이 가능하다.
(이미지)
https://spark.apache.org/docs/latest/img/structured-streaming-window.png
3.2.3. 늦게 들어오는 데이터(Late Data)와 Watermarking 방법
데이터 처리할 때, 이벤트가 실제 발생한 시간(예: 12:04)에 맞춰 결과를 내야 한다. 그런데 어떤 데이터가 늦게 와서 12:11에 받을 수도 있다. 이럴 때도 늦게 온 데이터의 원래 시간인 12:04에 맞춰 12:00~12:10 구간 결과를 업데이트한다.
문제는 늦은 데이터가 계속 들어오면 시스템이 오래된 데이터를 오래 메모리에 저장해야 해서 부담이 커진다.
그래서 스파크는 워터마크(watermark) 기능을 도입했다. 워터마크는 '얼마나 늦은 데이터까지 처리할지' 기준을 정한다.
https://spark.apache.org/docs/latest/img/structured-streaming-late-data.png
예를 들어, 워터마크를 5분으로 설정하면,
- 12:00~12:10 구간 데이터는 최대 5분 늦게 오는 데이터까지 반영한다.
- 5분보다 더 늦게 오는 데이터는 무시해서 오래된 상태를 메모리에서 지울 수 있다.
즉, 워터마크를 사용하면 늦은 데이터는 일정 시간까지 반영하면서도 시스템 부담은 줄일 수 있다.
설정하는 방법은 아래와 같다.
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()
이때 워터마크의 10분은 마지막 수집된 데이터(max event data) - 10분이다.
이 예제에서는 쿼리의 워터마크를 이벤트 시간 역할을 하는 컬럼인 “timestamp”를 기준으로 지정하고, 데이터가 늦게 도착할 수 있는 허용 시간 한계(watermark)를 10분으로 설정했다.
만약 이 쿼리를 Update 출력 모드(outputMode)로 실행하면, Spark 엔진은 결과 테이블에서 각 윈도우의 집계를 이벤트 시간인 “timestamp” 기준으로 현재 시간보다 10분 늦은 상태(워터마크)까지 계속해서 업데이트한다.
즉, 최신 이벤트 시간에서 10분 전까지의 윈도우는 상태를 유지하며, 늦게 도착하는 데이터가 있어도 집계 결과를 수정해준다. 하지만 그보다 오래된 윈도우는 상태를 정리(clean-up)해서 메모리 부담을 줄이고, 더 이상 업데이트하지 않는다.
이 방식은 늦게 도착하는 데이터도 최대 10분까지는 반영하면서, 불필요하게 오래된 상태를 계속 유지하지 않아 효율적인 스트리밍 처리가 가능하도록 돕는다.
https://spark.apache.org/docs/latest/img/structured-streaming-watermark-update-mode.png
그림에서 보듯이, 엔진이 추적하는 최대 이벤트 시간은 파란 점선으로 표시되고, 워터마크는 매 트리거(trigger) 시작 시점에 '최대 이벤트 시간 - 10분'으로 설정되어 빨간 선으로 표시된다.
예를 들어 스파크에 (12:14, dog) 데이터가 수집되면, 다음 트리거의 워터마크를 12:04로 설정한다.
이 워터마크 덕분에 스파크는 늦게 도착하는 데이터를 10분 더 허용하며, 중간 집계 상태(intermediate state)를 유지할 수 있다.
예를 들어 (12:09, cat) 데이터는 순서가 뒤바뀌고 늦은 데이터지만, 워터마크 허용시간인 12:04 이후에 도착했기 때문에 스파크는 중간 집계 상태를 계속 유지하며, 해당 데이터가 속한 윈도우(12:00 - 12:10, 12:05 - 12:15)의 집계 수치를 올바르게 업데이트한다.
하지만 워터마크가 12:11로 갱신되면, (12:00 - 12:10) 윈도우의 중간 상태는 정리(clean-up)되고, 그 이후 도착하는 모든 데이터(예: (12:04, donkey))는 너무 늦었다고 간주되어 무시된다.
또한, 각 트리거가 끝날 때마다 업데이트된 집계 결과(보라색 행)가 Update 모드에 따라 외부 저장소(sink)에 기록된다.
일부 저장소(예: 파일)는 Update 모드가 요구하는 세밀한 업데이트를 지원하지 않을 수 있다.
이 경우, 최종 집계 결과만 저장하는 Append 모드를 사용하면 된다. 이 부분은 아래 그림으로도 설명된다.
참고로, 비스트리밍 데이터셋(non-streaming Dataset)에 withWatermark를 적용해도 아무런 효과가 없으며(batch 쿼리에는 영향을 미치지 않으므로), 무시된다.
앞서 설명한 Update 모드와 유사하게, 엔진은 각 윈도우별 중간 집계 상태(intermediate counts)를 내부에 유지한다.
다만, 이 중간 집계 값들은 결과 테이블(Result Table)에 즉시 업데이트되지 않고, 외부 저장소(sink)에도 바로 쓰이지 않는다.
엔진은 늦게 도착하는 데이터를 10분 동안 기다렸다가,
워터마크보다 과거인 윈도우의 중간 상태는 정리(clean-up)하고,
그때 최종 집계 결과(final counts)를 결과 테이블과 외부 저장소에 추가(append) 한다.
예를 들어, 12:00 - 12:10 윈도우의 최종 집계 값은 워터마크가 12:11로 갱신된 이후에야 결과 테이블에 추가된다.
즉, 12:10이 지났다고해서 12:00-12:10 윈도우의 집계결과가 테이블에 추가되지 않고,
12:20-12:30 윈도우때 테이블에 실제로 추가된다.
3.2.4. Time Window의 종류
3가지 종류의 time window가 있다.
https://spark.apache.org/docs/latest/img/structured-streaming-time-window-types.jpg
-Tumbling Window
텀블링 윈도우는 고정 크기이며 겹치지 않는다.
즉, 각각의 윈도우는 서로 겹치지 않으며, 하나의 입력 데이터는 오직 하나의 윈도우에만 속한다.
-Sliding Window
슬라이딩 윈도우도 고정된 크기를 가지는 점에서는 텀블링 윈도우와 비슷하다.
하지만 슬라이딩 간격(윈도우가 이동하는 간격)이 윈도우 크기보다 작으면, 여러 윈도우가 서로 겹치게 된다.
이 경우 한 입력 데이터가 여러 개의 윈도우에 동시에 속할 수 있다.
텀블링 윈도우와 슬라이딩 윈도우는 위 예제들에서 설명한 것처럼 window 함수를 사용하여 구현한다.
-Session Window
세션 윈도우는 앞의 두 종류와 달리 크기가 고정되어 있지 않고, 입력 데이터에 따라 동적으로 결정된다.
세션 윈도우는 입력이 처음 도착하면 윈도우가 시작되고, 이후에 입력이 “지정된 시간 간격(gap duration)” 내에 계속 도착하면 윈도우 크기가 확장된다.
만약 이후 지정된 시간 동안 새로운 입력이 없으면 윈도우가 닫힌다.
세션 윈도우는 session_window 함수를 사용하며, 이 함수의 사용법은 window 함수와 매우 유사하다.
events = ... # streaming DataFrame of schema { timestamp: Timestamp, userId: String }
# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window(events.timestamp, "5 minutes"),
events.userId) \
.count()
고정된 값(static value) 대신 입력 행(row)에 따라 동적으로 계산되는 표현식(expression) 으로 gap duration(세션 윈도우의 간격)을 지정할 수도 있다.
이때, gap duration 값이 음수이거나 0인 행들은 집계에서 제외(필터링)된다.
동적 gap duration을 사용할 경우, 세션 윈도우가 닫히는 시점은 더 이상 최신 입력에만 의존하지 않는다.
세션 윈도우의 범위는 쿼리 실행 도중 각 이벤트의 시작 시간(event start time)과 그때 평가된 gap duration 값을 기준으로 산출된 모든 이벤트 구간들의 합집합(union)이다.
즉, 세션 윈도우가 동적으로 확장되고 여러 이벤트가 겹쳐서 하나의 세션 윈도우가 만들어질 수 있다.
from pyspark.sql import functions as sf
events = ... # streaming DataFrame of schema { timestamp: Timestamp, userId: String }
session_window = session_window(events.timestamp, \
sf.when(events.userId == "user1", "5 seconds") \ #user1은 5초 간격 갭 지정
.when(events.userId == "user2", "20 seconds") \ #user2는 20초 간격 갭 지정
.otherwise("5 minutes")) #그 외 사용자는 5분 갭 지정
# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window,
events.userId) \
.count()
위에 주어진 코드에서 세션 윈도우는 userId 값에 따라 세 가지 서로 다른 세션 간격(gap duration) 으로 구분된다.
-user1 은 세션 간격이 5초로 설정된다.
-user2 는 세션 간격이 20초로 설정된다.
-그 외 모든 사용자는 세션 간격이 5분으로 설정된다.
따라서 각각의 사용자 그룹별로 서로 다른 기준의 세션 윈도우가 만들어지고,
각 그룹별(즉, user1 세션, user2 세션, 기타 사용자 세션)로 집계가 진행된다.
세션 윈도우(Session Window)를 스트리밍 쿼리에서 사용할 때 몇 가지 제한 사항이 있다.
-업데이트 모드(Update mode) 출력은 지원되지 않는다.
즉, 세션 윈도우를 사용하는 스트리밍 쿼리는 Update 모드로 결과를 내보낼 수 없다.
-그룹화(grouping) 키에는 반드시 세션 윈도우(session_window) 이외에 최소한 한 개 이상의 추가 컬럼이 포함되어야 한다.
세션 윈도우만 단독으로 그룹화 키에 포함하는 것은 스트리밍에서는 허용되지 않는다.
반면, 배치(batch) 쿼리에서는 세션 윈도우만을 그룹화 키로 하는 ‘글로벌 윈도우(global window)’ 사용이 가능하다.
세션 윈도우 집계는 기본적으로 파티션별로 부분 집계를 안 한다.
부분 집계를 하면 성능이 좋아질 수 있지만, 로컬 파티션에서 추가로 정렬하는 과정이 필요해서 기본으로는 안 켜져 있다.
만약 한 파티션에 같은 그룹 키를 많이 가지고 있으면, 설정을 켜서 부분 집계도 사용하면 성능이 크게 좋아진다.
이 설정이 바로 spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition 이다.
3.2.5. Time Window에서 시간 표현
어떤 상황에서는 윈도우 단위로 모은 데이터에 대해 다시 시간 정보를 꺼내서, 타임스탬프가 필요한 작업을 해야 할 때가 있다.
예를 들어, 5분 단위로 집계한 결과를 다시 1시간 단위로 묶는 연쇄 윈도우 집계(chained time window aggregation) 를 할 때 그렇다.
이를 수행하는 방법은 두 가지가 있다.
-window_time: SQL 함수에 시간 윈도우 컬럼을 인자로 넘겨 사용하는 방법
-window: SQL 함수에 시간 윈도우 컬럼을 인자로 넘겨 사용하는 방법
window_time 함수는 시간 윈도우를 대표하는 시점을 타임스탬프로 변환해 준다.
사용자는 이 결과를 window 함수 등 타임스탬프가 필요한 곳에 넘겨 후속 처리를 할 수 있다.
words = ... # { timestamp: Timestamp, word: String } 스키마를 가진 스트리밍 데이터프레임
# 10분 크기 윈도우, 5분 슬라이드로 단어별 집계
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# 위 결과를 다시 1시간 크기 window로 묶어 집계
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
window 함수는 타임스탬프 컬럼뿐 아니라 시간 윈도우 컬럼 자체도 입력받을 수 있다.
이 기능을 활용하면, 여러 단위의 연쇄 윈도우 집계가 가능하다.
아래 방법이 더 간편하다.
# 위와 같은 예시지만 window_time 대신 window 컬럼을 직접 넣는 방식
anotherWindowedCounts = windowedCounts.groupBy(
window(windowedCounts.window, "1 hour"),
windowedCounts.word
).count()
-워터마크가 집계 상태를 정리하기 위한 조건
워터마크로 집계 상태(aggregation state)를 정리하려면 다음 조건을 만족해야 한다.
1. 출력 모드는 Append 모드 또는 Update 모드여야 한다.
Complete 모드는 모든 결과를 계속 보존해야 해서 워터마크로 상태를 정리할 수 없다.
2. 집계에는 반드시 이벤트 시간 컬럼(event-time) 이나 이벤트 시간 기반 윈도우가 포함되어 있어야 한다.
3. withWatermark는 집계에 사용하는 타임스탬프 컬럼과 동일한 컬럼에 적용되어야 한다.
예컨대 df.withWatermark("time", "1 min").groupBy("time2").count() 처럼
집계 컬럼과 워터마크 컬럼이 다르면 안 된다.
4. withWatermark는 집계 연산 이전에 호출되어야 한다.
df.groupBy("time").count().withWatermark("time", "1 min") 는 올바르지 않다.
-워터마크 집계의 의미보장(Semantic Guarantees)
워터마크를 2시간으로 설정했다면,
최신 데이터 이벤트 시간 기준으로 2시간 이내에 도착한 데이터는 무조건 놓치지 않고 집계 결과에 포함된다는 뜻이다.
하지만 2시간보다 훨씬 늦게 도착한 데이터에 대해서는
반드시 무시된다고 할 수 없고, 가끔은 처리되기도 하고, 가끔은 무시되기도 한다.
즉, 늦은 데이터가 너무 많아질수록 처리될 가능성은 줄어든다고 이해하면 된다.
3.2.6. Join Operations
Structured Streaming은 스트리밍 Dataset/DataFrame과 정적(static) Dataset/DataFrame 간의 조인뿐만 아니라, 두 개의 스트리밍 Dataset/DataFrame 간의 조인도 지원한다.
스트리밍 조인의 결과는 이전에 다룬 스트리밍 집계처럼 점진적으로 생성된다.
여기서는 두 경우에서 지원되는 조인 유형(내부(inner), 외부(outer), 세미(semi) 등)에 대해 살펴본다.
참고로, 스트리밍 Dataset/DataFrame을 사용하는 조인의 결과는, 동일한 데이터를 가진 정적 Dataset/DataFrame과 조인했을 때 결과와 완전히 동일하다.
-Stream&Static Joins
Spark 2.0부터, 스트리밍 DataFrame/Dataset과 정적 DataFrame/Dataset 간의 조인(내부 조인 및 일부 외부 조인)을 지원한다.
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # 정적 데이터프레임과 내부 동등 조인
streamingDf.join(staticDf, "type", "left_outer") # 정적 데이터프레임과 왼쪽 외부 조인
스트림-정적 조인은 상태(state)를 관리하지 않아도 되며, 별도의 상태 관리가 필요 없다.
다만 일부 종류의 스트림-정적 외부 조인은 아직 지원되지 않는 점 참고.
-Inner Join과 Watermark 사용
모든 컬럼과 다양한 조인 조건에 대해 내부 조인을 지원한다.
하지만 스트리밍이 계속 실행되면, 과거 입력 데이터를 모두 저장해야 하기 때문에 상태(state)가 무한히 커질 위험이 있다.
이를 방지하기 위해서는,
너무 오래된 데이터가 미래 데이터와 더 이상 매칭되지 않도록 추가 조인 조건을 설정해야 하며,
그렇게 하면 오래된 상태 정보를 안전하게 삭제할 수 있다.
이를 위해 조인 시 다음 두 가지를 추가로 정의해야 한다.
1. 양쪽 입력 데이터에 워터마크 지연 시간(watermark delays)을 설정
예를 들어 최대 몇 시간까지 늦은 이벤트를 허용할지 지정한다.
2. 이벤트 시간(event-time) 기반의 조건을 추가해 오래된 행들이 더 이상 조인에 필요 없음을 알려야 한다
방법 1: 시간 범위 조건으로 조인을 제한한다.
예: JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
방법 2: 이벤트 시간 단위로 윈도우를 나누어 같은 윈도우끼리만 조인한다.
예: JOIN ON leftTimeWindow = rightTimeWindow
예시: 광고 노출 및 클릭 데이터 조인
광고가 노출된 시간 스트림(impressions)과
사용자의 광고 클릭 스트림(clicks)을 조인해서,
언제 광고 노출이 실제 클릭으로 이어졌는지 연결시키고자 한다.
상태 정리를 위해 다음과 같이 워터마크와 시간 조건을 지정한다.
*워터마크 지연 시간
광고 노출 최대 2시간 지연 허용
광고 클릭 최대 3시간 지연 허용
*이벤트 시간 조건
클릭은 노출 시점으로부터 0초에서 1시간 이내 발생해야 한다
from pyspark.sql.functions import expr
impressions = spark.readStream...
clicks = spark.readStream...
# 워터마크 설정 (늦은 데이터 허용 시간)
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
# 이벤트 시간 조건을 활용하여 조인 수행
# impressionTime ≤ clickTime ≤ impressionTime + 1 hour
joinedStream = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
- 조인 조건으로 인해, 클릭이 노출 시점 이후 최대 1시간 이내에 발생한 데이터만 결과에 포함된다.
- 워터마크 설정 덕분에 너무 오래된 노출이나 클릭은 상태에서 정리(clean-up)되어 메모리 사용이 과도하게 늘어나지 않는다.
- 만약 클릭이나 노출 데이터가 워터마크보다 늦게 도착하면 (너무 늦게 오는 데이터), 조인 결과에 포함되지 않을 수 있다.
예시
| impressionAdId | impressionTime | clickAdId | clickTime | 조인 결과 생성 여부 |
| ad123 | 2024-06-01 10:00 | ad123 | 2024-06-01 10:30 | 조인 결과 생성 (30분차이, 조건 만족) |
| ad123 | 2024-06-01 10:00 | ad123 | 2024-06-01 11:15 | 조인 결과 미생성 (1시간 이상 지남) |
| ad456 | 2024-06-01 9:50 | ad456 | 2024-06-01 12:00 | 조인 결과 미생성 (워터마크 기준 지나침) |
결과
- 결과는 광고 노출 로그와 클릭 로그가 일치하면서, 클릭이 노출 후 1시간 이내에 발생한 경우만 포함된 조인된 스트림이 된다.
- 이 결과는 실시간으로 계속 업데이트되어 나오는 스트리밍 데이터이다.
-Watermaking이 필요한 Outer Joins
- 내부 조인(inner join)에서는 워터마크와 이벤트 시간 조건(event-time constraints)이 선택 사항일 수 있지만,
- 외부 조인(outer join)에서는 워터마크와 이벤트 시간 조건을 반드시 지정해야 한다.
- 이유는 외부 조인에서는 일치하는 데이터가 없는 쪽에 NULL 값을 포함하는 결과를 생성해야 하는데,
엔진이 "더 이상 앞으로 일치하는 데이터가 오지 않을 시점"을 알아야 올바른 NULL 결과를 만들 수 있기 때문이다. - 그러므로 워터마크와 이벤트 시간 조건을 통해 언제 과거 데이터가 더 이상 매칭될 가능성이 없는지 판단하고,
- 그 시점에 NULL 결과를 확정짓는다.
그래서 외부 조인 쿼리는 앞서 광고 수익화 예제와 거의 유사하나,
추가로 조인 타입을 leftOuter (또는 rightOuter, fullOuter 등외부 조인 종류) 로 지정하는 차이가 있다.
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # 조인 종류: 내부(inner), 왼쪽 외부(leftOuter), 오른쪽 외부(rightOuter), 전체 외부(fullOuter), 왼쪽 세미(leftSemi) 가능
)
주의사항은 다음과 같다.
- 외부 조인에서 일치하는 매칭이 없다는 결과를 내기 위해서, 일정 시간 동안 기다려야 한다.
- 이 기다림 시간은 워터마크와 시간 조건 때문에 발생하며,
- 그리고 새 데이터가 들어오지 않으면 결과 출력도 늦어진다.
- 따라서, 입력 스트림 중 하나가 잠시 멈추면, 외부 조인의 결과 출력이 늦어질 수 있다는 점을 유의해야 한다.
-Watermarking된 Semi Joins
- 세미 조인은 두 테이블(또는 스트림) 중에서 왼쪽(Left) 데이터 중 오른쪽(Right) 데이터와 매칭되는 값만 반환하는 조인이다.
- 따라서 왼쪽 세미 조인(left semi join) 이라고도 부른다.
- 외부 조인과 마찬가지로,
세미 조인에서도 워터마크(watermark)와 이벤트 시간(event-time) 조건을 반드시 지정해야 한다. - 이유는, 왼쪽 데이터 중에서 앞으로 오른쪽 데이터와 매칭되지 않을(즉, 앞으로는 연결될 가능성이 없는) 데이터를 미리 제거(evict)하기 위해 엔진이 이 정보를 알고 있어야 하기 때문이다.
-Streaming Join 쿼리에서 제공되는 Join 타입
| Left Input | Right Input | 조인 타입 (Join Type) | 지원 여부 및 특징 |
| Static | Static | 모든 타입(All types) | 지원됨 (스트리밍 데이터가 아니므로 상태 관리 불필요) |
| Stream | Static | Inner | 지원됨, 상태 관리 불필요 |
| Stream | Static | Left Outer | 지원됨, 상태 관리 불필요 |
| Stream | Static | Right Outer | 지원 안 됨 |
| Stream | Static | Full Outer | 지원 안 됨 |
| Stream | Static | Left Semi | 지원됨, 상태 관리 불필요 |
| Static | Stream | Inner | 지원됨, 상태 관리 불필요 |
| Static | Stream | Left Outer | 지원 안 됨 |
| Static | Stream | Right Outer | 지원됨, 상태 관리 불필요 |
| Static | Stream | Full Outer | 지원 안 됨 |
| Static | Stream | Left Semi | 지원 안 됨 |
| Stream | Stream | Inner | 조건부 지원, 양쪽 워터마크와 시간 조건 필요 (상태 관리 위해) |
| Stream | Stream | Left Outer | 조건부 지원, 오른쪽 워터마크와 시간 조건 필수, 왼쪽 워터마크는 옵션 |
| Stream | Stream | Right Outer | 조건부 지원, 왼쪽 워터마크와 시간 조건 필수, 오른쪽 워터마크는 옵션 |
| Stream | Stream | Full Outer | 조건부 지원, 한 쪽 워터마크와 시간 조건 필수, 다른 쪽 워터마크는 옵션 |
| Stream | Stream | Left Semi | 조건부 지원, 오른쪽 워터마크와 시간 조건 필수, 왼쪽 워터마크는 옵션 |
- 조인은 여러 번 연결해서(cascaded) 사용할 수 있으며, 예를 들어 df1.join(df2, ...).join(df3, ...).join(df4, ...) 처럼 사용 가능하다.
- Spark 2.4 기준으로, 조인은 Append 출력 모드에서만 지원된다.
다른 출력 모드는 아직 스트리밍 조인을 지원하지 않는다. - 조인 전후로 mapGroupsWithState 와 flatMapGroupsWithState 를 사용할 수 없다.
- Append 모드에서는 조인 이전이나 이후에 집계(aggregation), 중복 제거(deduplication), 스트림-스트림 조인도 사용할 수 있다.
예제 1: 시간 윈도우 집계 후 스트림-스트림 내부 조인
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
clicks 스트림과 impressions 스트림을 각각 1시간 단위 윈도우별로 집계한 후, 해당 윈도우를 기준으로 내부 조인하는 예시다.
예제 2: 시간 범위 조건을 가진 스트림-스트림 외부 조인 후 집계
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
- 광고 노출 스트림과 클릭 스트림을,
‘클릭이 노출 이후 최대 1시간 이내에 발생하는’ 조건으로 왼쪽 외부 조인한 뒤, - 조인된 결과를 클릭 시간 기준으로 1시간 단위 윈도우별 집계하는 예시이다.
3.2.7. Streaming Deduplication (스트리밍 중복제거)
- 데이터 스트림 내에서 중복된 레코드를 고유 식별자(unique identifier)를 사용하여 제거(deduplicate) 할 수 있다.
- 자세한 동작은 정적 데이터에서 고유 식별자를 이용한 중복 제거와 동일하다.
- 쿼리는 중복 필터링을 위해 이전 레코드들의 필요한 정보를 일정 기간 저장하여 관리한다.
- 중복 제거는 워터마크를 사용하거나 사용하지 않고 모두 가능하다.
워터마크 없이 중복 제거하기
- 언제 중복 데이터가 올지 범위를 알 수 없기 때문에
- 쿼리는 모든 과거 기록 데이터를 상태(state)로 계속 저장한다.
- 예시:
streamingDf.dropDuplicates(["guid"])
워터마크를 사용한 중복 제거
- 중복 데이터가 최대 얼마까지 늦게 도착할지를 알고 있다면,
- 이벤트 시간 컬럼(eventTime)에 워터마크를 설정하고, guid와 eventTime 두 컬럼을 기준으로 중복 제거를 수행할 수 있다.
- 워터마크가 지나간 오래된 상태 데이터는 쿼리가 자동으로 정리(clean-up)하기 때문에 상태 크기를 제한할 수 있다.
- 예시:
streamingDf \
.withWatermark("eventTime", "10 seconds") \
.dropDuplicates(["guid", "eventTime"])
- 워터마크 지연 시간이 예를 들어 '1시간'으로 설정되면,
- 그 시간 윈도우 안에서 발생한 중복 이벤트는 정확하게 제거된다.
- 워터마크를 사용하면, 이벤트 시간 컬럼만으로는 유니크 식별자로 쓰기 어려운 경우에도 중복 제거가 가능하다.
(예: 같은 레코드라도 이벤트 시간이 조금씩 다를 때, 비가역적 환경에서 유용) - 사용자는 중복 이벤트 간 최대 시간 차이보다 워터마크 지연 시간을 크게 설정하는 게 좋다.
- 반드시 워터마크(delay threshold)를 설정한 스트리밍 DataFrame/Dataset에서만 가능하다.
- 예시:
streamingDf \
.withWatermark("eventTime", "10 hours") \
.dropDuplicatesWithinWatermark(["guid"])
3.2.8. 여러 watermarks를 다루는 방법
- 스트리밍 쿼리에서 여러 입력 스트림이 있을 때, 각 스트림마다 워터마크(늦은 데이터 허용 기준)를 다르게 설정할 수 있다.
- 스파크는 각 스트림의 워터마크 중에서 하나를 골라서 전체 쿼리에 사용하는데, 기본 설정은 가장 작은 워터마크를 선택한다.
→ 이렇게 하면 느린 스트림에 맞춰서 안전하게 데이터를 안 버리고 처리하지만, 결과가 느리게 나올 수 있다. - 만약 더 빨리 결과를 받고 싶으면, 설정을 바꿔서 가장 큰 워터마크를 선택할 수도 있다.
→ 이 경우 빠른 스트림 속도에 맞춰 빨리 처리하지만, 느린 스트림의 데이터는 일부 버려질 수 있다.
| 설정 옵션 | 의미 | 특징 |
| 기본 (min) | 여러 워터마크 중 가장 작은 값을 전역 워터마크로 사용 | 느린 스트림에 맞춰 안전하게 처리, 처리 지연 발생 |
| spark.sql.streaming.multipleWatermarkPolicy = max | 여러 워터마크 중 가장 큰 값을 전역 워터마크로 사용 | 빠른 처리 가능, 느린 스트림 데이터 손실 위험 존재 |
3.2.9. 임의의 상태 기반 연산 (Arbitrary Stateful Operations)
많은 실무 상황에서는 단순한 집계(aggregation) 이상의 복잡한 상태 기반 작업이 필요하다.
예를 들어, 이벤트 스트림에서 사용자의 세션(session) 을 추적하는 작업이 대표적이다.
이런 경우에는 특정한 형태의 데이터를 상태(state)로 저장하고, 들어오는 스트림 이벤트마다 상태를 업데이트하는 복잡한 연산이 요구된다.
Spark 2.2 버전부터는 mapGroupsWithState 와 flatMapGroupsWithState 라는 구 연산자를 제공한다.
이 둘을 사용하면, 그룹화된 Dataset에 사용자 정의 코드를 적용하여 상태를 업데이트할 수 있다.
더 자세한 내용과 예제는 공식 API 문서(Scala/Java) 참고 가능하다.
Spark 4.0부터는 보다 새롭고 향상된 transformWithState 연산자 사용을 권장한다.
이 연산자는 복잡한 상태 기반 애플리케이션 개발에 더욱 적합하다.
자세한 내용은 공식 심층 문서를 참고하면 된다.
Spark가 출력 모드(output mode)의 의미론(semantics)에 맞게 상태 함수를 실행하도록 강제하지는 않으나, 상태 함수는 출력 모드에 부합하도록 설계되어야 한다.
예를 들어,
Update 모드에서는 변화된(업데이트된) 데이터만 출력하는 방식이다.
그래서 워터마크가 허용하는 지연 시간보다 오래되어 ‘지나간’ 데이터는 다시 출력하지 않는 것이 맞다.
쉽게 말해, 너무 늦게 도착한 데이터는 업데이트 결과에 포함하지 않는다.
이렇게 해야 오래된 데이터로 인해 결과가 계속 갱신되는 것을 막고, 일관된 결과를 유지할 수 있다.
Append 모드는 결과를 ‘추가’만 하는 방식이라, 늦게 도착한 오래된 데이터라도 출력할 수 있다.
즉, Append 모드에서는 워터마크 밖의 데이터도 결과로 나타날 수 있다.
Update 모드에서는 워터마크를 벗어나거나 지연된 오래된 데이터는 출력하지 않아야 한다.
Append 모드에서는 이런 오래된 데이터도 출력할 수 있다.
3.2.10. 스트리밍 DataFrame/Dataset에서 지원되지 않는 연산들 (Unsupported Operations)
스트리밍 DataFrame/Dataset에서는 몇 가지 연산이 지원되지 않는다. 주요 내용을 소개하면 다음과 같다.
1. 지원되지 않는 주요 연산
limit 또는 처음 N개 행(take first N rows) 가져오기
→ 스트리밍 데이터에서 이 기능은 지원하지 않는다.
distinct (중복 제거)
→ 스트리밍 Dataset에서 distinct 연산도 지원되지 않는다.
정렬(sorting)
→ 집계(aggregation) 후, 그리고 Complete 출력 모드에서만 가능하다.
외부 조인 일부 유형
→ 스트리밍 Dataset에서는 몇몇 외부 조인이 지원되지 않는다.
→ 자세한 내용은 ‘Join Operations’ 섹션의 지원 매트릭스를 참고해야 한다.
여러 상태 연산(stateful operations) 연속 처리
→ Update 모드와 Complete 모드에서는 여러 개의 상태 연산을 연속으로 처리하는 것을 지원하지 않는다.
mapGroupsWithState / flatMapGroupsWithState 이후 다른 상태 연산
→ Append 모드에서는 지원되지 않는다.
2. 권장하는 해결책
상태 연산을 여러 개 연속으로 사용해야 한다면, 스트리밍 쿼리를 여러 개로 나누어 각 쿼리에 하나씩 상태 연산을 둔다.
각 쿼리별로 exactly-once 보장을 달성하면 되며, 마지막 쿼리는 선택 사항이다.
3. 스트리밍 Dataset에서 작동하지 않는 Dataset 메서드 (즉시 실행하는 동작)
다음 메서드들은 스트리밍 Dataset에서는 실행 불가하고, 대신 스트리밍 쿼리를 명시적으로 시작해 사용해야 한다.
count()
→ 스트리밍에서는 단일 값 카운트를 반환할 수 없다. 대신 ds.groupBy().count() 를 사용해야 하며, 스트리밍 집계된 결과를 포함한다.
foreach()
→ 데이터 처리를 위해선 ds.writeStream.foreach(...) 를 사용해야 한다.
show()
→ 콘솔 싱크(console sink)를 통해 결과를 출력해야 한다.
4. 에러 메시지
이들 지원 안 되는 연산을 시도하면,
"operation XYZ is not supported with streaming DataFrames/Datasets" 와 같은 AnalysisException 에러가 발생한다.
5. 지원이 어려운 이유
일부 기능은 스트리밍 데이터에 매우 어려운 연산이기 때문에 효율적으로 구현하기 힘들다.
예를 들어, 입력 스트림에서 바로 정렬하는 것은 스트림에 들어온 모든 데이터를 계속 저장해야 하므로 매우 어렵다.
3.2.11. 상태 저장소(State Store)
상태 저장소는 버전이 관리되는 키-값 저장소(key-value store)로, 읽기와 쓰기 작업을 모두 지원한다.
Structured Streaming에서는 상태 저장소 프로바이더(state store provider)를 사용해 여러 마이크로배치에 걸쳐 상태 기반 연산을 처리한다.
기본적으로 두 가지 내장 상태 저장소 프로바이더 구현체가 있다.
사용자는 필요시 직접 StateStoreProvider 인터페이스를 구현해 자신만의 상태 저장소 프로바이더를 만들 수도 있다.
1. HDFS 상태 저장소 프로바이더 (HDFS state store provider)
기본 구현체이며, 상태 데이터는 처음에 메모리 맵에 저장되고 이후 HDFS 호환 파일 시스템에 파일 형태로 저장된다.
상태 업데이트는 트랜잭션 단위로 실행되며, 각 업데이트 세트마다 상태 저장소 버전(version) 이 증가한다.
이 버전을 기반으로 상태 업데이트 재시도(retries) 시 올바른 상태 버전을 사용하여 정확한 처리가 가능하다.
2. RocksDB 상태 저장소 구현 (RocksDB state store implementation)
Spark 3.2부터 새로 추가된 내장 상태 저장소이다.
만약 스트리밍 쿼리에서 수백만 개 이상의 키를 저장해야 하는 상태 기반 연산(streaming aggregation, dropDuplicates, stream-stream join, mapGroupsWithState 등)이 있다면, 기본 HDFSBackedStateStore는 상태 데이터를 JVM 메모리에 저장하기 때문에 대규모 JVM 가비지 컬렉션(GC) 지연 문제가 발생해 마이크로배치 처리 시간에 큰 변동이 생길 수 있다.
이럴 때는 RocksDB 기반 상태 관리를 선택하는 것이 좋다.
RocksDB는 상태를 JVM 메모리가 아닌 네이티브 메모리(native memory) 와 로컬 디스크에 효율적으로 관리한다.
또한, 상태 변경은 자동으로 스트럭처드 스트리밍 체크포인트 위치에 저장되어 완전한 내결함성(fault-tolerance)을 보장한다.
RocksDB 상태 저장소 활성화 방법
spark.sql.streaming.stateStore.providerClass = org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
위 설정을 통해 RocksDB 상태 저장소를 사용할 수 있다.
RocksDB 상태 저장소 프로바이더 관련 세부 설정도 존재한다.
| 설정 이름 | 설명 | 기본값 |
| spark.sql.streaming.stateStore.rocksdb.compactOnCommit | 커밋 시 RocksDB 인스턴스에서 범위 압축(range compaction)을 수행할지 여부 | FALSE |
| spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled | RocksDB StateStore 커밋 시 스냅샷 대신 변경 로그(changelog)를 업로드할지 여부 | FALSE |
| spark.sql.streaming.stateStore.rocksdb.blockSizeKB | RocksDB BlockBasedTable의 블록당 사용자 데이터 크기(KB 단위), SST 파일 기본 포맷과 관련 있음 | 4 |
| spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB | 블록 캐시 크기(MB 단위) | 8 |
| spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs | RocksDB 인스턴스 로드 시 락(lock) 획득 대기 시간(밀리초 단위) | 60000 |
| spark.sql.streaming.stateStore.rocksdb.maxOpenFiles | RocksDB에서 사용할 수 있는 최대 오픈 파일 수. -1인 경우 모든 파일을 항상 오픈 | -1 |
| spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad | RocksDB 로드 시 히스토그램 및 티커 통계 초기화 여부 | TRUE |
| spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows | 상태 저장소 내 총 행(row) 수 추적 여부 (성능 고려사항 참고) | TRUE |
| spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB | RocksDB의 MemTable 최대 크기(MB). -1은 RocksDB 기본값 사용 | -1 |
| spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber | RocksDB의 활성 및 불변 MemTable 최대 개수. -1은 RocksDB 기본값 사용 | -1 |
| spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage | 단일 노드 내 RocksDB 인스턴스들의 전체 메모리 사용량 제한 여부 | FALSE |
| spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB | 단일 노드 내 RocksDB 인스턴스가 사용할 수 있는 최대 메모리(MB 단위) | 500 |
| spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio | 단일 노드 내 메모리 중 쓰기 버퍼(write buffers)가 차지할 비율 | 0.5 |
| spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio | 단일 노드 내 메모리 중 우선순위가 높은 풀(high priority pool)이 차지할 비율 | 0.1 |
| spark.sql.streaming.stateStore.rocksdb.allowFAllocate | 로그 등에 대해 디스크 공간을 미리 할당하는 fallocate 사용 여부. 많은 작은 상태 저장소 앱에서 디스크 공간 절약을 위해 비활성화할 수 있음 | TRUE |
| spark.sql.streaming.stateStore.rocksdb.compression | RocksDB에서 사용하는 압축 타입(lz4, snappy 등). Java API의 getCompressionType()으로 변환됨 | lz4 |
3.2.12. RocksDB 상태 저장소 메모리 관리 (RocksDB State Store Memory Management)
RocksDB는 메모리 내에서 MemTable(쓰기 버퍼), 블록 캐시(block cache), 그리고 필터/인덱스 블록(filter/index blocks) 등 다양한 객체를 위해 메모리를 할당한다.
만약 메모리 사용에 제한이 없다면, 노드 내 여러 RocksDB 인스턴스에서 사용하는 메모리가 무한정 증가하여 메모리 부족(Out-Of-Memory, OOM) 문제가 발생할 수 있다.
메모리 사용 제한 방법
RocksDB는 노드 내 구동 중인 모든 DB 인스턴스의 전체 메모리 사용량을 제한할 수 있는 Write Buffer Manager 기능을 제공한다.
Spark Structured Streaming 환경에서는 아래 설정을 통해 이 기능을 활성화할 수 있다:
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage = true
또한 RocksDB 인스턴스에서 사용할 수 있는 최대 메모리 용량을 다음 설정을 통해 정할 수 있다:
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB
이 값은 고정 숫자(MB 단위) 또는 노드 물리 메모리 대비 비율로 지정 가능하다.
개별 RocksDB 인스턴스별 메모리 제한은 다음 두 설정으로 조절할 수 있다:
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber
기본적으로는 RocksDB 내부 기본값이 적용된다.
주의사항
boundedMemoryUsage 설정은 소프트 리밋(soft limit) 으로 동작한다.
즉, RocksDB가 사용할 총 메모리 사용량이 일시적으로 이 한도를 초과할 수 있다.
특히, 상위 레벨 읽기 작업을 위해 할당된 블록이 모두 사용 중일 때 초과 가능성 있음.
현재로서는 엄격한(strict) 메모리 한도를 적용하는 기능이 없다.
엄격한 한도를 적용하면 쿼리 실패가 발생할 수 있고, 상태 데이터를 다른 노드로 재분배하는 기능도 지원하지 않기 때문이다.
3.2.13. RocksDB 상태 저장소 변경 로그 체크포인팅 (RocksDB State Store Changelog Checkpointing)
최근 버전의 Spark에서는 RocksDB 상태 저장소에 대해 변경 로그(changelog) 체크포인팅 기능이 추가되었다.
기존 방식인 증분 스냅샷 체크포인팅(incremental snapshot checkpointing) 은 RocksDB 인스턴스의 매니페스트 파일(manifest files)과 새로 생성된 SST 파일(RocksDB의 저장 파일)을 내구성 있는 저장소에 업로드하는 방식이었다.
새로운 변경 로그 체크포인팅은 RocksDB 인스턴스 전체 파일을 업로드하지 않고, 마지막 체크포인트 이후 상태에 발생한 변경 사항만을 업로드하여 내구성을 보장한다.
스냅샷은 백그라운드에서 주기적으로 저장되어, 장애 복구와 변경 로그 정리에 활용된다.
변경 로그 체크포인팅은 전체 파일을 스냅샷으로 캡처하고 업로드하는 비용을 줄여 스트리밍 쿼리의 전체 지연(latency)을 상당히 감소시킨다.
활성화 및 호환성
변경 로그 체크포인팅은 기본적으로 비활성화되어 있다.
활성화하려면 아래 설정을 true 로 지정한다.
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled = true
변경 로그 체크포인팅은 기존 체크포인팅 방식과 완벽하게 호환된다.
RocksDB 상태 저장소 프로바이더는 두 가지 체크포인팅 방식을 자유롭게 전환할 수 있도록 지원한다.
예를 들어, 기존 버전의 Spark에서 사용하던 쿼리를 최신 버전에서 변경 로그 체크포인팅으로 쉽게 마이그레이션 가능
반대로 최신 버전에서 변경 로그 체크포인팅을 해제하면 예전 방식으로 안전하게 돌아갈 수 있다.
단, 체크포인팅 메커니즘 변경 시 스트리밍 쿼리를 재시작해야 적용된다.
재시작 과정에서 성능 저하 같은 문제는 발생하지 않는다.
성능 관련 고려 사항
상태 저장소의 성능을 높이려면, 전체 행(row) 수 추적(trackTotalNumberOfRows)을 비활성화 하는 방법도 있다.
행 수 추적 기능은 쓰기 작업 시 추가 조회(overhead)를 발생시키므로,
상태 운영자의 지표(metrics) 중 numRowsUpdated, numRowsRemoved 값이 큰 경우에는 비활성화해보는 것이 권장된다.
이 설정은 스트리밍 쿼리 재시작 시 변경할 수 있어,
성능과 모니터링(관측성, observability) 사이에서 조절할 수 있다.
비활성화하면 상태 내 행 수(numTotalStateRows)는 항상 0으로 보고된다.
3.2.14. 상태 저장소(State Store)와 작업 위치(Task Locality) 설정
스트럭처드 스트리밍에서 상태 기반 연산(Stateful operation)은 이벤트 상태를 실행 중인 Executor의 상태 저장소(State Store)에 저장한다.
상태 저장소는 메모리나 디스크 공간 등 자원을 사용한다.
따라서, 스트리밍 배치(batch)가 바뀌더라도 같은 Executor에서 같은 상태 저장소 프로바이더(State Store Provider)를 계속 사용하는 것이 효율적이다.
상태 저장소 위치 변경 시 문제점
상태 저장소 프로바이더가 다른 Executor로 이동하면 기존의 체크포인트 상태(checkpointed states)를 다시 로드해야 한다.
이 로딩 작업은 외부 저장소와 상태 크기에 따라 비용(시간)이 크며, 마이크로배치 처리 지연(latency)을 늘릴 수 있다.
특히 상태 데이터가 매우 큰 경우, 상태 저장소 프로바이더를 새로 로드하는 작업은 매우 느리고 비효율적이다.
스파크의 작업 위치 제어
스파크는 RDD의 preferred location (선호 위치) 기능을 이용해 가능한 한 같은 Executor에서 상태 저장소를 실행하려고 시도한다.
만약 다음 배치도 이전과 같은 Executor에서 실행된다면, 상태를 재사용하여 로딩 시간을 절약할 수 있다.
그러나 preferred location은 엄격한 강제 조건은 아니다.
시스템 상황에 따라 다른 Executor에 작업이 배정될 수 있고, 이 경우 상태 저장소는 체크포인트에서 다시 로딩된다.
이전 배치의 상태 저장소는 즉시 해제되지 않고, 스파크는 주기적으로 유지보수 작업을 실행해 사용하지 않는 상태 저장소를 해제한다.
사용자 설정으로 위치 제어 가능
예를 들어 spark.locality.wait 같은 설정을 통해 스파크가 데이터 로컬 작업 실행을 얼마나 오래 기다릴지 설정할 수 있다.
이런 설정을 적절히 조절하면, 상태 저장소 프로바이더가 여러 배치에서 동일 Executor 내에서 실행될 확률을 높일 수 있다.
특히 기본 HDFS 상태 저장소 프로바이더를 사용할 경우, 상태 저장소의 캐시 적중률(loadedMapCacheHitCount)과 캐시 미스율(loadedMapCacheMissCount) 메트릭을 통해 상태 로딩 효율을 모니터링할 수 있다.
캐시 미스가 낮을수록 불필요한 상태 로딩이 줄어 처리 효율이 좋아진다.
필요하다면 로컬리티 대기 시간을 늘려 상태 저장소 재로딩을 줄일 수 있다.
상태 데이터 소스(State Data Source) (실험적 기능)
Apache Spark는 체크포인트 내 상태 저장소를 조작할 수 있는 스트리밍 상태 관련 데이터 소스를 제공한다.
이를 이용해 기존 스트리밍 쿼리의 상태 정보를 배치 쿼리(batch query)로 조회할 수 있다.
다만 Spark 4.0 기준으로 이 데이터 소스는 읽기(read) 기능만 지원하며, 아직 실험적(experimental) 단계라 소스 옵션이나 결과 동작이 변경될 수 있으니 주의해야 한다.
3.3. Streaming Queries 시작하기
최종 결과를 담은 DataFrame 또는 Dataset을 정의한 후에는, 스트리밍 처리를 실제로 시작해야 한다.
스트리밍 처리는 Dataset.writeStream() 메서드를 통해 반환되는 DataStreamWriter 객체를 사용해 시작할 수 있다.
DataStreamWriter에서 지정해야 할 주요 항목
-출력 싱크(Output sink) 정보
결과를 저장할 위치, 데이터 포맷(format) 등 출력 대상에 관한 상세 정보를 지정한다.
-출력 모드(Output mode)
결과를 어떻게 출력할지 정한다. 예) Append, Update, Complete 모드 등
-쿼리 이름(Query name)
선택 사항이며, 쿼리를 구분하기 위한 고유 이름을 지정할 수 있다.
-트리거 간격(Trigger interval)
선택 사항이며, 스트리밍 처리를 주기적으로 실행할 간격을 지정한다.
지정하지 않으면, 이전 배치 처리가 끝나는 즉시 새 데이터를 확인하며 바로 실행한다.
만약 지정한 시간이 지났는데 이전 작업이 끝나지 않으면, 끝나는 즉시 바로 실행된다.
-체크포인트 위치(Checkpoint location)
일부 출력 싱크에서는 end-to-end 장애 복구 보장을 위해 체크포인트 저장 위치를 지정해야 한다.
체크포인트 디렉터리는 HDFS 호환의 내결함성 파일 시스템에 위치해야 한다.
체크포인트 의미와 동작은 다음 섹션에서 자세히 다룬다.
3.3.1. Output Modes 출력모드
스트리밍 쿼리 결과를 외부에 쓸 때 사용할 수 있는 출력 모드에는 몇 가지 종류가 있다.
1. Append 모드 (기본값)
기본 모드로, 이전 실행 이후 새로 추가된 행(new rows)만 외부에 출력한다.
결과 테이블에 추가된 행이 변경되지 않는 쿼리에서만 지원된다.
이 모드는 각 행이 한 번만 출력됨을 보장한다(장애 복구 가능한 저장소일 경우).
예) select, where, map, flatMap, filter, join 등을 포함하는 단순한 쿼리에서 사용 가능하다.
2. Complete 모드
매 트리거마다 전체 결과 테이블 전체를 외부로 다시 출력한다.
집계(aggregation) 쿼리에서 주로 사용된다.
3. Update 모드 (Spark 2.1.1 이후 지원)
마지막 트리거 이후 수정된 행만 결과로 출력한다.
앞으로 더 많은 정보가 추가될 예정이다.
쿼리 유형별 지원되는 출력 모드 및 참고사항
| 쿼리 타입 | 지원되는 Output Modes |
설명 | |
| Queries with aggregation | Aggregation on event-time with watermark |
Append, Update, Complete |
Append 모드는 워터마크를 이용해 오래된 집계 상태(aggregation state)를 버린다. 다만, 윈도우 기반 집계 결과는 withWatermark() 에서 지정한 늦은 데이터 허용 시간(late threshold)만큼 결과 출력이 지연된다. 즉, 워터마크가 넘어가야(즉, 윈도우가 확정되어야) 해당 윈도우에 속한 행들이 결과 테이블에 단 한 번만 추가될 수 있다. 자세한 내용은 ‘늦은 데이터(Late Data)’ 섹션을 참고하면 좋다. Update 모드도 워터마크를 이용해 오래된 집계 상태를 정리한다. Complete 모드는 정의상 결과 테이블에 모든 데이터를 보존해야 하므로, 오래된 집계 상태를 버리지 않고 모두 유지한다. |
| Other aggregations | Complete, Update |
워터마크가 정의되어 있지 않은 경우(예: 다른 카테고리에만 정의된 경우), 오래된 집계 상태(aggregation state)는 정리(삭제)되지 않는다. 이 상황에서는 Append 모드가 지원되지 않는다. 그 이유는 집계 결과가 계속 업데이트 될 수 있어서, Append 모드의 의미인 “한 번만 결과가 추가되는 것”과 맞지 않기 때문이다. |
|
| Queries with mapGroupsWithState | Update | mapGroupsWithState 가 포함된 쿼리에서는 집계(aggregation)를 사용할 수 없다. | |
| Queries with flatMapGroupsWithState | Append operation mode |
Append | flatMapGroupsWithState 이후에는 집계가 허용된다. 즉, flatMapGroupsWithState 적용 후에는 집계를 적용할 수 있다. |
| Update operation mode |
Update | flatMapGroupsWithState 가 포함된 쿼리 자체에서는 집계를 사용할 수 없다. | |
| Queries with joins | Append | Update 모드와 Complete 모드는 아직 지원되지 않는다. | |
| Other queries | Append, Update |
Complete 모드는 지원되지 않는다. 이유는 결과 테이블에 전체 미집계(unaggregated) 데이터를 모두 유지하는 것이 현실적으로 불가능하기 때문이다. |
|
3.3.2. Output Sinks
1. 파일 싱크(File Sink)
결과를 지정한 디렉터리에 저장한다.
지원 포맷 예: parquet, orc, json, csv 등
writeStream
.format("parquet") // 또는 orc, json, csv 등
.option("path", "path/to/destination/dir")
.start()
내결함성(fault-tolerant) 보장 (Exactly-once)
파티션 테이블에 쓸 수 있으며, 시간 기준 파티셔닝 시 유용
출력 파일의 TTL(retention) 설정 가능 (예: "12h", "7d")
2. 카프카 싱크(Kafka Sink)
결과를 하나 이상의 Kafka 토픽에 저장한다.
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
출력 모드: Append, Update, Complete 지원
내결함성: 최소 한 번(at-least-once) 보장
자세한 내용은 Kafka Integration Guide 참고
3. Foreach 싱크 (Foreach Sink)
결과 레코드에 대해 임의의 사용자 지정 연산을 수행한다.
writeStream
.foreach(...) // 사용자 함수 지정
.start()
출력 모드: Append, Update, Complete 지원
내결함성: 최소 한 번(at-least-once) 보장
4. 콘솔 싱크(Console Sink) - 디버깅용
트리거마다 결과를 콘솔(표준 출력)로 출력한다.
Append, Complete 모드 지원
소량 데이터 디버깅용으로만 사용 권장 (매번 전체 결과를 드라이버 메모리에 저장)
writeStream
.format("console")
.start()
5. 메모리 싱크(Memory Sink) - 디버깅용
결과를 메모리에 테이블 형식으로 저장
Append, Complete 모드 지원
소량 데이터 디버깅용 (전체 결과를 드라이버 메모리에 저장)
쿼리 이름으로 테이블 이름 지정 가능
writeStream
.format("memory")
.queryName("tableName")
.start()
내결함성 미지원 (Complete 모드 재시작 시 테이블 재생성)
추가 설명
일부 싱크는 내결함성을 보장하지 않으므로, 디버깅이나 임시용으로만 사용해야 한다.
위 싱크들 중, 파일 싱크, Kafka 싱크, Foreach 싱크 등은 내결함성을 지원한다.
요약표
| 싱크 종류 | 지원 모드 | 주요 옵션 및 특징 | 내결함성 | 비고 |
| 파일 싱크 | Append | 경로(path), 파일 포맷 지정, TTL 설정 가능 | 예 (Exactly-once) | 파티셔닝 가능, 시간 파티션 유용 |
| Kafka 싱크 | Append, Update, Complete | Kafka 서버, 토픽 지정 | 예 (At-least-once) | Kafka 공식 가이드 참고 필요 |
| Foreach 싱크 | Append, Update, Complete | 사용자 지정 함수 실행 | 예 (At-least-once) | 사용자 로직 구현 필요 |
| 콘솔 싱크 | Append, Complete | 출력 행 수, 출력 자르기 옵션 | 아니오 | 디버깅용, 소량 데이터에 적합 |
| 메모리 싱크 | Append, Complete | 쿼리 이름 지정해서 메모리 테이블 생성 | 아니오 (Complete 모드 재시작 시 재생성) | 디버깅용, 소량 데이터에 적합 |
위와 같이 설정 후 start() 를 호출해야 쿼리가 실제로 시작된다.
이때 반환되는 StreamingQuery 객체를 통해 실행 중인 쿼리를 제어할 수 있다.
아래 예시를 살펴본다.
# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF \
.writeStream \
.format("console") \
.start()
# Write new data to Parquet files
noAggDF \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
.start()
# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()
# Print updated aggregations to console
aggDF \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
.writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
.start()
spark.sql("select * from aggregates").show() # interactively query in-memory table
3.3.3. Foreach 와 ForeachBatch 사용하기
1. ForeachBatch
foreachBatch는 스트리밍 쿼리의 마이크로배치(micro-batch) 단위 출력 데이터에 대해 사용자 정의 함수를 실행할 수 있게 해준다.
Spark 2.4부터 Scala, Java, Python에서 지원한다.
함수는 두 개의 파라미터를 받는다:
-DataFrame 또는 Dataset (해당 마이크로배치의 출력 데이터)
-epoch_id (마이크로배치 고유 ID)
def foreach_batch_function(df, epoch_id):
# 배치 데이터 처리 및 저장 로직 구현
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
foreachBatch를 쓰는 이유
-기존 배치 데이터 처리 로직 재활용
많은 저장소 시스템에서 스트리밍 싱크는 없지만, 배치용 저장 기능은 이미 존재하는 경우
foreachBatch를 사용하면 마이크로배치 출력에 배치 쓰기 로직을 재활용 가능
-여러 위치에 결과 쓰기
출력 데이터를 여러 곳에 저장할 때, 각각 쓰기를 다르게 할 수 있다.
다만, 쓰기 시 출력 DataFrame이 다시 계산 될 수 있으므로, persist()로 캐시한 후 여러 위치에 쓰고 unpersist()하는 것이 효율적이다.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // 저장 위치 1
batchDF.write.format(...).save(...) // 저장 위치 2
batchDF.unpersist()
}
DataFrame 추가 연산 적용 가능
스트리밍 DataFrame은 일부 연산이 제한되는데, foreachBatch 사용 시 마이크로배치 단위로 실행해 다양한 DataFrame 연산을 적용 가능
다만, 전체 쿼리 동작 의미론(semantics)은 직접 고려해야 한다.
주의사항
-기본적으로 foreachBatch는 최소 한 번(at-least-once) 쓰기 보장을 제공한다.
하지만 batchId를 활용해 출력 결과 중복 제거(deduplication) 로직을 구현하면, 정확히 한 번(exactly-once) 보장도 가능하다.
-연속 처리 모드(continuous processing mode)에서는 foreachBatch를 사용할 수 없다.
그 대신, continuous 모드에서는 foreach를 써야 한다.
-상태 기반 스트리밍 쿼리에서, 하나의 마이크로배치 내에서 같은 DataFrame에 대해 여러 액션(ex. count, collect)을 실행하면 쿼리가 여러 번 실행되며, 상태가 여러 번 로드되어 성능 저하가 발생한다.
-이런 경우 foreachBatch 안에서 persist와 unpersist를 꼭 호출해 중복 계산을 방지하는 것이 권장된다.
2. Foreach
foreach는 스트리밍 데이터의 각 행에 사용자 정의 로직을 적용한다.
foreachBatch와 다르게, 배치 데이터 작성기(batch data writer)가 없거나 계속 실행되는 continuous 모드에서 사용할 수 있습니다.
Foreach 사용 방법
1. 함수 형태로 사용하기 (간단하지만 중복제거 불가능)
각 행(row)을 입력으로 받아 처리하는 단순 함수 방식
하지만 실패 시 재처리로 인해 중복된 출력이 발생할 수 있어 정확한 중복 제거는 불가능하다.
def process_row(row):
# 행 단위로 저장하거나 처리하는 로직 작성
pass
query = streamingDF.writeStream.foreach(process_row).start()
2. 객체(Object) 형태로 사용하기 (복잡하지만 중복제거 구현 가능)
open, process, close 메서드를 가진 클래스 구현
좀 더 세밀한 제어 가능
class ForeachWriter:
def open(self, partition_id, epoch_id):
# 커넥션 열기 등 초기화 작업 (선택적)
return True
def process(self, row):
# 행 단위로 데이터 쓰기 (필수 구현)
def close(self, error):
# 커넥션 닫기 등 정리 작업 (선택적)
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
실행 방식 (Execution semantics)
쿼리가 실행되면, 작업 단위(task)별로 ForeachWriter 객체의 인스턴스가 하나씩 생성됨
작업 하나는 하나의 데이터 파티션(partition)을 담당
각 파티션, 각 마이크로배치(epoch)마다 아래 순서로 메서드 호출
open(partitionId, epochId) 호출
이 함수가 True를 반환해야 해당 배치 데이터를 처리
해당 배치의 모든 행(row)에 대해 process(row) 호출
데이터 처리가 끝나면 close(error) 호출
처리 중 에러가 있으면 error 파라미터에 값이 전달됨
open()이 없거나 실패하면 process()와 close()가 호출되지 않을 수 있음
JVM이나 파이썬 프로세스가 비정상 종료되면 close()가 호출되지 않을 수도 있음
주의사항 (중복 제거 관련)
Spark는 (partitionId, epochId) 조합에 대해 항상 동일한 출력 결과를 보장하지 않기 때문에, 이 값만으로 중복 제거를 구현할 수 없다.
(예: 파티션 수 변경, Spark 최적화 등으로 인해 달라질 수 있음)
만약 정확한 중복 제거가 필요하다면, foreachBatch를 사용하는 것이 좋다.
3.3.4. Streaming Table APIs
Spark 3.1부터는 테이블을 스트리밍 DataFrame으로 읽고, 스트리밍 DataFrame을 테이블로 쓸 수 있는 기능이 추가되었다.
이를 위해 다음 두 가지 API를 사용할 수 있다.
DataStreamReader.table()
→ 기존 테이블을 스트리밍 DataFrame으로 읽어온다.
DataStreamWriter.toTable()
→ 스트리밍 DataFrame을 테이블로 저장한다.
spark = ... # spark session
# Create a streaming DataFrame
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 10) \
.load()
# Write the streaming DataFrame to a table
df.writeStream \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.toTable("myTable")
# Check the table result
spark.read.table("myTable").show()
# Transform the source dataset and write to a new table
spark.readStream \
.table("myTable") \
.select("value") \
.writeStream \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.format("parquet") \
.toTable("newTable")
# Check the new table result
spark.read.table("newTable").show()
3.3.5. Spark Structured Streaming의 트리거(Triggers)
트리거는 스트리밍 쿼리가 데이터를 언제, 어떻게 처리할지 타이밍을 결정하는 설정이다.
주로 마이크로배치 방식인지, 연속 처리(continuous processing) 방식인지 지정한다.
| 트리거 타입 | 설명 |
| unspecified (기본값) | 트리거를 명시하지 않으면 기본적으로 마이크로배치 모드로 실행된다. 이전 배치가 끝나는 즉시 다음 배치가 시작됨. |
| 고정 간격 마이크로배치 | 사용자가 지정한 간격마다 마이크로배치를 시작. 이전 배치가 간격 내에 완료되면 다음 배치까지 기다림. 지연되면 바로 다음 배치 실행. 새로운 데이터 없으면 배치 실행 안 함. |
| 한 번만 실행하는 마이크로배치 (Deprecated) |
한 번의 마이크로배치로 모든 데이터를 처리하고 자동 종료. 주기적 클러스터 실행 및 종료에 유용하지만, 지금은 Available-now 모드 사용 권장. |
| Available-now 마이크로배치 | 현재 사용 가능한 데이터를 모두 처리한 후 종료. 여러 배치로 나눠 처리해 확장성 높음. 워터마크도 업데이트하며 상태 관리가 용이. 일부 소스는 지원 안 할 수 있음. |
| 연속 처리 모드 (experimental) | 낮은 지연시간을 목표로 하는 새로운 연속 처리 모드. 아직 실험적 기능임. 자세한 내용은 Continuous Processing 참고. |
# 기본 트리거: 가능한 빠르게 마이크로배치 실행
df.writeStream \
.format("console") \
.start()
# 2초 간격 마이크로배치 트리거
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
# 한 번만 실행하는 트리거 (Deprecated, 사용 권장 안함)
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
# Available-now 트리거: 현재 가능한 데이터 모두 처리 후 종료
df.writeStream \
.format("console") \
.trigger(availableNow=True) \
.start()
# 연속 처리 트리거: 1초 단위 체크포인트 (실험적)
df.writeStream \
.format("console") \
.trigger(continuous='1 second') \
.start()
3.3.6. 스트리밍 쿼리 관리 (Managing Streaming Queries)
Spark에서 스트리밍 쿼리를 시작하면, StreamingQuery 객체가 생성된다.
이 객체를 통해 스트리밍 쿼리를 모니터링하고 제어할 수 있다.
주요 StreamingQuery 메서드
query = df.writeStream.format("console").start() # 쿼리 시작 및 StreamingQuery 객체 반환
- query.id()
- 스트리밍 쿼리 고유 식별자를 반환한다.
- 체크포인트 재시작 시에도 변하지 않는다.
- query.runId()
- 현재 실행 세션별 고유 ID를 반환한다.
- 재시작할 때마다 새로 생성된다.
- query.name()
- 쿼리 이름(자동 생성 또는 사용자가 지정한 이름)을 반환한다.
- query.explain()
- 쿼리의 실행 계획과 상세 내용을 출력한다.
- query.stop()
- 실행 중인 쿼리를 중지한다.
- query.awaitTermination()
- 쿼리가 종료될 때까지(중지되거나 오류 발생 시) 대기한다.
- query.exception()
- 쿼리에서 오류로 인해 종료된 경우, 해당 예외 정보를 반환한다.
- query.recentProgress
- 최근 여러 번의 진행 상태(progress) 업데이트 목록을 확인할 수 있다.
- query.lastProgress
- 가장 최신의 진행 상태 정보를 확인한다.
여러 쿼리 동시 실행 및 관리
하나의 SparkSession에서 여러 스트리밍 쿼리를 동시에 시작할 수 있으며, 이들은 클러스터 리소스를 공유하며 병렬 실행된다.
다음 메서드를 통해 현재 실행 중인 쿼리를 관리할 수 있다.
spark = ... # 현재 SparkSession
spark.streams.active # 현재 활성화된 스트리밍 쿼리 목록 반환
spark.streams.get(id) # 고유 ID로 특정 쿼리 객체 가져오기
spark.streams.awaitAnyTermination() # 활성 쿼리 중 하나가 종료될 때까지 대기
3.3.7. 스트리밍 쿼리 고장 복구와 체크포인트 (Recovering from Failures with Checkpointing)
스트리밍 중 장애 발생이나 의도적인 종료 후에도, 이전 쿼리의 상태(state)와 진행 상황(progress)을 복구하여 중단된 지점부터 다시 실행할 수 있다.
이를 위해 체크포인트(checkpointing)와 write-ahead 로그를 사용한다.
체크포인트 위치는 HDFS같은 내결함성 있는 파일 시스템 경로여야 하며, 쿼리 시작 시 DataStreamWriter 옵션으로 지정한다.
aggDF.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
.start()
3.3.8. 스트리밍 쿼리 재시작 시 "변경" 허용 범위 (Recovery Semantics after Changes)
같은 체크포인트 위치를 사용해 쿼리를 재시작할 때, 일부 변경은 허용되지 않거나, 변경 효과가 불명확할 수 있다.
여기서 “허용(allowed)” 은 변경이 가능하지만 결과 의미가 명확하지 않을 수 있는 경우,
“비허용(not allowed)” 은 쿼리 실패나 예측 불가능한 오류가 날 수 있어 하지 말아야 할 경우를 뜻한다.
sdf는 스트리밍 DataFrame/Dataset을 의미한다.
1. 입력 소스 변경
입력 소스 수나 유형(예: 다른 종류나 위치) 변경은 허용되지 않는다.
2. 입력 소스 파라미터 변경
일부 변경은 허용되나, 결과 의미는 소스와 쿼리에 따라 달라진다.
예:
속도 제한(rate limits) 추가/변경은 허용됨.
토픽이나 파일 구독 변경은 일반적으로 허용되지 않음(결과 예측 불가).
3. 출력 싱크 변경
일부 싱크 간 변경은 허용되나, 케이스별로 다름.
예:
파일 싱크 → 카프카 싱크는 가능
카프카 싱크 → 파일 싱크는 불가
카프카 웨이트 → foreach(또는 그 반대)는 가능
4. 출력 싱크 파라미터 변경
일부 파라미터 변경은 가능하나 의미는 싱크와 쿼리에 따른다.
예:
파일 싱크의 출력 경로 변경은 불가
카프카 토픽 변경은 가능
사용자 정의 Foreach 싱크 코드 변경은 가능하나 결과 의미는 코드에 의존
5. 프로젝션(projection), 필터(filter), 맵(map) 등 연산 변경
일부 경우는 허용됨.
예:
필터 추가/삭제 가능
같은 출력 스키마 내 프로젝션 변경 가능
다른 출력 스키마 변경은 싱크가 허용하는 경우에만 가능
6. 상태 기반 연산(schema 변경 불가)
상태 데이터를 유지하며 동작하는 연산은
재시작 시 상태 데이터 스키마 변경(추가, 삭제, 수정)을 하면 안 된다.
상태 스키마 변경 불가 연산 예시
스트리밍 집계 (groupBy/agg 중 그룹 키나 집계 방식 변경 불가)
스트리밍 중복 제거(dropDuplicates 컬럼 변경 불가)
스트림-스트림 조인 (스키마, 조인 컬럼, 조인 타입 변경 불가)
임의 상태 연산 (mapGroupsWithState, flatMapGroupsWithState 사용자 정의 상태 스키마 변경 불가)
다만 상태 매핑 함수 내부 로직 변경은 가능
복잡한 상태 스키마 변경이 필요하면 직접 인코딩/디코딩을 이용해 버전 관리하는 방식을 써야 함 (예: Avro 인코딩)
요약
체크포인트를 활용해 스트리밍 쿼리 중단 후 복구는 가능하나,
입력, 출력, 상태 연산 쪽에 큰 변경은 제한되며 신중히 해야 한다.
변경 가능 여부와 의미는 소스, 싱크, 쿼리 구조에 따라 다르므로 문서와 테스트가 필요하다.