데이터 엔지니어링 정복/NiFi
[NiFi] Data Pipeline | log -> Json -> gzip.parquet -> HDFS -> Iceberg Table
noohhee
2025. 4. 22. 14:42
728x90
반응형
NiFi 1.15.2
Spark 3.1.4
Iceberg 1.3.1
Hive 3.1.3
마지막 HDFS에 저장된 gzip.parquet 파일을 읽어서 Iceberg Table로 적재하는 PySpark Streaming을 실행한다.
미리 Iceberg Table은 생성되어야 있어야 하므로 PySpark로 만들어준다.
#iceberg table 생성쿼리
q="""
CREATE TABLE iceberg_test_db.test_table_name (
data STRING,
log_timestamp timestamp_ntz
)
USING iceberg
PARTITIONED BY (days(log_timestamp))
TBLPROPERTIES (
'read.parquet.vectorization.enabled' = 'true',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '5',
'format-version' = '2',
'format' = 'parquet'
)
"""
spark.sql(q).show()
아래는 PySpark Streaming 코드이다.
spark = SparkSession.builder \
.appName("LOG_Original_To_Iceberg") \
.config("spark.driver.cores", "4") \
.config("spark.driver.memory", "4g") \
.enableHiveSupport() \
.getOrCreate()
import glob
import subprocess
import os
# 파일 경로 설정
input_path = "/user/poc/tmp/test_table_name/*.gzip.parquet"
# 스트리밍 읽기
streaming_df = spark.readStream.schema("data STRING, log_timestamp TIMESTAMP").parquet(input_path)
# Iceberg에 데이터 적재하는 함수 정의
def write_to_iceberg(df, epoch_id):
df.write.format("iceberg") \
.mode("append") \
.save("iceberg_test_db.test_table_name")
# 처리 완료 후 원본 Parquet 파일 삭제
try:
# HDFS 디렉터리 삭제
delete_path = "/user/poc/tmp/test_table_name/*"
command = ["hdfs", "dfs", "-rm", "-r", delete_path]
result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"Deleted HDFS directory: {delete_path}")
print("Output:", result.stdout.decode())
except subprocess.CalledProcessError as e:
print(f"Error deleting HDFS directory {delete_path}: {e.stderr.decode()}")
# 스트리밍 쿼리 시작
query = (streaming_df
.writeStream
.foreachBatch(write_to_iceberg)
.outputMode("append")
.trigger(processingTime="10 minutes") # 여기서 10분 단위로 트리거 설정
.start())
# 스트리밍 쿼리 실행 대기
query.awaitTermination()
728x90
반응형