Pipeline de Decisão para IA Autônoma: Grafos Que Salvam Seu Deploy (Sem Framework Caro)

Você já sentiu aquela sensação estranha de que seus scripts de automação são só reativos — respondem a eventos, mas nunca pensam sobre o que fazer a seguir? Eu senti. Até o dia em que percebi que um grafo direcionado simples transformou meu caos de bash scripts em algo que, pela primeira vez, eu chamaria de pipeline de decisão. E o melhor: sem framework caro, sem serviço gerenciado, só Python puro e um pouco de lógica grafo aplicada.

Pipeline de decisão para IA autônoma mostrando fluxo de processamento de dados com monitoramento em tempo real no servidor
Um pipeline de decisão não é mágica — é estrutura aplicada ao caos.

O Problema Que Ninguém Conta em Tutorial de Automação

Todos os tutoriais de automação ensinam o caminho feliz: se X acontecer, faça Y. Lindo. Funciona no slide. Na vida real, as automações enfrentam cenários ramificados, dependências cíclicas e aquele momento em que o serviço X falha justo quando o serviço Z precisa dele. O que você faz? Um if-elif-elif-else de 200 linhas? Eu fiz. E me arrependi profundamente.

A verdade é que automação sem estrutura de decisão é como dirigir sem mapa: você até chega, mas gasta mais gasolina, tempo e sanidade. Um pipeline de decisão para IA autônoma resolve exatamente isso — ele modela caminhos possíveis como um grafo, avalia condições em cada nó e segue a rota mais eficiente sem intervenção humana.

Neste artigo, vou te mostrar como construí um sistema de pipeline de decisão em Python que roda 24/7 no meu servidor, toma decisões autônomas e — o mais importante — me notifica quando algo sai do planejado. Sem drama. Sem Kubernetes.

O Que É um Pipeline de Decisão (Sem o Papo de Enterprise Architect)

Um pipeline de decisão é, na essência, um grafo direcionado onde cada nó representa uma etapa ou decisão e cada aresta representa uma transição condicional. Diferente de uma sequência linear de comandos, o pipeline:

  • Avalia o contexto antes de cada transição (condições em tempo real)
  • Branch dinâmico — escolhe caminhos diferentes dependendo do resultado de cada etapa
  • Backtracking — pode voltar a nós anteriores se uma condição falhar
  • Paralelização controlada — ramos independentes rodam simultaneamente
  • Estado persistente — sabe exatamente onde parou se o processo morrer no meio

Pense como um fluxograma que roda de verdade e se adapta ao invés de seguir cegamente um roteiro. A IA entra como o avaliador de condições: ela decide qual aresta seguir com base em dados do ambiente, não em regras hard-coded.

Estrutura de grafo de decisão mostrando nós interconectados para automação inteligente de processos
Visualizando o grafo: cada nó é uma decisão, cada seta é um caminho possível.

Arquitetura: O Grafo de Decisão na Prática

Vamos ao concreto. A estrutura básica que eu uso tem três componentes:

  1. Definição do Grafo — nós (estados/decisões) e arestas (transições com condições)
  2. Motor de Execução — percorre o grafo, avalia condições, dispara ações
  3. Monitor de Estado — persiste o progresso, permite retomar após falhas

A implementação usa um dicionário Python como grafo simples. Sem bibliotecas pesadas. Aqui está o esqueleto:

from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json, logging

@dataclass
class Node:
    name: str
    action: Callable[[], Any]
    condition: Optional[Callable[[], bool]] = None
    description: str = ""

@dataclass
class DecisionPipeline:
    nodes: Dict[str, Node] = field(default_factory=dict)
    edges: Dict[str, Dict[str, str]] = field(default_factory=dict)
    state: Dict[str, Any] = field(default_factory=dict)
    
    def add_node(self, name: str, action: Callable, 
                 condition: Callable = None, desc: str = ""):
        self.nodes[name] = Node(name, action, condition, desc)
    
    def add_edge(self, source: str, outcome: str, target: str):
        """outcome: 'ok', 'fail', 'retry', etc."""
        self.edges.setdefault(source, {})[outcome] = target
    
    def evaluate_condition(self, node_name: str) -> str:
        """Retorna a aresta a seguir baseado na condição do nó."""
        node = self.nodes[node_name]
        if node.condition:
            result = node.condition()
            return "ok" if result else "fail"
        return "ok"
    
    def run(self, start: str = "init", max_steps: int = 50):
        current = start
        steps = 0
        path = []
        
        while current and steps < max_steps:
            node = self.nodes[current]
            self.state["current_node"] = current
            self.state["step"] = steps
            path.append(current)
            
            logging.info(f"Executando nó: {current} — {node.description}")
            result = node.action()
            self.state[f"{current}_result"] = result
            
            if node.condition:
                outcome = self.evaluate_condition(current)
            else:
                outcome = "ok"
            
            current = self.edges.get(current, {}).get(outcome)
            steps += 1
        
        self.state["completed"] = not current is None
        self.state["path"] = path
        self.save_state()
        return path

Simples. Mas poderoso. A mágica não está na complexidade do código — está na modelagem do problema como grafo.

Exemplo Real: Pipeline de Validação de Deploy

Esse é o cenário que me fez criar o pipeline. Eu precisava automatizar o deploy de serviços no servidor, mas com validações inteligentes em cada etapa. O pipeline decide se continua, rollback ou notifica baseado em métricas reais.

import subprocess, psutil, requests

pipeline = DecisionPipeline()

# Nó 1: Verificar saúde do servidor
pipeline.add_node(
    "check_server_health",
    action=lambda: {
        "cpu": psutil.cpu_percent(interval=1),
        "memory": psutil.virtual_memory().percent,
        "disk": psutil.disk_usage("/").percent
    },
    condition=lambda: (
        pipeline.state.get("check_server_health_result", {}).get("cpu", 100) < 85
        and pipeline.state.get("check_server_health_result", {}).get("memory", 100) < 90
    ),
    desc="Verifica se o servidor tem recursos para o deploy"
)
pipeline.add_edge("check_server_health", "ok", "pull_code")
pipeline.add_edge("check_server_health", "fail", "notify_resource_alert")

# Nó 2: Pull do código
pipeline.add_node(
    "pull_code",
    action=lambda: subprocess.run(
        ["git", "-C", "/opt/app", "pull", "--rebase"],
        capture_output=True, text=True, timeout=120
    ).returncode == 0,
    desc="Atualiza o código via git pull --rebase"
)
pipeline.add_edge("pull_code", "ok", "run_migrations")
pipeline.add_edge("pull_code", "fail", "notify_git_error")

# Nó 3: Rodar migrações se necessário
pipeline.add_node(
    "run_migrations",
    action=lambda: subprocess.run(
        ["python", "/opt/app/manage.py", "migrate", "--check"],
        capture_output=True, text=True, timeout=60
    ).returncode == 0,
    desc="Executa migrações do banco de dados"
)
pipeline.add_edge("run_migrations", "ok", "run_tests")
pipeline.add_edge("run_migrations", "fail", "rollback_migrations")

# Nó 4: Testes rápidos de sanity check
pipeline.add_node(
    "run_tests",
    action=lambda: subprocess.run(
        ["python", "/opt/app/manage.py", "test", "--keepdb"],
        capture_output=True, text=True, timeout=300
    ).returncode == 0,
    desc="Executa bateria rápida de testes"
)
pipeline.add_edge("run_tests", "ok", "deploy_service")
pipeline.add_edge("run_tests", "fail", "rollback_migrations")

# Nó 5: Deploy efetivo
pipeline.add_node(
    "deploy_service",
    action=lambda: subprocess.run(
        ["systemctl", "restart", "app.service"],
        capture_output=True, text=True, timeout=30
    ).returncode == 0,
    desc="Reinicia o serviço com o novo código"
)
pipeline.add_edge("deploy_service", "ok", "verify_health")
pipeline.add_edge("deploy_service", "fail", "notify_deploy_error")

# Nó 6: Verificar saúde pós-deploy
pipeline.add_node(
    "verify_health",
    action=lambda: requests.get(
        "http://localhost:8000/healthz", timeout=10
    ).status_code == 200,
    desc="Verifica se o serviço responde após deploy"
)
pipeline.add_edge("verify_health", "ok", "notify_success")
pipeline.add_edge("verify_health", "fail", "notify_health_check_fail")

# Nós de notificação e rollback
for notify_node in ["notify_resource_alert", "notify_git_error", 
                     "notify_deploy_error", "notify_health_check_fail"]:
    pipeline.add_node(
        notify_node,
        action=lambda n=notify_node: send_alert(f"Deploy falhou em: {n}"),
        desc=f"Notifica falha: {notify_node}"
    )
    pipeline.add_edge(notify_node, "ok", None)  # terminal

pipeline.add_node(
    "rollback_migrations",
    action=lambda: subprocess.run(
        ["python", "/opt/app/manage.py", "migrate", "--rollback"],
        capture_output=True, text=True, timeout=120
    ).returncode == 0,
    desc="Faz rollback das migrações em caso de falha"
)
pipeline.add_edge("rollback_migrations", "ok", "notify_rollback")
pipeline.add_edge("rollback_migrations", "fail", "notify_rollback")

pipeline.add_node(
    "notify_rollback",
    action=lambda: send_alert("Deploy falhou e rollback foi executado"),
    desc="Notifica que rollback ocorreu"
)
pipeline.add_edge("notify_rollback", "ok", None)

pipeline.add_node(
    "notify_success",
    action=lambda: send_alert("Deploy concluído com sucesso!"),
    desc="Notifica deploy bem-sucedido"
)
pipeline.add_edge("notify_success", "ok", None)

# Executar
path = pipeline.run("check_server_health")
print(f"Caminho executado: {' -> '.join(path)}")
print(f"Estado final: {json.dumps(pipeline.state, indent=2)}")

Repara na estrutura: cada nó tem uma ação, uma condição opcional e arestas que mapeiam resultados para próximos passos. Se o servidor estiver sobrecarregado, ele nem tenta o deploy — notifica e para. Se o teste falhar, rollback automático. É decisão autônoma de verdade, não um try/except glorificado.

Como Funciona o Backtracking

O backtracking é implícito na estrutura do grafo. Quando um nó falha, a aresta "fail" aponta para tratamento — que pode ser um retry, um rollback ou simplesmente notificar e parar. O monitor de estado persiste cada decisão:

    def save_state(self):
        """Persiste estado para retomar após falha."""
        self.state["timestamp"] = datetime.now().isoformat()
        with open("/opt/app/pipeline_state.json", "w") as f:
            json.dump(self.state, f, indent=2)
    
    def load_state(self) -> Optional[str]:
        """Retorna último nó executado para retomar."""
        try:
            with open("/opt/app/pipeline_state.json") as f:
                self.state = json.load(f)
            return self.state.get("current_node")
        except FileNotFoundError:
            return None

Com isso, se o servidor reiniciar no meio do deploy, o pipeline sabe exatamente onde parou. Não recomeça do zero — retoma do último nó executado. Isso salvou meu deploy mais de uma vez.

Integrando IA como Avaliador de Condições

Aqui é onde a coisa fica interessante. Em vez de condições hard-coded (CPU < 85%), você pode usar um modelo de linguagem para avaliar condições complexas. Quer um exemplo?

import openai

def ai_condition_evaluator(context: dict, prompt_template: str) -> bool:
    """Usa IA para avaliar condições complexas baseado em contexto."""
    context_str = json.dumps(context, indent=2)
    prompt = prompt_template.format(context=context_str)
    
    response = openai.ChatCompletion.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Você é um avaliador de condições de deploy. Responda APENAS 'true' ou 'false'."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.1,
        max_tokens=10
    )
    
    result = response.choices[0].message.content.strip().lower()
    return result == "true"

# Exemplo de uso como condição no pipeline
pipeline.add_node(
    "ai_risk_assessment",
    action=lambda: collect_metrics(),
    condition=lambda: ai_condition_evaluator(
        context=pipeline.state,
        prompt_template="""
Analise o contexto do deploy e decida se é seguro continuar:
{context}

Considere: padrões de tráfego, hora do dia, complexidade das mudanças,
histórico de falhas recentes. É seguro prosseguir com o deploy?
"""
    ),
    desc="IA avalia risco do deploy com base em métricas históricas"
)
pipeline.add_edge("ai_risk_assessment", "ok", "deploy_service")
pipeline.add_edge("ai_risk_assessment", "fail", "notify_risk_too_high")

Isso transforma condições binárias simples em decisões contextuais. A IA pode considerar hora do pico, padrão de tráfego, histórico de falhas e outros fatores que uma regra if cpu > 85 jamais capturaria.

Paralelização Controlada: Ramos que Rodam ao Mesmo Tempo

Nem tudo é sequencial. Às vezes você quer validar múltiplas coisas em paralelo antes de seguir adiante. A extensão do pipeline para suportar forks é simples:

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

class ParallelDecisionPipeline(DecisionPipeline):
    def run_parallel_branches(self, branch_nodes: List[str], timeout: int = 60):
        """Executa múltiplos ramos em paralelo e agrega resultados."""
        results = {}
        lock = threading.Lock()
        
        def run_node(node_name):
            node = self.nodes[node_name]
            result = node.action()
            with lock:
                results[node_name] = result
            return node_name, result
        
        with ThreadPoolExecutor(max_workers=len(branch_nodes)) as executor:
            futures = {
                executor.submit(run_node, n): n for n in branch_nodes
            }
            for future in as_completed(futures, timeout=timeout):
                node_name, result = future.result()
                self.state[f"parallel_{node_name}"] = result
        
        return results
    
    def evaluate_parallel_gate(self, branch_nodes: List[str], 
                                strategy: str = "all") -> bool:
        """Avalia se todos (ou qualquer) ramos passaram."""
        for node_name in branch_nodes:
            result = self.state.get(f"parallel_{node_name}")
            if strategy == "all" and not result:
                return False
            if strategy == "any" and result:
                return True
        return strategy == "all"

Com isso, você pode validar saúde do banco, disponibilidade do cache e integridade do sistema de arquivos simultaneamente. O pipeline só avança quando todos os checks passam (estratégia "all") ou quando pelo menos um passa (estratégia "any").

Execução de pipeline de automação com múltiplos ramos paralelos monitorando sistemas em tempo real
Ramos paralelos aceleram a validação sem sacrificar a confiabilidade.

Monitoramento e Observabilidade do Pipeline

Um pipeline de decisão sem monitoramento é um avião sem painel: você não sabe se está voando ou caindo. Adicionei logs estruturados e métricas para cada execução:

import json
from datetime import datetime

class PipelineMetrics:
    def __init__(self, log_file="/var/log/pipeline_metrics.jsonl"):
        self.log_file = log_file
    
    def record(self, pipeline_name: str, path: List[str], 
               state: dict, duration: float):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "pipeline": pipeline_name,
            "path": path,
            "final_state": state.get("completed", False),
            "duration_seconds": round(duration, 2),
            "nodes_executed": len(path),
            "last_node": path[-1] if path else None
        }
        with open(self.log_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

# Uso no pipeline
import time
start = time.time()
path = pipeline.run("check_server_health")
duration = time.time() - start
metrics = PipelineMetrics()
metrics.record("deploy_pipeline", path, pipeline.state, duration)

Cada execução fica registrada em JSONL — fácil de consultar com jq ou alimentar um Grafana. Eu uso tail -f no servidor pra ver os pipelines rodando em tempo real. É satisfatório demais.

O Perrengue Que Me Feu Criar Isso

Quando Usar (e Quando NÃO Usar) Pipeline de Decisão

Vamos ser honestos: nem tudo precisa de um grafo de decisão. Eis quando vale a pena:

  • Use quando: seu processo tem 5+ etapas com ramificações, precisa de rollback, depende de condições externas, ou roda sem supervisão
  • Não use quando: é um script de uma linha, um cron simples, ou quando cada etapa sempre faz a mesma coisa

A regra que eu sigo: se você precisa de mais de 3 níveis de if-else aninhados no seu script, é hora de modelar como grafo. A complexidade do código cai drasticamente porque você separa a lógica de decisão da lógica de execução.

Automatizando com Systemd para Rodar 24/7

O pipeline sozinho não é automação — é só código. Para virar automação real, precisa rodar de forma resiliente no servidor. Minha abordagem:

[Unit]
Description=AutoMente Decision Pipeline - Deploy Automation
After=network.target postgresql.service

[Service]
Type=oneshot
User=deploy
WorkingDirectory=/opt/app
ExecStart=/opt/app/venv/bin/python /opt/app/pipeline_runner.py
Restart=no
StandardOutput=journal
StandardError=journal

# Timeout generoso para pipelines longos
TimeoutStartSec=600

# Ambiente
Environment=PYTHONUNBUFFERED=1
Environment=PIPELINE_LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target

E o agendamento? Timer do systemd, claro:

[Unit]
Description=Executar Pipeline de Deploy a cada 6h

[Timer]
OnBootSec=5min
OnUnitActiveSec=6h
RandomizedDelaySec=300

[Install]
WantedBy=timers.target

Com RandomizedDelaySec, evita que todos os pipelines dispararem no mesmo segundo se você tiver múltiplos serviços. Detalhe que faz diferença.

Conclusão: Estrutura É Liberdade

Parece contraditório, mas é verdade: dar estrutura às suas automações te dá mais liberdade, não menos. Quando o pipeline decide por você, você pode dormir. Quando cada etapa valida antes de seguir, você não acorda com produção caída. Quando o estado persiste entre execuções, você não perde horas tentando entender onde o deploy parou.

O pipeline de decisão para IA autônoma não é sobre construir sistemas complexos — é sobre modelar problemas reais de forma que o computador resolva sozinho. Comece simples: um grafo com 5 nós, condições básicas, logs. Depois adicione IA, paralelização, observabilidade. O importante é começar.

E se você chegou até aqui, me diz nos comentários: qual automação você gostaria de ver transformada em pipeline de decisão? O próximo artigo pode ser exatamente sobre o seu caso. Porque no fim, automação boa é aquela que resolve o problema de alguém de verdade — não a que fica bonita no README.

Esse post faz parte da série Mente Binária, onde exploramos a lógica por trás de sistemas que pensam — ou pelo menos fingem que pensam bem.

Posts Similares