티스토리 뷰

카테고리 없음

python 단계별 샘플 예제

자바바라 2025. 9. 21. 07:16

1단계: Python 기초 완성

 1. 변수, 데이터 타입 이해
python
 샘플 1: 기본 데이터 타입과 연산
def demonstrate_basic_types():
    """기본 데이터 타입 이해"""
     문자열
    dag_name = "sales_pipeline"
    schedule = "@daily"
    
     숫자
    retry_count = 3
    timeout_seconds = 300.5
    
     불리언
    is_active = True
    is_production = False
    
     None
    last_run_result = None
    
    print(f"DAG: {dag_name}, Schedule: {schedule}")
    print(f"Retries: {retry_count}, Timeout: {timeout_seconds}s")
    print(f"Active: {is_active}, Production: {is_production}")
    print(f"Last run: {last_run_result}")
    
     타입 확인
    print(f"Type of dag_name: {type(dag_name)}")
    print(f"Type of retry_count: {type(retry_count)}")
    print(f"Type of is_active: {type(is_active)}")

 샘플 2: 데이터 타입 변환
def type_conversion_example():
    """데이터 타입 변환"""
     문자열 → 숫자
    retry_str = "3"
    retry_int = int(retry_str)
    print(f"String '{retry_str}' to int: {retry_int}")
    
     숫자 → 문자열
    timeout = 300
    timeout_str = str(timeout)
    print(f"Int {timeout} to string: '{timeout_str}'")
    
     불리언 변환
    truthy_values = [1, "hello", [1, 2], True]
    falsy_values = [0, "", [], False, None]
    
    print("Truthy values:")
    for val in truthy_values:
        print(f"  {val} -> {bool(val)}")
    
    print("Falsy values:")
    for val in falsy_values:
        print(f"  {val} -> {bool(val)}")

 샘플 3: 복합 데이터 타입
def complex_types_example():
    """복합 데이터 타입"""
     튜플 (immutable)
    task_config = ("extract_data", "python_operator", 300)
    print(f"Task config tuple: {task_config}")
    print(f"Task name: {task_config[0]}, Type: {task_config[1]}")
    
     리스트 (mutable)
    task_list = ["extract", "transform", "load"]
    task_list.append("validate")   추가
    task_list.remove("transform")   제거
    print(f"Task list: {task_list}")
    
     세트 (중복 제거)
    unique_tasks = set(["extract", "transform", "load", "extract"])
    print(f"Unique tasks: {unique_tasks}")

 실행
demonstrate_basic_types()
print("\n" + "="*50 + "\n")
type_conversion_example()
print("\n" + "="*50 + "\n")
complex_types_example()


 2. 리스트, 딕셔너리 활용
python
 샘플 1: 리스트 기본 연산
def list_operations():
    """리스트 기본 연산"""
    tasks = ["extract", "transform", "load"]
    
     추가
    tasks.append("validate")
    tasks.insert(1, "pre_process")
    
     제거
    removed = tasks.pop()   마지막 요소
    tasks.remove("pre_process")
    
     슬라이싱
    first_two = tasks[:2]
    last_one = tasks[-1:]
    
    print(f"All tasks: {tasks}")
    print(f"First two: {first_two}")
    print(f"Last one: {last_one}")
    print(f"Removed: {removed}")
    
     리스트 컴프리헨션
    task_lengths = [len(task) for task in tasks]
    print(f"Task name lengths: {task_lengths}")

 샘플 2: 딕셔너리 활용
def dictionary_operations():
    """딕셔너리 기본 연산"""
     DAG 설정
    dag_config = {
        "dag_id": "sales_pipeline",
        "schedule_interval": "@daily",
        "start_date": "2024-01-01",
        "retries": 3,
        "retry_delay": 300
    }
    
     값 접근
    print(f"DAG ID: {dag_config['dag_id']}")
    print(f"Schedule: {dag_config.get('schedule_interval', 'Not set')}")
    
     값 수정
    dag_config["retries"] = 5
    dag_config["timeout"] = 600   새로운 키 추가
    
     모든 항목 순회
    print("\nDAG Configuration:")
    for key, value in dag_config.items():
        print(f"  {key}: {value}")
    
     딕셔너리 컴프리헨션
    config_lengths = {k: len(str(v)) for k, v in dag_config.items()}
    print(f"\nConfig value lengths: {config_lengths}")

 샘플 3: 중첩 데이터 구조
def nested_data_structures():
    """중첩 데이터 구조"""
     파이프라인 정의
    pipeline = {
        "name": "data_processing",
        "tasks": [
            {
                "task_id": "extract_data",
                "operator": "PythonOperator",
                "params": {"source": "database"}
            },
            {
                "task_id": "transform_data",
                "operator": "PythonOperator", 
                "params": {"method": "normalize"},
                "dependencies": ["extract_data"]
            }
        ],
        "schedule": {
            "interval": "@daily",
            "start_time": "02:00"
        }
    }
    
     중첩 데이터 접근
    print(f"Pipeline: {pipeline['name']}")
    print(f"First task: {pipeline['tasks'][0]['task_id']}")
    print(f"Schedule: {pipeline['schedule']['interval']} at {pipeline['schedule']['start_time']}")
    
     tasks 순회
    print("\nTasks:")
    for task in pipeline["tasks"]:
        deps = task.get("dependencies", [])
        print(f"  {task['task_id']} -> depends on: {deps}")

 실행
list_operations()
print("\n" + "="*50 + "\n")
dictionary_operations() 
print("\n" + "="*50 + "\n")
nested_data_structures()


 3. 조건문, 반복문 작성
python
 샘플 1: 기본 조건문
def conditional_statements():
    """기본 조건문"""
    task_status = "success"
    retry_count = 2
    max_retries = 3
    
     if-elif-else
    if task_status == "success":
        print("Task completed successfully")
    elif task_status == "failed" and retry_count < max_retries:
        print("Task failed, retrying...")
    elif task_status == "failed":
        print("Task failed, no more retries")
    else:
        print("⏳ Task is running...")
    
     3항 연산자
    message = "Proceed" if task_status == "success" else "Wait"
    print(f"Decision: {message}")

 샘플 2: 다양한 반복문
def loop_examples():
    """다양한 반복문"""
    tasks = ["extract", "transform", "load", "validate"]
    
     for loop
    print("Task list:")
    for task in tasks:
        print(f"  - {task}")
    
     enumerate로 인덱스 함께
    print("\nTask with index:")
    for i, task in enumerate(tasks, 1):
        print(f"  {i}. {task}")
    
     while loop
    print("\nCountdown:")
    count = 3
    while count > 0:
        print(f"  Starting in {count}...")
        count -= 1
    print("  Go!")

 샘플 3: 반복문과 조건문 결합
def conditional_loops():
    """반복문과 조건문 결합"""
    task_results = [
        {"task": "extract", "status": "success", "duration": 120},
        {"task": "transform", "status": "failed", "duration": 180},
        {"task": "load", "status": "success", "duration": 90},
        {"task": "validate", "status": "running", "duration": 60}
    ]
    
     성공한 태스크만 필터링
    successful_tasks = []
    for result in task_results:
        if result["status"] == "success":
            successful_tasks.append(result["task"])
    
    print(f"Successful tasks: {successful_tasks}")
    
     실패한 태스크 상세 정보
    print("\nFailed tasks:")
    for result in task_results:
        if result["status"] == "failed":
            print(f"  {result['task']} failed after {result['duration']}s")
    
     break와 continue
    print("\nProcessing until first failure:")
    for result in task_results:
        if result["status"] == "failed":
            print(f"  Stopped at failed task: {result['task']}")
            break
        print(f"  Processed: {result['task']}")

 실행
conditional_statements()
print("\n" + "="*50 + "\n")
loop_examples()
print("\n" + "="*50 + "\n")
conditional_loops()


 2단계: 중급 Python

 1. 클래스 정의 및 객체 생성
python
 샘플 1: 기본 클래스
class DAGConfig:
    """DAG 설정을 관리하는 클래스"""
    
    def __init__(self, dag_id, schedule_interval="@daily", start_date=None):
        self.dag_id = dag_id
        self.schedule_interval = schedule_interval
        self.start_date = start_date or "2024-01-01"
        self.tasks = []
        self.default_args = {}
    
    def add_task(self, task_name, operator_type, kwargs):
        """태스크 추가"""
        task_config = {
            "task_name": task_name,
            "operator_type": operator_type,
            kwargs
        }
        self.tasks.append(task_config)
        print(f"Added task: {task_name}")
    
    def set_default_args(self, kwargs):
        """기본 인자 설정"""
        self.default_args.update(kwargs)
    
    def display_config(self):
        """설정 정보 출력"""
        print(f"\nDAG: {self.dag_id}")
        print(f"Schedule: {self.schedule_interval}")
        print(f"Start: {self.start_date}")
        print(f"Default args: {self.default_args}")
        print("Tasks:")
        for task in self.tasks:
            print(f"  - {task['task_name']} ({task['operator_type']})")

 사용 예제
config = DAGConfig("sales_pipeline", "@hourly", "2024-01-01")
config.set_default_args(retries=3, owner="data_team")
config.add_task("extract", "PythonOperator", python_callable="extract_data")
config.add_task("transform", "PythonOperator", python_callable="transform_data")
config.display_config()

 샘플 2: 상속과 다형성
class BaseTask:
    """기본 태스크 클래스"""
    
    def __init__(self, task_id):
        self.task_id = task_id
        self.dependencies = []
    
    def add_dependency(self, task):
        """의존성 추가"""
        self.dependencies.append(task)
        return self
    
    def execute(self):
        """태스크 실행 (추상 메서드)"""
        raise NotImplementedError("Subclasses must implement execute()")

class PythonTask(BaseTask):
    """PythonOperator를 모방한 클래스"""
    
    def __init__(self, task_id, python_callable, kwargs):
        super().__init__(task_id)
        self.python_callable = python_callable
        self.kwargs = kwargs
    
    def execute(self):
        """Python 함수 실행"""
        print(f"Executing Python task: {self.task_id}")
        try:
            result = self.python_callable(self.kwargs)
            print(f"Task {self.task_id} completed: {result}")
            return result
        except Exception as e:
            print(f"Task {self.task_id} failed: {e}")
            raise

class BashTask(BaseTask):
    """BashOperator를 모방한 클래스"""
    
    def __init__(self, task_id, bash_command):
        super().__init__(task_id)
        self.bash_command = bash_command
    
    def execute(self):
        """Bash 명령어 실행"""
        print(f"Executing Bash task: {self.task_id}")
        print(f"Command: {self.bash_command}")
         실제로는 subprocess.run() 등을 사용
        return f"Bash command executed: {self.bash_command}"

 사용 예제
def sample_function():
    return "Hello from Python"

python_task = PythonTask("python_task", sample_function)
bash_task = BashTask("bash_task", "echo 'Hello from Bash'")

python_task.execute()
bash_task.execute()

 샘플 3: 클래스 메서드와 정적 메서드
class TaskFactory:
    """태스크 생성 팩토리 클래스"""
    
    task_registry = {}   클래스 변수
    
    @classmethod
    def register_task_type(cls, task_type, task_class):
        """태스크 타입 등록"""
        cls.task_registry[task_type] = task_class
        print(f"Registered task type: {task_type}")
    
    @classmethod
    def create_task(cls, task_type, task_id, kwargs):
        """태스크 생성"""
        if task_type not in cls.task_registry:
            raise ValueError(f"Unknown task type: {task_type}")
        
        task_class = cls.task_registry[task_type]
        return task_class(task_id, kwargs)
    
    @staticmethod
    def validate_task_id(task_id):
        """태스크 ID 유효성 검사"""
        if not task_id or not isinstance(task_id, str):
            return False
        return len(task_id) <= 50   Airflow 제한

 팩토리 사용
TaskFactory.register_task_type("python", PythonTask)
TaskFactory.register_task_type("bash", BashTask)

task1 = TaskFactory.create_task("python", "extract_data", python_callable=sample_function)
task2 = TaskFactory.create_task("bash", "cleanup", bash_command="rm -rf /tmp/*")

print(f"Task1 ID valid: {TaskFactory.validate_task_id('extract_data')}")
print(f"Task2 ID valid: {TaskFactory.validate_task_id('')}")

task1.execute()
task2.execute()


 2. 데코레이터 이해 및 사용
python
 샘플 1: 기본 데코레이터
def retry_on_failure(max_retries=3):
    """실패시 재시도 데코레이터"""
    def decorator(func):
        def wrapper(*args, kwargs):
            for attempt in range(max_retries):
                try:
                    print(f"Attempt {attempt + 1}/{max_retries}")
                    result = func(*args, kwargs)
                    print("Success!")
                    return result
                except Exception as e:
                    print(f"Failed: {e}")
                    if attempt == max_retries - 1:
                        print("All retries exhausted")
                        raise
                    print("Retrying...")
        return wrapper
    return decorator

@retry_on_failure(max_retries=2)
def unstable_api_call():
    """불안정한 API 호출 시뮬레이션"""
    import random
    if random.random() < 0.7:
        raise ConnectionError("API connection failed")
    return "API response data"

 샘플 2: 파라미터를 가진 데코레이터  
def timeout(seconds):
    """타임아웃 데코레이터"""
    def decorator(func):
        def wrapper(*args, kwargs):
            import signal
            from functools import wraps
            
            def timeout_handler(signum, frame):
                raise TimeoutError(f"Function timed out after {seconds} seconds")
            
            @wraps(func)
            def wrapped(*args, kwargs):
                signal.signal(signal.SIGALRM, timeout_handler)
                signal.alarm(seconds)
                try:
                    result = func(*args, kwargs)
                    signal.alarm(0)   알람 취소
                    return result
                except TimeoutError:
                    raise
                finally:
                    signal.alarm(0)
            
            return wrapped(*args, kwargs)
        return wrapper
    return decorator

@timeout(3)   3초 타임아웃
def long_running_task():
    """오래 걸리는 작업"""
    import time
    time.sleep(5)   5초 sleep → 타임아웃 발생
    return "Task completed"

 샘플 3: 클래스 데코레이터
class LogExecution:
    """실행 로깅 데코레이터 클래스"""
    
    def __init__(self, func):
        self.func = func
        self.execution_count = 0
    
    def __call__(self, *args, kwargs):
        self.execution_count += 1
        print(f"Executing {self.func.__name__} (count: {self.execution_count})")
        
        start_time = time.time()
        try:
            result = self.func(*args, kwargs)
            end_time = time.time()
            
            print(f"{self.func.__name__} completed in {end_time - start_time:.2f}s")
            return result
        except Exception as e:
            end_time = time.time()
            print(f"{self.func.__name__} failed after {end_time - start_time:.2f}s: {e}")
            raise

@LogExecution
def important_task():
    """중요한 작업"""
    import time
    time.sleep(1)
    return "Important result"

 실행
print("=== Retry Decorator ===")
try:
    result = unstable_api_call()
    print(f"Final result: {result}")
except Exception as e:
    print(f"Final error: {e}")

print("\n=== Timeout Decorator ===")
try:
    result = long_running_task()
    print(f"Result: {result}")
except TimeoutError as e:
    print(f"Timeout: {e}")

print("\n=== Class Decorator ===")
important_task()
important_task()   두 번 실행해서 count 확인


 3. 컨텍스트 매니저 구현
python
 샘플 1: 기본 컨텍스트 매니저
class DatabaseConnection:
    """데이터베이스 연결 컨텍스트 매니저"""
    
    def __init__(self, connection_string):
        self.connection_string = connection_string
        self.connection = None
    
    def __enter__(self):
        """연결 시작"""
        print(f"🔗 Connecting to {self.connection_string}")
         실제로는 데이터베이스 연결 코드
        self.connection = {"connected": True, "string": self.connection_string}
        return self.connection
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """연결 종료"""
        if self.connection:
            print("Closing database connection")
            self.connection["connected"] = False
        
        if exc_type:
            print(f"Error occurred: {exc_val}")
             False를 반환하면 예외가 전파됨, True면 억제됨
            return False

 샘플 2: contextlib 사용
from contextlib import contextmanager
import time

@contextmanager
def timer_context(name):
    """실행 시간 측정 컨텍스트 매니저"""
    start_time = time.time()
    print(f"Starting {name}...")
    
    try:
        yield   여기서 실제 코드 실행
    except Exception as e:
        end_time = time.time()
        print(f"{name} failed after {end_time - start_time:.2f}s: {e}")
        raise
    else:
        end_time = time.time()
        print(f"{name} completed in {end_time - start_time:.2f}s")

 샘플 3: 중첩 컨텍스트 매니저
class TaskExecutionContext:
    """태스크 실행 컨텍스트 매니저"""
    
    def __init__(self, task_name, max_retries=3):
        self.task_name = task_name
        self.max_retries = max_retries
        self.retry_count = 0
    
    def __enter__(self):
        """태스크 시작"""
        print(f"Starting task: {self.task_name}")
        self.start_time = time.time()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """태스크 종료"""
        self.end_time = time.time()
        duration = self.end_time - self.start_time
        
        if exc_type:
            self.retry_count += 1
            if self.retry_count < self.max_retries:
                print(f"{self.task_name} failed, retrying... (attempt {self.retry_count + 1})")
                return True   예외 억제하고 재시도
            else:
                print(f"{self.task_name} failed after {self.retry_count} retries: {exc_val}")
                return False
        else:
            print(f"{self.task_name} completed in {duration:.2f}s")
            return True

 실행
print("=== Basic Context Manager ===")
with DatabaseConnection("postgresql://localhost:5432/mydb") as conn:
    print(f"Connection status: {conn['connected']}")
     여기서 실제 데이터베이스 작업 수행
    print("Performing database operations...")

print("\n=== Contextlib Timer ===")
with timer_context("Data Processing"):
    time.sleep(0.5)   실제 작업 시뮬레이션
    print("Processing data...")

print("\n=== Retry Context Manager ===")
def unreliable_operation():
    """불안정한 작업"""
    import random
    if random.random() < 0.6:
        raise ValueError("Random failure")
    return "Success"

with TaskExecutionContext("Unreliable Operation", max_retries=2) as context:
    result = unreliable_operation()
    print(f"Operation result: {result}")


 3단계: 고급 Python

 1. 멀티스레딩 이해
python
 샘플 1: 기본 스레딩
import threading
import time
from concurrent.futures import ThreadPoolExecutor

def basic_threading():
    """기본 스레딩 예제"""
    
    def task_worker(task_name, duration):
        """작업 수행 함수"""
        print(f"{task_name} started on thread {threading.current_thread().name}")
        time.sleep(duration)
        print(f"{task_name} completed after {duration}s")
        return f"{task_name}_result"
    
     단일 스레드
    print("=== Single Thread ===")
    start_time = time.time()
    result1 = task_worker("Task1", 1)
    result2 = task_worker("Task2", 2)
    single_thread_time = time.time() - start_time
    
     멀티 스레드
    print("\n=== Multi Thread ===")
    start_time = time.time()
    
    thread1 = threading.Thread(target=task_worker, args=("Task1", 1))
    thread2 = threading.Thread(target=task_worker, args=("Task2", 2))
    
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    
    multi_thread_time = time.time() - start_time
    
    print(f"\nSingle thread time: {single_thread_time:.2f}s")
    print(f"Multi thread time: {multi_thread_time:.2f}s")

 샘플 2: ThreadPoolExecutor
def thread_pool_example():
    """스레드 풀 예제"""
    
    def process_data_chunk(chunk_id, processing_time):
        """데이터 청크 처리"""
        print(f"Processing chunk {chunk_id}...")
        time.sleep(processing_time)
        result = f"processed_chunk_{chunk_id}"
        print(f"Chunk {chunk_id} processed")
        return result
    
     처리할 데이터 청크들
    chunks = [
        (1, 0.5), (2, 1.0), (3, 0.3), (4, 0.8), (5, 0.2)
    ]
    
    print("=== Processing with ThreadPool ===")
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=3) as executor:
         futures 생성
        futures = []
        for chunk_id, processing_time in chunks:
            future = executor.submit(process_data_chunk, chunk_id, processing_time)
            futures.append(future)
        
         결과 수집
        results = []
        for future in futures:
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                print(f"Chunk processing failed: {e}")
    
    total_time = time.time() - start_time
    print(f"\nAll chunks processed in {total_time:.2f}s")
    print(f"Results: {results}")

 샘플 3: 스레드 안전성
import threading
from queue import Queue

def thread_safe_example():
    """스레드 안전한 작업 큐"""
    
    class TaskQueue:
        def __init__(self):
            self.queue = Queue()
            self.lock = threading.Lock()
            self.results = []
        
        def add_task(self, task):
            """태스크 추가 (스레드 안전)"""
            with self.lock:
                self.queue.put(task)
                print(f"➕ Added task: {task}")
        
        def worker(self, worker_id):
            """작업자 스레드"""
            while not self.queue.empty():
                try:
                    task = self.queue.get_nowait()
                    print(f"Worker {worker_id} processing: {task}")
                    
                     작업 시뮬레이션
                    time.sleep(0.3)
                    result = f"processed_{task}"
                    
                    with self.lock:
                        self.results.append(result)
                        print(f"Worker {worker_id} completed: {task}")
                    
                    self.queue.task_done()
                    
                except Exception as e:
                    break
    
     테스트
    task_queue = TaskQueue()
    
     태스크 추가
    for i in range(10):
        task_queue.add_task(f"task_{i}")
    
     작업자 스레드 시작
    workers = []
    for i in range(3):
        worker = threading.Thread(target=task_queue.worker, args=(i,))
        workers.append(worker)
        worker.start()
    
     모든 작업자 대기
    for worker in workers:
        worker.join()
    
    print(f"\nFinal results: {task_queue.results}")

 실행
basic_threading()
print("\n" + "="*50 + "\n")
thread_pool_example()
print("\n" + "="*50 + "\n")
thread_safe_example()


 2. 비동기 프로그래밍 이해
python
 샘플 1: 기본 async/await
import asyncio

async def basic_async():
    """기본 비동기 함수"""
    
    async def fetch_data(source, delay):
        """데이터 비동기 조회"""
        print(f"Fetching data from {source}...")
        await asyncio.sleep(delay)
        return f"data_from_{source}"
    
    async def process_data(data, process_time):
        """데이터 비동기 처리"""
        print(f"⚙️ Processing {data}...")
        await asyncio.sleep(process_time)
        return f"processed_{data}"
    
     순차적 실행
    print("=== Sequential Execution ===")
    start_time = time.time()
    
    data1 = await fetch_data("API", 1.0)
    processed1 = await process_data(data1, 0.5)
    
    data2 = await fetch_data("Database", 0.8)
    processed2 = await process_data(data2, 0.3)
    
    seq_time = time.time() - start_time
    print(f"Sequential time: {seq_time:.2f}s")
    
     병렬 실행
    print("\n=== Parallel Execution ===")
    start_time = time.time()
    
    task1 = asyncio.create_task(fetch_data("API", 1.0))
    task2 = asyncio.create_task(fetch_data("Database", 0.8))
    
    data1, data2 = await asyncio.gather(task1, task2)
    
    process_task1 = asyncio.create_task(process_data(data1, 0.5))
    process_task2 = asyncio.create_task(process_data(data2, 0.3))
    
    processed1, processed2 = await asyncio.gather(process_task1, process_task2)
    
    parallel_time = time.time() - start_time
    print(f"Parallel time: {parallel_time:.2f}s")
    
    print(f"\nResults: {processed1}, {processed2}")

 샘플 2: 비동기 웹 요청
async def async_web_requests():
    """비동기 웹 요청"""
    import aiohttp
    
    async def fetch_url(session, url, timeout=5):
        """URL 비동기 조회"""
        try:
            async with session.get(url, timeout=timeout) as response:
                if response.status == 200:
                    content = await response.text()
                    return f"{url}: {len(content)} bytes"
                else:
                    return f"{url}: HTTP {response.status}"
        except asyncio.TimeoutError:
            return f"{url}: Timeout"
        except Exception as e:
            return f"{url}: Error {e}"
    
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2", 
        "https://httpbin.org/delay/3",
        "https://httpbin.org/status/404"
    ]
    
    print("=== Async Web Requests ===")
    start_time = time.time()
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    total_time = time.time() - start_time
    print(f"Total time: {total_time:.2f}s")
    
    for result in results:
        print(f"  {result}")

 샘플 3: 비동기 파이프라인
async def async_pipeline():
    """비동기 데이터 파이프라인"""
    
    async def extract():
        print("Extracting data...")
        await asyncio.sleep(1.0)
        return ["data1", "data2", "data3"]
    
    async def transform(data):
        print(f"Transforming {data}...")
        await asyncio.sleep(0.5)
        return [f"transformed_{item}" for item in data]
    
    async def load(transformed_data):
        print(f"Loading {transformed_data}...")
        await asyncio.sleep(0.8)
        return f"loaded_{len(transformed_data)}_items"
    
    async def validate(result):
        print(f"Validating {result}...")
        await asyncio.sleep(0.3)
        return f"validated_{result}"
    
    print("=== Async ETL Pipeline ===")
    start_time = time.time()
    
     파이프라인 실행
    raw_data = await extract()
    transformed_data = await transform(raw_data)
    loaded_result = await load(transformed_data)
    final_result = await validate(loaded_result)
    
    total_time = time.time() - start_time
    print(f"Pipeline completed in {total_time:.2f}s")
    print(f"Final result: {final_result}")

 실행
async def main():
    await basic_async()
    print("\n" + "="*50 + "\n")
    await async_web_requests()
    print("\n" + "="*50 + "\n")
    await async_pipeline()

 asyncio.run(main())


 3. 고급 데코레이터 사용
python
 샘플 1: 파라미터를 가진 데코레이터
def retry_with_backoff(max_retries=3, initial_delay=1, backoff_factor=2):
    """백오프를 사용한 재시도 데코레이터"""
    def decorator(func):
        def wrapper(*args, kwargs):
            import time
            delay = initial_delay
            
            for attempt in range(max_retries):
                try:
                    return func(*args, kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        print(f"All {max_retries} retries failed")
                        raise
                    
                    print(f"Attempt {attempt + 1} failed: {e}")
                    print(f"Waiting {delay}s before retry...")
                    time.sleep(delay)
                    delay *= backoff_factor   지수 백오프
            
            return None
        return wrapper
    return decorator

@retry_with_backoff(max_retries=4, initial_delay=1, backoff_factor=2)
def unreliable_operation():
    """불안정한 작업"""
    import random
    if random.random() < 0.8:
        raise ConnectionError("Temporary failure")
    return "Operation successful"

 샘플 2: 여러 데코레이터 조합
def log_execution(func):
    """실행 로깅 데코레이터"""
    def wrapper(*args, kwargs):
        print(f"Starting {func.__name__}")
        start_time = time.time()
        
        try:
            result = func(*args, kwargs)
            end_time = time.time()
            print(f"{func.__name__} completed in {end_time - start_time:.2f}s")
            return result
        except Exception as e:
            end_time = time.time()
            print(f"{func.__name__} failed after {end_time - start_time:.2f}s: {e}")
            raise
    return wrapper

def validate_input(*validators):
    """입력 검증 데코레이터"""
    def decorator(func):
        def wrapper(*args, kwargs):
             args 검증
            for i, arg in enumerate(args):
                if i < len(validators) and validators[i]:
                    if not validators[i](arg):
                        raise ValueError(f"Invalid argument {i}: {arg}")
            
             kwargs 검증 (간단한 구현)
            for key, value in kwargs.items():
                if key == 'timeout' and value <= 0:
                    raise ValueError("Timeout must be positive")
            
            return func(*args, kwargs)
        return wrapper
    return decorator

@log_execution
@validate_input(lambda x: isinstance(x, str) and x, lambda x: x > 0)
@retry_with_backoff(max_retries=3)
def api_call(endpoint, timeout=30):
    """API 호출 함수"""
    print(f"Calling {endpoint} with timeout {timeout}")
    import random
    if random.random() < 0.6:
        raise ConnectionError("API unavailable")
    return f"Response from {endpoint}"

 샘플 3: 클래스 데코레이터 with 파라미터
def add_methods(*methods):
    """클래스에 메서드 추가 데코레이터"""
    def decorator(cls):
        for method_name, method_func in methods:
            setattr(cls, method_name, method_func)
        return cls
    return decorator

def create_status_method(status):
    """상태 메서드 생성"""
    def status_method(self):
        return f"Status: {status}"
    return status_method

@add_methods(
    ("get_running_status", create_status_method("running")),
    ("get_success_status", create_status_method("success")),
    ("get_failed_status", create_status_method("failed"))
)
class Task:
    def __init__(self, name):
        self.name = name
    
    def execute(self):
        return f"Executing {self.name}"

 실행
print("=== Retry with Backoff ===")
try:
    result = unreliable_operation()
    print(f"Result: {result}")
except Exception as e:
    print(f"Final error: {e}")

print("\n=== Combined Decorators ===")
try:
    result = api_call("https://api.example.com/data", timeout=10)
    print(f"API result: {result}")
except Exception as e:
    print(f"API call failed: {e}")

print("\n=== Class Decorator ===")
task = Task("test_task")
print(task.execute())
print(task.get_running_status())
print(task.get_success_status())
print(task.get_failed_status())

끝.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG more
«   2026/02   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
글 보관함