Context Managers Python para Automações: Como Criar Cleanup que Nunca Falha

Context Managers Python para Automações: Como Criar Cleanup que Nunca Falha

Você já terminou de rodar um script de automação e encontrou arquivos temporários espalhados pelo disco? Ou peor — travou no meio de um processo e ficou com lockfiles orphaned? Eu já. Duas vezes. Em produção. Na segunda vez, decidi parar de aceitar isso como “parte do trabalho” e criar context managers que fizessem o cleanup por mim.

Context managers são mais que o with open(). São contratos de entrada e saída para qualquer recurso que sua automação tocar — conexões de banco, arquivos, sessões de API, threads. Se você não está usando eles, está confiando na sorte de que seu código vai terminar limpo. Ele não vai.

Scripts Python sem context managers deixando recursos abertos
Sem context managers, cada recurso aberto é uma potential leak waiting to happen

Por que Seus Scripts Deixam Bagunça

O problema não é falta de cuidado. É falta de estrutura. Vejas as formas mais comuns de falha:

# Errado: se algo lançar exceção, conn fica aberta
def processar_ordens(ids):
    conn = abrir_conexao()
    for id in ids:
        conn.enviar(id)
    conn.fechar()

# Errado: retorno temprano = sem cleanup
def processar_ordens(ids):
    conn = abrir_conexao()
    for id in ids:
        if id in cache:
            return "cache hit"  # leak!
    conn.fechar()
    return "ok"

Cada função queabre um recurso e não garante seu fechamento é uma bomba-relógio. context managers transformam isso em uma equação resolvível.

__enter__ e __exit__: O Contrato Mínimo

A forma mais direta de criar um context manager é uma classe com esses dois métodos:

class ConexaoDB:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.conn = None

    def __enter__(self):
        self.conn = conectar(self.host, self.port)
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.conn:
            self.conn.close()
        return False  # não suprime exceções

# Uso
with ConexaoDB("db.internal", 5432) as conn:
    conn.query("SELECT * FROM ordens")

O __exit__ é chamado sempre — exceção ou não, return ou não, crash ou não. Esse é o ponto. Você não precisa lembrar de fechar nada; o Python faz isso por você.

Usando contextlib: O Atalho Que Não é Gambiarra

Para casos simples, criar uma classe inteira é overkill. O módulo contextlib oferece formas mais limpas.

from contextlib import contextmanager

@contextmanager
def conexao_db(host, port):
    conn = None
    try:
        conn = conectar(host, port)
        yield conn
    finally:
        if conn:
            conn.close()

# Uso
with conexao_db("db.internal", 5432) as conn:
    conn.query("SELECT * FROM ordens")

O yield divide o context manager em duas partes: antes do yield é o setup, depois é o teardown. O finally garante que o cleanup rode independentemente do que acontecer entre o with.

Criando um Lockfile Context Manager Para Automações

Uma necessidade comum em automações é garantir que só uma instância do script rode por vez. Lockfiles são a solução, mas precisam de cleanup garantido:

import os
import time
from contextlib import contextmanager

@contextmanager
def lockfile(caminho, max_tentativas=3, intervalo=2):
    lock = open(caminho, "w")
    pid = os.getpid()
    lock.write(str(pid))
    lock.flush()
    
    try:
        yield pid
    finally:
        lock.close()
        if os.path.exists(caminho):
            os.remove(caminho)

# Uso
with lockfile("/tmp/automacao_ordens.lock") as pid:
    print(f"Processo {pid} ativo")
    processar_ordens()

Esse é simples. Mas e se o processo morrer? O lock fica. Você precisa de um context manager que detecte locks órfãos e reconquiste o arquivo.

Lockfile Resiliente: Detectando e Recuperando de Crashes

import os
import signal
from contextlib import contextmanager

@contextmanager
def lockfile_resiliente(caminho, ttl=300):
    """
    Lock com TTL em segundos. Se o processo morrer,
    outro processo pode reclamar o lock após o TTL expirar.
    """
    pid = os.getpid()
    
    # Tenta adquirir lock
    for tentativa in range(3):
        if os.path.exists(caminho):
            # Verifica se o processo ainda está vivo
            with open(caminho) as f:
                old_pid = int(f.read().strip())
            try:
                os.kill(old_pid, 0)  # sinal 0 = checa se existe
                # Lock válido e processo vivo — aguarda
                time.sleep(2)
            except OSError:
                # Processo morto, lock órfão — reclamar
                os.remove(caminho)
        
        # Tenta criar lock
        try:
            with open(caminho, "x") as f:
                f.write(str(pid))
            break
        except FileExistsError:
            continue
    
    yield pid
    
    # Cleanup
    if os.path.exists(caminho):
        with open(caminho) as f:
            if f.read().strip() == str(pid):
                os.remove(caminho)

Esse context manager faz três coisas: verifica se o lock existe, checa se o processo dono ainda está vivo, e limpa o lock quando termina ou quando o processo morre. Isso é o mínimo para automações que rodam em background via cron.

Context Manager Para Retry com Backoff Exponencial

Automações que chamam APIs externas precisam de retry. Mas retry cego é perigoso — você pode derrubar um serviço que já está em difficulté. O context manager abaixo encapsula retry com backoff:

import time
import logging
from contextlib import contextmanager

logger = logging.getLogger(__name__)

@contextmanager
def retry_api(max_tentativas=5, base_delay=1, max_delay=60, fator=2):
    """
    Context manager que/envolve uma chamada de API e faz retry
    automático com backoff exponencial em caso de falha.
    """
    tentativas = 0
    
    while tentativas < max_tentativas:
        try:
            yield  # usuário faz a chamada aqui dentro
            break  # sucesso sai do loop
        
        except Exception as e:
            tentativas += 1
            if tentativas >= max_tentativas:
                logger.error(f"Todas as {max_tentativas} tentativas falharam: {e}")
                raise
            
            delay = min(base_delay * (fator ** (tentativas - 1)), max_delay)
            logger.warning(f"Tentativa {tentativas} falhou: {e}. Aguardando {delay}s...")
            time.sleep(delay)

# Uso
with retry_api(max_tentativas=3) as retry:
    response = fazer_requisicao_api()
    if response.status_code >= 500:
        raise APIError(response.status_code)
    retry()  # marcador para retry

Na prática, você vai querer integrar isso com seu cliente HTTP específico — mas a estrutura de retry como context manager é reutilizável para qualquer chamada que possa falhar.

⚠️ O Perrengue: Uma vez implementei um lockfile que parecia perfeito. Rodava no cron a cada minuto. Até que o servidor travou no meio da execução, o lock ficou orphaned, e o cron passou 40 minutos sem rodar enquanto eu tentava resolver manualmente. O TTL de 300 segundos teria ajudado — mas eu não tinha implementado. Aprendi a nunca confiar em lockfile sem TTL.

Session Manager Para Chamadas HTTP Stateless

Se sua automação faz várias chamadas HTTP para o mesmo servidor, criar uma session do requests mantém cookies, conexões TCP reutilizadas e retry integrado:

import requests
from contextlib import contextmanager

@contextmanager
def http_session(base_url=None, timeout=30, max_retries=3):
    """
    Context manager para sessões HTTP com configuração padrão.
    """
    session = requests.Session()
    adapter = requests.adapters.HTTPAdapter(
        max_retries=max_retries,
        pool_connections=10,
        pool_maxsize=20
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    if base_url:
        session.params["base_url"] = base_url
    
    try:
        yield session
    finally:
        session.close()

# Uso
with http_session(base_url="https://api.servico.com") as session:
    resp = session.get("/recurso/123")
    data = resp.json()

O cleanup automático da session garante que conexões TCP não fiquem hanging. Em automações de longa duração, isso faz diferença de segundos para minutos.

Gerenciando Múltiplos Context Managers com ExitStack

Quando você precisa de vários context managers whose lifetimes precisam ser sincronizados, ExitStack é a solução:

from contextlib import ExitStack

def processar_ordem_completa(ordem_id):
    with ExitStack() as stack:
        db = stack.enter_context(conexao_db("db.internal", 5432))
        session = stack.enter_context(http_session())
        lock = stack.enter_context(lockfile_resiliente(f"/tmp/ordem_{ordem_id}.lock"))
        
        # Todos os recursos são criados aqui
        # O cleanup é garantido no fim do bloco
        ordem = db.query(f"SELECT * FROM ordens WHERE id = {ordem_id}")
        resp = session.post(f"/api/ordens/{ordem_id}/processar", json=ordem)
        
    # Aqui todos os recursos já foram fechados
    return resp.json()

ExitStack gerencia a pilha de context managers e garante que cada __exit__ seja chamado na ordem inversa. Se qualquer um deles lançar exceção, os outros ainda são fechados.

Async Context Managers: Quando Automações São Concorretes

Se sua automação usa asyncio, context managers também funcionam — mas no estilo async:

import asyncio

class AsyncConexaoDB:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.conn = None

    async def __aenter__(self):
        self.conn = await async_conectar(self.host, self.port)
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.conn:
            await self.conn.close()
        return False

# Uso com async with
async def processar_tudo():
    async with AsyncConexaoDB("db.internal", 5432) as conn:
        await conn.query("SELECT * FROM ordens")

Para automações modernas que fazem chamadas HTTP assíncronas, ler de filas (Redis, SQS), ou conversar com WebSockets, async context managers são indispensáveis.

Quando Não Usar Context Managers

context managers não são universais. Evite quando:

  • O recurso precisa sobreviver entre funções diferentes (guarde em variável global ou passe como parâmetro)
  • O tempo de vida do recurso é controlado por outro framework (FastAPI, Django, etc.)
  • Você precisa de lazy initialization que context managers não conseguem sem complicação

Para esses casos, o padrão RAII de Dependency Injection resolve melhor. Mas para 90% das automações de/scripts de processing, context managers são exatamente o que você precisa.

Qual Cleanup Você Precisa Resolver Agora?

Sei que esse post cobriu bastante coisa — e provavelmente pelo menos uma das situações aqui te fez lembrar de um script problemático que você tem. Se lockfiles órfãos são seu problema, comece pelo lockfile_resiliente. Se suas APIs estão falhando silenciosamente, o retry_api é o caminho.

Me diz nos comentários: qual desses context managers resolve um problema real que você tá enfrentando agora? Se precisar de ajuda para adaptar algum deles pro seu caso específico, manda o código — ajudo a montar.

E se quiser uma parte 2 com context managers para coisas mais específicas — monitoramento de recursos, rate limiting, ou integração com filas — é só pedir. O Lab da Garra existe pra isso.

Posts Similares