7 advanced python techniques you should know

Decorators with Arguments and functools.wraps
Standard decorators are fine for basic logging, but production-grade systems demand runtime configuration. You don't want a hardcoded retry limit or a static timeout; you need to tune those parameters per call site.
To pass arguments to a decorator, you have to introduce an extra layer of closure—essentially creating a factory that returns a decorator. If you’re coming from the .NET world, think of this as passing constructor arguments to a custom Attribute, but built entirely out of nested, executable scopes.
But here is the trap that catches even seasoned developers: when you wrap a function, you risk erasing its identity. Without intervention, your decorated function loses its original __name__, docstring, and parameter signature. This wrecks debugging, breaks reflection, and blinds your APM tracing tools. To stop this metadata erasure, you must always decorate your wrapper with functools.wraps.
Let’s look at how this nested closure architecture behaves in a production-ready retry decorator:
import functools
import time
def retry(max_attempts: int, delay: float = 1.0):
"""Decorator that retries a function on exception."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts:
raise
print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=3, delay=0.5)
def unstable_network_call(url: str) -> str:
import random
if random.random() < 0.7:
raise ConnectionError("Transient failure")
return f"Data from {url}"
# Usage
print(unstable_network_call("https://api.example.com"))Context Managers with Classes and contextlib
A context manager ensures resources are cleaned up. You can implement one with a class (__enter__ / __exit__) or more concisely with contextlib.contextmanager.
from contextlib import contextmanager
# Class‑based approach
class ManagedResource:
def __init__(self, name: str):
self.name = name
def __enter__(self):
print(f"Acquiring {self.name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"Releasing {self.name}")
# Handle exceptions if needed
return False # Propagate any exception
with ManagedResource("database connection") as res:
print(f"Working with {res.name}")
# Generator‑based approach using contextmanager
@contextmanager
def temporary_setting(key: str, value: str):
"""Temporarily set an environment‑like variable."""
import os
old = os.environ.get(key)
os.environ[key] = value
try:
yield
finally:
if old is None:
del os.environ[key]
else:
os.environ[key] = old
with temporary_setting("DEBUG", "true"):
print(f"DEBUG is {os.environ.get('DEBUG')}")Generators and Coroutines with send and yield from
Generators can be more than iterators. By using send(), you can push data back into a running generator, turning it into a coroutine. yield from delegates to another generator, making complex pipelines clean.
def running_average():
"""Coroutine that maintains a running average of values sent to it."""
total = 0.0
count = 0
average = None
while True:
x = yield average
total += x
count += 1
average = total / count
avg = running_average()
next(avg) # Prime the coroutine
print(avg.send(10)) # 10.0
print(avg.send(20)) # 15.0
print(avg.send(15)) # 15.0
# yield from for flattening nested iterables
def flatten(nested):
"""Recursively flatten any nested iterable except strings."""
for item in nested:
if isinstance(item, (list, tuple)):
yield from flatten(item)
else:
yield item
print(list(flatten([1, [2, [3, 4], 5], 6]))) # [1, 2, 3, 4, 5, 6]Async/Await with asyncio
async/await enables cooperative concurrency. Combine it with asyncio.gather for concurrent I/O operations.
import asyncio
import aiohttp # Requires: pip install aiohttp
async def fetch_url(session, url: str) -> str:
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls: list[str]) -> list[str]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
]
results = await fetch_all(urls)
for url, content in zip(urls, results):
print(f"{url} returned {len(content)} bytes")
# asyncio.run(main()) # Uncomment to run in an actual scriptMetaclasses in Practice
Metaclasses allow you to control class creation. A common use case is automatic registration of subclasses, useful for plugin systems.
class PluginRegistry(type):
"""Metaclass that registers all non‑abstract subclasses."""
plugins = {}
def __new__(mcs, name, bases, namespace):
cls = super().__new__(mcs, name, bases, namespace)
if not namespace.get("abstract", False):
mcs.plugins[name] = cls
return cls
class PluginBase(metaclass=PluginRegistry):
abstract = True # Base class is not registered
def run(self):
raise NotImplementedError
class ImageResizer(PluginBase):
def run(self):
print("Resizing image")
class TextFormatter(PluginBase):
def run(self):
print("Formatting text")
# Subclasses are automatically registered
print(PluginRegistry.plugins)
# {'ImageResizer': <class '__main__.ImageResizer'>,
# 'TextFormatter': <class '__main__.TextFormatter'>}Concurrency with Thread/Process Pools and Futures
The concurrent.futures module provides a high‑level interface for thread and process pools. Use it to easily parallelize CPU‑bound (ProcessPoolExecutor) or I/O‑bound (ThreadPoolExecutor) work.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import math
import time
def is_prime(n: int) -> bool:
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def check_primes_in_range(start: int, end: int):
"""Simulate a heavy CPU task."""
time.sleep(0.1) # Fake workload to demonstrate parallelism
return [n for n in range(start, end) if is_prime(n)]
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(check_primes_in_range, i * 100, (i + 1) * 100)
for i in range(4)
]
for future in as_completed(futures):
primes = future.result()
print(f"Found {len(primes)} primes: {primes[:3]}...")Advanced Type Hints: Protocols and Generics
Python’s typing system supports structural subtyping via Protocol, and generic types let you write reusable, type‑safe containers.
from typing import Protocol, TypeVar, Generic, Iterable
# Structural subtyping: any object with a 'close' method is a Closable
class Closable(Protocol):
def close(self) -> None: ...
def safe_close(resource: Closable) -> None:
print("Closing resource")
resource.close()
class Database:
def close(self) -> None:
print("Database connection closed")
safe_close(Database()) # OK, because Database has close()
# Generic stack with a bound type variable
T = TypeVar('T')
class Stack(Generic[T]):
def __init__(self) -> None:
self._items: list[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> T:
return self._items.pop()
int_stack = Stack[int]()
int_stack.push(10)
num: int = int_stack.pop() # Type safefunctools Power Tools: lru_cache and singledispatch
functools.lru_cache memoizes function results, while singledispatch allows you to define generic functions that behave differently based on the type of the first argument.
import functools
@functools.lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
print(fibonacci(100)) # Virtually instant
# singledispatch for type‑based function overloading
@functools.singledispatch
def format_value(arg) -> str:
return f"Unsupported type: {type(arg).__name__}"
@format_value.register
def _(arg: int) -> str:
return f"Integer: {arg}"
@format_value.register
def _(arg: list) -> str:
items = ", ".join(str(i) for i in arg)
return f"List: [{items}]"
@format_value.register(str)
def _(arg: str) -> str:
return f"String: '{arg}'"
print(format_value(42)) # Integer: 42
print(format_value([1,2,3])) # List: [1, 2, 3]
print(format_value("hello")) # String: 'hello'
print(format_value(3.14)) # Unsupported type: float