[Spark] 자주 사용하는 PySpark 코드들을 정리하자!
pyspark3 --master yarn --deploy-mode client \
--executor-memory 20g \
--executor-cores 5 --num-executors 30 \
--conf spark.pyspark.python=/usr/bin/python3.7 \
--conf spark.pyspark.driver.python=/usr/bin/python3.7 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/usr/bin/python3.7 \
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=/usr/bin/python3.7 \
--conf spark.driver.maxResultSize=3g
--driver-memory 8g
%config Completer.use_jedi = False #주피터 노트북 자동완성 tab키
sc.stop()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
import pandas as pd
import re
import numpy as np
# 사용자가 사용할 spark config setting
# 추천값
# spark.executor.instances = {number of cluster datanode} X n
# spark.executor.cores = 5 => '5' 일 때 최적
# spark.executor.memory = 4 X n => 만약 spark job이 executor Out of Memory로 실패할 경우 4의 배수로 증가시키기
# Hive Metastore에서 filter를 이용해 특정 테이블의 불필요한 partition은 제거해서 가져오는데 이때 만약 에러가 발생하면 그냥 전체 partitio을 가져오게하는 설정
# 객체를 시리얼라이즈할 때(보통 네트워크를 통해 다른 노드로 이동시 바이트 코드로 직렬화 하는 과정) 사용되는 클래스. Kryo가 기본 시리얼라이저보다 성능이 좋다.
#kryo 시리얼라이저의 버퍼 최대 크기
# spark3에서 파티션 최적화 옵션 ->
config("spark.sql.adaptive.skewJoin.enabled", "true") \
config("spark.sql.adaptive.advisoryPartitionSizeInBytes, "true") \
# Spark2
spark = SparkSession.builder.appName("model") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", 5) \
.config("spark.executor.instances", 10) \
.config('spark.sql.hive.metastorePartitionPruningFallbackOnException', 'true') \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config("spark.kryoserializer.buffer.max", "512m") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
spark = SparkSession.builder.appName("model") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", 5) \
.config("spark.executor.instances", 10) \
.config('spark.sql.hive.metastorePartitionPruningFallbackOnException', 'true') \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config("spark.kryoserializer.buffer.max", "512m") \
.enableHiveSupport() \
.getOrCreate()
# Spark3
spark static allocation |
sc.stop() spark = SparkSession.builder.appName("model") \ .config("spark.executor.memory", "4g") \ .config("spark.executor.cores", 5) \ .config("spark.executor.instances", 10) \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config("spark.kryoserializer.buffer.max", "512m") \ .config("spark.sql.adaptive.skewJoin.enabled", "true") \ .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext spark.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true") |
spark dynamic allocation |
spark = SparkSession.builder.appName("ji_test") \ .config("spark.dynamicAllocation.enabled", "true") \ .config("spark.shuffle.service.enabled", "true") \ .config("spark.dynamicAllocation.minExecutors", 0) \ .config("spark.dynamicAllocation.initialExecutors", 1) \ .config("spark.dynamicAllocation.maxExecutors", 5) \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config("spark.kryoserializer.buffer.max", "512m") \ .config("spark.sql.adaptive.skewJoin.enabled", "true") \ .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \ .enableHiveSupport() \ .getOrCreate() |
spark = SparkSession.builder.appName("check_ji") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", 5) \
.config("spark.executor.instances", 10) \
.config('spark.sql.hive.metastorePartitionPruningFallbackOnException', 'true') \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config("spark.kryoserializer.buffer.max", "512m") \
.config("spark.sql.broadcastTimeout", "600") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.parallelismFirst", "false") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \
.config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "1") \
.config("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "20000") \
.config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
.config("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin", "0.2") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.getOrCreate()
sc = spark.sparkContext
#Setting the hive table properties
spark.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true")
# 로그 레벨을 error로그만 보이도록 변경
sc.setLogLevel("ERROR")
#스파크세션 관련
spark = SparkSession.builder.appName("tt") \
.config("spark.executor.memory", "32g").config("spark.executor.cores", 5)\
.config("spark.kryoserializer.buffer.max", "512mb").config("spark.executor.instances", 50)\
.config("spark.dynamicAllocation.enabled", False).getOrCreate()
spark = SparkSession.builder \
.appName('test') \
.config("spark.sql.hive.caseSensitiveInferenceMode", "INFER_ONLY") \
.config("spark.executor.instances", 6) \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", 5) \
.enableHiveSupport() \
.getOrCreate()
spark = SparkSession.builder \
.appName('test') \
.config("spark.executor.instances", 30) \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", 5) \
.enableHiveSupport() \
.getOrCreate()
spark = SparkSession.builder \
.appName('test') \
.config("spark.executor.instances", 10) \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", 5) \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel(WARN)
- config setting
# Create a Spark session spark = SparkSession.builder.appName("FilterData").getOrCreate() spark.conf.set('spark.sql.adaptive.enabled', 'true') spark.conf.set() |
- sc설정값 확인 및 동작확인
conf=sc.getConf().getAll() foriinconf: print(i) #spark 동작확인 df = sc.parallelize(range(1,1000000000)) df.count() q="show databases" spark.sql(q).show() |
#hive테이블에 쿼리하기
spark.sql("select count(*) from cva.t_qir_mst").show()
df = spark.sql("select * from cva.t_qir_mst")
df.count(rate_attach)
df.select("rate_attach").groupby("rate_attach").count().show()
- HDFS 설정값 얻기
#hadoop config값 얻기 hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration() |
- 하이브 서브디렉터리 spark sql로 쿼리하기
from pyspark.sql import HiveContext spark = SparkSession.builder \ .appName('test') \ .config("mapreduce.input.fileinputformat.input.dir.recursive", "true") \ .getOrCreate() sc = spark.sparkContext |
spark.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true") |
spark.sql("DROP TABLE IF EXISTS " + db + "." + name)
spark.sql("""CREATE TABLE IF NOT EXISTS dev.master (
no STRING,
model_cd STRING,
year LONG,
pf STRING,
line_apply_date TIMESTAMP
) USING PARQUET OPTIONS (PATH 'hdfs://nameservice1/user/hive/warehouse/dev.db/master')""")
spark_df.write.mode("overwrite").format("parquet").insertInto(db + "." + name)
#쿼리 미리 선언하고 실행하기
# Define query to get repair order part data
def query(date):
query = """
SELECT dd,
order_id,
order_num,
CASE WHEN repair_order_open_date < '1960-01-01 00:00:00' OR SUBSTR(order_open_date,1,4) = '9999' THEN NULL ELSE order_open_date END AS rorder_open_date,
part_num,
part_qty,
FROM cva.order_part
WHERE create_date > add_months([:ANAL_DATE:],-48)
AND create_date <= [:ANAL_DATE:]
"""
query = query.replace('[:ANAL_DATE:]', '"' + anal_date + '"')
query = query.replace('\n', ' ')
query = re.sub(' +', ' ', query)
query = query.lstrip()
query = query.rstrip()
return query
anal_date='2022-10-31'
print(query(anal_date))
spark.sql(query(anal_date)).show()
#create 문
spark.sql("""
create external table if not exists pqms.test (
id LONG,
name STRING)
stored as parquet
LOCATION 'hdfs://nameservice1/data/test/test'
""")
###
df = spark.sql("select * from cva.new_full")
df.count()
df.write.mode("overwrite").format("parquet").option("path", "hdfs://nameservice1/data/test/test").saveAsTable("s.test")
pyspark executor들에게 명령어 실행시키기
#==================== <1> ==================== logger.info( ">>>>> 1. Getting vcrm list and password" ) try: logger.info( ">>>>> 1-1. Fetching vcrm file name from HDFS" ) # fetch file name from HDFS cmd = "hdfs dfs -ls /data/test/"+yesDay+" | awk '{print $8}'" files = str(subprocess.check_output(cmd, shell=True)).strip().split('\\n') # Converting files to rdd rddFiles = sc.parallelize( files ) # Getting password rddResult1 = rddFiles.map( lambda item: getPwd(item) ) except Exception as e: logger.error(">>>>> 1-1. Error ", e) logger.info( ">>>>> 1. Finished Getting vcrm list and password" ) |
- 변수명 동적생성하기
exec(f"{df_name} = spark.createDataframe[(),('filename', 'data')]") exec(f"{df_name} = spark.read.parquet('hdfs://nameservice1/data/test2/a_{ser}/pdate={yesDay}/').where(sss=={ser}).select('filename', 'data')") |
- 자신의 파이썬 가상환경 모든 익스큐터에 뿌리기
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python" sc.stop() conf = (SparkConf().set("spark.yarn.dist.archives", "py37.tar.gz#environment")) sc = SparkContext(conf = conf) |
- 각 익스큐터노드의 파이썬 버전, 경로 알아내기
# 각 executor에 작업을 분산시켜 Python 버전과 경로를 가져옵니다. def get_python_version_and_path(index): return [(index, sys.version, sys.executable)] # Executor의 수만큼 더미 데이터를 생성하여 각 Executor에서 작업을 수행하게 합니다. # 여기서는 executor의 수를 정확히 알 수 없으니, 예상되는 수보다 많은 레코드를 생성합니다. num_executors = spark.sparkContext.getConf().get("spark.executor.instances") data = list(range(int(num_executors))) # RDD 생성 후 mapPartitionsWithIndex를 사용하여 각 파티션(Executor)에서 실행 rdd = spark.sparkContext.parallelize(data, numSlices=int(num_executors)) python_versions = rdd.mapPartitionsWithIndex(lambda index, _: iter(get_python_version_and_path(index))) # 결과 수집 및 출력 results = python_versions.collect() for result in results: print(f"Executor ID: {result[0]}, Python Version: {result[1]}, Path: {result[2]}") |
- 각 익스큐터에 있는 RDD 데이터 출력하기
num_executors = int(sc.getConf().get("spark.executor.instances")) executor_cores = int(sc.getConf().get("spark.executor.cores")) num_partitions = num_executors * executor_cores # Create RDD first_rdd = sc.parallelize( ccsFileList, num_partitions ) # To return each partition's data in each Executors glommed_rdd = first_rdd.glom() print(glommed_rdd.collect()) |
- subprocess
import subprocess file_exist_cmd = f"ls /tmp/data/date={yes_day}/{service_num}/*" file_exist_output = subprocess.run(file_exist_cmd, shell=True, capture_output=True, text=True) if file_exist_output.returncode ==2: continue print( file_exist_outpu.stdout.strip().split('\n') ) |
- 익스큐터의 워킹디렉터리 알아내기
def readEachRow(parq_name): data = [] tmp_pwd = subprocess.check_output( ["pwd"] ) pwd = tmp_pwd.decode('cp949').replace('\n', '') + "/" output_hdfs_cmd = subprocess.check_output( ["hdfs", "dfs", "-get", parq_name, pwd] ) output_unparq_cmd = subprocess.check_output( ) result = pwd data.append( result ) return data #get_file = subprocess.check_output( ["hdfs", "dfs", "-get", {parq_name}, "./"] ) file_list_rdd = sc.parallelize( output_list_test, par_num ) result_rdd = file_list_rdd.flatMap( lambda parq_name: readEachRow(parq_name) ) result_rdd.top(3) ['/hadoop/dfs/9/yarn/nm/usercache/master/appcache/application_1704735675669_8866/container_e38_1704735675669_8866_01_000168/', '/hadoop/dfs/9/yarn/nm/usercache/master/appcache/application_1704735675669_8866/container_e38_1704735675669_8866_01_000162/', '/hadoop/dfs/9/yarn/nm/usercache/master/appcache/application_1704735675669_8866/container_e38_1704735675669_8866_01_000162/'] |
- Spark config 적용 시점 정리
https://spidyweb.tistory.com/330
spark.conf.set("", "")