데이터 엔지니어링 정복/Spark

[Spark] 자주 사용하는 PySpark 코드들을 정리하자!

noohhee 2025. 4. 22. 14:03
728x90
반응형

 

pyspark3 --master yarn --deploy-mode client \ 

--executor-memory 20g \ 

--executor-cores 5 --num-executors 30 \ 

--conf spark.pyspark.python=/usr/bin/python3.7 \ 

--conf spark.pyspark.driver.python=/usr/bin/python3.7 \ 

--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/usr/bin/python3.7 \ 

--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=/usr/bin/python3.7 \ 

--conf spark.driver.maxResultSize=3g 

 

--driver-memory 8g  

 

 

 

%config Completer.use_jedi = False   #주피터 노트북 자동완성 tab키 

sc.stop() 

 

from pyspark.sql import SparkSession 

from pyspark import SparkContext, SparkConf 

from pyspark.sql.types import * 

import pandas as pd 

import re 

import numpy as np 

 

 

사용자가 사용할 spark config setting 

 

추천값 

# spark.executor.instances = {number of cluster datanode} X n 

# spark.executor.cores = 5 => '5'   최적 

# spark.executor.memory = 4 X n => 만약 spark job executor Out of Memory 실패할 경우 4 배수로 증가시키기 

 # Hive Metastore에서 filter를 이용해 특정 테이블의 불필요한 partition은 제거해서 가져오는데 이때 만약 에러가 발생하면 그냥 전체 partitio을 가져오게하는 설정 

  # 객체를 시리얼라이즈할 때(보통 네트워크를 통해 다른 노드로 이동시 바이트 코드로 직렬화 하는 과정사용되는 클래스. Kryo가 기본 시리얼라이저보다 성능이 좋다

 #kryo 시리얼라이저의 버퍼 최대 크기 

# spark3에서 파티션 최적화 옵션 ->  

config("spark.sql.adaptive.skewJoin.enabled", "true") \ 

config("spark.sql.adaptive.advisoryPartitionSizeInBytes, "true") \ 

 

# Spark2 

spark = SparkSession.builder.appName("model") \ 

    .config("spark.executor.memory", "4g") \ 

    .config("spark.executor.cores", 5) \ 

    .config("spark.executor.instances", 10) \ 

    .config('spark.sql.hive.metastorePartitionPruningFallbackOnException', 'true') \ 

    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ 

    .config("spark.kryoserializer.buffer.max", "512m") \ 

    .enableHiveSupport() \ 

    .getOrCreate() 

 

sc = spark.sparkContext 

 

spark = SparkSession.builder.appName("model") \ 

    .config("spark.executor.memory", "4g") \ 

    .config("spark.executor.cores", 5) \ 

    .config("spark.executor.instances", 10) \ 

    .config('spark.sql.hive.metastorePartitionPruningFallbackOnException', 'true') \ 

    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ 

    .config("spark.kryoserializer.buffer.max", "512m") \ 

    .enableHiveSupport() \ 

    .getOrCreate() 

 

 

 

# Spark3 

spark static allocation
sc.stop()

spark = SparkSession.builder.appName("model") \

    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", 5) \
    .config("spark.executor.instances", 10) \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext


spark.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true")
spark dynamic allocation
spark = SparkSession.builder.appName("ji_test") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", 0) \
    .config("spark.dynamicAllocation.initialExecutors", 1) \
    .config("spark.dynamicAllocation.maxExecutors", 5) \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \
    .enableHiveSupport() \
    .getOrCreate()

 

 

spark = SparkSession.builder.appName("check_ji") \ 

    .config("spark.executor.memory", "4g") \ 

    .config("spark.executor.cores", 5) \ 

    .config("spark.executor.instances", 10) \ 

    .config('spark.sql.hive.metastorePartitionPruningFallbackOnException', 'true') \ 

    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ 

    .config("spark.kryoserializer.buffer.max", "512m") \ 

    .config("spark.sql.broadcastTimeout", "600") \ 

    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \ 

    .config("spark.sql.adaptive.enabled", "true") \ 

    .config("spark.sql.adaptive.skewJoin.enabled", "true") \ 

    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ 

    .config("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false") \ 

    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \ 

    .config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1") \ 

    .config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "20000") \ 

    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \ 

    .config("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin", "0.2") \ 

    .config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \ 

    .config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m") \ 

    .config("spark.sql.adaptive.skewJoin.enabled", "true") \ 

    .getOrCreate() 

sc = spark.sparkContext 

 

 

#Setting the hive table properties 

spark.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true") 

 

 

로그 레벨을 error로그만 보이도록 변경 

sc.setLogLevel("ERROR") 

 

 

#스파크세션 관련 

spark = SparkSession.builder.appName("tt") 

                .config("spark.executor.memory", "32g").config("spark.executor.cores", 5)\ 

                .config("spark.kryoserializer.buffer.max", "512mb").config("spark.executor.instances", 50)\ 

                .config("spark.dynamicAllocation.enabled", False).getOrCreate() 

 

spark = SparkSession.builder \ 

.appName('test') \ 

.config("spark.sql.hive.caseSensitiveInferenceMode", "INFER_ONLY") \ 

.config("spark.executor.instances", 6) \ 

.config("spark.executor.memory", "4g") \ 

.config("spark.executor.cores", 5) \ 

.enableHiveSupport() \ 

.getOrCreate() 

 

spark = SparkSession.builder \ 

.appName('test') \ 

.config("spark.executor.instances", 30) \ 

.config("spark.executor.memory", "4g") \ 

.config("spark.executor.cores", 5) \ 

.enableHiveSupport() \ 

.getOrCreate() 

 

spark = SparkSession.builder \ 

.appName('test') \ 

.config("spark.executor.instances", 10) \ 

.config("spark.executor.memory", "4g") \ 

.config("spark.executor.cores", 5) \ 

.getOrCreate() 

 

 

sc = spark.sparkContext 

sc.setLogLevel(WARN

 

 

  • config setting 
# Create a Spark session 
spark = SparkSession.builder.appName("FilterData").getOrCreate() 
spark.conf.set('spark.sql.adaptive.enabled', 'true') 
spark.conf.set() 

 

 

 

  •  sc설정값 확인  동작확인 
conf=sc.getConf().getAll() 
foriinconf: 
  print(i) 




#spark 동작확인 
df = sc.parallelize(range(1,1000000000)) 
df.count() 


q="show databases" 
spark.sql(q).show() 

 

 

 

 

#hive테이블에 쿼리하기 

spark.sql("select count(*) from cva.t_qir_mst").show() 

df = spark.sql("select * from cva.t_qir_mst") 

df.count(rate_attach) 

df.select("rate_attach").groupby("rate_attach").count().show() 

 

 

  • HDFS 설정값 얻기 
#hadoop config값 얻기 
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration() 

 

 

 

  • 하이브 서브디렉터리 spark sql 쿼리하기 

 

from pyspark.sql import HiveContext   


spark = SparkSession.builder \ 
.appName('test') \ 
.config("mapreduce.input.fileinputformat.input.dir.recursive", "true") \ 
.getOrCreate() 


sc = spark.sparkContext 
spark.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true")

 

 

 

spark.sql("DROP TABLE IF EXISTS " + db + "." + name)  

spark.sql("""CREATE TABLE IF NOT EXISTS dev.master ( 

no STRING, 

model_cd STRING, 

year LONG, 

pf STRING, 

line_apply_date TIMESTAMP 

) USING PARQUET OPTIONS (PATH 'hdfs://nameservice1/user/hive/warehouse/dev.db/master')""") 

spark_df.write.mode("overwrite").format("parquet").insertInto(db + "." + name) 

 

 

 

 

#쿼리 미리 선언하고 실행하기 

# Define query to get repair order part data 

def query(date): 

     

    query = """ 

            SELECT  dd, 

                    order_id, 

                    order_num, 

                    CASE WHEN repair_order_open_date < '1960-01-01 00:00:00' OR SUBSTR(order_open_date,1,4) = '9999' THEN NULL ELSE order_open_date END AS rorder_open_date, 

                    part_num, 

                    part_qty, 

            FROM cva.order_part  

            WHERE create_date > add_months([:ANAL_DATE:],-48) 

              AND create_date <= [:ANAL_DATE:] 

            """ 

     

    query = query.replace('[:ANAL_DATE:]', '"' + anal_date + '"') 

    query = query.replace('\n', ' ') 

    query = re.sub(' +', ' ', query) 

    query = query.lstrip() 

    query = query.rstrip() 

     

    return query 

 

 

anal_date='2022-10-31' 

 

print(query(anal_date)) 

 

spark.sql(query(anal_date)).show() 

 

 

 

 

#create  

spark.sql(""" 

create external table if not exists pqms.test ( 

    id LONG, 

    name STRING) 

stored as parquet 

LOCATION 'hdfs://nameservice1/data/test/test' 

""") 

 

### 

 

df = spark.sql("select * from cva.new_full") 

 

df.count() 

 

df.write.mode("overwrite").format("parquet").option("path", "hdfs://nameservice1/data/test/test").saveAsTable("s.test") 

 

 

 

pyspark executor들에게 명령어 실행시키기 



 #==================== <1> ==================== 
logger.info( ">>>>> 1. Getting vcrm list and password" ) 
 try
logger.info( ">>>>> 1-1. Fetching vcrm file name from HDFS" ) 
 # fetch file name from HDFS 
cmd = "hdfs dfs -ls /data/test/"+yesDay+" | awk '{print $8}'" 
files = str(subprocess.check_output(cmd, shell=True)).strip().split('\\n'
 # Converting files to rdd 
rddFiles = sc.parallelize( files ) 
 # Getting password 
rddResult1 = rddFiles.map( lambda item: getPwd(item) ) 
 except Exception as e: 
logger.error(">>>>> 1-1. Error ", e) 
logger.info( ">>>>> 1. Finished Getting vcrm list and password" ) 

 

 

  • 변수명 동적생성하기 


exec(f"{df_name} = spark.createDataframe[(),('filename', 'data')]"
  
exec(f"{df_name} = spark.read.parquet('hdfs://nameservice1/data/test2/a_{ser}/pdate={yesDay}/').where(sss=={ser}).select('filename', 'data')"

 

 

 

 

  • 자신의 파이썬 가상환경 모든 익스큐터에 뿌리기 


os.environ['PYSPARK_PYTHON'] = "./environment/bin/python" 


sc.stop() 
conf = (SparkConf().set("spark.yarn.dist.archives", "py37.tar.gz#environment")) 
sc = SparkContext(conf = conf) 

 

 

 

 

  •  익스큐터노드의 파이썬 버전경로 알아내기 
 executor 작업을 분산시켜 Python 버전과 경로를 가져옵니다
def get_python_version_and_path(index): 
    return [(index, sys.version, sys.executable)] 


# Executor 수만큼 더미 데이터를 생성하여  Executor에서 작업을 수행하게 합니다
여기서는 executor 수를 정확히   없으니예상되는 수보다 많은 레코드를 생성합니다
num_executors = spark.sparkContext.getConf().get("spark.executor.instances") 
data = list(range(int(num_executors))) 


# RDD 생성  mapPartitionsWithIndex 사용하여  파티션(Executor)에서 실행 
rdd = spark.sparkContext.parallelize(data, numSlices=int(num_executors)) 
python_versions = rdd.mapPartitionsWithIndex(lambda index, _: iter(get_python_version_and_path(index))) 


결과 수집  출력 
results = python_versions.collect() 
for result in results: 
    print(f"Executor ID: {result[0]}, Python Version: {result[1]}, Path: {result[2]}") 

 

 

 

 

  •  익스큐터에 있는 RDD 데이터 출력하기 
num_executors = int(sc.getConf().get("spark.executor.instances")) 
executor_cores = int(sc.getConf().get("spark.executor.cores")) 
num_partitions = num_executors * executor_cores 


# Create RDD 
first_rdd = sc.parallelize( ccsFileList, num_partitions ) 


# To return each partition's data in each Executors 
glommed_rdd = first_rdd.glom() 
print(glommed_rdd.collect()) 

 

 

 

 

 

  • subprocess 
import subprocess 


file_exist_cmd = f"ls /tmp/data/date={yes_day}/{service_num}/*" 


file_exist_output = subprocess.run(file_exist_cmd, shell=True, capture_output=True, text=True) 


if file_exist_output.returncode ==2: 
continue 


print( file_exist_outpu.stdout.strip().split('\n') ) 

 

 

 

 

  • 익스큐터의 워킹디렉터리 알아내기 
def readEachRow(parq_name): 
    data = [] 
    tmp_pwd = subprocess.check_output( ["pwd"] ) 
    pwd = tmp_pwd.decode('cp949').replace('\n', '') + "/" 
     
    output_hdfs_cmd = subprocess.check_output( ["hdfs", "dfs", "-get", parq_name, pwd] ) 
    output_unparq_cmd = subprocess.check_output(  ) 
    result = pwd 
    data.append( result ) 
    return data 
    #get_file = subprocess.check_output( ["hdfs", "dfs", "-get", {parq_name}, "./"] ) 
    


file_list_rdd = sc.parallelize( output_list_test, par_num ) 


result_rdd = file_list_rdd.flatMap( lambda parq_name: readEachRow(parq_name) ) 


result_rdd.top(3


['/hadoop/dfs/9/yarn/nm/usercache/master/appcache/application_1704735675669_8866/container_e38_1704735675669_8866_01_000168/', 
'/hadoop/dfs/9/yarn/nm/usercache/master/appcache/application_1704735675669_8866/container_e38_1704735675669_8866_01_000162/', 
'/hadoop/dfs/9/yarn/nm/usercache/master/appcache/application_1704735675669_8866/container_e38_1704735675669_8866_01_000162/'] 

 

 

 

  • Spark config 적용 시점 정리 

https://spidyweb.tistory.com/330 

 

spark.conf.set("", "") 

 

 

 

 

 

 

728x90
반응형