반응형
Notice
Recent Posts
Recent Comments
Link
관리 메뉴

지구정복

[NiFi] Data Pipeline | log -> Json -> gzip.parquet -> HDFS -> Iceberg Table 본문

데이터 엔지니어링 정복/NiFi

[NiFi] Data Pipeline | log -> Json -> gzip.parquet -> HDFS -> Iceberg Table

noohhee 2025. 4. 22. 14:42
728x90
반응형

 

 

 

NiFi 1.15.2

Spark 3.1.4

Iceberg 1.3.1

Hive 3.1.3

 

 

 

 

 

 

 

 

 

마지막 HDFS에 저장된 gzip.parquet 파일을 읽어서 Iceberg Table로 적재하는 PySpark Streaming을 실행한다.

 

미리 Iceberg Table은 생성되어야 있어야 하므로 PySpark로 만들어준다.

#iceberg table 생성쿼리
q="""
CREATE TABLE iceberg_test_db.test_table_name (
  data STRING,
  log_timestamp timestamp_ntz
)
USING iceberg
PARTITIONED BY (days(log_timestamp))
TBLPROPERTIES (
    'read.parquet.vectorization.enabled' = 'true',
    'write.metadata.delete-after-commit.enabled' = 'true',
    'write.metadata.previous-versions-max' = '5',
    'format-version' = '2',
    'format' = 'parquet'
)
"""
spark.sql(q).show()

 

아래는 PySpark Streaming 코드이다.

spark = SparkSession.builder \
    .appName("LOG_Original_To_Iceberg") \
    .config("spark.driver.cores", "4") \
    .config("spark.driver.memory", "4g") \
    .enableHiveSupport() \
    .getOrCreate()
    
import glob
import subprocess
import os

# 파일 경로 설정
input_path = "/user/poc/tmp/test_table_name/*.gzip.parquet"


# 스트리밍 읽기
streaming_df = spark.readStream.schema("data STRING, log_timestamp TIMESTAMP").parquet(input_path)


# Iceberg에 데이터 적재하는 함수 정의
def write_to_iceberg(df, epoch_id):
    df.write.format("iceberg") \
        .mode("append") \
        .save("iceberg_test_db.test_table_name")
     
    # 처리 완료 후 원본 Parquet 파일 삭제
    try:
        # HDFS 디렉터리 삭제
        delete_path = "/user/poc/tmp/test_table_name/*"
        command = ["hdfs", "dfs", "-rm", "-r", delete_path]
        result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(f"Deleted HDFS directory: {delete_path}")
        print("Output:", result.stdout.decode())
    except subprocess.CalledProcessError as e:
        print(f"Error deleting HDFS directory {delete_path}: {e.stderr.decode()}")
     
     
# 스트리밍 쿼리 시작
query = (streaming_df
         .writeStream
         .foreachBatch(write_to_iceberg)
         .outputMode("append")
         .trigger(processingTime="10 minutes")  # 여기서 10분 단위로 트리거 설정
         .start())
         
# 스트리밍 쿼리 실행 대기
query.awaitTermination()

 

728x90
반응형
Comments