REFACTOR: Split code and dynamic models
All checks were successful
Build docker container / Build image (push) Successful in 11m15s

This commit is contained in:
2025-12-21 13:06:10 +01:00
parent ff51af276d
commit c25ec806a5
15 changed files with 827 additions and 108 deletions

View File

@@ -1,94 +1,182 @@
from flask import Flask, jsonify, request from flask import Flask, jsonify, request
import joblib
import os
from datetime import date
from db import fetch_data
import threading import threading
from model_registry import register, load_meta from datetime import datetime
from models_train import train_demand_model, train_nulo_model
from core.model_loader import ModelManager
from core.predictor import PredictionHandler
from core.model_trainer import ModelTrainer
from config.models_config import ModelConfig
app = Flask(__name__) app = Flask(__name__)
MODEL_DIR = "models"
def load_model(model_type): # Inicializar componentes
meta = load_meta() model_config = ModelConfig()
file = meta["current"].get(model_type) model_manager = ModelManager(model_config)
if not file: prediction_handler = PredictionHandler(model_manager, model_config)
return None model_trainer = ModelTrainer(model_manager, model_config)
return joblib.load(os.path.join(MODEL_DIR, file))
def background_train(): # Comprobar que las funciones estan disponibles en el código
print("Fetching data...") @app.before_first_request
df = fetch_data() def check_functions():
print(f"Data fetched: {len(df)} rows") """Verificar que todas las funciones existen"""
from utils.dynamic_loader import execute_function
today = date.today().isoformat() for model_type in model_config.get_all_model_types():
os.makedirs(MODEL_DIR, exist_ok=True) config = model_config.get_model_config(model_type)
print(f"Verificando modelo: {model_type}")
print(f" Módulo: {config.get('module')}")
print(f" Función entrenamiento: {config.get('train_function')}")
print(f" Función datos: {config.get('data_function')}")
print("Training demand model...") # Verificar que la función de entrenamiento existe
demand_model = train_demand_model(df) try:
demand_file = f"demand_xgb_{today}.joblib" module = importlib.import_module(config.get('module'))
joblib.dump(demand_model, f"{MODEL_DIR}/{demand_file}") if hasattr(module, config.get('train_function')):
register("demand", demand_file, len(df)) print(f" ✓ Función de entrenamiento encontrada")
print("Demand model trained and saved.") else:
print(f" ✗ Función de entrenamiento NO encontrada")
except Exception as e:
print(f" ✗ Error: {str(e)}")
print("Training nulo model...") # Inicializar modelos al arrancar
nulo_model = train_nulo_model(df) model_manager.init_models()
nulo_file = f"nulo_xgb_{today}.joblib"
joblib.dump(nulo_model, f"{MODEL_DIR}/{nulo_file}")
register("nulo", nulo_file, len(df))
print("Nulo model trained and saved.")
@app.route("/train", methods=["POST"]) @app.route("/train", methods=["POST"])
def train(): def train():
threading.Thread(target=background_train).start() """Entrenar modelos"""
return jsonify({"status": "training started"}) model_type = request.args.get('model_type')
@app.route("/predict_demand", methods=["GET"]) # Validar que el tipo de modelo existe
def predict_demand(): if model_type and not model_config.model_exists(model_type):
h3 = int(request.args["h3"]) return jsonify({
week = int(request.args["week"]) "error": f"Model type '{model_type}' not found",
dow = int(request.args["dow"]) "available_models": model_config.get_all_model_types()
hour = int(request.args["hour"]) }), 400
model = load_model("demand") threading.Thread(target=model_trainer.background_train, args=(model_type,)).start()
X = [[h3, week, dow, hour]]
pred = model.predict(X)[0]
return jsonify({"expected_demand": float(pred)})
@app.route("/predict_nulo", methods=["GET"])
def predict_nulo():
h3 = int(request.args["h3"])
week = int(request.args["week"])
dow = int(request.args["dow"])
hour = int(request.args["hour"])
model = load_model("nulo")
X = [[h3, week, dow, hour]]
prob = model.predict_proba(X)[0][1]
return jsonify({"nulo_probability": float(prob)})
@app.route("/predict", methods=["GET"])
def predict_legacy():
h3 = int(request.args["h3"])
week = int(request.args["week"])
dow = int(request.args["dow"])
hour = int(request.args["hour"])
model = load_model("nulo")
X = [[h3, week, dow, hour]]
prob = model.predict_proba(X)[0][1]
return jsonify({ return jsonify({
"predicted_nulo_prob": float(prob) "status": "training started",
"model_type": model_type or "all",
"timestamp": datetime.now().isoformat()
}) })
@app.route("/predict", methods=["GET"])
def predict():
"""Endpoint único para predicciones"""
return prediction_handler.handle_predict_request(request.args)
@app.route("/demand", methods=["GET"])
def demand():
"""Endpoint para obtener todas las predicciones"""
return prediction_handler.handle_demand_request(request.args)
@app.route("/models/register", methods=["POST"])
def register_new_model():
"""Registrar un nuevo tipo de modelo dinámicamente"""
data = request.get_json()
try:
new_config = model_config.register_model_type(data)
return jsonify({
"status": "model type registered",
"model_type": data["type"],
"config": new_config
})
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": f"Registration failed: {str(e)}"}), 500
@app.route("/models", methods=["GET"]) @app.route("/models", methods=["GET"])
def models(): def list_models():
return jsonify(load_meta()) """Listar todos los modelos disponibles"""
models_info = []
for model_type in model_config.get_all_model_types():
model = model_manager.get_model(model_type)
model_config_info = model_config.get_model_config(model_type)
model_info = {
"type": model_type,
"description": model_config_info.get("description", ""),
"loaded": model is not None,
"required_params": model_config_info.get("required_params", []),
"module": model_config_info.get("module"),
"train_function": model_config_info.get("train_function")
}
# Añadir información del archivo si existe
from model_registry import load_meta
meta = load_meta()
if model_type in meta.get("current", {}):
model_info["file"] = meta["current"][model_type]
models_info.append(model_info)
return jsonify({
"available_models": models_info,
"total": len(models_info)
})
@app.route("/health", methods=["GET"])
def health():
"""Endpoint de salud"""
loaded_models = model_manager.get_loaded_models_status()
all_models = model_config.get_all_model_types()
return jsonify({
"status": "healthy" if all(loaded_models.values()) else "partial",
"models_loaded": loaded_models,
"total_configured": len(all_models),
"loaded_count": sum(1 for v in loaded_models.values() if v)
})
@app.route("/", methods=["GET"])
def index():
"""Página principal con documentación"""
endpoints = [
{
"path": "/predict",
"method": "GET",
"description": "Realizar predicción",
"parameters": "model_type (required), otros según modelo"
},
{
"path": "/demand",
"method": "GET",
"description": "Obtener predicciones masivas",
"parameters": "model_type (required), limit (opcional)"
},
{
"path": "/train",
"method": "POST",
"description": "Entrenar modelos",
"parameters": "model_type (opcional)"
},
{
"path": "/models",
"method": "GET",
"description": "Listar modelos disponibles"
},
{
"path": "/models/register",
"method": "POST",
"description": "Registrar nuevo tipo de modelo"
},
{
"path": "/health",
"method": "GET",
"description": "Estado del sistema"
}
]
return jsonify({
"service": "Model Prediction API",
"version": "1.0.0",
"endpoints": endpoints,
"available_models": model_config.get_all_model_types()
})
if __name__ == "__main__": if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000) app.run(host="0.0.0.0", port=5000, debug=True)

0
src/config/__init__.py Normal file
View File

108
src/config/models_config.py Normal file
View File

@@ -0,0 +1,108 @@
import os
import json
from typing import Dict, Any, List
class ModelConfig:
"""Gestión de configuración de modelos"""
def __init__(self, config_dir: str = "models"):
self.config_dir = config_dir
self.config_file = os.path.join(config_dir, "models_config.json")
self._config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Cargar configuración desde archivo JSON"""
if not os.path.exists(self.config_file):
self._create_default_config()
with open(self.config_file) as f:
return json.load(f)
def _create_default_config(self):
"""Crear configuración por defecto si no existe"""
default_config = {
"model_types": {
"demand": {
"module": "models_train",
"train_function": "train_demand_model",
"data_function": "fetch_data",
"description": "Modelo de predicción de demanda",
"required_params": ["h3", "week", "dow", "hour"]
},
"nulo": {
"module": "models_train",
"train_function": "train_nulo_model",
"data_function": "fetch_data",
"description": "Modelo de predicción de nulos",
"required_params": ["h3", "week", "dow", "hour"]
},
"legacy": {
"module": "models_train",
"train_function": "train_legacy_model",
"data_function": "fetch_data_legacy",
"description": "Modelo legacy compatible",
"required_params": ["hour", "dow", "total_events"]
}
}
}
os.makedirs(self.config_dir, exist_ok=True)
with open(self.config_file, 'w') as f:
json.dump(default_config, f, indent=2)
def save_config(self):
"""Guardar configuración actual a disco"""
with open(self.config_file, 'w') as f:
json.dump(self._config, f, indent=2)
def get_all_model_types(self) -> List[str]:
"""Obtener todos los tipos de modelos disponibles"""
return list(self._config["model_types"].keys())
def model_exists(self, model_type: str) -> bool:
"""Verificar si un tipo de modelo existe"""
return model_type in self._config["model_types"]
def get_model_config(self, model_type: str) -> Dict[str, Any]:
"""Obtener configuración de un modelo específico"""
if not self.model_exists(model_type):
raise ValueError(f"Model type '{model_type}' not found")
return self._config["model_types"][model_type]
def register_model_type(self, model_data: Dict[str, Any]) -> Dict[str, Any]:
"""Registrar un nuevo tipo de modelo"""
required_fields = ["type", "module", "train_function", "data_function", "required_params"]
for field in required_fields:
if field not in model_data:
raise ValueError(f"Missing required field: {field}")
model_type = model_data["type"]
# Verificar si ya existe
if self.model_exists(model_type):
raise ValueError(f"Model type '{model_type}' already exists")
# Añadir nuevo modelo
self._config["model_types"][model_type] = {
"module": model_data["module"],
"train_function": model_data["train_function"],
"data_function": model_data["data_function"],
"description": model_data.get("description", ""),
"required_params": model_data["required_params"]
}
# Guardar configuración
self.save_config()
return self._config["model_types"][model_type]
def update_model_config(self, model_type: str, updates: Dict[str, Any]) -> Dict[str, Any]:
"""Actualizar configuración de un modelo existente"""
if not self.model_exists(model_type):
raise ValueError(f"Model type '{model_type}' not found")
self._config["model_types"][model_type].update(updates)
self.save_config()
return self._config["model_types"][model_type]

0
src/core/__init__.py Normal file
View File

114
src/core/model_loader.py Normal file
View File

@@ -0,0 +1,114 @@
import joblib
import os
from datetime import date
from typing import Dict, Any, Optional
from config.models_config import ModelConfig
from utils.dynamic_loader import execute_function
from model_registry import register, load_meta
class ModelManager:
"""Gestión de carga y cache de modelos"""
def __init__(self, model_config: ModelConfig):
self.model_config = model_config
self.models_cache: Dict[str, Any] = {}
self.model_dir = "models"
def init_models(self):
"""Inicializar todos los modelos configurados"""
for model_type in self.model_config.get_all_model_types():
try:
self.models_cache[model_type] = self._load_or_train_model(model_type)
print(f"✓ Modelo '{model_type}' cargado correctamente")
except Exception as e:
print(f"✗ Error cargando modelo '{model_type}': {str(e)}")
self.models_cache[model_type] = None
def _load_or_train_model(self, model_type: str):
"""Cargar o entrenar un modelo dinámicamente"""
config = self.model_config.get_model_config(model_type)
# Intentar cargar el modelo existente
model = self._load_model_from_disk(model_type)
if model is not None:
return model
# Si no existe, entrenar nuevo modelo
print(f"Modelo {model_type} no se encontro, creación del modelo...")
# Obtener datos
df = self._load_data_for_model(model_type, config)
# Entrenar modelo
model = self._train_model(model_type, config, df)
# Guardar modelo
filename = self._save_model(model_type, model, len(df))
return model
def _load_model_from_disk(self, model_type: str):
"""Cargar un modelo desde disco"""
meta = load_meta()
file = meta.get("current", {}).get(model_type)
if not file:
return None
model_path = os.path.join(self.model_dir, file)
if not os.path.exists(model_path):
return None
return joblib.load(model_path)
def _load_data_for_model(self, model_type: str, config: Dict[str, Any]):
"""Cargar datos para un modelo específico"""
from db import fetch_data, fetch_data_legacy
data_function = config.get("data_function")
if data_function == "fetch_data":
return fetch_data()
elif data_function == "fetch_data_legacy":
return fetch_data_legacy()
else:
# Ejecutar función personalizada
module_name = config.get("module", "models_train")
return execute_function(module_name, data_function)
def _train_model(self, model_type: str, config: Dict[str, Any], df):
"""Entrenar un modelo"""
module_name = config.get("module", "models_train")
train_function = config.get("train_function")
return execute_function(module_name, train_function, df)
def _save_model(self, model_type: str, model, rows: int) -> str:
"""Guardar modelo en disco"""
today = date.today().isoformat()
filename = f"{model_type}_xgb_{today}.joblib"
os.makedirs(self.model_dir, exist_ok=True)
joblib.dump(model, os.path.join(self.model_dir, filename))
# Registrar en metadata
register(model_type, filename, rows)
return filename
def get_model(self, model_type: str):
"""Obtener modelo del cache"""
return self.models_cache.get(model_type)
def reload_model(self, model_type: str):
"""Recargar un modelo específico"""
try:
self.models_cache[model_type] = self._load_or_train_model(model_type)
return True
except Exception as e:
print(f"Error recargando modelo '{model_type}': {str(e)}")
return False
def get_loaded_models_status(self) -> Dict[str, bool]:
"""Obtener estado de carga de todos los modelos"""
return {mtype: model is not None for mtype, model in self.models_cache.items()}

74
src/core/model_trainer.py Normal file
View File

@@ -0,0 +1,74 @@
import os
from datetime import date
import joblib
from config.models_config import ModelConfig
from core.model_loader import ModelManager
from utils.dynamic_loader import execute_function
from model_registry import register
from db import fetch_data, fetch_data_legacy
class ModelTrainer:
"""Gestión del entrenamiento de modelos"""
def __init__(self, model_manager: ModelManager, model_config: ModelConfig):
self.model_manager = model_manager
self.model_config = model_config
self.model_dir = "models"
def background_train(self, model_type=None):
"""Entrenar modelos en segundo plano"""
os.makedirs(self.model_dir, exist_ok=True)
today = date.today().isoformat()
# Determinar qué modelos entrenar
if model_type:
model_types = [model_type]
else:
model_types = self.model_config.get_all_model_types()
for mtype in model_types:
try:
self._train_single_model(mtype, today)
except Exception as e:
print(f"Error entrenando el modelo {mtype}: {str(e)}")
def _train_single_model(self, model_type: str, date_str: str):
"""Entrenar un solo modelo"""
print(f"Entrenando modelo {model_type} ...")
config = self.model_config.get_model_config(model_type)
# Obtener datos
df = self._get_training_data(config)
# Entrenar modelo
model = self._execute_training(config, df)
# Guardar modelo
filename = f"{model_type}_xgb_{date_str}.joblib"
joblib.dump(model, os.path.join(self.model_dir, filename))
# Registrar
register(model_type, filename, len(df))
# Actualizar cache
self.model_manager.models_cache[model_type] = model
print(f"{model_type} modelo entrenado y guardado.")
def _get_training_data(self, config: dict):
"""Obtener datos de entrenamiento"""
data_function = config.get("data_function")
if data_function == "fetch_data":
return fetch_data()
elif data_function == "fetch_data_legacy":
return fetch_data_legacy()
else:
module_name = config.get("module", "models_train")
return execute_function(module_name, data_function)
def _execute_training(self, config: dict, df):
"""Ejecutar función de entrenamiento"""
module_name = config.get("module", "models_train")
train_function = config.get("train_function")
return execute_function(module_name, train_function, df)

225
src/core/predictor.py Normal file
View File

@@ -0,0 +1,225 @@
import pandas as pd
from typing import Dict, Any
from flask import jsonify
from config.models_config import ModelConfig
from core.model_loader import ModelManager
from utils.validators import validate_predict_params
class PredictionHandler:
"""Manejador de predicciones"""
def __init__(self, model_manager: ModelManager, model_config: ModelConfig):
self.model_manager = model_manager
self.model_config = model_config
def handle_predict_request(self, request_args: Dict[str, Any]):
"""Manejar solicitud de predicción"""
model_type = request_args.get('model_type')
# Validaciones básicas
if not model_type:
return jsonify({
"error": "Es necesario indicar el modelo para usar",
"available_models": self.model_config.get_all_model_types()
}), 400
if not self.model_config.model_exists(model_type):
return jsonify({
"error": f"El modelo '{model_type}' no esta disponible",
"available_models": self.model_config.get_all_model_types()
}), 400
# Validar parámetros
params = {k: v for k, v in request_args.items() if k != 'model_type'}
validation_result = validate_predict_params(model_type, params, self.model_config)
if not validation_result["valid"]:
return jsonify({
"error": validation_result["message"],
"required_params": validation_result.get("required_params", [])
}), 400
# Obtener y validar modelo
model = self._get_valid_model(model_type)
if not model:
return jsonify({"error": f"El modelo '{model_type}' no se ha podido cargar"}), 500
try:
# Realizar predicción
result = self._make_prediction(model_type, model, params)
return jsonify(result)
except Exception as e:
return jsonify({
"error": f"Fallo en la predicción: {str(e)}",
"model_type": model_type,
"parameters": params
}), 500
def handle_demand_request(self, request_args: Dict[str, Any]):
"""Manejar solicitud de predicciones masivas"""
model_type = request_args.get('model_type')
limit = request_args.get('limit', type=int)
# Validaciones básicas
if not model_type:
return jsonify({
"error": "Es necesario indicar el modelo para usar",
"available_models": self.model_config.get_all_model_types()
}), 400
if not self.model_config.model_exists(model_type):
return jsonify({
"error": f"El modelo '{model_type}' no esta disponible",
"available_models": self.model_config.get_all_model_types()
}), 400
# Obtener y validar modelo
model = self._get_valid_model(model_type)
if not model:
return jsonify({"error": f"El modelo '{model_type}' no se ha podido cargar"}), 500
try:
# Obtener predicciones masivas
result = self._make_batch_predictions(model_type, model, limit)
return jsonify(result)
except Exception as e:
return jsonify({
"error": f"Fallo en la predicción masiva: {str(e)}",
"model_type": model_type
}), 500
def _get_valid_model(self, model_type: str):
"""Obtener y validar modelo"""
model = self.model_manager.get_model(model_type)
if model is None:
try:
model = self.model_manager._load_or_train_model(model_type)
self.model_manager.models_cache[model_type] = model
except Exception as e:
print(f"Error al cargar el modelo '{model_type}': {str(e)}")
return None
return model
def _make_prediction(self, model_type: str, model, params: Dict[str, Any]):
"""Realizar una predicción individual"""
config = self.model_config.get_model_config(model_type)
# Preparar características según el modelo
if model_type == "demand":
X = self._prepare_demand_features(params)
prediction = float(model.predict(X)[0])
result_key = "expected_demand"
elif model_type == "nulo":
X = self._prepare_nulo_features(params)
prediction = float(model.predict_proba(X)[0][1])
result_key = "nulo_probability"
elif model_type == "legacy":
X = self._prepare_legacy_features(params)
prediction = float(model.predict_proba(X)[0][1])
result_key = "predicted_nulo_prob"
else:
# Modelos personalizados añadido para futuros desarrollos
X = self._prepare_custom_features(params, config)
prediction = self._predict_custom_model(model, X)
result_key = "prediction"
return {
"model_type": model_type,
result_key: prediction,
"parameters": params,
"timestamp": pd.Timestamp.now().isoformat()
}
def _make_batch_predictions(self, model_type: str, model, limit: int = None):
"""Realizar predicciones masivas"""
from db import fetch_data, fetch_data_legacy
config = self.model_config.get_model_config(model_type)
data_function = config.get("data_function")
# Obtener datos
if data_function == "fetch_data":
df = fetch_data()
elif data_function == "fetch_data_legacy":
df = fetch_data_legacy()
else:
# Para modelos personalizados, necesitaríamos implementación específica "futuros desarrollos"
raise ValueError(f"No se pueden hacer predicciones masivas para modelos personalizados: {data_function}")
if limit:
df = df.head(limit)
# Preparar y realizar predicciones
if model_type == "demand":
df["h3_int"] = df["h3"].apply(lambda x: int(x, 16))
X = df[["h3_int", "week", "dow", "hour"]]
df['prediction'] = model.predict(X).astype(float)
elif model_type == "nulo":
df["h3_int"] = df["h3"].apply(lambda x: int(x, 16))
X = df[["h3_int", "week", "dow", "hour"]]
df['prediction'] = model.predict_proba(X)[:,1].astype(float)
elif model_type == "legacy":
X = df[['hour', 'dow', 'total_events']]
df['prediction'] = model.predict_proba(X)[:,1].astype(float)
# Convertir a JSON
result = df.to_dict(orient='records')
return {
"model_type": model_type,
"count": len(result),
"data": result
}
def _prepare_demand_features(self, params: Dict[str, Any]):
"""Preparar características para modelo de demanda"""
h3 = int(params['h3'])
week = int(params['week'])
dow = int(params['dow'])
hour = int(params['hour'])
return [[h3, week, dow, hour]]
def _prepare_nulo_features(self, params: Dict[str, Any]):
"""Preparar características para modelo de nulo"""
h3 = int(params['h3'])
week = int(params['week'])
dow = int(params['dow'])
hour = int(params['hour'])
return [[h3, week, dow, hour]]
def _prepare_legacy_features(self, params: Dict[str, Any]):
"""Preparar características para modelo legacy"""
hour = int(params['hour'])
dow = int(params['dow'])
total_events = int(params.get('total_events', 1))
return pd.DataFrame([[hour, dow, total_events]],
columns=['hour', 'dow', 'total_events'])
def _prepare_custom_features(self, params: Dict[str, Any], config: Dict[str, Any]):
"""Preparar características para modelos personalizados"""
X = []
for param in config.get("required_params", []):
if param in params:
X.append(float(params[param]))
return [X]
def _predict_custom_model(self, model, X):
"""Realizar predicción para modelo personalizado"""
try:
return float(model.predict(X)[0])
except:
# Intentar con predict_proba para clasificadores (Incluido para futuros desarrollos)
try:
return float(model.predict_proba(X)[0][1])
except:
return float(model.predict(X)[0])

View File

@@ -35,3 +35,16 @@ def fetch_data():
except SQLAlchemyError as e: except SQLAlchemyError as e:
print(f"Error al ejecutar la consulta: {e}") print(f"Error al ejecutar la consulta: {e}")
return pd.DataFrame() return pd.DataFrame()
def fetch_data_legacy():
query = """
SELECT h3, hour, dow, total_events, total_nulos, nulo_rate
FROM demanda_h3_hour
"""
try:
with engine.connect() as conn:
df = pd.read_sql(text(query), conn)
return df
except SQLAlchemyError as e:
print(f"Error al ejecutar la consulta: {e}")
return pd.DataFrame()

View File

@@ -1,28 +0,0 @@
import psycopg2
import pandas as pd
import os
DB_CONFIG = {
'dbname': os.getenv('DB_NAME', 'postgres'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', 'tfmuocdfcarvajal'),
'host': os.getenv('DB_HOST', '10.10.5.32'),
'port': os.getenv('DB_PORT', '5432')
}
def fetch_data():
conn = psycopg2.connect(**DB_CONFIG)
query = """
SELECT
h3,
week,
dow,
hour,
total_events,
total_nulos,
nulo_rate
FROM demanda_h3_hour_ml
"""
df = pd.read_sql(query, conn)
conn.close()
return df

View File

@@ -31,3 +31,67 @@ def train_nulo_model(df):
model.fit(X, y) model.fit(X, y)
return model return model
def train_legacy_model(df):
"""Entrenar modelo legacy"""
# Variables predictoras
X = df[['hour', 'dow', 'total_events']]
# Variable objetivo: nulo_rate > 0 -> 1, else 0
y = (df['nulo_rate'] > 0).astype(int)
model = xgb.XGBClassifier(
n_estimators=100,
max_depth=4,
learning_rate=0.1,
use_label_encoder=False,
eval_metric='logloss'
)
model.fit(X, y)
return model
def train_custom_demand_model(df):
"""Versión modelo personalizado de demanda"""
if df['h3'].dtype == 'object':
df["h3_int"] = df["h3"].apply(lambda x: int(x, 16))
else:
df["h3_int"] = df["h3"]
X = df[["h3_int", "week", "dow", "hour"]]
y = df["total_events"]
model = xgb.XGBRegressor(
n_estimators=500,
max_depth=10,
learning_rate=0.01,
subsample=0.8,
colsample_bytree=0.8,
objective="reg:squarederror"
)
model.fit(X, y)
return model
def train_custom_nulo_model(df):
"""Versión modelo personalizado de nulos"""
if df['h3'].dtype == 'object':
df["h3_int"] = df["h3"].apply(lambda x: int(x, 16))
else:
df["h3_int"] = df["h3"]
X = df[["h3_int", "week", "dow", "hour"]]
y = (df["nulo_rate"] > 0).astype(int)
model = xgb.XGBClassifier(
n_estimators=300,
max_depth=7,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
eval_metric="logloss",
use_label_encoder=False
)
model.fit(X, y)
return model

View File

@@ -25,10 +25,19 @@ dependencies = [
[tool.setuptools] [tool.setuptools]
include-package-data = true include-package-data = true
py-modules = ["app", "db", "model_registry", "models_train"]
[tool.setuptools.packages] packages = [
find = {} "config",
"core",
"utils"
]
py-modules = [
"app",
"db",
"model_registry",
"models_train"
]
[project.urls] [project.urls]
Documentation='https://https://gitea.tfmuocdfcarvajal.duckdns.org/TFM/microservicios_python' Documentation='https://https://gitea.tfmuocdfcarvajal.duckdns.org/TFM/microservicios_python'

7
src/requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
Flask
pandas
xgboost
scikit-learn
joblib
SQLAlchemy
psycopg2-binary

0
src/utils/__init__.py Normal file
View File

View File

@@ -0,0 +1,15 @@
import importlib
from typing import Any
def execute_function(module_name: str, function_name: str, *args, **kwargs) -> Any:
"""Ejecutar una función dinámicamente"""
try:
module = importlib.import_module(module_name)
function = getattr(module, function_name)
return function(*args, **kwargs)
except ImportError as e:
raise ValueError(f"Modulo '{module_name}' no esta disponible: {str(e)}")
except AttributeError as e:
raise ValueError(f"Función '{function_name}' no esta disponible en el módulo '{module_name}': {str(e)}")
except Exception as e:
raise ValueError(f"Error ejecutando la función '{function_name}': {str(e)}")

30
src/utils/validators.py Normal file
View File

@@ -0,0 +1,30 @@
from typing import Dict, Any, Tuple
def validate_predict_params(model_type: str, params: Dict[str, Any], model_config) -> Dict[str, Any]:
"""Validar parámetros para predicción"""
config = model_config.get_model_config(model_type)
required_params = config.get("required_params", [])
# Verificar parámetros requeridos
missing_params = [p for p in required_params if p not in params]
if missing_params:
return {
"valid": False,
"message": f"Faltan los siguientes parámetros: {', '.join(missing_params)}",
"required_params": required_params
}
# Validar tipos de datos básicos
for param in required_params:
try:
# Intentar convertir a número
float(params[param])
except ValueError:
return {
"valid": False,
"message": f"El parámetro '{param}' tiene que ser un número",
"invalid_param": param
}
return {"valid": True}