반응형
Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- HIVE
- 코테
- 코엑스맛집
- Data Engineering
- java
- Linux
- 코딩
- 맛집
- 삼성역맛집
- bigdata engineer
- 코딩테스트
- Iceberg
- 영어
- Data Engineer
- 알고리즘
- 자바
- Spark
- pyspark
- Kafka
- HDFS
- bigdata engineering
- 백준
- hadoop
- Apache Kafka
- apache iceberg
- BigData
- 개발
- 여행
- Trino
- 프로그래머스
Archives
- Today
- Total
지구정복
[Airflow] Airflow Assets 본문
728x90
반응형
airflow 3버전 이상부터 사용할 수 있는 기능
아래와 같이 두 개의 dag가 있다.
a dag: task11 -> task12
b dag: task21 -> task22
이때 a dag가 끝나자마자 b dag를 실행시키고 싶을 경우
a dag의 최종 결과물(특정 디렉터리에 파일 생성)이 생성되면 b dag가 트리거되도록 실행하는 기능이다.
아래는 예제 코드
from airflow.sdk import asset
# 1️⃣ First asset DAG
@asset(
schedule="@daily", # runs every day
)
def first_asset():
"""
This asset returns some data.
This return value is stored as XCom (key='return_value')
in the DAG 'first_asset', task 'first_asset'.
"""
data = {"a": 1, "b": 2}
return data
# 2️⃣ Second asset DAG, triggered by first_asset
@asset(
schedule=first_asset, # run when first_asset finishes
)
def second_asset(context):
"""
This asset reads the XCom (return value) from first_asset.
"""
ti = context["ti"] # TaskInstance
# Pull the return value from first_asset via cross-DAG XCom
upstream_data = ti.xcom_pull(
dag_id="first_asset", # DAG id created by @asset above
task_ids="first_asset", # task id = function name by default
key="return_value", # return value is stored under 'return_value'
include_prior_dates=True, # also look at previous runs if needed
)
# Use the data from first_asset
print(f"Got data from first_asset: {upstream_data}")
# Optionally return something new
transformed = {k: v * 10 for k, v in upstream_data.items()}
return transformed
이때 위 코드에서 include_prior_dates=True 무조건 이렇게 써야된다.
아니면 에러난다.
이 의미는 가장 최근에 스케줄되어 실행된 dag에서 task의 xcom 값이 없을 경우 이전 스케줄 때 만들어진 xcom값들을 쓴다는(=include)의미이다.
그리고 asset=dag와 같고,
asset엔 하나의 task가 존재한다.
따라서 위 코드에서 dag_id와 task_id가 같은 것을 확인할 수 있다.
또한 asset이 실행되는 것은 materialize됐다고 표현한다.(=구체화됐다)
여기에 하나 더 asset을 추가할 수 있다.
from airflow.sdk import asset
# 1️⃣ First asset DAG
@asset(
schedule="@daily", # runs every day
)
def first_asset():
"""
This asset returns some data.
The return value is stored as XCom (key='return_value')
in DAG 'first_asset', task 'first_asset'.
"""
data = {"a": 1, "b": 2}
return data
# 2️⃣ Second asset DAG (runs after first_asset)
@asset(
schedule=first_asset, # trigger when first_asset asset is updated
)
def second_asset(context):
"""
This asset reads the XCom (return value) from first_asset.
"""
ti = context["ti"] # TaskInstance
upstream_data = ti.xcom_pull(
dag_id="first_asset",
task_ids="first_asset",
key="return_value",
include_prior_dates=True,
)
print(f"[second_asset] got data from first_asset: {upstream_data}")
# Do something with the data
result = {k: v * 10 for k, v in upstream_data.items()}
return result
# 3️⃣ Third asset DAG (runs in parallel with second_asset)
@asset(
schedule=first_asset, # same schedule as second_asset → parallel
)
def third_asset(context):
"""
This asset also uses the return value from first_asset,
but does something different.
"""
ti = context["ti"]
upstream_data = ti.xcom_pull(
dag_id="first_asset",
task_ids="first_asset",
key="return_value",
include_prior_dates=True,
)
print(f"[third_asset] got data from first_asset: {upstream_data}")
# Different processing logic
summed = sum(upstream_data.values())
return {"sum": summed}
그리고 현재 비슷한 asset이 두 개 있는데 이거를 asset.multi를 사용해서 합칠 수 있다.
from airflow.sdk import asset, Asset
# 1️⃣ Define the logical assets
first_asset = Asset("first_asset")
second_asset = Asset("second_asset")
third_asset = Asset("third_asset")
# 2️⃣ First asset producer
@asset(schedule="@daily", outlets=[first_asset])
def build_first_asset():
"""
Produce the first asset.
NOTE: In the asset API, the returned Python value is *not* what downstream receives.
The real asset content must be written to the storage the Asset represents.
"""
data = {"a": 1, "b": 2}
# ⬇ 여기에서 실제로 데이터를 저장해야 함 (예: S3, DB, local file 등)
return data # 단순 예시 (XCom에는 저장되지만 자산 데이터는 아님)
# 3️⃣ Multi-asset: second_asset, third_asset
@asset.multi(
schedule=None,
outlets=[second_asset, third_asset], # 공식 문서 기준 정확한 사용법
)
def combined_assets(first_asset):
"""
This task runs AFTER first_asset because it receives first_asset as a parameter.
It splits first_asset into second_asset and third_asset.
"""
# ⚠ 중요:
# 이 first_asset 인자는 자산의 실제 내용을 나타냅니다.
# 자산 → 파일/테이블/S3 등 저장소에서 직접 데이터를 읽어야 합니다.
#
# 여기서는 데모를 위해 "추측"으로 데이터를 직접 생성합니다.
upstream_data = {"a": 1, "b": 2} # 실제로는 first_asset 저장소에서 읽어야 함
# logic for each new asset
second_result = {k: v * 10 for k, v in upstream_data.items()}
third_result = {"sum": sum(upstream_data.values())}
# ⬇ 여기서 second_asset, third_asset 실제 저장 위치에 데이터를 저장해야 함
# (예: write_to_s3(second_asset, second_result))
# multi-asset에서는 return 값을 사용하지 않는 것이 정상입니다.
# 데이터는 outlets 에 대응하는 저장소에 직접 써야 합니다.
728x90
반응형
'데이터 엔지니어링 정복 > Airflow' 카테고리의 다른 글
| [Airflow] Airflow Flower | Hook | Task Group | Xcom | Branching | provider (0) | 2025.12.26 |
|---|---|
| [Airflow] SqoopOperator (0) | 2025.07.02 |
| [Airflow] airflow db upgrade error (0) | 2025.03.20 |
| [Airflow] Docker를 이용해서 Apache Airflow설치후 워크플로 만들기 (3) | 2022.03.08 |
Comments