From c25ec806a57ac431000b27f5c86596210c4cd416 Mon Sep 17 00:00:00 2001 From: dfcarvajal Date: Sun, 21 Dec 2025 13:06:10 +0100 Subject: [PATCH] REFACTOR: Split code and dynamic models --- src/app.py | 242 ++++++++++++++++++++++++------------ src/config/__init__.py | 0 src/config/models_config.py | 108 ++++++++++++++++ src/core/__init__.py | 0 src/core/model_loader.py | 114 +++++++++++++++++ src/core/model_trainer.py | 74 +++++++++++ src/core/predictor.py | 225 +++++++++++++++++++++++++++++++++ src/db.py | 13 ++ src/db_warning.py | 28 ----- src/models_train.py | 64 ++++++++++ src/pyproject.toml | 15 ++- src/requirements.txt | 7 ++ src/utils/__init__.py | 0 src/utils/dynamic_loader.py | 15 +++ src/utils/validators.py | 30 +++++ 15 files changed, 827 insertions(+), 108 deletions(-) create mode 100644 src/config/__init__.py create mode 100644 src/config/models_config.py create mode 100644 src/core/__init__.py create mode 100644 src/core/model_loader.py create mode 100644 src/core/model_trainer.py create mode 100644 src/core/predictor.py delete mode 100644 src/db_warning.py create mode 100644 src/requirements.txt create mode 100644 src/utils/__init__.py create mode 100644 src/utils/dynamic_loader.py create mode 100644 src/utils/validators.py diff --git a/src/app.py b/src/app.py index 18a0d5d..d05d5f3 100644 --- a/src/app.py +++ b/src/app.py @@ -1,94 +1,182 @@ from flask import Flask, jsonify, request -import joblib -import os -from datetime import date - -from db import fetch_data import threading -from model_registry import register, load_meta -from models_train import train_demand_model, train_nulo_model +from datetime import datetime + +from core.model_loader import ModelManager +from core.predictor import PredictionHandler +from core.model_trainer import ModelTrainer +from config.models_config import ModelConfig app = Flask(__name__) -MODEL_DIR = "models" -def load_model(model_type): - meta = load_meta() - file = meta["current"].get(model_type) - if not file: - return None - return joblib.load(os.path.join(MODEL_DIR, file)) +# Inicializar componentes +model_config = ModelConfig() +model_manager = ModelManager(model_config) +prediction_handler = PredictionHandler(model_manager, model_config) +model_trainer = ModelTrainer(model_manager, model_config) -def background_train(): - print("Fetching data...") - df = fetch_data() - print(f"Data fetched: {len(df)} rows") +# Comprobar que las funciones estan disponibles en el código +@app.before_first_request +def check_functions(): + """Verificar que todas las funciones existen""" + from utils.dynamic_loader import execute_function - today = date.today().isoformat() - os.makedirs(MODEL_DIR, exist_ok=True) + for model_type in model_config.get_all_model_types(): + config = model_config.get_model_config(model_type) + print(f"Verificando modelo: {model_type}") + print(f" Módulo: {config.get('module')}") + print(f" Función entrenamiento: {config.get('train_function')}") + print(f" Función datos: {config.get('data_function')}") + + # Verificar que la función de entrenamiento existe + try: + module = importlib.import_module(config.get('module')) + if hasattr(module, config.get('train_function')): + print(f" ✓ Función de entrenamiento encontrada") + else: + print(f" ✗ Función de entrenamiento NO encontrada") + except Exception as e: + print(f" ✗ Error: {str(e)}") - print("Training demand model...") - demand_model = train_demand_model(df) - demand_file = f"demand_xgb_{today}.joblib" - joblib.dump(demand_model, f"{MODEL_DIR}/{demand_file}") - register("demand", demand_file, len(df)) - print("Demand model trained and saved.") - - print("Training nulo model...") - nulo_model = train_nulo_model(df) - nulo_file = f"nulo_xgb_{today}.joblib" - joblib.dump(nulo_model, f"{MODEL_DIR}/{nulo_file}") - register("nulo", nulo_file, len(df)) - print("Nulo model trained and saved.") +# Inicializar modelos al arrancar +model_manager.init_models() @app.route("/train", methods=["POST"]) def train(): - threading.Thread(target=background_train).start() - return jsonify({"status": "training started"}) - -@app.route("/predict_demand", methods=["GET"]) -def predict_demand(): - h3 = int(request.args["h3"]) - week = int(request.args["week"]) - dow = int(request.args["dow"]) - hour = int(request.args["hour"]) - - model = load_model("demand") - X = [[h3, week, dow, hour]] - pred = model.predict(X)[0] - - return jsonify({"expected_demand": float(pred)}) - -@app.route("/predict_nulo", methods=["GET"]) -def predict_nulo(): - h3 = int(request.args["h3"]) - week = int(request.args["week"]) - dow = int(request.args["dow"]) - hour = int(request.args["hour"]) - - model = load_model("nulo") - X = [[h3, week, dow, hour]] - prob = model.predict_proba(X)[0][1] - - return jsonify({"nulo_probability": float(prob)}) - -@app.route("/predict", methods=["GET"]) -def predict_legacy(): - h3 = int(request.args["h3"]) - week = int(request.args["week"]) - dow = int(request.args["dow"]) - hour = int(request.args["hour"]) - - model = load_model("nulo") - X = [[h3, week, dow, hour]] - prob = model.predict_proba(X)[0][1] - + """Entrenar modelos""" + model_type = request.args.get('model_type') + + # Validar que el tipo de modelo existe + if model_type and not model_config.model_exists(model_type): + return jsonify({ + "error": f"Model type '{model_type}' not found", + "available_models": model_config.get_all_model_types() + }), 400 + + threading.Thread(target=model_trainer.background_train, args=(model_type,)).start() + return jsonify({ - "predicted_nulo_prob": float(prob) + "status": "training started", + "model_type": model_type or "all", + "timestamp": datetime.now().isoformat() }) +@app.route("/predict", methods=["GET"]) +def predict(): + """Endpoint único para predicciones""" + return prediction_handler.handle_predict_request(request.args) + +@app.route("/demand", methods=["GET"]) +def demand(): + """Endpoint para obtener todas las predicciones""" + return prediction_handler.handle_demand_request(request.args) + +@app.route("/models/register", methods=["POST"]) +def register_new_model(): + """Registrar un nuevo tipo de modelo dinámicamente""" + data = request.get_json() + + try: + new_config = model_config.register_model_type(data) + return jsonify({ + "status": "model type registered", + "model_type": data["type"], + "config": new_config + }) + except ValueError as e: + return jsonify({"error": str(e)}), 400 + except Exception as e: + return jsonify({"error": f"Registration failed: {str(e)}"}), 500 + @app.route("/models", methods=["GET"]) -def models(): - return jsonify(load_meta()) +def list_models(): + """Listar todos los modelos disponibles""" + models_info = [] + + for model_type in model_config.get_all_model_types(): + model = model_manager.get_model(model_type) + model_config_info = model_config.get_model_config(model_type) + + model_info = { + "type": model_type, + "description": model_config_info.get("description", ""), + "loaded": model is not None, + "required_params": model_config_info.get("required_params", []), + "module": model_config_info.get("module"), + "train_function": model_config_info.get("train_function") + } + + # Añadir información del archivo si existe + from model_registry import load_meta + meta = load_meta() + if model_type in meta.get("current", {}): + model_info["file"] = meta["current"][model_type] + + models_info.append(model_info) + + return jsonify({ + "available_models": models_info, + "total": len(models_info) + }) + +@app.route("/health", methods=["GET"]) +def health(): + """Endpoint de salud""" + loaded_models = model_manager.get_loaded_models_status() + all_models = model_config.get_all_model_types() + + return jsonify({ + "status": "healthy" if all(loaded_models.values()) else "partial", + "models_loaded": loaded_models, + "total_configured": len(all_models), + "loaded_count": sum(1 for v in loaded_models.values() if v) + }) + +@app.route("/", methods=["GET"]) +def index(): + """Página principal con documentación""" + endpoints = [ + { + "path": "/predict", + "method": "GET", + "description": "Realizar predicción", + "parameters": "model_type (required), otros según modelo" + }, + { + "path": "/demand", + "method": "GET", + "description": "Obtener predicciones masivas", + "parameters": "model_type (required), limit (opcional)" + }, + { + "path": "/train", + "method": "POST", + "description": "Entrenar modelos", + "parameters": "model_type (opcional)" + }, + { + "path": "/models", + "method": "GET", + "description": "Listar modelos disponibles" + }, + { + "path": "/models/register", + "method": "POST", + "description": "Registrar nuevo tipo de modelo" + }, + { + "path": "/health", + "method": "GET", + "description": "Estado del sistema" + } + ] + + return jsonify({ + "service": "Model Prediction API", + "version": "1.0.0", + "endpoints": endpoints, + "available_models": model_config.get_all_model_types() + }) if __name__ == "__main__": - app.run(host="0.0.0.0", port=5000) + app.run(host="0.0.0.0", port=5000, debug=True) \ No newline at end of file diff --git a/src/config/__init__.py b/src/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config/models_config.py b/src/config/models_config.py new file mode 100644 index 0000000..80bda9a --- /dev/null +++ b/src/config/models_config.py @@ -0,0 +1,108 @@ +import os +import json +from typing import Dict, Any, List + +class ModelConfig: + """Gestión de configuración de modelos""" + + def __init__(self, config_dir: str = "models"): + self.config_dir = config_dir + self.config_file = os.path.join(config_dir, "models_config.json") + self._config = self._load_config() + + def _load_config(self) -> Dict[str, Any]: + """Cargar configuración desde archivo JSON""" + if not os.path.exists(self.config_file): + self._create_default_config() + + with open(self.config_file) as f: + return json.load(f) + + def _create_default_config(self): + """Crear configuración por defecto si no existe""" + default_config = { + "model_types": { + "demand": { + "module": "models_train", + "train_function": "train_demand_model", + "data_function": "fetch_data", + "description": "Modelo de predicción de demanda", + "required_params": ["h3", "week", "dow", "hour"] + }, + "nulo": { + "module": "models_train", + "train_function": "train_nulo_model", + "data_function": "fetch_data", + "description": "Modelo de predicción de nulos", + "required_params": ["h3", "week", "dow", "hour"] + }, + "legacy": { + "module": "models_train", + "train_function": "train_legacy_model", + "data_function": "fetch_data_legacy", + "description": "Modelo legacy compatible", + "required_params": ["hour", "dow", "total_events"] + } + } + } + + os.makedirs(self.config_dir, exist_ok=True) + with open(self.config_file, 'w') as f: + json.dump(default_config, f, indent=2) + + def save_config(self): + """Guardar configuración actual a disco""" + with open(self.config_file, 'w') as f: + json.dump(self._config, f, indent=2) + + def get_all_model_types(self) -> List[str]: + """Obtener todos los tipos de modelos disponibles""" + return list(self._config["model_types"].keys()) + + def model_exists(self, model_type: str) -> bool: + """Verificar si un tipo de modelo existe""" + return model_type in self._config["model_types"] + + def get_model_config(self, model_type: str) -> Dict[str, Any]: + """Obtener configuración de un modelo específico""" + if not self.model_exists(model_type): + raise ValueError(f"Model type '{model_type}' not found") + return self._config["model_types"][model_type] + + def register_model_type(self, model_data: Dict[str, Any]) -> Dict[str, Any]: + """Registrar un nuevo tipo de modelo""" + required_fields = ["type", "module", "train_function", "data_function", "required_params"] + + for field in required_fields: + if field not in model_data: + raise ValueError(f"Missing required field: {field}") + + model_type = model_data["type"] + + # Verificar si ya existe + if self.model_exists(model_type): + raise ValueError(f"Model type '{model_type}' already exists") + + # Añadir nuevo modelo + self._config["model_types"][model_type] = { + "module": model_data["module"], + "train_function": model_data["train_function"], + "data_function": model_data["data_function"], + "description": model_data.get("description", ""), + "required_params": model_data["required_params"] + } + + # Guardar configuración + self.save_config() + + return self._config["model_types"][model_type] + + def update_model_config(self, model_type: str, updates: Dict[str, Any]) -> Dict[str, Any]: + """Actualizar configuración de un modelo existente""" + if not self.model_exists(model_type): + raise ValueError(f"Model type '{model_type}' not found") + + self._config["model_types"][model_type].update(updates) + self.save_config() + + return self._config["model_types"][model_type] \ No newline at end of file diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/model_loader.py b/src/core/model_loader.py new file mode 100644 index 0000000..5cbb3b5 --- /dev/null +++ b/src/core/model_loader.py @@ -0,0 +1,114 @@ +import joblib +import os +from datetime import date +from typing import Dict, Any, Optional + +from config.models_config import ModelConfig +from utils.dynamic_loader import execute_function +from model_registry import register, load_meta + +class ModelManager: + """Gestión de carga y cache de modelos""" + + def __init__(self, model_config: ModelConfig): + self.model_config = model_config + self.models_cache: Dict[str, Any] = {} + self.model_dir = "models" + + def init_models(self): + """Inicializar todos los modelos configurados""" + for model_type in self.model_config.get_all_model_types(): + try: + self.models_cache[model_type] = self._load_or_train_model(model_type) + print(f"✓ Modelo '{model_type}' cargado correctamente") + except Exception as e: + print(f"✗ Error cargando modelo '{model_type}': {str(e)}") + self.models_cache[model_type] = None + + def _load_or_train_model(self, model_type: str): + """Cargar o entrenar un modelo dinámicamente""" + config = self.model_config.get_model_config(model_type) + + # Intentar cargar el modelo existente + model = self._load_model_from_disk(model_type) + if model is not None: + return model + + # Si no existe, entrenar nuevo modelo + print(f"Modelo {model_type} no se encontro, creación del modelo...") + + # Obtener datos + df = self._load_data_for_model(model_type, config) + + # Entrenar modelo + model = self._train_model(model_type, config, df) + + # Guardar modelo + filename = self._save_model(model_type, model, len(df)) + + return model + + def _load_model_from_disk(self, model_type: str): + """Cargar un modelo desde disco""" + meta = load_meta() + file = meta.get("current", {}).get(model_type) + + if not file: + return None + + model_path = os.path.join(self.model_dir, file) + if not os.path.exists(model_path): + return None + + return joblib.load(model_path) + + def _load_data_for_model(self, model_type: str, config: Dict[str, Any]): + """Cargar datos para un modelo específico""" + from db import fetch_data, fetch_data_legacy + + data_function = config.get("data_function") + + if data_function == "fetch_data": + return fetch_data() + elif data_function == "fetch_data_legacy": + return fetch_data_legacy() + else: + # Ejecutar función personalizada + module_name = config.get("module", "models_train") + return execute_function(module_name, data_function) + + def _train_model(self, model_type: str, config: Dict[str, Any], df): + """Entrenar un modelo""" + module_name = config.get("module", "models_train") + train_function = config.get("train_function") + return execute_function(module_name, train_function, df) + + def _save_model(self, model_type: str, model, rows: int) -> str: + """Guardar modelo en disco""" + today = date.today().isoformat() + filename = f"{model_type}_xgb_{today}.joblib" + + os.makedirs(self.model_dir, exist_ok=True) + joblib.dump(model, os.path.join(self.model_dir, filename)) + + # Registrar en metadata + register(model_type, filename, rows) + + return filename + + def get_model(self, model_type: str): + """Obtener modelo del cache""" + return self.models_cache.get(model_type) + + def reload_model(self, model_type: str): + """Recargar un modelo específico""" + try: + self.models_cache[model_type] = self._load_or_train_model(model_type) + return True + except Exception as e: + print(f"Error recargando modelo '{model_type}': {str(e)}") + return False + + def get_loaded_models_status(self) -> Dict[str, bool]: + """Obtener estado de carga de todos los modelos""" + return {mtype: model is not None for mtype, model in self.models_cache.items()} \ No newline at end of file diff --git a/src/core/model_trainer.py b/src/core/model_trainer.py new file mode 100644 index 0000000..c301c9a --- /dev/null +++ b/src/core/model_trainer.py @@ -0,0 +1,74 @@ +import os +from datetime import date +import joblib + +from config.models_config import ModelConfig +from core.model_loader import ModelManager +from utils.dynamic_loader import execute_function +from model_registry import register +from db import fetch_data, fetch_data_legacy + +class ModelTrainer: + """Gestión del entrenamiento de modelos""" + + def __init__(self, model_manager: ModelManager, model_config: ModelConfig): + self.model_manager = model_manager + self.model_config = model_config + self.model_dir = "models" + + def background_train(self, model_type=None): + """Entrenar modelos en segundo plano""" + os.makedirs(self.model_dir, exist_ok=True) + today = date.today().isoformat() + + # Determinar qué modelos entrenar + if model_type: + model_types = [model_type] + else: + model_types = self.model_config.get_all_model_types() + + for mtype in model_types: + try: + self._train_single_model(mtype, today) + except Exception as e: + print(f"Error entrenando el modelo {mtype}: {str(e)}") + + def _train_single_model(self, model_type: str, date_str: str): + """Entrenar un solo modelo""" + print(f"Entrenando modelo {model_type} ...") + config = self.model_config.get_model_config(model_type) + + # Obtener datos + df = self._get_training_data(config) + + # Entrenar modelo + model = self._execute_training(config, df) + + # Guardar modelo + filename = f"{model_type}_xgb_{date_str}.joblib" + joblib.dump(model, os.path.join(self.model_dir, filename)) + + # Registrar + register(model_type, filename, len(df)) + + # Actualizar cache + self.model_manager.models_cache[model_type] = model + print(f"✓ {model_type} modelo entrenado y guardado.") + + def _get_training_data(self, config: dict): + """Obtener datos de entrenamiento""" + data_function = config.get("data_function") + + if data_function == "fetch_data": + return fetch_data() + elif data_function == "fetch_data_legacy": + return fetch_data_legacy() + else: + module_name = config.get("module", "models_train") + return execute_function(module_name, data_function) + + def _execute_training(self, config: dict, df): + """Ejecutar función de entrenamiento""" + module_name = config.get("module", "models_train") + train_function = config.get("train_function") + return execute_function(module_name, train_function, df) \ No newline at end of file diff --git a/src/core/predictor.py b/src/core/predictor.py new file mode 100644 index 0000000..52471d2 --- /dev/null +++ b/src/core/predictor.py @@ -0,0 +1,225 @@ +import pandas as pd +from typing import Dict, Any +from flask import jsonify + +from config.models_config import ModelConfig +from core.model_loader import ModelManager +from utils.validators import validate_predict_params + +class PredictionHandler: + """Manejador de predicciones""" + + def __init__(self, model_manager: ModelManager, model_config: ModelConfig): + self.model_manager = model_manager + self.model_config = model_config + + def handle_predict_request(self, request_args: Dict[str, Any]): + """Manejar solicitud de predicción""" + model_type = request_args.get('model_type') + + # Validaciones básicas + if not model_type: + return jsonify({ + "error": "Es necesario indicar el modelo para usar", + "available_models": self.model_config.get_all_model_types() + }), 400 + + if not self.model_config.model_exists(model_type): + return jsonify({ + "error": f"El modelo '{model_type}' no esta disponible", + "available_models": self.model_config.get_all_model_types() + }), 400 + + # Validar parámetros + params = {k: v for k, v in request_args.items() if k != 'model_type'} + validation_result = validate_predict_params(model_type, params, self.model_config) + + if not validation_result["valid"]: + return jsonify({ + "error": validation_result["message"], + "required_params": validation_result.get("required_params", []) + }), 400 + + # Obtener y validar modelo + model = self._get_valid_model(model_type) + if not model: + return jsonify({"error": f"El modelo '{model_type}' no se ha podido cargar"}), 500 + + try: + # Realizar predicción + result = self._make_prediction(model_type, model, params) + return jsonify(result) + + except Exception as e: + return jsonify({ + "error": f"Fallo en la predicción: {str(e)}", + "model_type": model_type, + "parameters": params + }), 500 + + def handle_demand_request(self, request_args: Dict[str, Any]): + """Manejar solicitud de predicciones masivas""" + model_type = request_args.get('model_type') + limit = request_args.get('limit', type=int) + + # Validaciones básicas + if not model_type: + return jsonify({ + "error": "Es necesario indicar el modelo para usar", + "available_models": self.model_config.get_all_model_types() + }), 400 + + if not self.model_config.model_exists(model_type): + return jsonify({ + "error": f"El modelo '{model_type}' no esta disponible", + "available_models": self.model_config.get_all_model_types() + }), 400 + + # Obtener y validar modelo + model = self._get_valid_model(model_type) + if not model: + return jsonify({"error": f"El modelo '{model_type}' no se ha podido cargar"}), 500 + + try: + # Obtener predicciones masivas + result = self._make_batch_predictions(model_type, model, limit) + return jsonify(result) + + except Exception as e: + return jsonify({ + "error": f"Fallo en la predicción masiva: {str(e)}", + "model_type": model_type + }), 500 + + def _get_valid_model(self, model_type: str): + """Obtener y validar modelo""" + model = self.model_manager.get_model(model_type) + + if model is None: + try: + model = self.model_manager._load_or_train_model(model_type) + self.model_manager.models_cache[model_type] = model + except Exception as e: + print(f"Error al cargar el modelo '{model_type}': {str(e)}") + return None + + return model + + def _make_prediction(self, model_type: str, model, params: Dict[str, Any]): + """Realizar una predicción individual""" + config = self.model_config.get_model_config(model_type) + + # Preparar características según el modelo + if model_type == "demand": + X = self._prepare_demand_features(params) + prediction = float(model.predict(X)[0]) + result_key = "expected_demand" + + elif model_type == "nulo": + X = self._prepare_nulo_features(params) + prediction = float(model.predict_proba(X)[0][1]) + result_key = "nulo_probability" + + elif model_type == "legacy": + X = self._prepare_legacy_features(params) + prediction = float(model.predict_proba(X)[0][1]) + result_key = "predicted_nulo_prob" + + else: + # Modelos personalizados añadido para futuros desarrollos + X = self._prepare_custom_features(params, config) + prediction = self._predict_custom_model(model, X) + result_key = "prediction" + + return { + "model_type": model_type, + result_key: prediction, + "parameters": params, + "timestamp": pd.Timestamp.now().isoformat() + } + + def _make_batch_predictions(self, model_type: str, model, limit: int = None): + """Realizar predicciones masivas""" + from db import fetch_data, fetch_data_legacy + + config = self.model_config.get_model_config(model_type) + data_function = config.get("data_function") + + # Obtener datos + if data_function == "fetch_data": + df = fetch_data() + elif data_function == "fetch_data_legacy": + df = fetch_data_legacy() + else: + # Para modelos personalizados, necesitaríamos implementación específica "futuros desarrollos" + raise ValueError(f"No se pueden hacer predicciones masivas para modelos personalizados: {data_function}") + + if limit: + df = df.head(limit) + + # Preparar y realizar predicciones + if model_type == "demand": + df["h3_int"] = df["h3"].apply(lambda x: int(x, 16)) + X = df[["h3_int", "week", "dow", "hour"]] + df['prediction'] = model.predict(X).astype(float) + + elif model_type == "nulo": + df["h3_int"] = df["h3"].apply(lambda x: int(x, 16)) + X = df[["h3_int", "week", "dow", "hour"]] + df['prediction'] = model.predict_proba(X)[:,1].astype(float) + + elif model_type == "legacy": + X = df[['hour', 'dow', 'total_events']] + df['prediction'] = model.predict_proba(X)[:,1].astype(float) + + # Convertir a JSON + result = df.to_dict(orient='records') + + return { + "model_type": model_type, + "count": len(result), + "data": result + } + + def _prepare_demand_features(self, params: Dict[str, Any]): + """Preparar características para modelo de demanda""" + h3 = int(params['h3']) + week = int(params['week']) + dow = int(params['dow']) + hour = int(params['hour']) + return [[h3, week, dow, hour]] + + def _prepare_nulo_features(self, params: Dict[str, Any]): + """Preparar características para modelo de nulo""" + h3 = int(params['h3']) + week = int(params['week']) + dow = int(params['dow']) + hour = int(params['hour']) + return [[h3, week, dow, hour]] + + def _prepare_legacy_features(self, params: Dict[str, Any]): + """Preparar características para modelo legacy""" + hour = int(params['hour']) + dow = int(params['dow']) + total_events = int(params.get('total_events', 1)) + return pd.DataFrame([[hour, dow, total_events]], + columns=['hour', 'dow', 'total_events']) + + def _prepare_custom_features(self, params: Dict[str, Any], config: Dict[str, Any]): + """Preparar características para modelos personalizados""" + X = [] + for param in config.get("required_params", []): + if param in params: + X.append(float(params[param])) + return [X] + + def _predict_custom_model(self, model, X): + """Realizar predicción para modelo personalizado""" + try: + return float(model.predict(X)[0]) + except: + # Intentar con predict_proba para clasificadores (Incluido para futuros desarrollos) + try: + return float(model.predict_proba(X)[0][1]) + except: + return float(model.predict(X)[0]) \ No newline at end of file diff --git a/src/db.py b/src/db.py index 3c42442..d7d36fe 100644 --- a/src/db.py +++ b/src/db.py @@ -28,6 +28,19 @@ def fetch_data(): nulo_rate FROM demanda_h3_hour_ml """ + try: + with engine.connect() as conn: + df = pd.read_sql(text(query), conn) + return df + except SQLAlchemyError as e: + print(f"Error al ejecutar la consulta: {e}") + return pd.DataFrame() + +def fetch_data_legacy(): + query = """ + SELECT h3, hour, dow, total_events, total_nulos, nulo_rate + FROM demanda_h3_hour + """ try: with engine.connect() as conn: df = pd.read_sql(text(query), conn) diff --git a/src/db_warning.py b/src/db_warning.py deleted file mode 100644 index 663cd67..0000000 --- a/src/db_warning.py +++ /dev/null @@ -1,28 +0,0 @@ -import psycopg2 -import pandas as pd -import os - -DB_CONFIG = { - 'dbname': os.getenv('DB_NAME', 'postgres'), - 'user': os.getenv('DB_USER', 'postgres'), - 'password': os.getenv('DB_PASSWORD', 'tfmuocdfcarvajal'), - 'host': os.getenv('DB_HOST', '10.10.5.32'), - 'port': os.getenv('DB_PORT', '5432') -} - -def fetch_data(): - conn = psycopg2.connect(**DB_CONFIG) - query = """ - SELECT - h3, - week, - dow, - hour, - total_events, - total_nulos, - nulo_rate - FROM demanda_h3_hour_ml - """ - df = pd.read_sql(query, conn) - conn.close() - return df \ No newline at end of file diff --git a/src/models_train.py b/src/models_train.py index 509ed19..3a6bf84 100644 --- a/src/models_train.py +++ b/src/models_train.py @@ -31,3 +31,67 @@ def train_nulo_model(df): model.fit(X, y) return model + +def train_legacy_model(df): + """Entrenar modelo legacy""" + # Variables predictoras + X = df[['hour', 'dow', 'total_events']] + + # Variable objetivo: nulo_rate > 0 -> 1, else 0 + y = (df['nulo_rate'] > 0).astype(int) + + model = xgb.XGBClassifier( + n_estimators=100, + max_depth=4, + learning_rate=0.1, + use_label_encoder=False, + eval_metric='logloss' + ) + + model.fit(X, y) + return model + +def train_custom_demand_model(df): + """Versión modelo personalizado de demanda""" + if df['h3'].dtype == 'object': + df["h3_int"] = df["h3"].apply(lambda x: int(x, 16)) + else: + df["h3_int"] = df["h3"] + + X = df[["h3_int", "week", "dow", "hour"]] + y = df["total_events"] + + model = xgb.XGBRegressor( + n_estimators=500, + max_depth=10, + learning_rate=0.01, + subsample=0.8, + colsample_bytree=0.8, + objective="reg:squarederror" + ) + + model.fit(X, y) + return model + +def train_custom_nulo_model(df): + """Versión modelo personalizado de nulos""" + if df['h3'].dtype == 'object': + df["h3_int"] = df["h3"].apply(lambda x: int(x, 16)) + else: + df["h3_int"] = df["h3"] + + X = df[["h3_int", "week", "dow", "hour"]] + y = (df["nulo_rate"] > 0).astype(int) + + model = xgb.XGBClassifier( + n_estimators=300, + max_depth=7, + learning_rate=0.05, + subsample=0.8, + colsample_bytree=0.8, + eval_metric="logloss", + use_label_encoder=False + ) + + model.fit(X, y) + return model \ No newline at end of file diff --git a/src/pyproject.toml b/src/pyproject.toml index 3287afb..c5e2e85 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -25,10 +25,19 @@ dependencies = [ [tool.setuptools] include-package-data = true -py-modules = ["app", "db", "model_registry", "models_train"] -[tool.setuptools.packages] -find = {} +packages = [ + "config", + "core", + "utils" +] + +py-modules = [ + "app", + "db", + "model_registry", + "models_train" +] [project.urls] Documentation='https://https://gitea.tfmuocdfcarvajal.duckdns.org/TFM/microservicios_python' \ No newline at end of file diff --git a/src/requirements.txt b/src/requirements.txt new file mode 100644 index 0000000..ce16ca4 --- /dev/null +++ b/src/requirements.txt @@ -0,0 +1,7 @@ +Flask +pandas +xgboost +scikit-learn +joblib +SQLAlchemy +psycopg2-binary \ No newline at end of file diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/dynamic_loader.py b/src/utils/dynamic_loader.py new file mode 100644 index 0000000..e3897cc --- /dev/null +++ b/src/utils/dynamic_loader.py @@ -0,0 +1,15 @@ +import importlib +from typing import Any + +def execute_function(module_name: str, function_name: str, *args, **kwargs) -> Any: + """Ejecutar una función dinámicamente""" + try: + module = importlib.import_module(module_name) + function = getattr(module, function_name) + return function(*args, **kwargs) + except ImportError as e: + raise ValueError(f"Modulo '{module_name}' no esta disponible: {str(e)}") + except AttributeError as e: + raise ValueError(f"Función '{function_name}' no esta disponible en el módulo '{module_name}': {str(e)}") + except Exception as e: + raise ValueError(f"Error ejecutando la función '{function_name}': {str(e)}") \ No newline at end of file diff --git a/src/utils/validators.py b/src/utils/validators.py new file mode 100644 index 0000000..acf56dc --- /dev/null +++ b/src/utils/validators.py @@ -0,0 +1,30 @@ +from typing import Dict, Any, Tuple + +def validate_predict_params(model_type: str, params: Dict[str, Any], model_config) -> Dict[str, Any]: + """Validar parámetros para predicción""" + config = model_config.get_model_config(model_type) + required_params = config.get("required_params", []) + + # Verificar parámetros requeridos + missing_params = [p for p in required_params if p not in params] + + if missing_params: + return { + "valid": False, + "message": f"Faltan los siguientes parámetros: {', '.join(missing_params)}", + "required_params": required_params + } + + # Validar tipos de datos básicos + for param in required_params: + try: + # Intentar convertir a número + float(params[param]) + except ValueError: + return { + "valid": False, + "message": f"El parámetro '{param}' tiene que ser un número", + "invalid_param": param + } + + return {"valid": True} \ No newline at end of file