반응형
Notice
Recent Posts
Recent Comments
Link
관리 메뉴

지구정복

[Airflow] Airflow Assets 본문

데이터 엔지니어링 정복/Airflow

[Airflow] Airflow Assets

noohhee 2025. 11. 30. 21:21
728x90
반응형

 

airflow 3버전 이상부터 사용할 수 있는 기능

 

아래와 같이 두 개의 dag가 있다.

a dag: task11 -> task12

b dag: task21 -> task22

 

이때 a dag가 끝나자마자 b dag를 실행시키고 싶을 경우

a dag의 최종 결과물(특정 디렉터리에 파일 생성)이 생성되면 b dag가 트리거되도록 실행하는 기능이다.

 

 

 

 

아래는 예제 코드

from airflow.sdk import asset


# 1️⃣ First asset DAG
@asset(
    schedule="@daily",          # runs every day
)
def first_asset():
    """
    This asset returns some data.
    This return value is stored as XCom (key='return_value')
    in the DAG 'first_asset', task 'first_asset'.
    """
    data = {"a": 1, "b": 2}
    return data


# 2️⃣ Second asset DAG, triggered by first_asset
@asset(
    schedule=first_asset,       # run when first_asset finishes
)
def second_asset(context):
    """
    This asset reads the XCom (return value) from first_asset.
    """
    ti = context["ti"]  # TaskInstance

    # Pull the return value from first_asset via cross-DAG XCom
    upstream_data = ti.xcom_pull(
        dag_id="first_asset",        # DAG id created by @asset above
        task_ids="first_asset",      # task id = function name by default
        key="return_value",          # return value is stored under 'return_value'
        include_prior_dates=True,    # also look at previous runs if needed
    )

    # Use the data from first_asset
    print(f"Got data from first_asset: {upstream_data}")

    # Optionally return something new
    transformed = {k: v * 10 for k, v in upstream_data.items()}
    return transformed

 

 

이때 위 코드에서 include_prior_dates=True 무조건 이렇게 써야된다.

아니면 에러난다.

이 의미는 가장 최근에 스케줄되어 실행된 dag에서 task의 xcom 값이 없을 경우 이전 스케줄 때 만들어진 xcom값들을 쓴다는(=include)의미이다.

 

그리고 asset=dag와 같고,

asset엔 하나의 task가 존재한다.

 

따라서 위 코드에서 dag_id와 task_id가 같은 것을 확인할 수 있다.

또한 asset이 실행되는 것은 materialize됐다고 표현한다.(=구체화됐다)

 

 

여기에 하나 더 asset을 추가할 수 있다.

from airflow.sdk import asset


# 1️⃣ First asset DAG
@asset(
    schedule="@daily",          # runs every day
)
def first_asset():
    """
    This asset returns some data.
    The return value is stored as XCom (key='return_value')
    in DAG 'first_asset', task 'first_asset'.
    """
    data = {"a": 1, "b": 2}
    return data


# 2️⃣ Second asset DAG (runs after first_asset)
@asset(
    schedule=first_asset,       # trigger when first_asset asset is updated
)
def second_asset(context):
    """
    This asset reads the XCom (return value) from first_asset.
    """
    ti = context["ti"]  # TaskInstance

    upstream_data = ti.xcom_pull(
        dag_id="first_asset",
        task_ids="first_asset",
        key="return_value",
        include_prior_dates=True,
    )

    print(f"[second_asset] got data from first_asset: {upstream_data}")

    # Do something with the data
    result = {k: v * 10 for k, v in upstream_data.items()}
    return result


# 3️⃣ Third asset DAG (runs in parallel with second_asset)
@asset(
    schedule=first_asset,       # same schedule as second_asset → parallel
)
def third_asset(context):
    """
    This asset also uses the return value from first_asset,
    but does something different.
    """
    ti = context["ti"]

    upstream_data = ti.xcom_pull(
        dag_id="first_asset",
        task_ids="first_asset",
        key="return_value",
        include_prior_dates=True,
    )

    print(f"[third_asset] got data from first_asset: {upstream_data}")

    # Different processing logic
    summed = sum(upstream_data.values())
    return {"sum": summed}

 

 

그리고 현재 비슷한 asset이 두 개 있는데 이거를 asset.multi를 사용해서 합칠 수 있다.

from airflow.sdk import asset, Asset


# 1️⃣ Define the logical assets
first_asset = Asset("first_asset")
second_asset = Asset("second_asset")
third_asset = Asset("third_asset")


# 2️⃣ First asset producer
@asset(schedule="@daily", outlets=[first_asset])
def build_first_asset():
    """
    Produce the first asset.
    NOTE: In the asset API, the returned Python value is *not* what downstream receives.
    The real asset content must be written to the storage the Asset represents.
    """
    data = {"a": 1, "b": 2}
    # ⬇ 여기에서 실제로 데이터를 저장해야 함 (예: S3, DB, local file 등)
    return data  # 단순 예시 (XCom에는 저장되지만 자산 데이터는 아님)


# 3️⃣ Multi-asset: second_asset, third_asset
@asset.multi(
    schedule=None,
    outlets=[second_asset, third_asset],   # 공식 문서 기준 정확한 사용법
)
def combined_assets(first_asset):
    """
    This task runs AFTER first_asset because it receives first_asset as a parameter.
    It splits first_asset into second_asset and third_asset.
    """

    # ⚠ 중요:
    # 이 first_asset 인자는 자산의 실제 내용을 나타냅니다.
    # 자산 → 파일/테이블/S3 등 저장소에서 직접 데이터를 읽어야 합니다.
    #
    # 여기서는 데모를 위해 "추측"으로 데이터를 직접 생성합니다.
    upstream_data = {"a": 1, "b": 2}  # 실제로는 first_asset 저장소에서 읽어야 함

    # logic for each new asset
    second_result = {k: v * 10 for k, v in upstream_data.items()}
    third_result = {"sum": sum(upstream_data.values())}

    # ⬇ 여기서 second_asset, third_asset 실제 저장 위치에 데이터를 저장해야 함
    # (예: write_to_s3(second_asset, second_result))

    # multi-asset에서는 return 값을 사용하지 않는 것이 정상입니다.
    # 데이터는 outlets 에 대응하는 저장소에 직접 써야 합니다.

 

 

 

 

728x90
반응형
Comments