Programming 8 min read

7 advanced python techniques you should know

Adrian Kuczyński
Senior Security Developer
7 advanced python techniques you should know

Decorators with Arguments and functools.wraps

Standard decorators are fine for basic logging, but production-grade systems demand runtime configuration. You don't want a hardcoded retry limit or a static timeout; you need to tune those parameters per call site.

To pass arguments to a decorator, you have to introduce an extra layer of closure—essentially creating a factory that returns a decorator. If you’re coming from the .NET world, think of this as passing constructor arguments to a custom Attribute, but built entirely out of nested, executable scopes.

But here is the trap that catches even seasoned developers: when you wrap a function, you risk erasing its identity. Without intervention, your decorated function loses its original __name__, docstring, and parameter signature. This wrecks debugging, breaks reflection, and blinds your APM tracing tools. To stop this metadata erasure, you must always decorate your wrapper with functools.wraps.

Let’s look at how this nested closure architecture behaves in a production-ready retry decorator:

import functools
import time

def retry(max_attempts: int, delay: float = 1.0):
    """Decorator that retries a function on exception."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts:
                        raise
                    print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry(max_attempts=3, delay=0.5)
def unstable_network_call(url: str) -> str:
    import random
    if random.random() < 0.7:
        raise ConnectionError("Transient failure")
    return f"Data from {url}"

# Usage
print(unstable_network_call("https://api.example.com"))

Context Managers with Classes and contextlib

A context manager ensures resources are cleaned up. You can implement one with a class (__enter__ / __exit__) or more concisely with contextlib.contextmanager.

from contextlib import contextmanager

# Class‑based approach
class ManagedResource:
    def __init__(self, name: str):
        self.name = name

    def __enter__(self):
        print(f"Acquiring {self.name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"Releasing {self.name}")
        # Handle exceptions if needed
        return False  # Propagate any exception

with ManagedResource("database connection") as res:
    print(f"Working with {res.name}")

# Generator‑based approach using contextmanager
@contextmanager
def temporary_setting(key: str, value: str):
    """Temporarily set an environment‑like variable."""
    import os
    old = os.environ.get(key)
    os.environ[key] = value
    try:
        yield
    finally:
        if old is None:
            del os.environ[key]
        else:
            os.environ[key] = old

with temporary_setting("DEBUG", "true"):
    print(f"DEBUG is {os.environ.get('DEBUG')}")

Generators and Coroutines with send and yield from

Generators can be more than iterators. By using send(), you can push data back into a running generator, turning it into a coroutine. yield from delegates to another generator, making complex pipelines clean.

def running_average():
    """Coroutine that maintains a running average of values sent to it."""
    total = 0.0
    count = 0
    average = None
    while True:
        x = yield average
        total += x
        count += 1
        average = total / count

avg = running_average()
next(avg)  # Prime the coroutine
print(avg.send(10))  # 10.0
print(avg.send(20))  # 15.0
print(avg.send(15))  # 15.0

# yield from for flattening nested iterables
def flatten(nested):
    """Recursively flatten any nested iterable except strings."""
    for item in nested:
        if isinstance(item, (list, tuple)):
            yield from flatten(item)
        else:
            yield item

print(list(flatten([1, [2, [3, 4], 5], 6])))  # [1, 2, 3, 4, 5, 6]

Async/Await with asyncio

async/await enables cooperative concurrency. Combine it with asyncio.gather for concurrent I/O operations.

import asyncio
import aiohttp  # Requires: pip install aiohttp

async def fetch_url(session, url: str) -> str:
    async with session.get(url) as response:
        return await response.text()

async def fetch_all(urls: list[str]) -> list[str]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
    ]
    results = await fetch_all(urls)
    for url, content in zip(urls, results):
        print(f"{url} returned {len(content)} bytes")

# asyncio.run(main())  # Uncomment to run in an actual script

Metaclasses in Practice

Metaclasses allow you to control class creation. A common use case is automatic registration of subclasses, useful for plugin systems.

class PluginRegistry(type):
    """Metaclass that registers all non‑abstract subclasses."""
    plugins = {}

    def __new__(mcs, name, bases, namespace):
        cls = super().__new__(mcs, name, bases, namespace)
        if not namespace.get("abstract", False):
            mcs.plugins[name] = cls
        return cls

class PluginBase(metaclass=PluginRegistry):
    abstract = True  # Base class is not registered
    def run(self):
        raise NotImplementedError

class ImageResizer(PluginBase):
    def run(self):
        print("Resizing image")

class TextFormatter(PluginBase):
    def run(self):
        print("Formatting text")

# Subclasses are automatically registered
print(PluginRegistry.plugins)
# {'ImageResizer': <class '__main__.ImageResizer'>,
#  'TextFormatter': <class '__main__.TextFormatter'>}

Concurrency with Thread/Process Pools and Futures

The concurrent.futures module provides a high‑level interface for thread and process pools. Use it to easily parallelize CPU‑bound (ProcessPoolExecutor) or I/O‑bound (ThreadPoolExecutor) work.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import math
import time

def is_prime(n: int) -> bool:
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def check_primes_in_range(start: int, end: int):
    """Simulate a heavy CPU task."""
    time.sleep(0.1)  # Fake workload to demonstrate parallelism
    return [n for n in range(start, end) if is_prime(n)]

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [
        executor.submit(check_primes_in_range, i * 100, (i + 1) * 100)
        for i in range(4)
    ]
    for future in as_completed(futures):
        primes = future.result()
        print(f"Found {len(primes)} primes: {primes[:3]}...")

Advanced Type Hints: Protocols and Generics

Python’s typing system supports structural subtyping via Protocol, and generic types let you write reusable, type‑safe containers.

from typing import Protocol, TypeVar, Generic, Iterable

# Structural subtyping: any object with a 'close' method is a Closable
class Closable(Protocol):
    def close(self) -> None: ...

def safe_close(resource: Closable) -> None:
    print("Closing resource")
    resource.close()

class Database:
    def close(self) -> None:
        print("Database connection closed")

safe_close(Database())  # OK, because Database has close()

# Generic stack with a bound type variable
T = TypeVar('T')

class Stack(Generic[T]):
    def __init__(self) -> None:
        self._items: list[T] = []

    def push(self, item: T) -> None:
        self._items.append(item)

    def pop(self) -> T:
        return self._items.pop()

int_stack = Stack[int]()
int_stack.push(10)
num: int = int_stack.pop()  # Type safe

functools Power Tools: lru_cache and singledispatch

functools.lru_cache memoizes function results, while singledispatch allows you to define generic functions that behave differently based on the type of the first argument.

import functools

@functools.lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

print(fibonacci(100))  # Virtually instant

# singledispatch for type‑based function overloading
@functools.singledispatch
def format_value(arg) -> str:
    return f"Unsupported type: {type(arg).__name__}"

@format_value.register
def _(arg: int) -> str:
    return f"Integer: {arg}"

@format_value.register
def _(arg: list) -> str:
    items = ", ".join(str(i) for i in arg)
    return f"List: [{items}]"

@format_value.register(str)
def _(arg: str) -> str:
    return f"String: '{arg}'"

print(format_value(42))        # Integer: 42
print(format_value([1,2,3]))   # List: [1, 2, 3]
print(format_value("hello"))   # String: 'hello'
print(format_value(3.14))      # Unsupported type: float

Discussion

Read Next