Pipeline de Decisão para IA Autônoma: Grafos Que Salvam Seu Deploy (Sem Framework Caro)
Você já sentiu aquela sensação estranha de que seus scripts de automação são só reativos — respondem a eventos, mas nunca pensam sobre o que fazer a seguir? Eu senti. Até o dia em que percebi que um grafo direcionado simples transformou meu caos de bash scripts em algo que, pela primeira vez, eu chamaria de pipeline de decisão. E o melhor: sem framework caro, sem serviço gerenciado, só Python puro e um pouco de lógica grafo aplicada.

O Problema Que Ninguém Conta em Tutorial de Automação
Todos os tutoriais de automação ensinam o caminho feliz: se X acontecer, faça Y. Lindo. Funciona no slide. Na vida real, as automações enfrentam cenários ramificados, dependências cíclicas e aquele momento em que o serviço X falha justo quando o serviço Z precisa dele. O que você faz? Um if-elif-elif-else de 200 linhas? Eu fiz. E me arrependi profundamente.
A verdade é que automação sem estrutura de decisão é como dirigir sem mapa: você até chega, mas gasta mais gasolina, tempo e sanidade. Um pipeline de decisão para IA autônoma resolve exatamente isso — ele modela caminhos possíveis como um grafo, avalia condições em cada nó e segue a rota mais eficiente sem intervenção humana.
Neste artigo, vou te mostrar como construí um sistema de pipeline de decisão em Python que roda 24/7 no meu servidor, toma decisões autônomas e — o mais importante — me notifica quando algo sai do planejado. Sem drama. Sem Kubernetes.
O Que É um Pipeline de Decisão (Sem o Papo de Enterprise Architect)
Um pipeline de decisão é, na essência, um grafo direcionado onde cada nó representa uma etapa ou decisão e cada aresta representa uma transição condicional. Diferente de uma sequência linear de comandos, o pipeline:
- Avalia o contexto antes de cada transição (condições em tempo real)
- Branch dinâmico — escolhe caminhos diferentes dependendo do resultado de cada etapa
- Backtracking — pode voltar a nós anteriores se uma condição falhar
- Paralelização controlada — ramos independentes rodam simultaneamente
- Estado persistente — sabe exatamente onde parou se o processo morrer no meio
Pense como um fluxograma que roda de verdade e se adapta ao invés de seguir cegamente um roteiro. A IA entra como o avaliador de condições: ela decide qual aresta seguir com base em dados do ambiente, não em regras hard-coded.

Arquitetura: O Grafo de Decisão na Prática
Vamos ao concreto. A estrutura básica que eu uso tem três componentes:
- Definição do Grafo — nós (estados/decisões) e arestas (transições com condições)
- Motor de Execução — percorre o grafo, avalia condições, dispara ações
- Monitor de Estado — persiste o progresso, permite retomar após falhas
A implementação usa um dicionário Python como grafo simples. Sem bibliotecas pesadas. Aqui está o esqueleto:
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json, logging
@dataclass
class Node:
name: str
action: Callable[[], Any]
condition: Optional[Callable[[], bool]] = None
description: str = ""
@dataclass
class DecisionPipeline:
nodes: Dict[str, Node] = field(default_factory=dict)
edges: Dict[str, Dict[str, str]] = field(default_factory=dict)
state: Dict[str, Any] = field(default_factory=dict)
def add_node(self, name: str, action: Callable,
condition: Callable = None, desc: str = ""):
self.nodes[name] = Node(name, action, condition, desc)
def add_edge(self, source: str, outcome: str, target: str):
"""outcome: 'ok', 'fail', 'retry', etc."""
self.edges.setdefault(source, {})[outcome] = target
def evaluate_condition(self, node_name: str) -> str:
"""Retorna a aresta a seguir baseado na condição do nó."""
node = self.nodes[node_name]
if node.condition:
result = node.condition()
return "ok" if result else "fail"
return "ok"
def run(self, start: str = "init", max_steps: int = 50):
current = start
steps = 0
path = []
while current and steps < max_steps:
node = self.nodes[current]
self.state["current_node"] = current
self.state["step"] = steps
path.append(current)
logging.info(f"Executando nó: {current} — {node.description}")
result = node.action()
self.state[f"{current}_result"] = result
if node.condition:
outcome = self.evaluate_condition(current)
else:
outcome = "ok"
current = self.edges.get(current, {}).get(outcome)
steps += 1
self.state["completed"] = not current is None
self.state["path"] = path
self.save_state()
return path
Simples. Mas poderoso. A mágica não está na complexidade do código — está na modelagem do problema como grafo.
Exemplo Real: Pipeline de Validação de Deploy
Esse é o cenário que me fez criar o pipeline. Eu precisava automatizar o deploy de serviços no servidor, mas com validações inteligentes em cada etapa. O pipeline decide se continua, rollback ou notifica baseado em métricas reais.
import subprocess, psutil, requests
pipeline = DecisionPipeline()
# Nó 1: Verificar saúde do servidor
pipeline.add_node(
"check_server_health",
action=lambda: {
"cpu": psutil.cpu_percent(interval=1),
"memory": psutil.virtual_memory().percent,
"disk": psutil.disk_usage("/").percent
},
condition=lambda: (
pipeline.state.get("check_server_health_result", {}).get("cpu", 100) < 85
and pipeline.state.get("check_server_health_result", {}).get("memory", 100) < 90
),
desc="Verifica se o servidor tem recursos para o deploy"
)
pipeline.add_edge("check_server_health", "ok", "pull_code")
pipeline.add_edge("check_server_health", "fail", "notify_resource_alert")
# Nó 2: Pull do código
pipeline.add_node(
"pull_code",
action=lambda: subprocess.run(
["git", "-C", "/opt/app", "pull", "--rebase"],
capture_output=True, text=True, timeout=120
).returncode == 0,
desc="Atualiza o código via git pull --rebase"
)
pipeline.add_edge("pull_code", "ok", "run_migrations")
pipeline.add_edge("pull_code", "fail", "notify_git_error")
# Nó 3: Rodar migrações se necessário
pipeline.add_node(
"run_migrations",
action=lambda: subprocess.run(
["python", "/opt/app/manage.py", "migrate", "--check"],
capture_output=True, text=True, timeout=60
).returncode == 0,
desc="Executa migrações do banco de dados"
)
pipeline.add_edge("run_migrations", "ok", "run_tests")
pipeline.add_edge("run_migrations", "fail", "rollback_migrations")
# Nó 4: Testes rápidos de sanity check
pipeline.add_node(
"run_tests",
action=lambda: subprocess.run(
["python", "/opt/app/manage.py", "test", "--keepdb"],
capture_output=True, text=True, timeout=300
).returncode == 0,
desc="Executa bateria rápida de testes"
)
pipeline.add_edge("run_tests", "ok", "deploy_service")
pipeline.add_edge("run_tests", "fail", "rollback_migrations")
# Nó 5: Deploy efetivo
pipeline.add_node(
"deploy_service",
action=lambda: subprocess.run(
["systemctl", "restart", "app.service"],
capture_output=True, text=True, timeout=30
).returncode == 0,
desc="Reinicia o serviço com o novo código"
)
pipeline.add_edge("deploy_service", "ok", "verify_health")
pipeline.add_edge("deploy_service", "fail", "notify_deploy_error")
# Nó 6: Verificar saúde pós-deploy
pipeline.add_node(
"verify_health",
action=lambda: requests.get(
"http://localhost:8000/healthz", timeout=10
).status_code == 200,
desc="Verifica se o serviço responde após deploy"
)
pipeline.add_edge("verify_health", "ok", "notify_success")
pipeline.add_edge("verify_health", "fail", "notify_health_check_fail")
# Nós de notificação e rollback
for notify_node in ["notify_resource_alert", "notify_git_error",
"notify_deploy_error", "notify_health_check_fail"]:
pipeline.add_node(
notify_node,
action=lambda n=notify_node: send_alert(f"Deploy falhou em: {n}"),
desc=f"Notifica falha: {notify_node}"
)
pipeline.add_edge(notify_node, "ok", None) # terminal
pipeline.add_node(
"rollback_migrations",
action=lambda: subprocess.run(
["python", "/opt/app/manage.py", "migrate", "--rollback"],
capture_output=True, text=True, timeout=120
).returncode == 0,
desc="Faz rollback das migrações em caso de falha"
)
pipeline.add_edge("rollback_migrations", "ok", "notify_rollback")
pipeline.add_edge("rollback_migrations", "fail", "notify_rollback")
pipeline.add_node(
"notify_rollback",
action=lambda: send_alert("Deploy falhou e rollback foi executado"),
desc="Notifica que rollback ocorreu"
)
pipeline.add_edge("notify_rollback", "ok", None)
pipeline.add_node(
"notify_success",
action=lambda: send_alert("Deploy concluído com sucesso!"),
desc="Notifica deploy bem-sucedido"
)
pipeline.add_edge("notify_success", "ok", None)
# Executar
path = pipeline.run("check_server_health")
print(f"Caminho executado: {' -> '.join(path)}")
print(f"Estado final: {json.dumps(pipeline.state, indent=2)}")
Repara na estrutura: cada nó tem uma ação, uma condição opcional e arestas que mapeiam resultados para próximos passos. Se o servidor estiver sobrecarregado, ele nem tenta o deploy — notifica e para. Se o teste falhar, rollback automático. É decisão autônoma de verdade, não um try/except glorificado.
Como Funciona o Backtracking
O backtracking é implícito na estrutura do grafo. Quando um nó falha, a aresta "fail" aponta para tratamento — que pode ser um retry, um rollback ou simplesmente notificar e parar. O monitor de estado persiste cada decisão:
def save_state(self):
"""Persiste estado para retomar após falha."""
self.state["timestamp"] = datetime.now().isoformat()
with open("/opt/app/pipeline_state.json", "w") as f:
json.dump(self.state, f, indent=2)
def load_state(self) -> Optional[str]:
"""Retorna último nó executado para retomar."""
try:
with open("/opt/app/pipeline_state.json") as f:
self.state = json.load(f)
return self.state.get("current_node")
except FileNotFoundError:
return None
Com isso, se o servidor reiniciar no meio do deploy, o pipeline sabe exatamente onde parou. Não recomeça do zero — retoma do último nó executado. Isso salvou meu deploy mais de uma vez.
Integrando IA como Avaliador de Condições
Aqui é onde a coisa fica interessante. Em vez de condições hard-coded (CPU < 85%), você pode usar um modelo de linguagem para avaliar condições complexas. Quer um exemplo?
import openai
def ai_condition_evaluator(context: dict, prompt_template: str) -> bool:
"""Usa IA para avaliar condições complexas baseado em contexto."""
context_str = json.dumps(context, indent=2)
prompt = prompt_template.format(context=context_str)
response = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Você é um avaliador de condições de deploy. Responda APENAS 'true' ou 'false'."},
{"role": "user", "content": prompt}
],
temperature=0.1,
max_tokens=10
)
result = response.choices[0].message.content.strip().lower()
return result == "true"
# Exemplo de uso como condição no pipeline
pipeline.add_node(
"ai_risk_assessment",
action=lambda: collect_metrics(),
condition=lambda: ai_condition_evaluator(
context=pipeline.state,
prompt_template="""
Analise o contexto do deploy e decida se é seguro continuar:
{context}
Considere: padrões de tráfego, hora do dia, complexidade das mudanças,
histórico de falhas recentes. É seguro prosseguir com o deploy?
"""
),
desc="IA avalia risco do deploy com base em métricas históricas"
)
pipeline.add_edge("ai_risk_assessment", "ok", "deploy_service")
pipeline.add_edge("ai_risk_assessment", "fail", "notify_risk_too_high")
Isso transforma condições binárias simples em decisões contextuais. A IA pode considerar hora do pico, padrão de tráfego, histórico de falhas e outros fatores que uma regra if cpu > 85 jamais capturaria.
Paralelização Controlada: Ramos que Rodam ao Mesmo Tempo
Nem tudo é sequencial. Às vezes você quer validar múltiplas coisas em paralelo antes de seguir adiante. A extensão do pipeline para suportar forks é simples:
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class ParallelDecisionPipeline(DecisionPipeline):
def run_parallel_branches(self, branch_nodes: List[str], timeout: int = 60):
"""Executa múltiplos ramos em paralelo e agrega resultados."""
results = {}
lock = threading.Lock()
def run_node(node_name):
node = self.nodes[node_name]
result = node.action()
with lock:
results[node_name] = result
return node_name, result
with ThreadPoolExecutor(max_workers=len(branch_nodes)) as executor:
futures = {
executor.submit(run_node, n): n for n in branch_nodes
}
for future in as_completed(futures, timeout=timeout):
node_name, result = future.result()
self.state[f"parallel_{node_name}"] = result
return results
def evaluate_parallel_gate(self, branch_nodes: List[str],
strategy: str = "all") -> bool:
"""Avalia se todos (ou qualquer) ramos passaram."""
for node_name in branch_nodes:
result = self.state.get(f"parallel_{node_name}")
if strategy == "all" and not result:
return False
if strategy == "any" and result:
return True
return strategy == "all"
Com isso, você pode validar saúde do banco, disponibilidade do cache e integridade do sistema de arquivos simultaneamente. O pipeline só avança quando todos os checks passam (estratégia "all") ou quando pelo menos um passa (estratégia "any").

Monitoramento e Observabilidade do Pipeline
Um pipeline de decisão sem monitoramento é um avião sem painel: você não sabe se está voando ou caindo. Adicionei logs estruturados e métricas para cada execução:
import json
from datetime import datetime
class PipelineMetrics:
def __init__(self, log_file="/var/log/pipeline_metrics.jsonl"):
self.log_file = log_file
def record(self, pipeline_name: str, path: List[str],
state: dict, duration: float):
entry = {
"timestamp": datetime.now().isoformat(),
"pipeline": pipeline_name,
"path": path,
"final_state": state.get("completed", False),
"duration_seconds": round(duration, 2),
"nodes_executed": len(path),
"last_node": path[-1] if path else None
}
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
# Uso no pipeline
import time
start = time.time()
path = pipeline.run("check_server_health")
duration = time.time() - start
metrics = PipelineMetrics()
metrics.record("deploy_pipeline", path, pipeline.state, duration)
Cada execução fica registrada em JSONL — fácil de consultar com jq ou alimentar um Grafana. Eu uso tail -f no servidor pra ver os pipelines rodando em tempo real. É satisfatório demais.
O Perrengue Que Me Feu Criar Isso
Quando Usar (e Quando NÃO Usar) Pipeline de Decisão
Vamos ser honestos: nem tudo precisa de um grafo de decisão. Eis quando vale a pena:
- Use quando: seu processo tem 5+ etapas com ramificações, precisa de rollback, depende de condições externas, ou roda sem supervisão
- Não use quando: é um script de uma linha, um cron simples, ou quando cada etapa sempre faz a mesma coisa
A regra que eu sigo: se você precisa de mais de 3 níveis de if-else aninhados no seu script, é hora de modelar como grafo. A complexidade do código cai drasticamente porque você separa a lógica de decisão da lógica de execução.
Automatizando com Systemd para Rodar 24/7
O pipeline sozinho não é automação — é só código. Para virar automação real, precisa rodar de forma resiliente no servidor. Minha abordagem:
[Unit]
Description=AutoMente Decision Pipeline - Deploy Automation
After=network.target postgresql.service
[Service]
Type=oneshot
User=deploy
WorkingDirectory=/opt/app
ExecStart=/opt/app/venv/bin/python /opt/app/pipeline_runner.py
Restart=no
StandardOutput=journal
StandardError=journal
# Timeout generoso para pipelines longos
TimeoutStartSec=600
# Ambiente
Environment=PYTHONUNBUFFERED=1
Environment=PIPELINE_LOG_LEVEL=INFO
[Install]
WantedBy=multi-user.target
E o agendamento? Timer do systemd, claro:
[Unit]
Description=Executar Pipeline de Deploy a cada 6h
[Timer]
OnBootSec=5min
OnUnitActiveSec=6h
RandomizedDelaySec=300
[Install]
WantedBy=timers.target
Com RandomizedDelaySec, evita que todos os pipelines dispararem no mesmo segundo se você tiver múltiplos serviços. Detalhe que faz diferença.
Conclusão: Estrutura É Liberdade
Parece contraditório, mas é verdade: dar estrutura às suas automações te dá mais liberdade, não menos. Quando o pipeline decide por você, você pode dormir. Quando cada etapa valida antes de seguir, você não acorda com produção caída. Quando o estado persiste entre execuções, você não perde horas tentando entender onde o deploy parou.
O pipeline de decisão para IA autônoma não é sobre construir sistemas complexos — é sobre modelar problemas reais de forma que o computador resolva sozinho. Comece simples: um grafo com 5 nós, condições básicas, logs. Depois adicione IA, paralelização, observabilidade. O importante é começar.
E se você chegou até aqui, me diz nos comentários: qual automação você gostaria de ver transformada em pipeline de decisão? O próximo artigo pode ser exatamente sobre o seu caso. Porque no fim, automação boa é aquela que resolve o problema de alguém de verdade — não a que fica bonita no README.
Esse post faz parte da série Mente Binária, onde exploramos a lógica por trás de sistemas que pensam — ou pelo menos fingem que pensam bem.
