Python Deep Dive: Advanced Programming Mastery
Table of Contents
- Advanced Python Features
- Object-Oriented Programming
- Metaclasses and Descriptors
- Decorators and Context Managers
- Concurrency and Parallelism
- Memory Management
- Performance Optimization
- Design Patterns in Python
- Testing and Debugging
- Python Internals
Advanced Python Features {#advanced-features}
Generators and Iterators
# Custom Iterator
class NumberSquares:
def __init__(self, max_num):
self.max_num = max_num
self.current = 0
def __iter__(self):
return self
def __next__(self):
if self.current >= self.max_num:
raise StopIteration
else:
result = self.current ** 2
self.current += 1
return result
# Generator Function
def fibonacci_generator(n):
"""Generate Fibonacci sequence up to n terms."""
a, b = 0, 1
count = 0
while count < n:
yield a
a, b = b, a + b
count += 1
# Generator Expression
squares = (x**2 for x in range(10))
# Advanced Generator with send()
def accumulator():
"""Generator that accumulates sent values."""
total = 0
while True:
value = yield total
if value is not None:
total += value
# Usage
acc = accumulator()
next(acc) # Initialize
print(acc.send(10)) # 10
print(acc.send(5)) # 15
print(acc.send(3)) # 18
# Generator for large datasets
def read_large_file(file_path):
"""Memory-efficient file reader."""
with open(file_path, 'r') as file:
for line in file:
yield line.strip()
# Coroutine example
def grep_coroutine(pattern):
"""Coroutine that searches for pattern in sent lines."""
print(f"Searching for '{pattern}'")
try:
while True:
line = yield
if pattern in line:
print(f"Found: {line}")
except GeneratorExit:
print("Coroutine closing")
# Usage
searcher = grep_coroutine("error")
next(searcher) # Prime the coroutine
searcher.send("This is an error message")
searcher.send("Normal log entry")
searcher.close()
Advanced Function Features
from functools import wraps, lru_cache, singledispatch, partial
from typing import Callable, Any, TypeVar, Generic
# Function annotations and type hints
def process_data(data: list[dict],
transform: Callable[[dict], dict],
filter_func: Callable[[dict], bool] = None) -> list[dict]:
"""Process data with transformation and optional filtering."""
result = [transform(item) for item in data]
if filter_func:
result = [item for item in result if filter_func(item)]
return result
# Closures and nonlocal
def create_counter(start: int = 0):
"""Create a counter function with closure."""
count = start
def counter(increment: int = 1):
nonlocal count
count += increment
return count
def reset():
nonlocal count
count = start
counter.reset = reset
return counter
# Function factory
def create_validator(min_val: float = None, max_val: float = None):
"""Create a validation function."""
def validator(value: float) -> bool:
if min_val is not None and value < min_val:
return False
if max_val is not None and value > max_val:
return False
return True
return validator
# Advanced decorator with parameters
def retry(max_attempts: int = 3, delay: float = 1.0,
exceptions: tuple = (Exception,)):
"""Retry decorator with configurable parameters."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(delay)
continue
raise last_exception
return wrapper
return decorator
# Single dispatch (method overloading)
@singledispatch
def process_item(item):
"""Generic item processor."""
raise NotImplementedError(f"Cannot process {type(item)}")
@process_item.register
def _(item: str):
return item.upper()
@process_item.register
def _(item: int):
return item * 2
@process_item.register
def _(item: list):
return [process_item(x) for x in item]
# Partial application
def power(base: float, exponent: float) -> float:
return base ** exponent
square = partial(power, exponent=2)
cube = partial(power, exponent=3)
print(square(5)) # 25
print(cube(3)) # 27
Object-Oriented Programming {#oop}
Advanced Class Features
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import ClassVar, Protocol
# Abstract base classes
class Shape(ABC):
"""Abstract shape class."""
@abstractmethod
def area(self) -> float:
pass
@abstractmethod
def perimeter(self) -> float:
pass
def describe(self) -> str:
return f"{self.__class__.__name__}: Area={self.area():.2f}, Perimeter={self.perimeter():.2f}"
class Rectangle(Shape):
def __init__(self, width: float, height: float):
self.width = width
self.height = height
def area(self) -> float:
return self.width * self.height
def perimeter(self) -> float:
return 2 * (self.width + self.height)
# Dataclasses
@dataclass
class Point:
x: float
y: float
z: float = 0.0
def distance_from_origin(self) -> float:
return (self.x**2 + self.y**2 + self.z**2)**0.5
@dataclass
class Person:
name: str
age: int
email: str = ""
skills: list[str] = field(default_factory=list)
_id: int = field(default_factory=lambda: id(object()))
def __post_init__(self):
if not self.email:
self.email = f"{self.name.lower().replace(' ', '.')}@example.com"
# Property decorators and descriptors
class Temperature:
def __init__(self, celsius: float = 0):
self._celsius = celsius
@property
def celsius(self) -> float:
return self._celsius
@celsius.setter
def celsius(self, value: float):
if value < -273.15:
raise ValueError("Temperature cannot be below absolute zero")
self._celsius = value
@property
def fahrenheit(self) -> float:
return (self._celsius * 9/5) + 32
@fahrenheit.setter
def fahrenheit(self, value: float):
self.celsius = (value - 32) * 5/9
@property
def kelvin(self) -> float:
return self._celsius + 273.15
# Class and static methods
class MathUtils:
PI: ClassVar[float] = 3.14159
def __init__(self, precision: int = 2):
self.precision = precision
@classmethod
def create_high_precision(cls):
"""Factory method for high precision instance."""
return cls(precision=10)
@staticmethod
def is_prime(n: int) -> bool:
"""Check if number is prime."""
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
def circle_area(self, radius: float) -> float:
area = self.PI * radius**2
return round(area, self.precision)
# Multiple inheritance and MRO
class Flyable:
def fly(self):
return "Flying"
class Swimmable:
def swim(self):
return "Swimming"
class Duck(Flyable, Swimmable):
def __init__(self, name: str):
self.name = name
def speak(self):
return "Quack"
# Method Resolution Order
print(Duck.__mro__)
duck = Duck("Donald")
print(duck.fly(), duck.swim(), duck.speak())
Metaclasses and Advanced Features
# Basic metaclass
class SingletonMeta(type):
"""Metaclass that creates singleton instances."""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Database(metaclass=SingletonMeta):
def __init__(self, connection_string: str = "default"):
self.connection_string = connection_string
self.connected = False
def connect(self):
self.connected = True
return f"Connected to {self.connection_string}"
# Attribute validation metaclass
class ValidatedMeta(type):
def __new__(cls, name, bases, attrs):
# Add validation to all methods
for key, value in attrs.items():
if callable(value) and not key.startswith('_'):
attrs[key] = cls._add_validation(value)
return super().__new__(cls, name, bases, attrs)
@staticmethod
def _add_validation(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Pre-execution validation
if hasattr(self, '_validate'):
self._validate()
return func(self, *args, **kwargs)
return wrapper
# Descriptor for attribute validation
class ValidatedAttribute:
def __init__(self, validator_func, error_message="Invalid value"):
self.validator = validator_func
self.error_message = error_message
self.private_name = None
def __set_name__(self, owner, name):
self.private_name = f'_{name}'
def __get__(self, obj, objtype=None):
if obj is None:
return self
return getattr(obj, self.private_name, None)
def __set__(self, obj, value):
if not self.validator(value):
raise ValueError(self.error_message)
setattr(obj, self.private_name, value)
class Person:
name = ValidatedAttribute(
lambda x: isinstance(x, str) and len(x) > 0,
"Name must be non-empty string"
)
age = ValidatedAttribute(
lambda x: isinstance(x, int) and 0 <= x <= 150,
"Age must be integer between 0 and 150"
)
def __init__(self, name: str, age: int):
self.name = name
self.age = age
# Protocol (structural typing)
class Drawable(Protocol):
def draw(self) -> str: ...
class Circle:
def __init__(self, radius: float):
self.radius = radius
def draw(self) -> str:
return f"Drawing circle with radius {self.radius}"
def render_shape(shape: Drawable) -> str:
return shape.draw()
circle = Circle(5)
print(render_shape(circle)) # Works due to structural typing
Decorators and Context Managers {#decorators}
Advanced Decorators
import functools
import time
from typing import Callable, Any
# Class-based decorator
class RateLimiter:
def __init__(self, max_calls: int, time_window: int):
self.max_calls = max_calls
self.time_window = time_window
self.calls = []
def __call__(self, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
# Remove old calls outside time window
self.calls = [call_time for call_time in self.calls
if now - call_time < self.time_window]
if len(self.calls) >= self.max_calls:
raise Exception(f"Rate limit exceeded: {self.max_calls} calls per {self.time_window}s")
self.calls.append(now)
return func(*args, **kwargs)
return wrapper
# Decorator with state
class CallCounter:
def __init__(self, func: Callable):
self.func = func
self.count = 0
functools.update_wrapper(self, func)
def __call__(self, *args, **kwargs):
self.count += 1
print(f"Call #{self.count} to {self.func.__name__}")
return self.func(*args, **kwargs)
def reset(self):
self.count = 0
# Decorator factory with complex logic
def memoize_with_ttl(ttl_seconds: int = 300):
"""Memoization with time-to-live."""
def decorator(func: Callable) -> Callable:
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
key = str(args) + str(sorted(kwargs.items()))
now = time.time()
# Check if cached result exists and is still valid
if key in cache:
result, timestamp = cache[key]
if now - timestamp < ttl_seconds:
return result
else:
del cache[key]
# Calculate and cache result
result = func(*args, **kwargs)
cache[key] = (result, now)
return result
wrapper.cache_clear = cache.clear
wrapper.cache_info = lambda: f"Cache size: {len(cache)}"
return wrapper
return decorator
# Usage examples
@RateLimiter(max_calls=5, time_window=60)
def api_call(data):
return f"Processing {data}"
@CallCounter
def greet(name):
return f"Hello, {name}!"
@memoize_with_ttl(ttl_seconds=10)
def expensive_calculation(n):
time.sleep(1) # Simulate expensive operation
return n ** 2
Context Managers
import contextlib
import tempfile
import os
from typing import Generator
# Class-based context manager
class DatabaseTransaction:
def __init__(self, connection):
self.connection = connection
self.transaction = None
def __enter__(self):
self.transaction = self.connection.begin()
return self.transaction
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.transaction.commit()
else:
self.transaction.rollback()
return False # Don't suppress exceptions
# Function-based context manager
@contextlib.contextmanager
def temporary_directory() -> Generator[str, None, None]:
"""Create and cleanup temporary directory."""
temp_dir = tempfile.mkdtemp()
try:
yield temp_dir
finally:
import shutil
shutil.rmtree(temp_dir)
@contextlib.contextmanager
def timer(description: str = "Operation") -> Generator[None, None, None]:
"""Time the execution of a code block."""
start_time = time.time()
try:
yield
finally:
elapsed = time.time() - start_time
print(f"{description} took {elapsed:.4f} seconds")
@contextlib.contextmanager
def suppress_stdout():
"""Suppress stdout temporarily."""
import sys
original_stdout = sys.stdout
try:
sys.stdout = open(os.devnull, 'w')
yield
finally:
sys.stdout.close()
sys.stdout = original_stdout
# Multiple context managers
@contextlib.contextmanager
def file_manager(filename: str, backup: bool = True):
"""Manage file with optional backup."""
backup_name = f"{filename}.backup" if backup else None
# Create backup if requested
if backup and os.path.exists(filename):
import shutil
shutil.copy2(filename, backup_name)
try:
with open(filename, 'w') as f:
yield f
except Exception:
# Restore backup on error
if backup_name and os.path.exists(backup_name):
import shutil
shutil.move(backup_name, filename)
raise
else:
# Remove backup on success
if backup_name and os.path.exists(backup_name):
os.remove(backup_name)
# Usage examples
with temporary_directory() as temp_dir:
print(f"Working in {temp_dir}")
# Do work in temporary directory
with timer("Database query"):
time.sleep(0.1) # Simulate database operation
with file_manager("config.txt", backup=True) as f:
f.write("new configuration")
Concurrency and Parallelism {#concurrency}
Threading
import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock, RLock, Semaphore, Event, Condition
# Thread-safe counter
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
@property
def value(self):
with self._lock:
return self._value
# Producer-Consumer pattern
class ProducerConsumer:
def __init__(self, max_size: int = 10):
self.queue = queue.Queue(maxsize=max_size)
self.shutdown = threading.Event()
def producer(self, items: list):
"""Produce items and add to queue."""
for item in items:
if self.shutdown.is_set():
break
self.queue.put(item)
print(f"Produced: {item}")
time.sleep(0.1)
# Signal end of production
self.queue.put(None)
def consumer(self, worker_id: int):
"""Consume items from queue."""
while not self.shutdown.is_set():
try:
item = self.queue.get(timeout=1)
if item is None: # End signal
break
print(f"Consumer {worker_id} processing: {item}")
time.sleep(0.2) # Simulate processing
self.queue.task_done()
except queue.Empty:
continue
def run(self, items: list, num_consumers: int = 3):
"""Run producer-consumer system."""
threads = []
# Start producer
producer_thread = threading.Thread(
target=self.producer,
args=(items,)
)
threads.append(producer_thread)
producer_thread.start()
# Start consumers
for i in range(num_consumers):
consumer_thread = threading.Thread(
target=self.consumer,
args=(i,)
)
threads.append(consumer_thread)
consumer_thread.start()
# Wait for completion
for thread in threads:
thread.join()
# Thread pool example
def process_data(data):
"""Simulate data processing."""
result = sum(x**2 for x in data)
time.sleep(0.1)
return result
def parallel_processing(datasets: list):
"""Process multiple datasets in parallel."""
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit all tasks
future_to_data = {
executor.submit(process_data, data): data
for data in datasets
}
# Collect results as they complete
results = []
for future in as_completed(future_to_data):
data = future_to_data[future]
try:
result = future.result()
results.append((data, result))
except Exception as exc:
print(f"Data {data} generated exception: {exc}")
return results
Asyncio
import asyncio
import aiohttp
import aiofiles
from typing import List, Dict, Any
# Basic async functions
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
"""Fetch URL asynchronously."""
try:
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'content': await response.text()
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_multiple_urls(urls: List[str]) -> List[Dict[str, Any]]:
"""Fetch multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Async file operations
async def process_file_async(file_path: str) -> int:
"""Process file asynchronously."""
async with aiofiles.open(file_path, 'r') as file:
content = await file.read()
# Simulate processing
await asyncio.sleep(0.1)
return len(content.split())
async def process_multiple_files(file_paths: List[str]) -> Dict[str, int]:
"""Process multiple files concurrently."""
tasks = [process_file_async(path) for path in file_paths]
results = await asyncio.gather(*tasks)
return dict(zip(file_paths, results))
# Async context manager
class AsyncDatabaseConnection:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.connection = None
async def __aenter__(self):
# Simulate async connection
await asyncio.sleep(0.1)
self.connection = f"Connected to {self.connection_string}"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Simulate async cleanup
await asyncio.sleep(0.1)
self.connection = None
async def execute(self, query: str):
await asyncio.sleep(0.05) # Simulate query execution
return f"Executed: {query}"
# Producer-Consumer with asyncio
class AsyncQueue:
def __init__(self, maxsize: int = 0):
self.queue = asyncio.Queue(maxsize=maxsize)
self.active_producers = 0
self.lock = asyncio.Lock()
async def producer(self, items: List[Any], producer_id: int):
"""Produce items asynchronously."""
async with self.lock:
self.active_producers += 1
try:
for item in items:
await self.queue.put((producer_id, item))
print(f"Producer {producer_id} produced: {item}")
await asyncio.sleep(0.1)
finally:
async with self.lock:
self.active_producers -= 1
if self.active_producers == 0:
await self.queue.put(None) # End signal
async def consumer(self, consumer_id: int):
"""Consume items asynchronously."""
while True:
item = await self.queue.get()
if item is None:
# Re-queue end signal for other consumers
await self.queue.put(None)
break
producer_id, data = item
print(f"Consumer {consumer_id} processing {data} from producer {producer_id}")
await asyncio.sleep(0.2) # Simulate processing
async def run(self, producer_data: List[List[Any]], num_consumers: int = 2):
"""Run async producer-consumer system."""
# Start producers
producer_tasks = [
asyncio.create_task(self.producer(data, i))
for i, data in enumerate(producer_data)
]
# Start consumers
consumer_tasks = [
asyncio.create_task(self.consumer(i))
for i in range(num_consumers)
]
# Wait for all tasks
await asyncio.gather(*producer_tasks, *consumer_tasks)
# Usage example
async def main():
# Fetch URLs
urls = [
"https://httpbin.org/get",
"https://httpbin.org/headers",
"https://httpbin.org/ip"
]
results = await fetch_multiple_urls(urls)
print(f"Fetched {len(results)} URLs")
# Database operations
async with AsyncDatabaseConnection("postgresql://localhost") as db:
result = await db.execute("SELECT * FROM users")
print(result)
# Producer-Consumer
queue_system = AsyncQueue()
producer_data = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
await queue_system.run(producer_data, num_consumers=2)
# Run the async code
if __name__ == "__main__":
asyncio.run(main())
This deep dive into Python covers advanced concepts that separate intermediate programmers from experts. Mastering these concepts will help you write more efficient, maintainable, and pythonic code.