티스토리 뷰
1단계: Python 기초 완성
1. 변수, 데이터 타입 이해
python
샘플 1: 기본 데이터 타입과 연산
def demonstrate_basic_types():
"""기본 데이터 타입 이해"""
문자열
dag_name = "sales_pipeline"
schedule = "@daily"
숫자
retry_count = 3
timeout_seconds = 300.5
불리언
is_active = True
is_production = False
None
last_run_result = None
print(f"DAG: {dag_name}, Schedule: {schedule}")
print(f"Retries: {retry_count}, Timeout: {timeout_seconds}s")
print(f"Active: {is_active}, Production: {is_production}")
print(f"Last run: {last_run_result}")
타입 확인
print(f"Type of dag_name: {type(dag_name)}")
print(f"Type of retry_count: {type(retry_count)}")
print(f"Type of is_active: {type(is_active)}")
샘플 2: 데이터 타입 변환
def type_conversion_example():
"""데이터 타입 변환"""
문자열 → 숫자
retry_str = "3"
retry_int = int(retry_str)
print(f"String '{retry_str}' to int: {retry_int}")
숫자 → 문자열
timeout = 300
timeout_str = str(timeout)
print(f"Int {timeout} to string: '{timeout_str}'")
불리언 변환
truthy_values = [1, "hello", [1, 2], True]
falsy_values = [0, "", [], False, None]
print("Truthy values:")
for val in truthy_values:
print(f" {val} -> {bool(val)}")
print("Falsy values:")
for val in falsy_values:
print(f" {val} -> {bool(val)}")
샘플 3: 복합 데이터 타입
def complex_types_example():
"""복합 데이터 타입"""
튜플 (immutable)
task_config = ("extract_data", "python_operator", 300)
print(f"Task config tuple: {task_config}")
print(f"Task name: {task_config[0]}, Type: {task_config[1]}")
리스트 (mutable)
task_list = ["extract", "transform", "load"]
task_list.append("validate") 추가
task_list.remove("transform") 제거
print(f"Task list: {task_list}")
세트 (중복 제거)
unique_tasks = set(["extract", "transform", "load", "extract"])
print(f"Unique tasks: {unique_tasks}")
실행
demonstrate_basic_types()
print("\n" + "="*50 + "\n")
type_conversion_example()
print("\n" + "="*50 + "\n")
complex_types_example()
2. 리스트, 딕셔너리 활용
python
샘플 1: 리스트 기본 연산
def list_operations():
"""리스트 기본 연산"""
tasks = ["extract", "transform", "load"]
추가
tasks.append("validate")
tasks.insert(1, "pre_process")
제거
removed = tasks.pop() 마지막 요소
tasks.remove("pre_process")
슬라이싱
first_two = tasks[:2]
last_one = tasks[-1:]
print(f"All tasks: {tasks}")
print(f"First two: {first_two}")
print(f"Last one: {last_one}")
print(f"Removed: {removed}")
리스트 컴프리헨션
task_lengths = [len(task) for task in tasks]
print(f"Task name lengths: {task_lengths}")
샘플 2: 딕셔너리 활용
def dictionary_operations():
"""딕셔너리 기본 연산"""
DAG 설정
dag_config = {
"dag_id": "sales_pipeline",
"schedule_interval": "@daily",
"start_date": "2024-01-01",
"retries": 3,
"retry_delay": 300
}
값 접근
print(f"DAG ID: {dag_config['dag_id']}")
print(f"Schedule: {dag_config.get('schedule_interval', 'Not set')}")
값 수정
dag_config["retries"] = 5
dag_config["timeout"] = 600 새로운 키 추가
모든 항목 순회
print("\nDAG Configuration:")
for key, value in dag_config.items():
print(f" {key}: {value}")
딕셔너리 컴프리헨션
config_lengths = {k: len(str(v)) for k, v in dag_config.items()}
print(f"\nConfig value lengths: {config_lengths}")
샘플 3: 중첩 데이터 구조
def nested_data_structures():
"""중첩 데이터 구조"""
파이프라인 정의
pipeline = {
"name": "data_processing",
"tasks": [
{
"task_id": "extract_data",
"operator": "PythonOperator",
"params": {"source": "database"}
},
{
"task_id": "transform_data",
"operator": "PythonOperator",
"params": {"method": "normalize"},
"dependencies": ["extract_data"]
}
],
"schedule": {
"interval": "@daily",
"start_time": "02:00"
}
}
중첩 데이터 접근
print(f"Pipeline: {pipeline['name']}")
print(f"First task: {pipeline['tasks'][0]['task_id']}")
print(f"Schedule: {pipeline['schedule']['interval']} at {pipeline['schedule']['start_time']}")
tasks 순회
print("\nTasks:")
for task in pipeline["tasks"]:
deps = task.get("dependencies", [])
print(f" {task['task_id']} -> depends on: {deps}")
실행
list_operations()
print("\n" + "="*50 + "\n")
dictionary_operations()
print("\n" + "="*50 + "\n")
nested_data_structures()
3. 조건문, 반복문 작성
python
샘플 1: 기본 조건문
def conditional_statements():
"""기본 조건문"""
task_status = "success"
retry_count = 2
max_retries = 3
if-elif-else
if task_status == "success":
print("Task completed successfully")
elif task_status == "failed" and retry_count < max_retries:
print("Task failed, retrying...")
elif task_status == "failed":
print("Task failed, no more retries")
else:
print("⏳ Task is running...")
3항 연산자
message = "Proceed" if task_status == "success" else "Wait"
print(f"Decision: {message}")
샘플 2: 다양한 반복문
def loop_examples():
"""다양한 반복문"""
tasks = ["extract", "transform", "load", "validate"]
for loop
print("Task list:")
for task in tasks:
print(f" - {task}")
enumerate로 인덱스 함께
print("\nTask with index:")
for i, task in enumerate(tasks, 1):
print(f" {i}. {task}")
while loop
print("\nCountdown:")
count = 3
while count > 0:
print(f" Starting in {count}...")
count -= 1
print(" Go!")
샘플 3: 반복문과 조건문 결합
def conditional_loops():
"""반복문과 조건문 결합"""
task_results = [
{"task": "extract", "status": "success", "duration": 120},
{"task": "transform", "status": "failed", "duration": 180},
{"task": "load", "status": "success", "duration": 90},
{"task": "validate", "status": "running", "duration": 60}
]
성공한 태스크만 필터링
successful_tasks = []
for result in task_results:
if result["status"] == "success":
successful_tasks.append(result["task"])
print(f"Successful tasks: {successful_tasks}")
실패한 태스크 상세 정보
print("\nFailed tasks:")
for result in task_results:
if result["status"] == "failed":
print(f" {result['task']} failed after {result['duration']}s")
break와 continue
print("\nProcessing until first failure:")
for result in task_results:
if result["status"] == "failed":
print(f" Stopped at failed task: {result['task']}")
break
print(f" Processed: {result['task']}")
실행
conditional_statements()
print("\n" + "="*50 + "\n")
loop_examples()
print("\n" + "="*50 + "\n")
conditional_loops()
2단계: 중급 Python
1. 클래스 정의 및 객체 생성
python
샘플 1: 기본 클래스
class DAGConfig:
"""DAG 설정을 관리하는 클래스"""
def __init__(self, dag_id, schedule_interval="@daily", start_date=None):
self.dag_id = dag_id
self.schedule_interval = schedule_interval
self.start_date = start_date or "2024-01-01"
self.tasks = []
self.default_args = {}
def add_task(self, task_name, operator_type, kwargs):
"""태스크 추가"""
task_config = {
"task_name": task_name,
"operator_type": operator_type,
kwargs
}
self.tasks.append(task_config)
print(f"Added task: {task_name}")
def set_default_args(self, kwargs):
"""기본 인자 설정"""
self.default_args.update(kwargs)
def display_config(self):
"""설정 정보 출력"""
print(f"\nDAG: {self.dag_id}")
print(f"Schedule: {self.schedule_interval}")
print(f"Start: {self.start_date}")
print(f"Default args: {self.default_args}")
print("Tasks:")
for task in self.tasks:
print(f" - {task['task_name']} ({task['operator_type']})")
사용 예제
config = DAGConfig("sales_pipeline", "@hourly", "2024-01-01")
config.set_default_args(retries=3, owner="data_team")
config.add_task("extract", "PythonOperator", python_callable="extract_data")
config.add_task("transform", "PythonOperator", python_callable="transform_data")
config.display_config()
샘플 2: 상속과 다형성
class BaseTask:
"""기본 태스크 클래스"""
def __init__(self, task_id):
self.task_id = task_id
self.dependencies = []
def add_dependency(self, task):
"""의존성 추가"""
self.dependencies.append(task)
return self
def execute(self):
"""태스크 실행 (추상 메서드)"""
raise NotImplementedError("Subclasses must implement execute()")
class PythonTask(BaseTask):
"""PythonOperator를 모방한 클래스"""
def __init__(self, task_id, python_callable, kwargs):
super().__init__(task_id)
self.python_callable = python_callable
self.kwargs = kwargs
def execute(self):
"""Python 함수 실행"""
print(f"Executing Python task: {self.task_id}")
try:
result = self.python_callable(self.kwargs)
print(f"Task {self.task_id} completed: {result}")
return result
except Exception as e:
print(f"Task {self.task_id} failed: {e}")
raise
class BashTask(BaseTask):
"""BashOperator를 모방한 클래스"""
def __init__(self, task_id, bash_command):
super().__init__(task_id)
self.bash_command = bash_command
def execute(self):
"""Bash 명령어 실행"""
print(f"Executing Bash task: {self.task_id}")
print(f"Command: {self.bash_command}")
실제로는 subprocess.run() 등을 사용
return f"Bash command executed: {self.bash_command}"
사용 예제
def sample_function():
return "Hello from Python"
python_task = PythonTask("python_task", sample_function)
bash_task = BashTask("bash_task", "echo 'Hello from Bash'")
python_task.execute()
bash_task.execute()
샘플 3: 클래스 메서드와 정적 메서드
class TaskFactory:
"""태스크 생성 팩토리 클래스"""
task_registry = {} 클래스 변수
@classmethod
def register_task_type(cls, task_type, task_class):
"""태스크 타입 등록"""
cls.task_registry[task_type] = task_class
print(f"Registered task type: {task_type}")
@classmethod
def create_task(cls, task_type, task_id, kwargs):
"""태스크 생성"""
if task_type not in cls.task_registry:
raise ValueError(f"Unknown task type: {task_type}")
task_class = cls.task_registry[task_type]
return task_class(task_id, kwargs)
@staticmethod
def validate_task_id(task_id):
"""태스크 ID 유효성 검사"""
if not task_id or not isinstance(task_id, str):
return False
return len(task_id) <= 50 Airflow 제한
팩토리 사용
TaskFactory.register_task_type("python", PythonTask)
TaskFactory.register_task_type("bash", BashTask)
task1 = TaskFactory.create_task("python", "extract_data", python_callable=sample_function)
task2 = TaskFactory.create_task("bash", "cleanup", bash_command="rm -rf /tmp/*")
print(f"Task1 ID valid: {TaskFactory.validate_task_id('extract_data')}")
print(f"Task2 ID valid: {TaskFactory.validate_task_id('')}")
task1.execute()
task2.execute()
2. 데코레이터 이해 및 사용
python
샘플 1: 기본 데코레이터
def retry_on_failure(max_retries=3):
"""실패시 재시도 데코레이터"""
def decorator(func):
def wrapper(*args, kwargs):
for attempt in range(max_retries):
try:
print(f"Attempt {attempt + 1}/{max_retries}")
result = func(*args, kwargs)
print("Success!")
return result
except Exception as e:
print(f"Failed: {e}")
if attempt == max_retries - 1:
print("All retries exhausted")
raise
print("Retrying...")
return wrapper
return decorator
@retry_on_failure(max_retries=2)
def unstable_api_call():
"""불안정한 API 호출 시뮬레이션"""
import random
if random.random() < 0.7:
raise ConnectionError("API connection failed")
return "API response data"
샘플 2: 파라미터를 가진 데코레이터
def timeout(seconds):
"""타임아웃 데코레이터"""
def decorator(func):
def wrapper(*args, kwargs):
import signal
from functools import wraps
def timeout_handler(signum, frame):
raise TimeoutError(f"Function timed out after {seconds} seconds")
@wraps(func)
def wrapped(*args, kwargs):
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
result = func(*args, kwargs)
signal.alarm(0) 알람 취소
return result
except TimeoutError:
raise
finally:
signal.alarm(0)
return wrapped(*args, kwargs)
return wrapper
return decorator
@timeout(3) 3초 타임아웃
def long_running_task():
"""오래 걸리는 작업"""
import time
time.sleep(5) 5초 sleep → 타임아웃 발생
return "Task completed"
샘플 3: 클래스 데코레이터
class LogExecution:
"""실행 로깅 데코레이터 클래스"""
def __init__(self, func):
self.func = func
self.execution_count = 0
def __call__(self, *args, kwargs):
self.execution_count += 1
print(f"Executing {self.func.__name__} (count: {self.execution_count})")
start_time = time.time()
try:
result = self.func(*args, kwargs)
end_time = time.time()
print(f"{self.func.__name__} completed in {end_time - start_time:.2f}s")
return result
except Exception as e:
end_time = time.time()
print(f"{self.func.__name__} failed after {end_time - start_time:.2f}s: {e}")
raise
@LogExecution
def important_task():
"""중요한 작업"""
import time
time.sleep(1)
return "Important result"
실행
print("=== Retry Decorator ===")
try:
result = unstable_api_call()
print(f"Final result: {result}")
except Exception as e:
print(f"Final error: {e}")
print("\n=== Timeout Decorator ===")
try:
result = long_running_task()
print(f"Result: {result}")
except TimeoutError as e:
print(f"Timeout: {e}")
print("\n=== Class Decorator ===")
important_task()
important_task() 두 번 실행해서 count 확인
3. 컨텍스트 매니저 구현
python
샘플 1: 기본 컨텍스트 매니저
class DatabaseConnection:
"""데이터베이스 연결 컨텍스트 매니저"""
def __init__(self, connection_string):
self.connection_string = connection_string
self.connection = None
def __enter__(self):
"""연결 시작"""
print(f"🔗 Connecting to {self.connection_string}")
실제로는 데이터베이스 연결 코드
self.connection = {"connected": True, "string": self.connection_string}
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
"""연결 종료"""
if self.connection:
print("Closing database connection")
self.connection["connected"] = False
if exc_type:
print(f"Error occurred: {exc_val}")
False를 반환하면 예외가 전파됨, True면 억제됨
return False
샘플 2: contextlib 사용
from contextlib import contextmanager
import time
@contextmanager
def timer_context(name):
"""실행 시간 측정 컨텍스트 매니저"""
start_time = time.time()
print(f"Starting {name}...")
try:
yield 여기서 실제 코드 실행
except Exception as e:
end_time = time.time()
print(f"{name} failed after {end_time - start_time:.2f}s: {e}")
raise
else:
end_time = time.time()
print(f"{name} completed in {end_time - start_time:.2f}s")
샘플 3: 중첩 컨텍스트 매니저
class TaskExecutionContext:
"""태스크 실행 컨텍스트 매니저"""
def __init__(self, task_name, max_retries=3):
self.task_name = task_name
self.max_retries = max_retries
self.retry_count = 0
def __enter__(self):
"""태스크 시작"""
print(f"Starting task: {self.task_name}")
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""태스크 종료"""
self.end_time = time.time()
duration = self.end_time - self.start_time
if exc_type:
self.retry_count += 1
if self.retry_count < self.max_retries:
print(f"{self.task_name} failed, retrying... (attempt {self.retry_count + 1})")
return True 예외 억제하고 재시도
else:
print(f"{self.task_name} failed after {self.retry_count} retries: {exc_val}")
return False
else:
print(f"{self.task_name} completed in {duration:.2f}s")
return True
실행
print("=== Basic Context Manager ===")
with DatabaseConnection("postgresql://localhost:5432/mydb") as conn:
print(f"Connection status: {conn['connected']}")
여기서 실제 데이터베이스 작업 수행
print("Performing database operations...")
print("\n=== Contextlib Timer ===")
with timer_context("Data Processing"):
time.sleep(0.5) 실제 작업 시뮬레이션
print("Processing data...")
print("\n=== Retry Context Manager ===")
def unreliable_operation():
"""불안정한 작업"""
import random
if random.random() < 0.6:
raise ValueError("Random failure")
return "Success"
with TaskExecutionContext("Unreliable Operation", max_retries=2) as context:
result = unreliable_operation()
print(f"Operation result: {result}")
3단계: 고급 Python
1. 멀티스레딩 이해
python
샘플 1: 기본 스레딩
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def basic_threading():
"""기본 스레딩 예제"""
def task_worker(task_name, duration):
"""작업 수행 함수"""
print(f"{task_name} started on thread {threading.current_thread().name}")
time.sleep(duration)
print(f"{task_name} completed after {duration}s")
return f"{task_name}_result"
단일 스레드
print("=== Single Thread ===")
start_time = time.time()
result1 = task_worker("Task1", 1)
result2 = task_worker("Task2", 2)
single_thread_time = time.time() - start_time
멀티 스레드
print("\n=== Multi Thread ===")
start_time = time.time()
thread1 = threading.Thread(target=task_worker, args=("Task1", 1))
thread2 = threading.Thread(target=task_worker, args=("Task2", 2))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
multi_thread_time = time.time() - start_time
print(f"\nSingle thread time: {single_thread_time:.2f}s")
print(f"Multi thread time: {multi_thread_time:.2f}s")
샘플 2: ThreadPoolExecutor
def thread_pool_example():
"""스레드 풀 예제"""
def process_data_chunk(chunk_id, processing_time):
"""데이터 청크 처리"""
print(f"Processing chunk {chunk_id}...")
time.sleep(processing_time)
result = f"processed_chunk_{chunk_id}"
print(f"Chunk {chunk_id} processed")
return result
처리할 데이터 청크들
chunks = [
(1, 0.5), (2, 1.0), (3, 0.3), (4, 0.8), (5, 0.2)
]
print("=== Processing with ThreadPool ===")
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
futures 생성
futures = []
for chunk_id, processing_time in chunks:
future = executor.submit(process_data_chunk, chunk_id, processing_time)
futures.append(future)
결과 수집
results = []
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Chunk processing failed: {e}")
total_time = time.time() - start_time
print(f"\nAll chunks processed in {total_time:.2f}s")
print(f"Results: {results}")
샘플 3: 스레드 안전성
import threading
from queue import Queue
def thread_safe_example():
"""스레드 안전한 작업 큐"""
class TaskQueue:
def __init__(self):
self.queue = Queue()
self.lock = threading.Lock()
self.results = []
def add_task(self, task):
"""태스크 추가 (스레드 안전)"""
with self.lock:
self.queue.put(task)
print(f"➕ Added task: {task}")
def worker(self, worker_id):
"""작업자 스레드"""
while not self.queue.empty():
try:
task = self.queue.get_nowait()
print(f"Worker {worker_id} processing: {task}")
작업 시뮬레이션
time.sleep(0.3)
result = f"processed_{task}"
with self.lock:
self.results.append(result)
print(f"Worker {worker_id} completed: {task}")
self.queue.task_done()
except Exception as e:
break
테스트
task_queue = TaskQueue()
태스크 추가
for i in range(10):
task_queue.add_task(f"task_{i}")
작업자 스레드 시작
workers = []
for i in range(3):
worker = threading.Thread(target=task_queue.worker, args=(i,))
workers.append(worker)
worker.start()
모든 작업자 대기
for worker in workers:
worker.join()
print(f"\nFinal results: {task_queue.results}")
실행
basic_threading()
print("\n" + "="*50 + "\n")
thread_pool_example()
print("\n" + "="*50 + "\n")
thread_safe_example()
2. 비동기 프로그래밍 이해
python
샘플 1: 기본 async/await
import asyncio
async def basic_async():
"""기본 비동기 함수"""
async def fetch_data(source, delay):
"""데이터 비동기 조회"""
print(f"Fetching data from {source}...")
await asyncio.sleep(delay)
return f"data_from_{source}"
async def process_data(data, process_time):
"""데이터 비동기 처리"""
print(f"⚙️ Processing {data}...")
await asyncio.sleep(process_time)
return f"processed_{data}"
순차적 실행
print("=== Sequential Execution ===")
start_time = time.time()
data1 = await fetch_data("API", 1.0)
processed1 = await process_data(data1, 0.5)
data2 = await fetch_data("Database", 0.8)
processed2 = await process_data(data2, 0.3)
seq_time = time.time() - start_time
print(f"Sequential time: {seq_time:.2f}s")
병렬 실행
print("\n=== Parallel Execution ===")
start_time = time.time()
task1 = asyncio.create_task(fetch_data("API", 1.0))
task2 = asyncio.create_task(fetch_data("Database", 0.8))
data1, data2 = await asyncio.gather(task1, task2)
process_task1 = asyncio.create_task(process_data(data1, 0.5))
process_task2 = asyncio.create_task(process_data(data2, 0.3))
processed1, processed2 = await asyncio.gather(process_task1, process_task2)
parallel_time = time.time() - start_time
print(f"Parallel time: {parallel_time:.2f}s")
print(f"\nResults: {processed1}, {processed2}")
샘플 2: 비동기 웹 요청
async def async_web_requests():
"""비동기 웹 요청"""
import aiohttp
async def fetch_url(session, url, timeout=5):
"""URL 비동기 조회"""
try:
async with session.get(url, timeout=timeout) as response:
if response.status == 200:
content = await response.text()
return f"{url}: {len(content)} bytes"
else:
return f"{url}: HTTP {response.status}"
except asyncio.TimeoutError:
return f"{url}: Timeout"
except Exception as e:
return f"{url}: Error {e}"
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/status/404"
]
print("=== Async Web Requests ===")
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
print(f"Total time: {total_time:.2f}s")
for result in results:
print(f" {result}")
샘플 3: 비동기 파이프라인
async def async_pipeline():
"""비동기 데이터 파이프라인"""
async def extract():
print("Extracting data...")
await asyncio.sleep(1.0)
return ["data1", "data2", "data3"]
async def transform(data):
print(f"Transforming {data}...")
await asyncio.sleep(0.5)
return [f"transformed_{item}" for item in data]
async def load(transformed_data):
print(f"Loading {transformed_data}...")
await asyncio.sleep(0.8)
return f"loaded_{len(transformed_data)}_items"
async def validate(result):
print(f"Validating {result}...")
await asyncio.sleep(0.3)
return f"validated_{result}"
print("=== Async ETL Pipeline ===")
start_time = time.time()
파이프라인 실행
raw_data = await extract()
transformed_data = await transform(raw_data)
loaded_result = await load(transformed_data)
final_result = await validate(loaded_result)
total_time = time.time() - start_time
print(f"Pipeline completed in {total_time:.2f}s")
print(f"Final result: {final_result}")
실행
async def main():
await basic_async()
print("\n" + "="*50 + "\n")
await async_web_requests()
print("\n" + "="*50 + "\n")
await async_pipeline()
asyncio.run(main())
3. 고급 데코레이터 사용
python
샘플 1: 파라미터를 가진 데코레이터
def retry_with_backoff(max_retries=3, initial_delay=1, backoff_factor=2):
"""백오프를 사용한 재시도 데코레이터"""
def decorator(func):
def wrapper(*args, kwargs):
import time
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, kwargs)
except Exception as e:
if attempt == max_retries - 1:
print(f"All {max_retries} retries failed")
raise
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Waiting {delay}s before retry...")
time.sleep(delay)
delay *= backoff_factor 지수 백오프
return None
return wrapper
return decorator
@retry_with_backoff(max_retries=4, initial_delay=1, backoff_factor=2)
def unreliable_operation():
"""불안정한 작업"""
import random
if random.random() < 0.8:
raise ConnectionError("Temporary failure")
return "Operation successful"
샘플 2: 여러 데코레이터 조합
def log_execution(func):
"""실행 로깅 데코레이터"""
def wrapper(*args, kwargs):
print(f"Starting {func.__name__}")
start_time = time.time()
try:
result = func(*args, kwargs)
end_time = time.time()
print(f"{func.__name__} completed in {end_time - start_time:.2f}s")
return result
except Exception as e:
end_time = time.time()
print(f"{func.__name__} failed after {end_time - start_time:.2f}s: {e}")
raise
return wrapper
def validate_input(*validators):
"""입력 검증 데코레이터"""
def decorator(func):
def wrapper(*args, kwargs):
args 검증
for i, arg in enumerate(args):
if i < len(validators) and validators[i]:
if not validators[i](arg):
raise ValueError(f"Invalid argument {i}: {arg}")
kwargs 검증 (간단한 구현)
for key, value in kwargs.items():
if key == 'timeout' and value <= 0:
raise ValueError("Timeout must be positive")
return func(*args, kwargs)
return wrapper
return decorator
@log_execution
@validate_input(lambda x: isinstance(x, str) and x, lambda x: x > 0)
@retry_with_backoff(max_retries=3)
def api_call(endpoint, timeout=30):
"""API 호출 함수"""
print(f"Calling {endpoint} with timeout {timeout}")
import random
if random.random() < 0.6:
raise ConnectionError("API unavailable")
return f"Response from {endpoint}"
샘플 3: 클래스 데코레이터 with 파라미터
def add_methods(*methods):
"""클래스에 메서드 추가 데코레이터"""
def decorator(cls):
for method_name, method_func in methods:
setattr(cls, method_name, method_func)
return cls
return decorator
def create_status_method(status):
"""상태 메서드 생성"""
def status_method(self):
return f"Status: {status}"
return status_method
@add_methods(
("get_running_status", create_status_method("running")),
("get_success_status", create_status_method("success")),
("get_failed_status", create_status_method("failed"))
)
class Task:
def __init__(self, name):
self.name = name
def execute(self):
return f"Executing {self.name}"
실행
print("=== Retry with Backoff ===")
try:
result = unreliable_operation()
print(f"Result: {result}")
except Exception as e:
print(f"Final error: {e}")
print("\n=== Combined Decorators ===")
try:
result = api_call("https://api.example.com/data", timeout=10)
print(f"API result: {result}")
except Exception as e:
print(f"API call failed: {e}")
print("\n=== Class Decorator ===")
task = Task("test_task")
print(task.execute())
print(task.get_running_status())
print(task.get_success_status())
print(task.get_failed_status())
끝.
