Python Deep Dive

Python Deep Dive: Advanced Programming Mastery

Table of Contents

  1. Advanced Python Features
  2. Object-Oriented Programming
  3. Metaclasses and Descriptors
  4. Decorators and Context Managers
  5. Concurrency and Parallelism
  6. Memory Management
  7. Performance Optimization
  8. Design Patterns in Python
  9. Testing and Debugging
  10. Python Internals

Advanced Python Features {#advanced-features}

Generators and Iterators

# Custom Iterator
class NumberSquares:
    def __init__(self, max_num):
        self.max_num = max_num
        self.current = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.current >= self.max_num:
            raise StopIteration
        else:
            result = self.current ** 2
            self.current += 1
            return result

# Generator Function
def fibonacci_generator(n):
    """Generate Fibonacci sequence up to n terms."""
    a, b = 0, 1
    count = 0
    while count < n:
        yield a
        a, b = b, a + b
        count += 1

# Generator Expression
squares = (x**2 for x in range(10))

# Advanced Generator with send()
def accumulator():
    """Generator that accumulates sent values."""
    total = 0
    while True:
        value = yield total
        if value is not None:
            total += value

# Usage
acc = accumulator()
next(acc)  # Initialize
print(acc.send(10))  # 10
print(acc.send(5))   # 15
print(acc.send(3))   # 18

# Generator for large datasets
def read_large_file(file_path):
    """Memory-efficient file reader."""
    with open(file_path, 'r') as file:
        for line in file:
            yield line.strip()

# Coroutine example
def grep_coroutine(pattern):
    """Coroutine that searches for pattern in sent lines."""
    print(f"Searching for '{pattern}'")
    try:
        while True:
            line = yield
            if pattern in line:
                print(f"Found: {line}")
    except GeneratorExit:
        print("Coroutine closing")

# Usage
searcher = grep_coroutine("error")
next(searcher)  # Prime the coroutine
searcher.send("This is an error message")
searcher.send("Normal log entry")
searcher.close()

Advanced Function Features

from functools import wraps, lru_cache, singledispatch, partial
from typing import Callable, Any, TypeVar, Generic

# Function annotations and type hints
def process_data(data: list[dict], 
                transform: Callable[[dict], dict],
                filter_func: Callable[[dict], bool] = None) -> list[dict]:
    """Process data with transformation and optional filtering."""
    result = [transform(item) for item in data]
    if filter_func:
        result = [item for item in result if filter_func(item)]
    return result

# Closures and nonlocal
def create_counter(start: int = 0):
    """Create a counter function with closure."""
    count = start

    def counter(increment: int = 1):
        nonlocal count
        count += increment
        return count

    def reset():
        nonlocal count
        count = start

    counter.reset = reset
    return counter

# Function factory
def create_validator(min_val: float = None, max_val: float = None):
    """Create a validation function."""
    def validator(value: float) -> bool:
        if min_val is not None and value < min_val:
            return False
        if max_val is not None and value > max_val:
            return False
        return True

    return validator

# Advanced decorator with parameters
def retry(max_attempts: int = 3, delay: float = 1.0, 
          exceptions: tuple = (Exception,)):
    """Retry decorator with configurable parameters."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        time.sleep(delay)
                    continue
            raise last_exception
        return wrapper
    return decorator

# Single dispatch (method overloading)
@singledispatch
def process_item(item):
    """Generic item processor."""
    raise NotImplementedError(f"Cannot process {type(item)}")

@process_item.register
def _(item: str):
    return item.upper()

@process_item.register
def _(item: int):
    return item * 2

@process_item.register
def _(item: list):
    return [process_item(x) for x in item]

# Partial application
def power(base: float, exponent: float) -> float:
    return base ** exponent

square = partial(power, exponent=2)
cube = partial(power, exponent=3)

print(square(5))  # 25
print(cube(3))    # 27

Object-Oriented Programming {#oop}

Advanced Class Features

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import ClassVar, Protocol

# Abstract base classes
class Shape(ABC):
    """Abstract shape class."""

    @abstractmethod
    def area(self) -> float:
        pass

    @abstractmethod
    def perimeter(self) -> float:
        pass

    def describe(self) -> str:
        return f"{self.__class__.__name__}: Area={self.area():.2f}, Perimeter={self.perimeter():.2f}"

class Rectangle(Shape):
    def __init__(self, width: float, height: float):
        self.width = width
        self.height = height

    def area(self) -> float:
        return self.width * self.height

    def perimeter(self) -> float:
        return 2 * (self.width + self.height)

# Dataclasses
@dataclass
class Point:
    x: float
    y: float
    z: float = 0.0

    def distance_from_origin(self) -> float:
        return (self.x**2 + self.y**2 + self.z**2)**0.5

@dataclass
class Person:
    name: str
    age: int
    email: str = ""
    skills: list[str] = field(default_factory=list)
    _id: int = field(default_factory=lambda: id(object()))

    def __post_init__(self):
        if not self.email:
            self.email = f"{self.name.lower().replace(' ', '.')}@example.com"

# Property decorators and descriptors
class Temperature:
    def __init__(self, celsius: float = 0):
        self._celsius = celsius

    @property
    def celsius(self) -> float:
        return self._celsius

    @celsius.setter
    def celsius(self, value: float):
        if value < -273.15:
            raise ValueError("Temperature cannot be below absolute zero")
        self._celsius = value

    @property
    def fahrenheit(self) -> float:
        return (self._celsius * 9/5) + 32

    @fahrenheit.setter
    def fahrenheit(self, value: float):
        self.celsius = (value - 32) * 5/9

    @property
    def kelvin(self) -> float:
        return self._celsius + 273.15

# Class and static methods
class MathUtils:
    PI: ClassVar[float] = 3.14159

    def __init__(self, precision: int = 2):
        self.precision = precision

    @classmethod
    def create_high_precision(cls):
        """Factory method for high precision instance."""
        return cls(precision=10)

    @staticmethod
    def is_prime(n: int) -> bool:
        """Check if number is prime."""
        if n < 2:
            return False
        for i in range(2, int(n**0.5) + 1):
            if n % i == 0:
                return False
        return True

    def circle_area(self, radius: float) -> float:
        area = self.PI * radius**2
        return round(area, self.precision)

# Multiple inheritance and MRO
class Flyable:
    def fly(self):
        return "Flying"

class Swimmable:
    def swim(self):
        return "Swimming"

class Duck(Flyable, Swimmable):
    def __init__(self, name: str):
        self.name = name

    def speak(self):
        return "Quack"

# Method Resolution Order
print(Duck.__mro__)
duck = Duck("Donald")
print(duck.fly(), duck.swim(), duck.speak())

Metaclasses and Advanced Features

# Basic metaclass
class SingletonMeta(type):
    """Metaclass that creates singleton instances."""
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

class Database(metaclass=SingletonMeta):
    def __init__(self, connection_string: str = "default"):
        self.connection_string = connection_string
        self.connected = False

    def connect(self):
        self.connected = True
        return f"Connected to {self.connection_string}"

# Attribute validation metaclass
class ValidatedMeta(type):
    def __new__(cls, name, bases, attrs):
        # Add validation to all methods
        for key, value in attrs.items():
            if callable(value) and not key.startswith('_'):
                attrs[key] = cls._add_validation(value)
        return super().__new__(cls, name, bases, attrs)

    @staticmethod
    def _add_validation(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            # Pre-execution validation
            if hasattr(self, '_validate'):
                self._validate()
            return func(self, *args, **kwargs)
        return wrapper

# Descriptor for attribute validation
class ValidatedAttribute:
    def __init__(self, validator_func, error_message="Invalid value"):
        self.validator = validator_func
        self.error_message = error_message
        self.private_name = None

    def __set_name__(self, owner, name):
        self.private_name = f'_{name}'

    def __get__(self, obj, objtype=None):
        if obj is None:
            return self
        return getattr(obj, self.private_name, None)

    def __set__(self, obj, value):
        if not self.validator(value):
            raise ValueError(self.error_message)
        setattr(obj, self.private_name, value)

class Person:
    name = ValidatedAttribute(
        lambda x: isinstance(x, str) and len(x) > 0,
        "Name must be non-empty string"
    )
    age = ValidatedAttribute(
        lambda x: isinstance(x, int) and 0 <= x <= 150,
        "Age must be integer between 0 and 150"
    )

    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age

# Protocol (structural typing)
class Drawable(Protocol):
    def draw(self) -> str: ...

class Circle:
    def __init__(self, radius: float):
        self.radius = radius

    def draw(self) -> str:
        return f"Drawing circle with radius {self.radius}"

def render_shape(shape: Drawable) -> str:
    return shape.draw()

circle = Circle(5)
print(render_shape(circle))  # Works due to structural typing

Decorators and Context Managers {#decorators}

Advanced Decorators

import functools
import time
from typing import Callable, Any

# Class-based decorator
class RateLimiter:
    def __init__(self, max_calls: int, time_window: int):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []

    def __call__(self, func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            # Remove old calls outside time window
            self.calls = [call_time for call_time in self.calls 
                         if now - call_time < self.time_window]

            if len(self.calls) >= self.max_calls:
                raise Exception(f"Rate limit exceeded: {self.max_calls} calls per {self.time_window}s")

            self.calls.append(now)
            return func(*args, **kwargs)

        return wrapper

# Decorator with state
class CallCounter:
    def __init__(self, func: Callable):
        self.func = func
        self.count = 0
        functools.update_wrapper(self, func)

    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"Call #{self.count} to {self.func.__name__}")
        return self.func(*args, **kwargs)

    def reset(self):
        self.count = 0

# Decorator factory with complex logic
def memoize_with_ttl(ttl_seconds: int = 300):
    """Memoization with time-to-live."""
    def decorator(func: Callable) -> Callable:
        cache = {}

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key
            key = str(args) + str(sorted(kwargs.items()))
            now = time.time()

            # Check if cached result exists and is still valid
            if key in cache:
                result, timestamp = cache[key]
                if now - timestamp < ttl_seconds:
                    return result
                else:
                    del cache[key]

            # Calculate and cache result
            result = func(*args, **kwargs)
            cache[key] = (result, now)
            return result

        wrapper.cache_clear = cache.clear
        wrapper.cache_info = lambda: f"Cache size: {len(cache)}"
        return wrapper

    return decorator

# Usage examples
@RateLimiter(max_calls=5, time_window=60)
def api_call(data):
    return f"Processing {data}"

@CallCounter
def greet(name):
    return f"Hello, {name}!"

@memoize_with_ttl(ttl_seconds=10)
def expensive_calculation(n):
    time.sleep(1)  # Simulate expensive operation
    return n ** 2

Context Managers

import contextlib
import tempfile
import os
from typing import Generator

# Class-based context manager
class DatabaseTransaction:
    def __init__(self, connection):
        self.connection = connection
        self.transaction = None

    def __enter__(self):
        self.transaction = self.connection.begin()
        return self.transaction

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self.transaction.commit()
        else:
            self.transaction.rollback()
        return False  # Don't suppress exceptions

# Function-based context manager
@contextlib.contextmanager
def temporary_directory() -> Generator[str, None, None]:
    """Create and cleanup temporary directory."""
    temp_dir = tempfile.mkdtemp()
    try:
        yield temp_dir
    finally:
        import shutil
        shutil.rmtree(temp_dir)

@contextlib.contextmanager
def timer(description: str = "Operation") -> Generator[None, None, None]:
    """Time the execution of a code block."""
    start_time = time.time()
    try:
        yield
    finally:
        elapsed = time.time() - start_time
        print(f"{description} took {elapsed:.4f} seconds")

@contextlib.contextmanager
def suppress_stdout():
    """Suppress stdout temporarily."""
    import sys
    original_stdout = sys.stdout
    try:
        sys.stdout = open(os.devnull, 'w')
        yield
    finally:
        sys.stdout.close()
        sys.stdout = original_stdout

# Multiple context managers
@contextlib.contextmanager
def file_manager(filename: str, backup: bool = True):
    """Manage file with optional backup."""
    backup_name = f"{filename}.backup" if backup else None

    # Create backup if requested
    if backup and os.path.exists(filename):
        import shutil
        shutil.copy2(filename, backup_name)

    try:
        with open(filename, 'w') as f:
            yield f
    except Exception:
        # Restore backup on error
        if backup_name and os.path.exists(backup_name):
            import shutil
            shutil.move(backup_name, filename)
        raise
    else:
        # Remove backup on success
        if backup_name and os.path.exists(backup_name):
            os.remove(backup_name)

# Usage examples
with temporary_directory() as temp_dir:
    print(f"Working in {temp_dir}")
    # Do work in temporary directory

with timer("Database query"):
    time.sleep(0.1)  # Simulate database operation

with file_manager("config.txt", backup=True) as f:
    f.write("new configuration")

Concurrency and Parallelism {#concurrency}

Threading

import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, RLock, Semaphore, Event, Condition

# Thread-safe counter
class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = Lock()

    def increment(self):
        with self._lock:
            self._value += 1

    def decrement(self):
        with self._lock:
            self._value -= 1

    @property
    def value(self):
        with self._lock:
            return self._value

# Producer-Consumer pattern
class ProducerConsumer:
    def __init__(self, max_size: int = 10):
        self.queue = queue.Queue(maxsize=max_size)
        self.shutdown = threading.Event()

    def producer(self, items: list):
        """Produce items and add to queue."""
        for item in items:
            if self.shutdown.is_set():
                break
            self.queue.put(item)
            print(f"Produced: {item}")
            time.sleep(0.1)

        # Signal end of production
        self.queue.put(None)

    def consumer(self, worker_id: int):
        """Consume items from queue."""
        while not self.shutdown.is_set():
            try:
                item = self.queue.get(timeout=1)
                if item is None:  # End signal
                    break

                print(f"Consumer {worker_id} processing: {item}")
                time.sleep(0.2)  # Simulate processing
                self.queue.task_done()

            except queue.Empty:
                continue

    def run(self, items: list, num_consumers: int = 3):
        """Run producer-consumer system."""
        threads = []

        # Start producer
        producer_thread = threading.Thread(
            target=self.producer, 
            args=(items,)
        )
        threads.append(producer_thread)
        producer_thread.start()

        # Start consumers
        for i in range(num_consumers):
            consumer_thread = threading.Thread(
                target=self.consumer,
                args=(i,)
            )
            threads.append(consumer_thread)
            consumer_thread.start()

        # Wait for completion
        for thread in threads:
            thread.join()

# Thread pool example
def process_data(data):
    """Simulate data processing."""
    result = sum(x**2 for x in data)
    time.sleep(0.1)
    return result

def parallel_processing(datasets: list):
    """Process multiple datasets in parallel."""
    with ThreadPoolExecutor(max_workers=4) as executor:
        # Submit all tasks
        future_to_data = {
            executor.submit(process_data, data): data 
            for data in datasets
        }

        # Collect results as they complete
        results = []
        for future in as_completed(future_to_data):
            data = future_to_data[future]
            try:
                result = future.result()
                results.append((data, result))
            except Exception as exc:
                print(f"Data {data} generated exception: {exc}")

        return results

Asyncio

import asyncio
import aiohttp
import aiofiles
from typing import List, Dict, Any

# Basic async functions
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
    """Fetch URL asynchronously."""
    try:
        async with session.get(url) as response:
            return {
                'url': url,
                'status': response.status,
                'content': await response.text()
            }
    except Exception as e:
        return {
            'url': url,
            'error': str(e)
        }

async def fetch_multiple_urls(urls: List[str]) -> List[Dict[str, Any]]:
    """Fetch multiple URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Async file operations
async def process_file_async(file_path: str) -> int:
    """Process file asynchronously."""
    async with aiofiles.open(file_path, 'r') as file:
        content = await file.read()
        # Simulate processing
        await asyncio.sleep(0.1)
        return len(content.split())

async def process_multiple_files(file_paths: List[str]) -> Dict[str, int]:
    """Process multiple files concurrently."""
    tasks = [process_file_async(path) for path in file_paths]
    results = await asyncio.gather(*tasks)
    return dict(zip(file_paths, results))

# Async context manager
class AsyncDatabaseConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None

    async def __aenter__(self):
        # Simulate async connection
        await asyncio.sleep(0.1)
        self.connection = f"Connected to {self.connection_string}"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Simulate async cleanup
        await asyncio.sleep(0.1)
        self.connection = None

    async def execute(self, query: str):
        await asyncio.sleep(0.05)  # Simulate query execution
        return f"Executed: {query}"

# Producer-Consumer with asyncio
class AsyncQueue:
    def __init__(self, maxsize: int = 0):
        self.queue = asyncio.Queue(maxsize=maxsize)
        self.active_producers = 0
        self.lock = asyncio.Lock()

    async def producer(self, items: List[Any], producer_id: int):
        """Produce items asynchronously."""
        async with self.lock:
            self.active_producers += 1

        try:
            for item in items:
                await self.queue.put((producer_id, item))
                print(f"Producer {producer_id} produced: {item}")
                await asyncio.sleep(0.1)
        finally:
            async with self.lock:
                self.active_producers -= 1
                if self.active_producers == 0:
                    await self.queue.put(None)  # End signal

    async def consumer(self, consumer_id: int):
        """Consume items asynchronously."""
        while True:
            item = await self.queue.get()
            if item is None:
                # Re-queue end signal for other consumers
                await self.queue.put(None)
                break

            producer_id, data = item
            print(f"Consumer {consumer_id} processing {data} from producer {producer_id}")
            await asyncio.sleep(0.2)  # Simulate processing

    async def run(self, producer_data: List[List[Any]], num_consumers: int = 2):
        """Run async producer-consumer system."""
        # Start producers
        producer_tasks = [
            asyncio.create_task(self.producer(data, i))
            for i, data in enumerate(producer_data)
        ]

        # Start consumers
        consumer_tasks = [
            asyncio.create_task(self.consumer(i))
            for i in range(num_consumers)
        ]

        # Wait for all tasks
        await asyncio.gather(*producer_tasks, *consumer_tasks)

# Usage example
async def main():
    # Fetch URLs
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/headers",
        "https://httpbin.org/ip"
    ]

    results = await fetch_multiple_urls(urls)
    print(f"Fetched {len(results)} URLs")

    # Database operations
    async with AsyncDatabaseConnection("postgresql://localhost") as db:
        result = await db.execute("SELECT * FROM users")
        print(result)

    # Producer-Consumer
    queue_system = AsyncQueue()
    producer_data = [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
    ]
    await queue_system.run(producer_data, num_consumers=2)

# Run the async code
if __name__ == "__main__":
    asyncio.run(main())

This deep dive into Python covers advanced concepts that separate intermediate programmers from experts. Mastering these concepts will help you write more efficient, maintainable, and pythonic code.