티스토리 뷰
Airflow 학습 로드맵: 상세 다이어그램과 실습 가이드
1. 기본 DAG 생성 및 실행
개념 다이어그램
[Airflow Scheduler] → [DAG 파싱] → [DAG Bag] → [실행 가능 Task] → [Executor] → [작업 실행]
↑ ↑ ↑ ↑
[Web Server] ←──── [Metadata DB] ← [실행 상태] ← [Task 실행 결과]
상세 설명
DAG는 작업들의 흐름을 정의하는 Directed Acyclic Graph. 각 Task는 Operator로 구현되며, 의존성에 따라 순차적으로 실행.
실습 코드```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
기본 DAG 구조
with DAG(
'my_first_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
) as dag:
def start_process():
print("프로세스 시작")
return "시작 완료"
start_task = PythonOperator(
task_id='start_process',
python_callable=start_process
)
data_task = BashOperator(
task_id='download_data',
bash_command='echo "데이터 다운로드 중..." && sleep 2'
)
def finish_process():
print("모든 작업 완료")
return "완료"
end_task = PythonOperator(
task_id='finish_process',
python_callable=finish_process
)
작업 흐름 정의
start_task >> data_task >> end_task
실행 방법```bash
DAG 파일을 ~/airflow/dags/에 저장
cp my_first_dag.py ~/airflow/dags/
Airflow 웹서버 실행
airflow webserver --port 8080
스케줄러 실행
airflow scheduler
수동으로 DAG 실행
airflow dags trigger my_first_dag
2. 다양한 Operator 사용해보기
Operator 종류 다이어그램
[Operator 계층 구조]
├── BaseOperator
│ ├── BashOperator (쉘 명령어 실행)
│ ├── PythonOperator (Python 함수 실행)
│ ├── EmailOperator (이메일 전송)
│ ├── FileSensor (파일 감지)
│ ├── TimeSensor (시간 감지)
│ ├── HttpSensor (HTTP 응답 감지)
│ └── Database Operators
│ ├── PostgresOperator
│ ├── MySqlOperator
│ └── BigQueryOperator
실습 코드```python
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_delta import TimeDeltaSensor
def process_data():
import pandas as pd
데이터 처리 로직
data = {'col1': [1, 2, 3], 'col2': ['A', 'B', 'C']}
df = pd.DataFrame(data)
df.to_csv('/tmp/processed_data.csv', index=False)
return "데이터 처리 완료"
다양한 Operator 사용 예제
file_sensor = FileSensor(
task_id='wait_for_input',
filepath='/tmp/input_data.csv',
poke_interval=30,
timeout=300,
mode='poke'
)
wait_task = TimeDeltaSensor(
task_id='wait_5_seconds',
delta=timedelta(seconds=5)
)
bash_task = BashOperator(
task_id='cleanup_files',
bash_command='rm -f /tmp/temp_*.csv && echo "정리 완료"'
)
python_task = PythonOperator(
task_id='process_data_task',
python_callable=process_data
)
email_task = EmailOperator(
task_id='send_notification',
to='team@company.com',
subject='작업 완료 알림',
html_content='<p>데이터 처리 작업이 완료되었습니다.</p>'
)
작업 흐름
file_sensor >> wait_task >> python_task >> bash_task >> email_task
3. XCom을 이용한 데이터 전달
XCom 데이터 흐름 다이어그램
[Task A] → [XCom Push] → [Metadata DB] → [XCom Pull] → [Task B]
↓ ↓ ↓ ↓ ↓
데이터 생성 데이터 저장 중간 저장소 데이터 조회 데이터 사용
실습 코드```python
def extract_data(kwargs):
"""데이터 추출 및 XCom 저장"""
sample_data = [
{'id': 1, 'name': 'John', 'value': 100},
{'id': 2, 'name': 'Jane', 'value': 200},
{'id': 3, 'name': 'Doe', 'value': 300}
]
여러 가지 방식으로 XCom에 데이터 저장
kwargs['ti'].xcom_push(key='raw_data', value=sample_data)
kwargs['ti'].xcom_push(key='data_count', value=len(sample_data))
return "데이터 추출 완료"
def transform_data(kwargs):
"""데이터 변환 - XCom에서 데이터 가져오기"""
ti = kwargs['ti']
다양한 방식으로 XCom 데이터 가져오기
raw_data = ti.xcom_pull(key='raw_data', task_ids='extract_data')
data_count = ti.xcom_pull(key='data_count', task_ids='extract_data')
print(f"📊 처리할 데이터: {data_count}건")
데이터 변환
transformed_data = []
for item in raw_data:
transformed_item = {
'user_id': item['id'],
'user_name': item['name'].upper(),
'score': item['value'] * 1.1 10% 증가
}
transformed_data.append(transformed_item)
변환된 데이터 저장
ti.xcom_push(key='transformed_data', value=transformed_data)
return "데이터 변환 완료"
def load_data(kwargs):
"""데이터 적재 - 최종 결과 사용"""
ti = kwargs['ti']
final_data = ti.xcom_pull(key='transformed_data', task_ids='transform_data')
print("최종 데이터:")
for item in final_data:
print(f" - {item['user_name']}: {item['score']:.1f}점")
데이터베이스나 파일에 저장하는 로직
return "데이터 적재 완료"
Task 정의
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
provide_context=True
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True
)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
provide_context=True
)
extract_task >> transform_task >> load_task
4. Branching 구현
Branching 흐름 다이어그램
[Start Task] → [Branch Operator] → [Condition Check]
↓
[True] → [Task A] → [Join Task] → [End Task]
[False] → [Task B] → [Join Task]
실습 코드```python
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
def decide_branch(kwargs):
"""분기 결정 로직"""
ti = kwargs['ti']
data_count = ti.xcom_pull(key='data_count', task_ids='extract_data')
데이터 양에 따라 다른 분기 선택
if data_count > 100:
print("📈 대량 데이터 - 상세 분석 진행")
return 'detailed_analysis'
elif data_count > 0:
print("📊 일반 데이터 - 기본 분석 진행")
return 'basic_analysis'
else:
print("⚠️ 데이터 없음 - 알림 전송")
return 'send_alert'
def detailed_analysis(kwargs):
"""상세 분석"""
print("🔍 상세 분석 수행 중...")
return "상세 분석 완료"
def basic_analysis(kwargs):
"""기본 분석"""
print("📋 기본 분석 수행 중...")
return "기본 분석 완료"
def send_alert(kwargs):
"""알림 전송"""
print("🚨 데이터 없음 알림 전송")
return "알림 전송 완료"
Branching Task 정의
branch_task = BranchPythonOperator(
task_id='decide_branch',
python_callable=decide_branch,
provide_context=True
)
분기 Task들
detailed_task = PythonOperator(
task_id='detailed_analysis',
python_callable=detailed_analysis
)
basic_task = PythonOperator(
task_id='basic_analysis',
python_callable=basic_analysis
)
alert_task = PythonOperator(
task_id='send_alert',
python_callable=send_alert
)
합쳐지는 지점 (모든 분기가 여기로 모임)
join_task = DummyOperator(
task_id='join_branches',
trigger_rule='none_failed_or_skipped'
)
end_task = DummyOperator(task_id='end_process')
작업 흐름 정의
extract_task >> branch_task
branch_task >> detailed_task >> join_task
branch_task >> basic_task >> join_task
branch_task >> alert_task >> join_task
join_task >> end_task
5. Variables 사용
Variables 구조 다이어그램
[Airflow Variables]
├── Environment Variables
├── Airflow UI에서 설정한 Variables
├── Code에서 동적으로 설정한 Variables
└── Secret Backend Variables
실습 코드```python
from airflow.models import Variable
def use_variables(kwargs):
"""Variables 사용 예제"""
1. 기본 Variables 사용
api_url = Variable.get("API_ENDPOINT", default_var="https://api.default.com")
database_url = Variable.get("DATABASE_URL")
2. JSON Variables 파싱
config_str = Variable.get("APP_CONFIG", "{}")
config = json.loads(config_str)
3. Variable 설정 (주의: 일반적으로 UI에서 설정)
Variable.set("TEMP_VALUE", "임시값")
print(f"API URL: {api_url}")
print(f"DB URL: {database_url}")
print(f"Config: {config}")
return "Variables 사용 완료"
Variables를 사용하는 Task
config_task = PythonOperator(
task_id='use_config_variables',
python_callable=use_variables,
provide_context=True
)
UI에서 Variables 설정 방법:
Admin → Variables → "+" 버튼 클릭
Key: API_ENDPOINT, Value: https://api.example.com/data
Key: DATABASE_URL, Value: postgresql://user:pass@localhost/db
Key: APP_CONFIG, Value: {"timeout": 30, "retries": 3}
6. 실전 프로젝트 완성
ETL 파이프라인 아키텍처
[External API] → [Extract Task] → [Raw Data] → [Transform Task]
↓ ↓ ↓ ↓
[데이터 수집] [XCom 저장] [임시 저장] [데이터 가공]
↓ ↓ ↓ ↓
[Load Task] → [Database] → [Monitoring] → [Alert System]
종합 실전 프로젝트```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
from datetime import datetime, timedelta
import pandas as pd
import requests
import json
class ETLPipeline:
"""완전한 ETL 파이프라인 클래스"""
def __init__(self):
self.config = self.load_config()
def load_config(self):
"""설정 로드"""
return {
'api_url': Variable.get("WEATHER_API_URL"),
'api_key': Variable.get("WEATHER_API_KEY"),
'db_conn_id': 'weather_db',
'cities': ['Seoul', 'Busan', 'Jeju']
}
def extract_weather_data(self, kwargs):
"""날씨 데이터 추출"""
execution_date = kwargs['execution_date']
date_str = execution_date.strftime('%Y-%m-%d')
all_data = []
for city in self.config['cities']:
url = f"{self.config['api_url']}?city={city}&date={date_str}&key={self.config['api_key']}"
response = requests.get(url, timeout=30)
if response.status_code == 200:
city_data = response.json()
city_data['city'] = city
city_data['extract_date'] = date_str
all_data.append(city_data)
else:
print(f"⚠️ {city} 데이터 추출 실패: {response.status_code}")
kwargs['ti'].xcom_push(key='raw_weather_data', value=all_data)
return f"{len(all_data)}개 도시 데이터 추출 완료"
def transform_weather_data(self, kwargs):
"""날씨 데이터 변환"""
ti = kwargs['ti']
raw_data = ti.xcom_pull(key='raw_weather_data', task_ids='extract_weather_data')
transformed_data = []
for city_data in raw_data:
transformed = {
'city': city_data['city'],
'date': city_data['extract_date'],
'temperature': city_data.get('main', {}).get('temp', 0),
'humidity': city_data.get('main', {}).get('humidity', 0),
'weather': city_data.get('weather', [{}])[0].get('main', 'Unknown'),
'description': city_data.get('weather', [{}])[0].get('description', 'Unknown'),
'wind_speed': city_data.get('wind', {}).get('speed', 0),
'processed_at': datetime.now().isoformat()
}
transformed_data.append(transformed)
ti.xcom_push(key='transformed_weather_data', value=transformed_data)
return "데이터 변환 완료"
def load_to_database(self, kwargs):
"""데이터베이스 적재"""
ti = kwargs['ti']
weather_data = ti.xcom_pull(key='transformed_weather_data', task_ids='transform_weather_data')
pg_hook = PostgresHook(postgres_conn_id=self.config['db_conn_id'])
for data in weather_data:
insert_query = """
INSERT INTO weather_data
(city, date, temperature, humidity, weather, description, wind_speed, processed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (city, date)
DO UPDATE SET
temperature = EXCLUDED.temperature,
humidity = EXCLUDED.humidity,
weather = EXCLUDED.weather,
description = EXCLUDED.description,
wind_speed = EXCLUDED.wind_speed,
processed_at = EXCLUDED.processed_at
"""
pg_hook.run(insert_query, parameters=(
data['city'], data['date'], data['temperature'],
data['humidity'], data['weather'], data['description'],
data['wind_speed'], data['processed_at']
))
return f"{len(weather_data)}건 데이터베이스 적재 완료"
def generate_daily_report(self, kwargs):
"""일일 리포트 생성"""
ti = kwargs['ti']
weather_data = ti.xcom_pull(key='transformed_weather_data', task_ids='transform_weather_data')
df = pd.DataFrame(weather_data)
report = {
'total_records': len(df),
'avg_temperature': df['temperature'].mean(),
'max_temperature': df['temperature'].max(),
'min_temperature': df['temperature'].min(),
'cities_covered': df['city'].nunique()
}
print("📊 일일 날씨 리포트:")
for key, value in report.items():
print(f" {key}: {value}")
return "리포트 생성 완료"
DAG 설정
with DAG(
'weather_etl_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='0 6 * * *', 매일 오전 6시
catchup=True,
default_args={
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': True
}
) as dag:
pipeline = ETLPipeline()
extract_task = PythonOperator(
task_id='extract_weather_data',
python_callable=pipeline.extract_weather_data,
provide_context=True
)
transform_task = PythonOperator(
task_id='transform_weather_data',
python_callable=pipeline.transform_weather_data,
provide_context=True
)
load_task = PythonOperator(
task_id='load_to_database',
python_callable=pipeline.load_to_database,
provide_context=True
)
report_task = PythonOperator(
task_id='generate_daily_report',
python_callable=pipeline.generate_daily_report,
provide_context=True
)
작업 흐름
extract_task >> transform_task >> load_task >> report_task
7. 모니터링과 문제 해결
모니터링 체계 다이어그램
[Airflow UI] → [DAG Runs] → [Task Instances] → [Logs] → [Metrics]
↓ ↓ ↓ ↓ ↓
[실행 현황] [실행 기록] [작업 상태] [로그 분석] [성능 지표]
↓ ↓ ↓ ↓ ↓
[Alert System] → [Email] → [Slack] → [PagerDuty] → [Monitoring Tools]
모니터링과 디버깅 코드```python
def monitor_dag_execution(kwargs):
"""DAG 실행 모니터링"""
ti = kwargs['ti']
dag_run = kwargs['dag_run']
print(f"🔍 DAG 실행 모니터링:")
print(f" DAG ID: {dag_run.dag_id}")
print(f" 실행 ID: {dag_run.run_id}")
print(f" 실행 시간: {dag_run.execution_date}")
print(f" 상태: {dag_run.state}")
추가 모니터링 정보
task_instances = dag_run.get_task_instances()
success_count = sum(1 for ti in task_instances if ti.state == 'success')
failed_count = sum(1 for ti in task_instances if ti.state == 'failed')
print(f" 성공 작업: {success_count}")
print(f" 실패 작업: {failed_count}")
return "모니터링 완료"
def error_handling(kwargs):
"""에러 처리 및 복구"""
try:
정상 작업 수행
result = some_risky_operation()
return result
except Exception as e:
print(f"❌ 에러 발생: {str(e)}")
에러 정보 기록
error_info = {
'error_type': type(e).__name__,
'error_message': str(e),
'task_id': kwargs['ti'].task_id,
'execution_date': kwargs['execution_date'].isoformat(),
'timestamp': datetime.now().isoformat()
}
에러 로그 저장
with open('/tmp/error_log.json', 'a') as f:
f.write(json.dumps(error_info) + '\n')
재시도 또는 대체 작업 수행
return fallback_operation()
def send_alert_on_failure(context):
"""실패 시 알림 전송"""
ti = context['ti']
dag_run = context['dag_run']
alert_message = f"""
🚨 Airflow 작업 실패 알림
========================
DAG: {dag_run.dag_id}
Task: {ti.task_id}
실행 시간: {dag_run.execution_date}
에러: {str(context['exception'])}
로그: {ti.log_url}
"""
print(alert_message)
실제로는 이메일이나 슬랙으로 알림 전송
모니터링 Task 추가
monitor_task = PythonOperator(
task_id='monitor_execution',
python_callable=monitor_dag_execution,
provide_context=True,
trigger_rule='all_done' 성공/실패 관계없이 실행
)
에러 핸들링 설정
error_prone_task = PythonOperator(
task_id='error_prone_operation',
python_callable=error_handling,
provide_context=True,
on_failure_callback=send_alert_on_failure,
retries=2,
retry_delay=timedelta(minutes=5)
)
최종 작업 흐름
extract_task >> transform_task >> load_task >> report_task >> monitor_task
체계적으로 학습하면 Airflow의 모든 주요 기능을 실무에서 활용할 수 있다. 각 단계별로 코드를 직접 실행하고 수정해보면서 이해하는 것이 가장 중요.
1단계: Airflow의 핵심 개념 완벽 이해
DAG(Directed Acyclic Graph)란 무엇인가?
실제 예시로 이해하기: DAG는 방향성이 있고 순환하지 않는 그래프로, 작업들의 흐름을 정의.
예를 들어, 매일 아침 출근 준비 과정을 DAG로 표현.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def wake_up():
print("6:30 AM - 기상")
return "기상 완료"
def take_shower():
print("6:40 AM - 샤워")
return "샤워 완료"
def get_dressed():
print("6:50 AM - 옷 입기")
return "옷 입기 완료"
def eat_breakfast():
print("7:00 AM - 아침 식사")
return "아침 식사 완료"
def leave_home():
print("7:30 AM - 출발")
return "출근 완료"
DAG 정의
with DAG(
'morning_routine',
start_date=datetime(2024, 1, 1),
schedule_interval='0 6 * * 1-5', 월-금 오전 6시
catchup=False
) as dag:
wake_up_task = PythonOperator(
task_id='wake_up',
python_callable=wake_up
)
shower_task = PythonOperator(
task_id='take_shower',
python_callable=take_shower
)
dress_task = PythonOperator(
task_id='get_dressed',
python_callable=get_dressed
)
breakfast_task = PythonOperator(
task_id='eat_breakfast',
python_callable=eat_breakfast
)
leave_task = PythonOperator(
task_id='leave_home',
python_callable=leave_home
)
작업 순서 정의: 기상 → 샤워 → 옷입기 → 아침식사 → 출발
wake_up_task >> shower_task >> dress_task >> breakfast_task >> leave_task
실행 결과:
6:30 AM - 기상
6:40 AM - 샤워
6:50 AM - 옷 입기
7:00 AM - 아침 식사
7:30 AM - 출발
Operator의 역할과 종류
Operator는 실제 작업을 수행하는 단위입니다. 다양한 Operator를 상황에 맞게 사용
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.sensors.filesystem import FileSensor
파일 처리 예제
def process_data():
import pandas as pd
데이터 처리 로직
df = pd.read_csv('/data/input.csv')
processed_df = df.groupby('category').sum()
processed_df.to_csv('/data/output.csv')
return "데이터 처리 완료"
다양한 Operator 사용 예제
with DAG('operator_examples', ...) as dag:
1. BashOperator: 쉘 명령어 실행
download_data = BashOperator(
task_id='download_data',
bash_command='curl -o /data/input.csv https://example.com/data.csv'
)
2. FileSensor: 파일 존재 확인 (30초마다 체크, 최대 5분 대기)
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/input.csv',
poke_interval=30,
timeout=300
)
3. PythonOperator: Python 함수 실행
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data
)
4. EmailOperator: 이메일 전송
send_report = EmailOperator(
task_id='send_report',
to='team@company.com',
subject='데이터 처리 완료',
html_content='<p>일일 데이터 처리가 완료되었습니다.</p>'
)
download_data >> wait_for_file >> process_task >> send_report
2단계: Task 간 데이터 공유와 의존성 관리
XCom을 이용한 데이터 전달
XCom(Cross-communication)은 Task 간에 작은 데이터를 주고받을 때 사용
def extract_user_data(kwargs):
"""가상의 API에서 사용자 데이터 추출"""
users = [
{'id': 1, 'name': '김철수', 'age': 25, 'city': '서울'},
{'id': 2, 'name': '이영희', 'age': 30, 'city': '부산'},
{'id': 3, 'name': '박민수', 'age': 35, 'city': '대구'}
]
XCom에 데이터 저장
kwargs['ti'].xcom_push(key='raw_users', value=users)
return f"{len(users)}명의 사용자 데이터 추출 완료"
def process_user_data(kwargs):
"""사용자 데이터 처리"""
ti = kwargs['ti']
이전 Task에서 데이터 가져오기
raw_users = ti.xcom_pull(key='raw_users', task_ids='extract_user_data')
print("=== 원본 데이터 ===")
for user in raw_users:
print(user)
데이터 처리: 30세 이상만 필터링
filtered_users = [user for user in raw_users if user['age'] >= 30]
처리 결과 저장
ti.xcom_push(key='processed_users', value=filtered_users)
return f"{len(filtered_users)}명 필터링 완료"
def generate_report(kwargs):
"""최종 리포트 생성"""
ti = kwargs['ti']
processed_users = ti.xcom_pull(key='processed_users', task_ids='process_user_data')
print("=== 최종 리포트 ===")
print("30세 이상 사용자 목록:")
for user in processed_users:
print(f"- {user['name']} ({user['age']}세, {user['city']})")
return "리포트 생성 완료"
DAG 설정
with DAG('user_data_pipeline', ...) as dag:
extract = PythonOperator(
task_id='extract_user_data',
python_callable=extract_user_data,
provide_context=True kwargs에 context 정보 전달
)
process = PythonOperator(
task_id='process_user_data',
python_callable=process_user_data,
provide_context=True
)
report = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
provide_context=True
)
extract >> process >> report
실행 결과:
=== 원본 데이터 ===
{'id': 1, 'name': '김철수', 'age': 25, 'city': '서울'}
{'id': 2, 'name': '이영희', 'age': 30, 'city': '부산'}
{'id': 3, 'name': '박민수', 'age': 35, 'city': '대구'}
=== 최종 리포트 ===
30세 이상 사용자 목록:
- 이영희 (30세, 부산)
- 박민수 (35세, 대구)
3단계: 조건부 실행과 에러 처리
Branching과 조건부 작업 흐름
from airflow.operators.python_operator import BranchPythonOperator
def check_data_quality(kwargs):
"""데이터 품질 검사"""
ti = kwargs['ti']
users = ti.xcom_pull(key='processed_users', task_ids='process_user_data')
간단한 품질 검사: 데이터가 있고, 최소 1명 이상인지 확인
if users and len(users) > 0:
print("데이터 품질 양호")
return 'load_to_database' 다음 task로 이동
else:
print("데이터 품질 문제 발견")
return 'send_quality_alert' 알림 task로 이동
def load_to_database(kwargs):
"""데이터베이스에 적재"""
ti = kwargs['ti']
users = ti.xcom_pull(key='processed_users', task_ids='process_user_data')
실제로는 데이터베이스 연결 코드가 들어갑니다
print(f"📊 {len(users)}명의 데이터를 데이터베이스에 적재")
return "데이터베이스 적재 완료"
def send_quality_alert(kwargs):
"""품질 문제 알림"""
print("데이터 품질 문제로 관리자에게 알림 전송")
실제로는 이메일이나 슬랙 알림이 전송됩니다
return "품질 알림 전송 완료"
def finalize_process(kwargs):
"""최종 처리"""
print("모든 프로세스 완료")
return "파이프라인 완료"
with DAG('quality_control_pipeline', ...) as dag:
... extract, process tasks 생략 ...
quality_check = BranchPythonOperator(
task_id='check_data_quality',
python_callable=check_data_quality,
provide_context=True
)
load_db = PythonOperator(
task_id='load_to_database',
python_callable=load_to_database,
provide_context=True
)
send_alert = PythonOperator(
task_id='send_quality_alert',
python_callable=send_quality_alert
)
finalize = PythonOperator(
task_id='finalize_process',
python_callable=finalize_process
)
의존성 설정: quality_check 결과에 따라 load_db 또는 send_alert 실행
process >> quality_check
quality_check >> load_db >> finalize
quality_check >> send_alert >> finalize
4단계: 실전 데이터 파이프라인 프로젝트
종합 예제: 일일 판매 데이터 처리 파이프라인
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
import pandas as pd
import requests
def download_daily_sales(kwargs):
"""일일 판매 데이터 다운로드"""
execution_date = kwargs['execution_date']
date_str = execution_date.strftime('%Y-%m-%d')
가상의 API에서 데이터 다운로드
url = f"https://api.sales.com/data?date={date_str}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
filename = f"/data/sales_{date_str}.json"
with open(filename, 'w') as f:
json.dump(data, f)
kwargs['ti'].xcom_push(key='sales_file', value=filename)
return f"{len(data)}건의 판매 데이터 다운로드 완료"
else:
raise Exception(f"데이터 다운로드 실패: {response.status_code}")
def validate_sales_data(kwargs):
"""판매 데이터 유효성 검사"""
ti = kwargs['ti']
filename = ti.xcom_pull(key='sales_file', task_ids='download_daily_sales')
with open(filename, 'r') as f:
sales_data = json.load(f)
기본 유효성 검사
required_fields = ['sale_id', 'product_id', 'amount', 'sale_date']
valid_records = []
invalid_records = []
for record in sales_data:
if all(field in record for field in required_fields):
valid_records.append(record)
else:
invalid_records.append(record)
print(f"유효한 레코드: {len(valid_records)}건")
print(f"무효한 레코드: {len(invalid_records)}건")
if invalid_records:
print("무효한 레코드 샘플:")
for record in invalid_records[:3]:
print(record)
ti.xcom_push(key='valid_sales', value=valid_records)
ti.xcom_push(key='invalid_count', value=len(invalid_records))
return f"유효성 검사 완료 ({len(valid_records)}건 유효)"
def calculate_daily_stats(kwargs):
"""일일 통계 계산"""
ti = kwargs['ti']
valid_sales = ti.xcom_pull(key='valid_sales', task_ids='validate_sales_data')
df = pd.DataFrame(valid_sales)
다양한 통계 계산
daily_stats = {
'total_sales': len(valid_sales),
'total_amount': df['amount'].sum(),
'avg_amount': df['amount'].mean(),
'max_amount': df['amount'].max(),
'min_amount': df['amount'].min(),
'top_products': df['product_id'].value_counts().head(5).to_dict()
}
ti.xcom_push(key='daily_stats', value=daily_stats)
return "통계 계산 완료"
def load_to_data_warehouse(kwargs):
"""데이터 웨어하우스에 적재"""
ti = kwargs['ti']
valid_sales = ti.xcom_pull(key='valid_sales', task_ids='validate_sales_data')
daily_stats = ti.xcom_pull(key='daily_stats', task_ids='calculate_daily_stats')
PostgreSQL 연결
pg_hook = PostgresHook(postgres_conn_id='data_warehouse')
판매 데이터 적재
for sale in valid_sales:
insert_query = """
INSERT INTO sales (sale_id, product_id, amount, sale_date)
VALUES (%s, %s, %s, %s)
ON CONFLICT (sale_id) DO NOTHING
"""
pg_hook.run(insert_query, parameters=(
sale['sale_id'], sale['product_id'], sale['amount'], sale['sale_date']
))
통계 데이터 적재
stats_query = """
INSERT INTO daily_stats (stat_date, total_sales, total_amount,
avg_amount, max_amount, min_amount)
VALUES (%s, %s, %s, %s, %s, %s)
"""
execution_date = kwargs['execution_date']
pg_hook.run(stats_query, parameters=(
execution_date.date(),
daily_stats['total_sales'],
daily_stats['total_amount'],
daily_stats['avg_amount'],
daily_stats['max_amount'],
daily_stats['min_amount']
))
return "데이터 웨어하우스 적재 완료"
def generate_daily_report(kwargs):
"""일일 리포트 생성"""
ti = kwargs['ti']
daily_stats = ti.xcom_pull(key='daily_stats', task_ids='calculate_daily_stats')
invalid_count = ti.xcom_pull(key='invalid_count', task_ids='validate_sales_data')
report_content = f"""
📊 일일 판매 리포트
====================
• 총 판매건수: {daily_stats['total_sales']:,}건
• 총 판매금액: {daily_stats['total_amount']:,.0f}원
• 평균 거래액: {daily_stats['avg_amount']:,.0f}원
• 최대 거래액: {daily_stats['max_amount']:,.0f}원
• 최소 거래액: {daily_stats['min_amount']:,.0f}원
• 무효 데이터: {invalid_count}건
"""
print(report_content)
리포트 파일로 저장
execution_date = kwargs['execution_date']
report_filename = f"/reports/sales_report_{execution_date.strftime('%Y%m%d')}.txt"
with open(report_filename, 'w') as f:
f.write(report_content)
return "일일 리포트 생성 완료"
메인 DAG 정의
with DAG(
'daily_sales_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='0 1 * * *', 매일 오전 1시
catchup=True,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True
}
) as dag:
download_task = PythonOperator(
task_id='download_daily_sales',
python_callable=download_daily_sales,
provide_context=True
)
validate_task = PythonOperator(
task_id='validate_sales_data',
python_callable=validate_sales_data,
provide_context=True
)
stats_task = PythonOperator(
task_id='calculate_daily_stats',
python_callable=calculate_daily_stats,
provide_context=True
)
load_task = PythonOperator(
task_id='load_to_data_warehouse',
python_callable=load_to_data_warehouse,
provide_context=True
)
report_task = PythonOperator(
task_id='generate_daily_report',
python_callable=generate_daily_report,
provide_context=True
)
작업 흐름 정의
download_task >> validate_task >> [stats_task, load_task]
stats_task >> report_task
끝.
