티스토리 뷰

Airflow 학습 로드맵: 상세 다이어그램과 실습 가이드

1. 기본 DAG 생성 및 실행
개념 다이어그램

[Airflow Scheduler] → [DAG 파싱] → [DAG Bag] → [실행 가능 Task] → [Executor] → [작업 실행]
       ↑                   ↑           ↑             ↑
[Web Server] ←──── [Metadata DB] ← [실행 상태] ← [Task 실행 결과]

상세 설명
DAG는 작업들의 흐름을 정의하는 Directed Acyclic Graph. 각 Task는 Operator로 구현되며, 의존성에 따라 순차적으로 실행.

실습 코드```python

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

 기본 DAG 구조
with DAG(
    'my_first_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5)
    }
) as dag:

    def start_process():
        print("프로세스 시작")
        return "시작 완료"

    start_task = PythonOperator(
        task_id='start_process',
        python_callable=start_process
    )

    data_task = BashOperator(
        task_id='download_data',
        bash_command='echo "데이터 다운로드 중..." && sleep 2'
    )

    def finish_process():
        print("모든 작업 완료")
        return "완료"

    end_task = PythonOperator(
        task_id='finish_process',
        python_callable=finish_process
    )

     작업 흐름 정의
    start_task >> data_task >> end_task


실행 방법```bash

DAG 파일을 ~/airflow/dags/에 저장
cp my_first_dag.py ~/airflow/dags/

 Airflow 웹서버 실행
airflow webserver --port 8080

 스케줄러 실행
airflow scheduler

 수동으로 DAG 실행
airflow dags trigger my_first_dag


 2. 다양한 Operator 사용해보기
Operator 종류 다이어그램

[Operator 계층 구조]
├── BaseOperator
│   ├── BashOperator (쉘 명령어 실행)
│   ├── PythonOperator (Python 함수 실행)
│   ├── EmailOperator (이메일 전송)
│   ├── FileSensor (파일 감지)
│   ├── TimeSensor (시간 감지)
│   ├── HttpSensor (HTTP 응답 감지)
│   └── Database Operators
│       ├── PostgresOperator
│       ├── MySqlOperator
│       └── BigQueryOperator

실습 코드```python

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor

def process_data():
    import pandas as pd
     데이터 처리 로직
    data = {'col1': [1, 2, 3], 'col2': ['A', 'B', 'C']}
    df = pd.DataFrame(data)
    df.to_csv('/tmp/processed_data.csv', index=False)
    return "데이터 처리 완료"

 다양한 Operator 사용 예제
file_sensor = FileSensor(
    task_id='wait_for_input',
    filepath='/tmp/input_data.csv',
    poke_interval=30,
    timeout=300,
    mode='poke'
)

wait_task = TimeDeltaSensor(
    task_id='wait_5_seconds',
    delta=timedelta(seconds=5)
)

bash_task = BashOperator(
    task_id='cleanup_files',
    bash_command='rm -f /tmp/temp_*.csv && echo "정리 완료"'
)

python_task = PythonOperator(
    task_id='process_data_task',
    python_callable=process_data
)

email_task = EmailOperator(
    task_id='send_notification',
    to='team@company.com',
    subject='작업 완료 알림',
    html_content='<p>데이터 처리 작업이 완료되었습니다.</p>'
)

 작업 흐름
file_sensor >> wait_task >> python_task >> bash_task >> email_task


 3. XCom을 이용한 데이터 전달
XCom 데이터 흐름 다이어그램

[Task A] → [XCom Push] → [Metadata DB] → [XCom Pull] → [Task B]
   ↓           ↓             ↓               ↓           ↓
데이터 생성   데이터 저장   중간 저장소   데이터 조회   데이터 사용

실습 코드```python

def extract_data(kwargs):
    """데이터 추출 및 XCom 저장"""
    sample_data = [
        {'id': 1, 'name': 'John', 'value': 100},
        {'id': 2, 'name': 'Jane', 'value': 200},
        {'id': 3, 'name': 'Doe', 'value': 300}
    ]
    
     여러 가지 방식으로 XCom에 데이터 저장
    kwargs['ti'].xcom_push(key='raw_data', value=sample_data)
    kwargs['ti'].xcom_push(key='data_count', value=len(sample_data))
    
    return "데이터 추출 완료"

def transform_data(kwargs):
    """데이터 변환 - XCom에서 데이터 가져오기"""
    ti = kwargs['ti']
    
     다양한 방식으로 XCom 데이터 가져오기
    raw_data = ti.xcom_pull(key='raw_data', task_ids='extract_data')
    data_count = ti.xcom_pull(key='data_count', task_ids='extract_data')
    
    print(f"📊 처리할 데이터: {data_count}건")
    
     데이터 변환
    transformed_data = []
    for item in raw_data:
        transformed_item = {
            'user_id': item['id'],
            'user_name': item['name'].upper(),
            'score': item['value'] * 1.1   10% 증가
        }
        transformed_data.append(transformed_item)
    
     변환된 데이터 저장
    ti.xcom_push(key='transformed_data', value=transformed_data)
    return "데이터 변환 완료"

def load_data(kwargs):
    """데이터 적재 - 최종 결과 사용"""
    ti = kwargs['ti']
    final_data = ti.xcom_pull(key='transformed_data', task_ids='transform_data')
    
    print("최종 데이터:")
    for item in final_data:
        print(f"  - {item['user_name']}: {item['score']:.1f}점")
    
     데이터베이스나 파일에 저장하는 로직
    return "데이터 적재 완료"

 Task 정의
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    provide_context=True
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    provide_context=True
)

extract_task >> transform_task >> load_task

 4. Branching 구현
Branching 흐름 다이어그램

[Start Task] → [Branch Operator] → [Condition Check]
                                      ↓
             [True] → [Task A] → [Join Task] → [End Task]
             [False] → [Task B] → [Join Task]

실습 코드```python

from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

def decide_branch(kwargs):
    """분기 결정 로직"""
    ti = kwargs['ti']
    data_count = ti.xcom_pull(key='data_count', task_ids='extract_data')
    
     데이터 양에 따라 다른 분기 선택
    if data_count > 100:
        print("📈 대량 데이터 - 상세 분석 진행")
        return 'detailed_analysis'
    elif data_count > 0:
        print("📊 일반 데이터 - 기본 분석 진행")
        return 'basic_analysis'
    else:
        print("⚠️ 데이터 없음 - 알림 전송")
        return 'send_alert'

def detailed_analysis(kwargs):
    """상세 분석"""
    print("🔍 상세 분석 수행 중...")
    return "상세 분석 완료"

def basic_analysis(kwargs):
    """기본 분석"""
    print("📋 기본 분석 수행 중...")
    return "기본 분석 완료"

def send_alert(kwargs):
    """알림 전송"""
    print("🚨 데이터 없음 알림 전송")
    return "알림 전송 완료"

 Branching Task 정의
branch_task = BranchPythonOperator(
    task_id='decide_branch',
    python_callable=decide_branch,
    provide_context=True
)

 분기 Task들
detailed_task = PythonOperator(
    task_id='detailed_analysis',
    python_callable=detailed_analysis
)

basic_task = PythonOperator(
    task_id='basic_analysis',
    python_callable=basic_analysis
)

alert_task = PythonOperator(
    task_id='send_alert',
    python_callable=send_alert
)

 합쳐지는 지점 (모든 분기가 여기로 모임)
join_task = DummyOperator(
    task_id='join_branches',
    trigger_rule='none_failed_or_skipped'
)

end_task = DummyOperator(task_id='end_process')

 작업 흐름 정의
extract_task >> branch_task
branch_task >> detailed_task >> join_task
branch_task >> basic_task >> join_task
branch_task >> alert_task >> join_task
join_task >> end_task


 5. Variables 사용
Variables 구조 다이어그램

[Airflow Variables]
├── Environment Variables
├── Airflow UI에서 설정한 Variables
├── Code에서 동적으로 설정한 Variables
└── Secret Backend Variables

실습 코드```python

from airflow.models import Variable

def use_variables(kwargs):
    """Variables 사용 예제"""
    
     1. 기본 Variables 사용
    api_url = Variable.get("API_ENDPOINT", default_var="https://api.default.com")
    database_url = Variable.get("DATABASE_URL")
    
     2. JSON Variables 파싱
    config_str = Variable.get("APP_CONFIG", "{}")
    config = json.loads(config_str)
    
     3. Variable 설정 (주의: 일반적으로 UI에서 설정)
     Variable.set("TEMP_VALUE", "임시값")
    
    print(f"API URL: {api_url}")
    print(f"DB URL: {database_url}")
    print(f"Config: {config}")
    
    return "Variables 사용 완료"

 Variables를 사용하는 Task
config_task = PythonOperator(
    task_id='use_config_variables',
    python_callable=use_variables,
    provide_context=True
)

 UI에서 Variables 설정 방법:
 Admin → Variables → "+" 버튼 클릭
 Key: API_ENDPOINT, Value: https://api.example.com/data
 Key: DATABASE_URL, Value: postgresql://user:pass@localhost/db
 Key: APP_CONFIG, Value: {"timeout": 30, "retries": 3}

6. 실전 프로젝트 완성
ETL 파이프라인 아키텍처

[External API] → [Extract Task] → [Raw Data] → [Transform Task]
     ↓               ↓                  ↓           ↓
[데이터 수집]   [XCom 저장]      [임시 저장]   [데이터 가공]
     ↓               ↓                  ↓           ↓
[Load Task] → [Database] → [Monitoring] → [Alert System]

종합 실전 프로젝트```python

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
from datetime import datetime, timedelta
import pandas as pd
import requests
import json

class ETLPipeline:
    """완전한 ETL 파이프라인 클래스"""
    
    def __init__(self):
        self.config = self.load_config()
    
    def load_config(self):
        """설정 로드"""
        return {
            'api_url': Variable.get("WEATHER_API_URL"),
            'api_key': Variable.get("WEATHER_API_KEY"),
            'db_conn_id': 'weather_db',
            'cities': ['Seoul', 'Busan', 'Jeju']
        }
    
    def extract_weather_data(self, kwargs):
        """날씨 데이터 추출"""
        execution_date = kwargs['execution_date']
        date_str = execution_date.strftime('%Y-%m-%d')
        
        all_data = []
        for city in self.config['cities']:
            url = f"{self.config['api_url']}?city={city}&date={date_str}&key={self.config['api_key']}"
            response = requests.get(url, timeout=30)
            
            if response.status_code == 200:
                city_data = response.json()
                city_data['city'] = city
                city_data['extract_date'] = date_str
                all_data.append(city_data)
            else:
                print(f"⚠️ {city} 데이터 추출 실패: {response.status_code}")
        
        kwargs['ti'].xcom_push(key='raw_weather_data', value=all_data)
        return f"{len(all_data)}개 도시 데이터 추출 완료"
    
    def transform_weather_data(self, kwargs):
        """날씨 데이터 변환"""
        ti = kwargs['ti']
        raw_data = ti.xcom_pull(key='raw_weather_data', task_ids='extract_weather_data')
        
        transformed_data = []
        for city_data in raw_data:
            transformed = {
                'city': city_data['city'],
                'date': city_data['extract_date'],
                'temperature': city_data.get('main', {}).get('temp', 0),
                'humidity': city_data.get('main', {}).get('humidity', 0),
                'weather': city_data.get('weather', [{}])[0].get('main', 'Unknown'),
                'description': city_data.get('weather', [{}])[0].get('description', 'Unknown'),
                'wind_speed': city_data.get('wind', {}).get('speed', 0),
                'processed_at': datetime.now().isoformat()
            }
            transformed_data.append(transformed)
        
        ti.xcom_push(key='transformed_weather_data', value=transformed_data)
        return "데이터 변환 완료"
    
    def load_to_database(self, kwargs):
        """데이터베이스 적재"""
        ti = kwargs['ti']
        weather_data = ti.xcom_pull(key='transformed_weather_data', task_ids='transform_weather_data')
        
        pg_hook = PostgresHook(postgres_conn_id=self.config['db_conn_id'])
        
        for data in weather_data:
            insert_query = """
                INSERT INTO weather_data 
                (city, date, temperature, humidity, weather, description, wind_speed, processed_at)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (city, date) 
                DO UPDATE SET
                    temperature = EXCLUDED.temperature,
                    humidity = EXCLUDED.humidity,
                    weather = EXCLUDED.weather,
                    description = EXCLUDED.description,
                    wind_speed = EXCLUDED.wind_speed,
                    processed_at = EXCLUDED.processed_at
            """
            pg_hook.run(insert_query, parameters=(
                data['city'], data['date'], data['temperature'], 
                data['humidity'], data['weather'], data['description'],
                data['wind_speed'], data['processed_at']
            ))
        
        return f"{len(weather_data)}건 데이터베이스 적재 완료"
    
    def generate_daily_report(self, kwargs):
        """일일 리포트 생성"""
        ti = kwargs['ti']
        weather_data = ti.xcom_pull(key='transformed_weather_data', task_ids='transform_weather_data')
        
        df = pd.DataFrame(weather_data)
        report = {
            'total_records': len(df),
            'avg_temperature': df['temperature'].mean(),
            'max_temperature': df['temperature'].max(),
            'min_temperature': df['temperature'].min(),
            'cities_covered': df['city'].nunique()
        }
        
        print("📊 일일 날씨 리포트:")
        for key, value in report.items():
            print(f"  {key}: {value}")
        
        return "리포트 생성 완료"

 DAG 설정
with DAG(
    'weather_etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='0 6 * * *',   매일 오전 6시
    catchup=True,
    default_args={
        'retries': 3,
        'retry_delay': timedelta(minutes=10),
        'email_on_failure': True
    }
) as dag:

    pipeline = ETLPipeline()
    
    extract_task = PythonOperator(
        task_id='extract_weather_data',
        python_callable=pipeline.extract_weather_data,
        provide_context=True
    )
    
    transform_task = PythonOperator(
        task_id='transform_weather_data',
        python_callable=pipeline.transform_weather_data,
        provide_context=True
    )
    
    load_task = PythonOperator(
        task_id='load_to_database',
        python_callable=pipeline.load_to_database,
        provide_context=True
    )
    
    report_task = PythonOperator(
        task_id='generate_daily_report',
        python_callable=pipeline.generate_daily_report,
        provide_context=True
    )
    
     작업 흐름
    extract_task >> transform_task >> load_task >> report_task

7. 모니터링과 문제 해결
모니터링 체계 다이어그램

[Airflow UI] → [DAG Runs] → [Task Instances] → [Logs] → [Metrics]
     ↓              ↓             ↓             ↓          ↓
[실행 현황]   [실행 기록]   [작업 상태]   [로그 분석]   [성능 지표]
     ↓              ↓             ↓             ↓          ↓
[Alert System] → [Email] → [Slack] → [PagerDuty] → [Monitoring Tools]

모니터링과 디버깅 코드```python

def monitor_dag_execution(kwargs):
    """DAG 실행 모니터링"""
    ti = kwargs['ti']
    dag_run = kwargs['dag_run']
    
    print(f"🔍 DAG 실행 모니터링:")
    print(f"   DAG ID: {dag_run.dag_id}")
    print(f"   실행 ID: {dag_run.run_id}")
    print(f"   실행 시간: {dag_run.execution_date}")
    print(f"   상태: {dag_run.state}")
    
     추가 모니터링 정보
    task_instances = dag_run.get_task_instances()
    success_count = sum(1 for ti in task_instances if ti.state == 'success')
    failed_count = sum(1 for ti in task_instances if ti.state == 'failed')
    
    print(f"   성공 작업: {success_count}")
    print(f"   실패 작업: {failed_count}")
    
    return "모니터링 완료"

def error_handling(kwargs):
    """에러 처리 및 복구"""
    try:
         정상 작업 수행
        result = some_risky_operation()
        return result
        
    except Exception as e:
        print(f"❌ 에러 발생: {str(e)}")
        
         에러 정보 기록
        error_info = {
            'error_type': type(e).__name__,
            'error_message': str(e),
            'task_id': kwargs['ti'].task_id,
            'execution_date': kwargs['execution_date'].isoformat(),
            'timestamp': datetime.now().isoformat()
        }
        
         에러 로그 저장
        with open('/tmp/error_log.json', 'a') as f:
            f.write(json.dumps(error_info) + '\n')
        
         재시도 또는 대체 작업 수행
        return fallback_operation()

def send_alert_on_failure(context):
    """실패 시 알림 전송"""
    ti = context['ti']
    dag_run = context['dag_run']
    
    alert_message = f"""
    🚨 Airflow 작업 실패 알림
    ========================
    DAG: {dag_run.dag_id}
    Task: {ti.task_id}
    실행 시간: {dag_run.execution_date}
    에러: {str(context['exception'])}
    로그: {ti.log_url}
    """
    
    print(alert_message)
     실제로는 이메일이나 슬랙으로 알림 전송

 모니터링 Task 추가
monitor_task = PythonOperator(
    task_id='monitor_execution',
    python_callable=monitor_dag_execution,
    provide_context=True,
    trigger_rule='all_done'   성공/실패 관계없이 실행
)

 에러 핸들링 설정
error_prone_task = PythonOperator(
    task_id='error_prone_operation',
    python_callable=error_handling,
    provide_context=True,
    on_failure_callback=send_alert_on_failure,
    retries=2,
    retry_delay=timedelta(minutes=5)
)

 최종 작업 흐름
extract_task >> transform_task >> load_task >> report_task >> monitor_task


체계적으로 학습하면 Airflow의 모든 주요 기능을 실무에서 활용할 수 있다. 각 단계별로 코드를 직접 실행하고 수정해보면서 이해하는 것이 가장 중요.

 

1단계: Airflow의 핵심 개념 완벽 이해

 DAG(Directed Acyclic Graph)란 무엇인가?
실제 예시로 이해하기: DAG는 방향성이 있고 순환하지 않는 그래프로, 작업들의 흐름을 정의.

예를 들어, 매일 아침 출근 준비 과정을 DAG로 표현.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def wake_up():
    print("6:30 AM - 기상")
    return "기상 완료"

def take_shower():
    print("6:40 AM - 샤워")
    return "샤워 완료"

def get_dressed():
    print("6:50 AM - 옷 입기")
    return "옷 입기 완료"

def eat_breakfast():
    print("7:00 AM - 아침 식사")
    return "아침 식사 완료"

def leave_home():
    print("7:30 AM - 출발")
    return "출근 완료"

 DAG 정의
with DAG(
    'morning_routine',
    start_date=datetime(2024, 1, 1),
    schedule_interval='0 6 * * 1-5',   월-금 오전 6시
    catchup=False
) as dag:
    
    wake_up_task = PythonOperator(
        task_id='wake_up',
        python_callable=wake_up
    )
    
    shower_task = PythonOperator(
        task_id='take_shower',
        python_callable=take_shower
    )
    
    dress_task = PythonOperator(
        task_id='get_dressed',
        python_callable=get_dressed
    )
    
    breakfast_task = PythonOperator(
        task_id='eat_breakfast',
        python_callable=eat_breakfast
    )
    
    leave_task = PythonOperator(
        task_id='leave_home',
        python_callable=leave_home
    )

     작업 순서 정의: 기상 → 샤워 → 옷입기 → 아침식사 → 출발
    wake_up_task >> shower_task >> dress_task >> breakfast_task >> leave_task


실행 결과:

6:30 AM - 기상
6:40 AM - 샤워  
6:50 AM - 옷 입기
7:00 AM - 아침 식사
7:30 AM - 출발


Operator의 역할과 종류

Operator는 실제 작업을 수행하는 단위입니다. 다양한 Operator를 상황에 맞게 사용

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.sensors.filesystem import FileSensor

 파일 처리 예제
def process_data():
    import pandas as pd
     데이터 처리 로직
    df = pd.read_csv('/data/input.csv')
    processed_df = df.groupby('category').sum()
    processed_df.to_csv('/data/output.csv')
    return "데이터 처리 완료"

 다양한 Operator 사용 예제
with DAG('operator_examples', ...) as dag:
    
     1. BashOperator: 쉘 명령어 실행
    download_data = BashOperator(
        task_id='download_data',
        bash_command='curl -o /data/input.csv https://example.com/data.csv'
    )
    
     2. FileSensor: 파일 존재 확인 (30초마다 체크, 최대 5분 대기)
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/data/input.csv',
        poke_interval=30,
        timeout=300
    )
    
     3. PythonOperator: Python 함수 실행
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data
    )
    
     4. EmailOperator: 이메일 전송
    send_report = EmailOperator(
        task_id='send_report',
        to='team@company.com',
        subject='데이터 처리 완료',
        html_content='<p>일일 데이터 처리가 완료되었습니다.</p>'
    )
    
    download_data >> wait_for_file >> process_task >> send_report

2단계: Task 간 데이터 공유와 의존성 관리

XCom을 이용한 데이터 전달

XCom(Cross-communication)은 Task 간에 작은 데이터를 주고받을 때 사용

def extract_user_data(kwargs):
    """가상의 API에서 사용자 데이터 추출"""
    users = [
        {'id': 1, 'name': '김철수', 'age': 25, 'city': '서울'},
        {'id': 2, 'name': '이영희', 'age': 30, 'city': '부산'},
        {'id': 3, 'name': '박민수', 'age': 35, 'city': '대구'}
    ]
    
     XCom에 데이터 저장
    kwargs['ti'].xcom_push(key='raw_users', value=users)
    return f"{len(users)}명의 사용자 데이터 추출 완료"

def process_user_data(kwargs):
    """사용자 데이터 처리"""
    ti = kwargs['ti']
    
     이전 Task에서 데이터 가져오기
    raw_users = ti.xcom_pull(key='raw_users', task_ids='extract_user_data')
    
    print("=== 원본 데이터 ===")
    for user in raw_users:
        print(user)
    
     데이터 처리: 30세 이상만 필터링
    filtered_users = [user for user in raw_users if user['age'] >= 30]
    
     처리 결과 저장
    ti.xcom_push(key='processed_users', value=filtered_users)
    return f"{len(filtered_users)}명 필터링 완료"

def generate_report(kwargs):
    """최종 리포트 생성"""
    ti = kwargs['ti']
    processed_users = ti.xcom_pull(key='processed_users', task_ids='process_user_data')
    
    print("=== 최종 리포트 ===")
    print("30세 이상 사용자 목록:")
    for user in processed_users:
        print(f"- {user['name']} ({user['age']}세, {user['city']})")
    
    return "리포트 생성 완료"

 DAG 설정
with DAG('user_data_pipeline', ...) as dag:
    
    extract = PythonOperator(
        task_id='extract_user_data',
        python_callable=extract_user_data,
        provide_context=True   kwargs에 context 정보 전달
    )
    
    process = PythonOperator(
        task_id='process_user_data',
        python_callable=process_user_data,
        provide_context=True
    )
    
    report = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report,
        provide_context=True
    )
    
    extract >> process >> report


실행 결과:

=== 원본 데이터 ===
{'id': 1, 'name': '김철수', 'age': 25, 'city': '서울'}
{'id': 2, 'name': '이영희', 'age': 30, 'city': '부산'}  
{'id': 3, 'name': '박민수', 'age': 35, 'city': '대구'}

=== 최종 리포트 ===
30세 이상 사용자 목록:
- 이영희 (30세, 부산)
- 박민수 (35세, 대구)

 

3단계: 조건부 실행과 에러 처리

 Branching과 조건부 작업 흐름

from airflow.operators.python_operator import BranchPythonOperator

def check_data_quality(kwargs):
    """데이터 품질 검사"""
    ti = kwargs['ti']
    users = ti.xcom_pull(key='processed_users', task_ids='process_user_data')
    
     간단한 품질 검사: 데이터가 있고, 최소 1명 이상인지 확인
    if users and len(users) > 0:
        print("데이터 품질 양호")
        return 'load_to_database'   다음 task로 이동
    else:
        print("데이터 품질 문제 발견")
        return 'send_quality_alert'   알림 task로 이동

def load_to_database(kwargs):
    """데이터베이스에 적재"""
    ti = kwargs['ti']
    users = ti.xcom_pull(key='processed_users', task_ids='process_user_data')
    
     실제로는 데이터베이스 연결 코드가 들어갑니다
    print(f"📊 {len(users)}명의 데이터를 데이터베이스에 적재")
    return "데이터베이스 적재 완료"

def send_quality_alert(kwargs):
    """품질 문제 알림"""
    print("데이터 품질 문제로 관리자에게 알림 전송")
     실제로는 이메일이나 슬랙 알림이 전송됩니다
    return "품질 알림 전송 완료"

def finalize_process(kwargs):
    """최종 처리"""
    print("모든 프로세스 완료")
    return "파이프라인 완료"

with DAG('quality_control_pipeline', ...) as dag:
    
     ... extract, process tasks 생략 ...
    
    quality_check = BranchPythonOperator(
        task_id='check_data_quality',
        python_callable=check_data_quality,
        provide_context=True
    )
    
    load_db = PythonOperator(
        task_id='load_to_database',
        python_callable=load_to_database,
        provide_context=True
    )
    
    send_alert = PythonOperator(
        task_id='send_quality_alert',
        python_callable=send_quality_alert
    )
    
    finalize = PythonOperator(
        task_id='finalize_process',
        python_callable=finalize_process
    )
    
     의존성 설정: quality_check 결과에 따라 load_db 또는 send_alert 실행
    process >> quality_check
    quality_check >> load_db >> finalize
    quality_check >> send_alert >> finalize


4단계: 실전 데이터 파이프라인 프로젝트

종합 예제: 일일 판매 데이터 처리 파이프라인

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
import pandas as pd
import requests

def download_daily_sales(kwargs):
    """일일 판매 데이터 다운로드"""
    execution_date = kwargs['execution_date']
    date_str = execution_date.strftime('%Y-%m-%d')
    
     가상의 API에서 데이터 다운로드
    url = f"https://api.sales.com/data?date={date_str}"
    response = requests.get(url)
    
    if response.status_code == 200:
        data = response.json()
        filename = f"/data/sales_{date_str}.json"
        
        with open(filename, 'w') as f:
            json.dump(data, f)
        
        kwargs['ti'].xcom_push(key='sales_file', value=filename)
        return f"{len(data)}건의 판매 데이터 다운로드 완료"
    else:
        raise Exception(f"데이터 다운로드 실패: {response.status_code}")

def validate_sales_data(kwargs):
    """판매 데이터 유효성 검사"""
    ti = kwargs['ti']
    filename = ti.xcom_pull(key='sales_file', task_ids='download_daily_sales')
    
    with open(filename, 'r') as f:
        sales_data = json.load(f)
    
     기본 유효성 검사
    required_fields = ['sale_id', 'product_id', 'amount', 'sale_date']
    valid_records = []
    invalid_records = []
    
    for record in sales_data:
        if all(field in record for field in required_fields):
            valid_records.append(record)
        else:
            invalid_records.append(record)
    
    print(f"유효한 레코드: {len(valid_records)}건")
    print(f"무효한 레코드: {len(invalid_records)}건")
    
    if invalid_records:
        print("무효한 레코드 샘플:")
        for record in invalid_records[:3]:
            print(record)
    
    ti.xcom_push(key='valid_sales', value=valid_records)
    ti.xcom_push(key='invalid_count', value=len(invalid_records))
    
    return f"유효성 검사 완료 ({len(valid_records)}건 유효)"

def calculate_daily_stats(kwargs):
    """일일 통계 계산"""
    ti = kwargs['ti']
    valid_sales = ti.xcom_pull(key='valid_sales', task_ids='validate_sales_data')
    
    df = pd.DataFrame(valid_sales)
    
     다양한 통계 계산
    daily_stats = {
        'total_sales': len(valid_sales),
        'total_amount': df['amount'].sum(),
        'avg_amount': df['amount'].mean(),
        'max_amount': df['amount'].max(),
        'min_amount': df['amount'].min(),
        'top_products': df['product_id'].value_counts().head(5).to_dict()
    }
    
    ti.xcom_push(key='daily_stats', value=daily_stats)
    return "통계 계산 완료"

def load_to_data_warehouse(kwargs):
    """데이터 웨어하우스에 적재"""
    ti = kwargs['ti']
    valid_sales = ti.xcom_pull(key='valid_sales', task_ids='validate_sales_data')
    daily_stats = ti.xcom_pull(key='daily_stats', task_ids='calculate_daily_stats')
    
     PostgreSQL 연결
    pg_hook = PostgresHook(postgres_conn_id='data_warehouse')
    
     판매 데이터 적재
    for sale in valid_sales:
        insert_query = """
            INSERT INTO sales (sale_id, product_id, amount, sale_date)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (sale_id) DO NOTHING
        """
        pg_hook.run(insert_query, parameters=(
            sale['sale_id'], sale['product_id'], sale['amount'], sale['sale_date']
        ))
    
     통계 데이터 적재
    stats_query = """
        INSERT INTO daily_stats (stat_date, total_sales, total_amount, 
                               avg_amount, max_amount, min_amount)
        VALUES (%s, %s, %s, %s, %s, %s)
    """
    execution_date = kwargs['execution_date']
    pg_hook.run(stats_query, parameters=(
        execution_date.date(),
        daily_stats['total_sales'],
        daily_stats['total_amount'],
        daily_stats['avg_amount'],
        daily_stats['max_amount'],
        daily_stats['min_amount']
    ))
    
    return "데이터 웨어하우스 적재 완료"

def generate_daily_report(kwargs):
    """일일 리포트 생성"""
    ti = kwargs['ti']
    daily_stats = ti.xcom_pull(key='daily_stats', task_ids='calculate_daily_stats')
    invalid_count = ti.xcom_pull(key='invalid_count', task_ids='validate_sales_data')
    
    report_content = f"""
    📊 일일 판매 리포트
    ====================
    • 총 판매건수: {daily_stats['total_sales']:,}건
    • 총 판매금액: {daily_stats['total_amount']:,.0f}원
    • 평균 거래액: {daily_stats['avg_amount']:,.0f}원
    • 최대 거래액: {daily_stats['max_amount']:,.0f}원
    • 최소 거래액: {daily_stats['min_amount']:,.0f}원
    • 무효 데이터: {invalid_count}건
    """
    
    print(report_content)
    
     리포트 파일로 저장
    execution_date = kwargs['execution_date']
    report_filename = f"/reports/sales_report_{execution_date.strftime('%Y%m%d')}.txt"
    
    with open(report_filename, 'w') as f:
        f.write(report_content)
    
    return "일일 리포트 생성 완료"

 메인 DAG 정의
with DAG(
    'daily_sales_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='0 1 * * *',   매일 오전 1시
    catchup=True,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'email_on_failure': True
    }
) as dag:

    download_task = PythonOperator(
        task_id='download_daily_sales',
        python_callable=download_daily_sales,
        provide_context=True
    )
    
    validate_task = PythonOperator(
        task_id='validate_sales_data',
        python_callable=validate_sales_data,
        provide_context=True
    )
    
    stats_task = PythonOperator(
        task_id='calculate_daily_stats',
        python_callable=calculate_daily_stats,
        provide_context=True
    )
    
    load_task = PythonOperator(
        task_id='load_to_data_warehouse',
        python_callable=load_to_data_warehouse,
        provide_context=True
    )
    
    report_task = PythonOperator(
        task_id='generate_daily_report',
        python_callable=generate_daily_report,
        provide_context=True
    )
    
     작업 흐름 정의
    download_task >> validate_task >> [stats_task, load_task]
    stats_task >> report_task


끝.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG more
«   2026/02   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
글 보관함