diff --git a/man/supervisord.conf b/man/supervisord.conf index 7b02a24..03bc5ca 100644 --- a/man/supervisord.conf +++ b/man/supervisord.conf @@ -13,7 +13,7 @@ user=root ; default user file = /tmp/supervisor.sock ; supervisor unix http server sock file [program:microservicios] -command=python3 flask.py +command=python3 app.py directory=/app stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..18a0d5d --- /dev/null +++ b/src/app.py @@ -0,0 +1,94 @@ +from flask import Flask, jsonify, request +import joblib +import os +from datetime import date + +from db import fetch_data +import threading +from model_registry import register, load_meta +from models_train import train_demand_model, train_nulo_model + +app = Flask(__name__) +MODEL_DIR = "models" + +def load_model(model_type): + meta = load_meta() + file = meta["current"].get(model_type) + if not file: + return None + return joblib.load(os.path.join(MODEL_DIR, file)) + +def background_train(): + print("Fetching data...") + df = fetch_data() + print(f"Data fetched: {len(df)} rows") + + today = date.today().isoformat() + os.makedirs(MODEL_DIR, exist_ok=True) + + print("Training demand model...") + demand_model = train_demand_model(df) + demand_file = f"demand_xgb_{today}.joblib" + joblib.dump(demand_model, f"{MODEL_DIR}/{demand_file}") + register("demand", demand_file, len(df)) + print("Demand model trained and saved.") + + print("Training nulo model...") + nulo_model = train_nulo_model(df) + nulo_file = f"nulo_xgb_{today}.joblib" + joblib.dump(nulo_model, f"{MODEL_DIR}/{nulo_file}") + register("nulo", nulo_file, len(df)) + print("Nulo model trained and saved.") + +@app.route("/train", methods=["POST"]) +def train(): + threading.Thread(target=background_train).start() + return jsonify({"status": "training started"}) + +@app.route("/predict_demand", methods=["GET"]) +def predict_demand(): + h3 = int(request.args["h3"]) + week = int(request.args["week"]) + dow = int(request.args["dow"]) + hour = int(request.args["hour"]) + + model = load_model("demand") + X = [[h3, week, dow, hour]] + pred = model.predict(X)[0] + + return jsonify({"expected_demand": float(pred)}) + +@app.route("/predict_nulo", methods=["GET"]) +def predict_nulo(): + h3 = int(request.args["h3"]) + week = int(request.args["week"]) + dow = int(request.args["dow"]) + hour = int(request.args["hour"]) + + model = load_model("nulo") + X = [[h3, week, dow, hour]] + prob = model.predict_proba(X)[0][1] + + return jsonify({"nulo_probability": float(prob)}) + +@app.route("/predict", methods=["GET"]) +def predict_legacy(): + h3 = int(request.args["h3"]) + week = int(request.args["week"]) + dow = int(request.args["dow"]) + hour = int(request.args["hour"]) + + model = load_model("nulo") + X = [[h3, week, dow, hour]] + prob = model.predict_proba(X)[0][1] + + return jsonify({ + "predicted_nulo_prob": float(prob) + }) + +@app.route("/models", methods=["GET"]) +def models(): + return jsonify(load_meta()) + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5000) diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..3c42442 --- /dev/null +++ b/src/db.py @@ -0,0 +1,37 @@ +from sqlalchemy import create_engine, text +from sqlalchemy.exc import SQLAlchemyError +import pandas as pd +import os + +# Configuración de la conexión (puedes usar variables de entorno) +DB_USER = os.getenv("DB_USER", "postgres") +DB_PASSWORD = os.getenv("DB_PASSWORD", "tfmuocdfcarvajal") +DB_HOST = os.getenv("DB_HOST", "10.10.5.32") +DB_PORT = os.getenv("DB_PORT", "5432") +DB_NAME = os.getenv("DB_NAME", "postgres") + +DATABASE_URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + +# Crear el engine de SQLAlchemy +engine = create_engine(DATABASE_URL, echo=False, future=True) + + +def fetch_data(): + query = """ + SELECT + h3, + week, + dow, + hour, + total_events, + total_nulos, + nulo_rate + FROM demanda_h3_hour_ml + """ + try: + with engine.connect() as conn: + df = pd.read_sql(text(query), conn) + return df + except SQLAlchemyError as e: + print(f"Error al ejecutar la consulta: {e}") + return pd.DataFrame() \ No newline at end of file diff --git a/src/db_warning.py b/src/db_warning.py new file mode 100644 index 0000000..663cd67 --- /dev/null +++ b/src/db_warning.py @@ -0,0 +1,28 @@ +import psycopg2 +import pandas as pd +import os + +DB_CONFIG = { + 'dbname': os.getenv('DB_NAME', 'postgres'), + 'user': os.getenv('DB_USER', 'postgres'), + 'password': os.getenv('DB_PASSWORD', 'tfmuocdfcarvajal'), + 'host': os.getenv('DB_HOST', '10.10.5.32'), + 'port': os.getenv('DB_PORT', '5432') +} + +def fetch_data(): + conn = psycopg2.connect(**DB_CONFIG) + query = """ + SELECT + h3, + week, + dow, + hour, + total_events, + total_nulos, + nulo_rate + FROM demanda_h3_hour_ml + """ + df = pd.read_sql(query, conn) + conn.close() + return df \ No newline at end of file diff --git a/src/flask.py b/src/flask.py deleted file mode 100644 index ca0b0a5..0000000 --- a/src/flask.py +++ /dev/null @@ -1,85 +0,0 @@ -from flask import Flask, jsonify, request -import os -import psycopg2 -import pandas as pd -import xgboost as xgb -import joblib - -app = Flask(__name__) - -# Configuración de la base de datos -DB_CONFIG = { - 'dbname': os.getenv('DB_NAME', 'postgres'), - 'user': os.getenv('DB_USER', 'postgres'), - 'password': os.getenv('DB_PASSWORD', 'tfmuocdfcarvajal'), - 'host': os.getenv('DB_HOST', '10.10.5.32'), - 'port': os.getenv('DB_PORT', '5432') -} - -# Ruta para entrenar o cargar el modelo -MODEL_PATH = 'modelo_xgb.joblib' - -def get_db_connection(): - conn = psycopg2.connect(**DB_CONFIG) - return conn - -def fetch_data(): - conn = get_db_connection() - query = """ - SELECT h3, hour, dow, total_events, total_nulos, nulo_rate - FROM demanda_h3_hour - """ - df = pd.read_sql(query, conn) - conn.close() - return df - -def train_model(df): - # Variables predictoras - X = df[['hour', 'dow', 'total_events']] - # Variable objetivo: nulo_rate > 0 -> 1, else 0 - y = (df['nulo_rate'] > 0).astype(int) - - model = xgb.XGBClassifier( - max_depth=4, n_estimators=100, learning_rate=0.1, use_label_encoder=False, eval_metric='logloss' - ) - model.fit(X, y) - joblib.dump(model, MODEL_PATH) - return model - -def load_model(): - try: - model = joblib.load(MODEL_PATH) - return model - except: - df = fetch_data() - return train_model(df) - -model = load_model() - -@app.route('/predict', methods=['GET']) -def predict(): - # Parámetros de la consulta - hour = int(request.args.get('hour')) - dow = int(request.args.get('dow')) - total_events = int(request.args.get('total_events', 1)) # valor por defecto si no se pasa - - X_pred = pd.DataFrame([[hour, dow, total_events]], columns=['hour', 'dow', 'total_events']) - prob = model.predict_proba(X_pred)[0][1] # Probabilidad de nulo - return jsonify({ - 'hour': hour, - 'dow': dow, - 'total_events': total_events, - 'predicted_nulo_prob': float(prob) - }) - -@app.route('/demand', methods=['GET']) -def demand(): - df = fetch_data() - X_pred = df[['hour', 'dow', 'total_events']] - df['predicted_nulo_prob'] = model.predict_proba(X_pred)[:,1].astype(float) - # Convertimos a JSON - result = df.to_dict(orient='records') - return jsonify(result) - -if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000) \ No newline at end of file diff --git a/src/model_registry.py b/src/model_registry.py new file mode 100644 index 0000000..6fa0f06 --- /dev/null +++ b/src/model_registry.py @@ -0,0 +1,30 @@ +import json +import os +from datetime import datetime + +MODEL_DIR = "models" +META_FILE = os.path.join(MODEL_DIR, "metadata.json") + +def load_meta(): + if not os.path.exists(META_FILE): + return {"current": {}, "history": []} + with open(META_FILE) as f: + return json.load(f) + +def save_meta(meta): + os.makedirs(MODEL_DIR, exist_ok=True) + with open(META_FILE, "w") as f: + json.dump(meta, f, indent=2) + +def register(model_type, file_name, rows): + meta = load_meta() + entry = { + "type": model_type, + "file": file_name, + "trained_at": datetime.utcnow().isoformat() + "Z", + "rows": rows + } + meta["history"].append(entry) + meta["current"][model_type] = file_name + save_meta(meta) + return entry diff --git a/src/models_train.py b/src/models_train.py new file mode 100644 index 0000000..509ed19 --- /dev/null +++ b/src/models_train.py @@ -0,0 +1,33 @@ +import xgboost as xgb + +def train_demand_model(df): + df["h3_int"] = df["h3"].apply(lambda x: int(x, 16)) + + X = df[["h3_int", "week", "dow", "hour"]] + y = df["total_events"] + + model = xgb.XGBRegressor( + n_estimators=300, + max_depth=8, + learning_rate=0.05, + objective="reg:squarederror" + ) + + model.fit(X, y) + return model + +def train_nulo_model(df): + df["h3_int"] = df["h3"].apply(lambda x: int(x, 16)) + + X = df[["h3_int", "week", "dow", "hour"]] + y = (df["nulo_rate"] > 0).astype(int) + + model = xgb.XGBClassifier( + n_estimators=200, + max_depth=5, + learning_rate=0.1, + eval_metric="logloss" + ) + + model.fit(X, y) + return model diff --git a/src/pyproject.toml b/src/pyproject.toml index 73f8372..3287afb 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -16,6 +16,7 @@ authors = [ dependencies = [ 'flask', 'psycopg2-binary', + 'sqlalchemy', 'pandas', 'xgboost', 'scikit-learn', @@ -24,7 +25,7 @@ dependencies = [ [tool.setuptools] include-package-data = true -py-modules = ["flask"] +py-modules = ["app", "db", "model_registry", "models_train"] [tool.setuptools.packages] find = {}