🤖 Pipeline de Machine Learning
Documentação detalhada do pipeline de Machine Learning do projeto Flight Delay Prediction.
📋 Visão Geral do Pipeline
O pipeline de ML foi projetado seguindo as melhores práticas da indústria, implementando um fluxo completo desde a ingestão de dados até o deploy do modelo em produção.
graph TB
subgraph "📊 Data Ingestion"
A[Raw Flight Data] --> B[Data Validation]
B --> C[Data Quality Check]
end
subgraph "🔧 Data Processing"
C --> D[Data Cleaning]
D --> E[Feature Engineering]
E --> F[Data Splitting]
end
subgraph "🤖 Model Development"
F --> G[Algorithm Selection]
G --> H[Hyperparameter Tuning]
H --> I[Model Training]
I --> J[Model Validation]
end
subgraph "📈 Model Evaluation"
J --> K[Performance Metrics]
K --> L[Business Metrics]
L --> M[Model Comparison]
end
subgraph "🚀 Model Deployment"
M --> N[Model Serialization]
N --> O[API Integration]
O --> P[Production Monitoring]
end
style A fill:#e3f2fd
style D fill:#f3e5f5
style G fill:#e8f5e8
style K fill:#fff3e0
style N fill:#fce4ec
📊 1. Ingestão e Validação de Dados
🔍 Fonte de Dados
Dataset Principal: data/input/voos.json
- 📊 Volume: ~100.000 registros de voos
- 📅 Período: 2019-2023 (dados históricos)
- 🗂️ Formato: JSON estruturado
- 📝 Features: 15+ variáveis incluindo companhia, aeroportos, horários
✅ Validação Inicial
def validate_raw_data(df: pd.DataFrame) -> Dict[str, Any]:
"""Validação inicial dos dados brutos"""
validation_report = {
"total_records": len(df),
"missing_data": {},
"data_types": {},
"quality_issues": []
}
# Verificar dados faltantes
missing = df.isnull().sum()
validation_report["missing_data"] = {
col: {"count": missing[col], "percentage": missing[col]/len(df)*100}
for col in missing.index if missing[col] > 0
}
# Verificar tipos de dados
validation_report["data_types"] = df.dtypes.to_dict()
# Verificar consistência de datas
if 'partida_prevista' in df.columns and 'chegada_prevista' in df.columns:
invalid_dates = df[df['partida_prevista'] >= df['chegada_prevista']]
if len(invalid_dates) > 0:
validation_report["quality_issues"].append({
"issue": "invalid_flight_duration",
"count": len(invalid_dates),
"description": "Voos com chegada antes da partida"
})
return validation_report
Resultados da Validação:
{
"total_records": 95437,
"missing_data": {
"weather_condition": {"count": 12453, "percentage": 13.04},
"aircraft_type": {"count": 8901, "percentage": 9.32}
},
"quality_issues": [
{
"issue": "invalid_flight_duration",
"count": 234,
"description": "Voos com chegada antes da partida"
}
]
}
🔧 2. Processamento e Limpeza de Dados
🧹 Limpeza de Dados
def clean_flight_data(df: pd.DataFrame) -> pd.DataFrame:
"""Pipeline completo de limpeza de dados"""
logger.info(f"Iniciando limpeza. Dataset original: {df.shape}")
# 1. Remover duplicatas
df = df.drop_duplicates()
logger.info(f"Após remover duplicatas: {df.shape}")
# 2. Converter tipos de dados
datetime_columns = ['partida_prevista', 'chegada_prevista']
for col in datetime_columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
# 3. Remover registros com datas inválidas
df = df.dropna(subset=datetime_columns)
logger.info(f"Após remover datas inválidas: {df.shape}")
# 4. Remover voos com duração inconsistente
df = df[df['partida_prevista'] < df['chegada_prevista']]
logger.info(f"Após validação de duração: {df.shape}")
# 5. Tratar outliers de duração
df['duracao_horas'] = (df['chegada_prevista'] - df['partida_prevista']).dt.total_seconds() / 3600
df = df[(df['duracao_horas'] >= 0.5) & (df['duracao_horas'] <= 20)]
logger.info(f"Após remover outliers de duração: {df.shape}")
# 6. Padronizar strings
string_columns = ['companhia', 'aeroporto_origem', 'aeroporto_destino']
for col in string_columns:
df[col] = df[col].str.strip().str.upper()
# 7. Tratar valores faltantes
df['atraso_partida'] = df['atraso_partida'].fillna(0)
df['weather_condition'] = df['weather_condition'].fillna('UNKNOWN')
logger.info(f"Limpeza concluída. Dataset final: {df.shape}")
return df
🎭 Feature Engineering
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Engenharia de features completa"""
# Features temporais
df['hora_partida'] = df['partida_prevista'].dt.hour
df['dia_semana'] = df['partida_prevista'].dt.dayofweek # 0=Monday
df['mes'] = df['partida_prevista'].dt.month
df['trimestre'] = df['partida_prevista'].dt.quarter
df['is_weekend'] = (df['dia_semana'] >= 5).astype(int)
df['is_feriado'] = df['partida_prevista'].apply(is_holiday)
# Features de período do dia
df['periodo_dia'] = pd.cut(
df['hora_partida'],
bins=[0, 6, 12, 18, 24],
labels=['MADRUGADA', 'MANHA', 'TARDE', 'NOITE'],
include_lowest=True
)
# Features de duração e distância
df['duracao_planejada'] = df['duracao_horas']
df['is_voo_longo'] = (df['duracao_planejada'] > 4).astype(int)
# Features de rota
df['rota'] = df['aeroporto_origem'] + '-' + df['aeroporto_destino']
# Popularidade da rota (baseada em frequência)
route_counts = df['rota'].value_counts()
df['popularidade_rota'] = df['rota'].map(route_counts)
df['popularidade_rota_norm'] = df['popularidade_rota'] / df['popularidade_rota'].max()
# Features de aeroporto
airport_stats = calculate_airport_statistics(df)
df['origem_hub_size'] = df['aeroporto_origem'].map(airport_stats['hub_size'])
df['destino_hub_size'] = df['aeroporto_destino'].map(airport_stats['hub_size'])
# Features de companhia aérea
airline_stats = calculate_airline_statistics(df)
df['companhia_reliability'] = df['companhia'].map(airline_stats['reliability'])
df['companhia_volume'] = df['companhia'].map(airline_stats['volume'])
# Features sazonais
df['is_alta_temporada'] = df['mes'].isin([12, 1, 6, 7]).astype(int)
# Features climáticas (se disponível)
if 'weather_condition' in df.columns:
df['weather_risk'] = df['weather_condition'].map({
'CLEAR': 0, 'CLOUDY': 1, 'RAIN': 2,
'STORM': 3, 'SNOW': 3, 'FOG': 2, 'UNKNOWN': 1
})
return df
def calculate_airport_statistics(df: pd.DataFrame) -> Dict[str, Dict]:
"""Calcula estatísticas dos aeroportos"""
# Contagem de voos por aeroporto
origem_counts = df['aeroporto_origem'].value_counts()
destino_counts = df['aeroporto_destino'].value_counts()
total_counts = origem_counts.add(destino_counts, fill_value=0)
# Classificar tamanho do hub
hub_size = {}
for airport, count in total_counts.items():
if count >= 5000:
hub_size[airport] = 3 # Hub grande
elif count >= 1000:
hub_size[airport] = 2 # Hub médio
else:
hub_size[airport] = 1 # Hub pequeno
return {'hub_size': hub_size}
🤖 3. Desenvolvimento do Modelo
🎯 Seleção de Algoritmos
Foram testados múltiplos algoritmos para encontrar o melhor desempenho:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
def compare_algorithms(X_train, X_test, y_train, y_test):
"""Comparação de algoritmos de ML"""
algorithms = {
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(random_state=42),
'Logistic Regression': LogisticRegression(random_state=42),
'SVM': SVC(probability=True, random_state=42)
}
results = {}
for name, algorithm in algorithms.items():
logger.info(f"Treinando {name}...")
# Treinar modelo
algorithm.fit(X_train, y_train)
# Fazer predições
y_pred = algorithm.predict(X_test)
y_prob = algorithm.predict_proba(X_test)[:, 1] if hasattr(algorithm, 'predict_proba') else None
# Calcular métricas
accuracy = accuracy_score(y_test, y_pred)
results[name] = {
'model': algorithm,
'accuracy': accuracy,
'predictions': y_pred,
'probabilities': y_prob
}
logger.info(f"{name} - Acurácia: {accuracy:.4f}")
return results
Resultados da Comparação:
| Algoritmo | Acurácia | Precisão | Recall | F1-Score | Tempo Treino |
|---|---|---|---|---|---|
| Decision Tree | 94.2% | 89.5% | 87.3% | 88.4% | 2.3s |
| Random Forest | 93.8% | 91.2% | 85.1% | 88.0% | 15.7s |
| Gradient Boosting | 93.5% | 90.8% | 84.9% | 87.8% | 45.2s |
| Logistic Regression | 89.1% | 78.3% | 82.1% | 80.2% | 1.1s |
| SVM | 88.7% | 79.1% | 80.5% | 79.8% | 67.3s |
Modelo Escolhido: Decision Tree - ✅ Melhor acurácia (94.2%) - ⚡ Treinamento rápido (2.3s) - 🔍 Interpretabilidade alta - 📊 Boa generalização
⚙️ Otimização de Hiperparâmetros
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
def optimize_decision_tree(X_train, y_train):
"""Otimização de hiperparâmetros para Decision Tree"""
# Grid de hiperparâmetros
param_grid = {
'max_depth': [5, 10, 15, 20, None],
'min_samples_split': [50, 100, 200, 500],
'min_samples_leaf': [20, 50, 100, 200],
'max_features': ['sqrt', 'log2', None],
'criterion': ['gini', 'entropy']
}
# Grid Search com Cross Validation
dt = DecisionTreeClassifier(random_state=42)
grid_search = GridSearchCV(
estimator=dt,
param_grid=param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
logger.info("Iniciando otimização de hiperparâmetros...")
grid_search.fit(X_train, y_train)
logger.info(f"Melhores parâmetros: {grid_search.best_params_}")
logger.info(f"Melhor score CV: {grid_search.best_score_:.4f}")
return grid_search.best_estimator_
Melhores Hiperparâmetros Encontrados:
{
'max_depth': 15,
'min_samples_split': 100,
'min_samples_leaf': 50,
'max_features': 'sqrt',
'criterion': 'gini'
}
🎯 Treinamento Final
def train_final_model(X_train, y_train, best_params):
"""Treina modelo final com melhores parâmetros"""
# Criar modelo com hiperparâmetros otimizados
final_model = DecisionTreeClassifier(
max_depth=best_params['max_depth'],
min_samples_split=best_params['min_samples_split'],
min_samples_leaf=best_params['min_samples_leaf'],
max_features=best_params['max_features'],
criterion=best_params['criterion'],
random_state=42
)
# Treinar modelo
logger.info("Treinando modelo final...")
start_time = time.time()
final_model.fit(X_train, y_train)
training_time = time.time() - start_time
logger.info(f"Treinamento concluído em {training_time:.2f}s")
return final_model
📈 4. Avaliação do Modelo
📊 Métricas de Performance
def evaluate_model(model, X_test, y_test, X_train, y_train):
"""Avaliação completa do modelo"""
# Predições
y_pred_test = model.predict(X_test)
y_prob_test = model.predict_proba(X_test)[:, 1]
y_pred_train = model.predict(X_train)
# Métricas básicas
test_accuracy = accuracy_score(y_test, y_pred_test)
train_accuracy = accuracy_score(y_train, y_pred_train)
# Relatório de classificação
classification_rep = classification_report(y_test, y_pred_test, output_dict=True)
# Matriz de confusão
cm = confusion_matrix(y_test, y_pred_test)
# AUC-ROC
auc_roc = roc_auc_score(y_test, y_prob_test)
# Feature importance
if hasattr(model, 'feature_importances_'):
feature_names = X_test.columns if hasattr(X_test, 'columns') else [f'feature_{i}' for i in range(X_test.shape[1])]
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
results = {
'test_accuracy': test_accuracy,
'train_accuracy': train_accuracy,
'overfitting': train_accuracy - test_accuracy,
'classification_report': classification_rep,
'confusion_matrix': cm,
'auc_roc': auc_roc,
'feature_importance': feature_importance
}
return results
Resultados Finais:
{
"test_accuracy": 0.9421,
"train_accuracy": 0.9567,
"overfitting": 0.0146,
"auc_roc": 0.9153,
"precision": 0.895,
"recall": 0.873,
"f1_score": 0.884
}
🎯 Feature Importance
| Rank | Feature | Importância | Interpretação |
|---|---|---|---|
| 1 | companhia_encoded |
0.234 | Companhia aérea é fator crítico |
| 2 | hora_partida |
0.187 | Horário do voo muito relevante |
| 3 | popularidade_rota_norm |
0.156 | Rotas menos populares mais instáveis |
| 4 | duracao_planejada |
0.123 | Voos longos mais propensos a cancelamento |
| 5 | mes |
0.098 | Sazonalidade impacta cancelamentos |
| 6 | weather_risk |
0.087 | Condições climáticas são importantes |
| 7 | origem_hub_size |
0.065 | Tamanho do aeroporto de origem |
| 8 | dia_semana |
0.050 | Dia da semana tem influência |
📊 Análise de Erros
def analyze_prediction_errors(model, X_test, y_test):
"""Análise detalhada dos erros de predição"""
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
# Identificar erros
errors = X_test[y_test != y_pred].copy()
errors['true_label'] = y_test[y_test != y_pred]
errors['predicted_label'] = y_pred[y_test != y_pred]
errors['probability'] = y_prob[y_test != y_pred]
# Análise por tipo de erro
false_positives = errors[errors['true_label'] == 0] # Predito cancelado, mas não cancelou
false_negatives = errors[errors['true_label'] == 1] # Predito não cancelado, mas cancelou
logger.info(f"Total de erros: {len(errors)}")
logger.info(f"Falsos positivos: {len(false_positives)} ({len(false_positives)/len(errors)*100:.1f}%)")
logger.info(f"Falsos negativos: {len(false_negatives)} ({len(false_negatives)/len(errors)*100:.1f}%)")
return {
'total_errors': len(errors),
'false_positives': len(false_positives),
'false_negatives': len(false_negatives),
'error_data': errors
}
💾 5. Persistência e Versionamento
💾 Salvamento do Modelo
import joblib
import json
from datetime import datetime
def save_model_with_metadata(model, feature_names, metrics, model_path="./model/"):
"""Salva modelo com metadados completos"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Salvar modelo
model_filename = f"modelo_arvore_decisao_{timestamp}.pkl"
joblib.dump(model, os.path.join(model_path, model_filename))
# Salvar link simbólico para modelo atual
current_model_path = os.path.join(model_path, "modelo_arvore_decisao.pkl")
if os.path.exists(current_model_path):
os.remove(current_model_path)
os.symlink(model_filename, current_model_path)
# Salvar metadados
metadata = {
"model_info": {
"filename": model_filename,
"algorithm": "DecisionTreeClassifier",
"version": "1.0.0",
"created_at": datetime.now().isoformat(),
"sklearn_version": sklearn.__version__
},
"features": {
"count": len(feature_names),
"names": feature_names.tolist()
},
"performance": metrics,
"hyperparameters": model.get_params()
}
metadata_filename = f"model_metadata_{timestamp}.json"
with open(os.path.join(model_path, metadata_filename), 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"Modelo salvo: {model_filename}")
logger.info(f"Metadados salvos: {metadata_filename}")
return model_filename, metadata_filename
🔄 Carregamento para API
def load_model_for_api(model_path="./model/modelo_arvore_decisao.pkl"):
"""Carrega modelo para uso na API"""
try:
# Carregar modelo
model = joblib.load(model_path)
# Carregar metadados se disponível
metadata_path = model_path.replace('.pkl', '_metadata.json')
metadata = {}
if os.path.exists(metadata_path):
with open(metadata_path, 'r') as f:
metadata = json.load(f)
logger.info(f"Modelo carregado: {model_path}")
logger.info(f"Acurácia: {metadata.get('performance', {}).get('test_accuracy', 'N/A')}")
return {
'model': model,
'metadata': metadata,
'loaded_at': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Erro ao carregar modelo: {e}")
raise
🚀 6. Deploy e Monitoramento
📊 Monitoramento em Produção
class ModelMonitor:
"""Monitor de performance do modelo em produção"""
def __init__(self):
self.predictions_log = []
self.performance_metrics = {}
def log_prediction(self, input_features, prediction, probability, actual_result=None):
"""Registra predição para monitoramento"""
prediction_log = {
'timestamp': datetime.now().isoformat(),
'input_features': input_features,
'prediction': prediction,
'probability': probability,
'actual_result': actual_result
}
self.predictions_log.append(prediction_log)
# Calcular métricas se resultado real disponível
if actual_result is not None:
self._update_performance_metrics(prediction, actual_result)
def _update_performance_metrics(self, prediction, actual):
"""Atualiza métricas de performance"""
if 'total_predictions' not in self.performance_metrics:
self.performance_metrics = {
'total_predictions': 0,
'correct_predictions': 0,
'false_positives': 0,
'false_negatives': 0
}
self.performance_metrics['total_predictions'] += 1
if prediction == actual:
self.performance_metrics['correct_predictions'] += 1
elif prediction == 1 and actual == 0:
self.performance_metrics['false_positives'] += 1
elif prediction == 0 and actual == 1:
self.performance_metrics['false_negatives'] += 1
# Calcular acurácia atual
self.performance_metrics['current_accuracy'] = (
self.performance_metrics['correct_predictions'] /
self.performance_metrics['total_predictions']
)
def get_drift_analysis(self):
"""Análise de drift dos dados"""
if len(self.predictions_log) < 100:
return {"status": "insufficient_data"}
# Análise simples de drift baseada na distribuição de probabilidades
recent_probs = [p['probability'] for p in self.predictions_log[-100:]]
older_probs = [p['probability'] for p in self.predictions_log[-200:-100]]
if len(older_probs) == 0:
return {"status": "insufficient_historical_data"}
from scipy import stats
statistic, p_value = stats.ks_2samp(recent_probs, older_probs)
drift_detected = p_value < 0.05
return {
"drift_detected": drift_detected,
"p_value": p_value,
"statistic": statistic,
"recommendation": "retrain_model" if drift_detected else "continue_monitoring"
}
🔄 Pipeline de Retreinamento
def retrain_pipeline(new_data_path, current_model_path):
"""Pipeline automático de retreinamento"""
logger.info("Iniciando pipeline de retreinamento...")
# 1. Carregar novos dados
new_data = pd.read_json(new_data_path)
logger.info(f"Novos dados carregados: {new_data.shape}")
# 2. Validar qualidade dos dados
validation_report = validate_raw_data(new_data)
if validation_report['quality_score'] < 0.8:
logger.warning("Qualidade dos dados abaixo do threshold. Retreinamento cancelado.")
return False
# 3. Preprocessar dados
cleaned_data = clean_flight_data(new_data)
engineered_data = engineer_features(cleaned_data)
# 4. Dividir dados
X, y = prepare_features_target(engineered_data)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 5. Treinar novo modelo
new_model = train_final_model(X_train, y_train, best_params)
# 6. Avaliar novo modelo
new_metrics = evaluate_model(new_model, X_test, y_test, X_train, y_train)
# 7. Comparar com modelo atual
current_model = joblib.load(current_model_path)
current_metrics = evaluate_model(current_model, X_test, y_test, X_train, y_train)
# 8. Decidir se deve substituir
if new_metrics['test_accuracy'] > current_metrics['test_accuracy'] + 0.01:
# Salvar novo modelo
save_model_with_metadata(new_model, X.columns, new_metrics)
logger.info(f"Novo modelo salvo. Acurácia: {new_metrics['test_accuracy']:.4f}")
return True
else:
logger.info("Novo modelo não apresentou melhoria significativa. Mantendo modelo atual.")
return False
📊 Métricas de Negócio
💰 Impacto Financeiro
def calculate_business_impact(predictions, actual_results, cost_matrix):
"""Calcula impacto financeiro do modelo"""
# Matriz de custos
# cost_matrix = {
# 'true_positive': -50, # Evitou cancelamento (economia)
# 'false_positive': -200, # Cancelou voo desnecessariamente
# 'true_negative': 0, # Predição correta de não cancelamento
# 'false_negative': -1000 # Não previu cancelamento (prejuízo alto)
# }
impact = {
'total_cost': 0,
'predictions_analyzed': len(predictions),
'cost_breakdown': {
'true_positive': 0,
'false_positive': 0,
'true_negative': 0,
'false_negative': 0
}
}
for pred, actual in zip(predictions, actual_results):
if pred == 1 and actual == 1: # True Positive
impact['total_cost'] += cost_matrix['true_positive']
impact['cost_breakdown']['true_positive'] += 1
elif pred == 1 and actual == 0: # False Positive
impact['total_cost'] += cost_matrix['false_positive']
impact['cost_breakdown']['false_positive'] += 1
elif pred == 0 and actual == 0: # True Negative
impact['total_cost'] += cost_matrix['true_negative']
impact['cost_breakdown']['true_negative'] += 1
elif pred == 0 and actual == 1: # False Negative
impact['total_cost'] += cost_matrix['false_negative']
impact['cost_breakdown']['false_negative'] += 1
impact['average_cost_per_prediction'] = impact['total_cost'] / len(predictions)
return impact
📚 Próximos Passos
🔮 Melhorias Futuras
- 🤖 Modelos Ensemble: Combinar Decision Tree com Random Forest
- 🧠 Deep Learning: Testar redes neurais para capturar padrões complexos
- ⏱️ Time Series: Incorporar análise temporal mais sofisticada
- 🌦️ Dados Externos: Integrar APIs de clima em tempo real
- 🔄 Online Learning: Implementar aprendizado contínuo
- 🎯 Explicabilidade: Adicionar SHAP values para interpretação
📊 Monitoramento Avançado
- 📈 Drift Detection: Implementar detecção automática de drift
- ⚠️ Alertas: Sistema de alertas para degradação de performance
- 📊 Dashboards: Interface visual para métricas em tempo real
- 🔄 A/B Testing: Testes de modelos em produção
- 📝 Audit Trail: Rastreabilidade completa de mudanças
📞 Suporte
- 🏗️ Arquitetura ML - Visão geral da arquitetura
- 🧩 Componentes - Detalhes dos componentes
- 📊 Notebooks - Análise exploratória
- 🧪 Testes ML - Testes do pipeline
- 🐛 Issues