ADD: Placement

This commit is contained in:
2025-12-21 15:37:47 +01:00
parent 7320798c29
commit 0a8342b041
5 changed files with 595 additions and 3 deletions

View File

@@ -7,6 +7,7 @@ from dotenv import load_dotenv
from core.model_loader import ModelManager
from core.predictor import PredictionHandler
from core.model_trainer import ModelTrainer
from core.placement import build_prediction_grid, place_gruas, optimize_gruas_placement
from config.models_config import ModelConfig
app = Flask(__name__)
@@ -155,6 +156,247 @@ def list_models():
"total": len(models_info)
})
@app.route("/recommend", methods=["GET"])
def recommend_gruas():
"""
Recomendar ubicaciones para grúas.
Parámetros:
- week: semana del año (requerido)
- dow: día de la semana (0-6, requerido)
- hour: hora del día (0-23, requerido)
- k: número de grúas (opcional, default=2)
- optimize: si es True, encuentra k óptimo (opcional, default=False)
- min_gruas: mínimo de grúas para optimización (opcional, default=1)
- max_gruas: máximo de grúas para optimización (opcional, default=10)
- target_coverage: cobertura objetivo % (opcional, default=80.0)
"""
try:
# Obtener parámetros
week = int(request.args["week"])
dow = int(request.args["dow"])
hour = int(request.args["hour"])
k = request.args.get("k", default=2, type=int)
optimize = request.args.get("optimize", default="false").lower() == "true"
min_gruas = request.args.get("min_gruas", default=1, type=int)
max_gruas = request.args.get("max_gruas", default=10, type=int)
target_coverage = request.args.get("target_coverage", default=80.0, type=float)
# Validar parámetros
if not (0 <= dow <= 6):
return jsonify({"error": "dow must be between 0 and 6"}), 400
if not (0 <= hour <= 23):
return jsonify({"error": "hour must be between 0 and 23"}), 400
if k < 1:
return jsonify({"error": "k must be at least 1"}), 400
# Obtener modelos
demand_model = model_manager.get_model("demand")
nulo_model = model_manager.get_model("nulo")
if demand_model is None or nulo_model is None:
return jsonify({
"error": "Required models not loaded. Please train demand and nulo models first."
}), 500
if DEBUG_MODE:
print(f"DEBUG: Building prediction grid for week={week}, dow={dow}, hour={hour}")
# Construir grilla de predicciones
df = build_prediction_grid(week, dow, hour, demand_model, nulo_model)
if df.empty:
return jsonify({
"error": "No prediction data available for the specified parameters"
}), 404
if DEBUG_MODE:
print(f"DEBUG: Prediction grid built with {len(df)} cells")
print(f"DEBUG: Data sample:\n{df.head()}")
# Ejecutar algoritmo de colocación
if optimize:
if DEBUG_MODE:
print(f"DEBUG: Running optimization with min={min_gruas}, max={max_gruas}, target={target_coverage}%")
result = optimize_gruas_placement(
df,
min_gruas=min_gruas,
max_gruas=max_gruas,
target_coverage=target_coverage
)
if "error" in result:
return jsonify(result), 500
response = {
"week": week,
"dow": dow,
"hour": hour,
"optimization": {
"optimal_k": result['optimal_k'],
"coverage_percentage": result['coverage_percentage'],
"risk_coverage_percentage": result['risk_coverage_percentage'],
"target_coverage": target_coverage
},
"recommended_h3": result['selected_cells'],
"statistics": result.get('placement', {}).get('statistics', []),
"coverage": result.get('placement', {}).get('coverage', {})
}
# Incluir todos los resultados si está en modo debug
if DEBUG_MODE:
response['debug'] = {
'all_results': result.get('all_results', [])
}
else:
if DEBUG_MODE:
print(f"DEBUG: Running placement with k={k}")
result = place_gruas(df, k=k)
response = {
"week": week,
"dow": dow,
"hour": hour,
"parameters": {
"k": k,
"optimization": optimize
},
"recommended_h3": result['selected'],
"statistics": result.get('statistics', []),
"coverage": result.get('coverage', {})
}
return jsonify(response)
except KeyError as e:
return jsonify({"error": f"Missing required parameter: {str(e)}"}), 400
except ValueError as e:
return jsonify({"error": f"Invalid parameter value: {str(e)}"}), 400
except Exception as e:
if DEBUG_MODE:
import traceback
print(f"ERROR in recommend_gruas: {str(e)}")
traceback.print_exc()
return jsonify({
"error": f"Internal server error: {str(e)}"
}), 500
@app.route("/coverage/radius", methods=["GET"])
def get_coverage_radius():
"""
Calcular radio de cobertura para una demanda específica.
Parámetros:
- demand: demanda esperada (requerido)
"""
try:
demand = float(request.args["demand"])
from algorithms.placement import coverage_radius, get_coverage_weight
radius = coverage_radius(demand)
weight = get_coverage_weight(demand)
# Determinar nivel de demanda
if demand < 2:
level = "baja"
elif demand < 5:
level = "media"
else:
level = "alta"
return jsonify({
"demand": demand,
"demand_level": level,
"coverage_radius": radius,
"coverage_weight": weight,
"explanation": f"Para demanda {level} ({demand:.2f}), el radio de cobertura es {radius}"
})
except (KeyError, ValueError) as e:
return jsonify({"error": f"Invalid parameter: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": f"Internal error: {str(e)}"}), 500
@app.route("/coverage/cells", methods=["GET"])
def get_coverage_cells():
"""
Obtener celdas cubiertas por una celda H3 con un radio específico.
Parámetros:
- h3: celda H3 (requerido)
- radius: radio de cobertura (opcional, default calculado según demanda)
- demand: demanda para calcular radio automático (opcional)
"""
try:
h3_cell = request.args["h3"]
from algorithms.placement import covered_cells, coverage_radius
# Determinar radio
if "radius" in request.args:
radius = int(request.args["radius"])
elif "demand" in request.args:
demand = float(request.args["demand"])
radius = coverage_radius(demand)
else:
return jsonify({
"error": "Either radius or demand parameter is required"
}), 400
# Calcular celdas cubiertas
cells = covered_cells(h3_cell, radius)
return jsonify({
"h3": h3_cell,
"radius": radius,
"cells_covered": len(cells),
"covered_cells": list(cells)[:100], # Limitar para no sobrecargar respuesta
"truncated": len(cells) > 100
})
except (KeyError, ValueError) as e:
return jsonify({"error": f"Invalid parameter: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": f"Internal error: {str(e)}"}), 500
@app.route("/risk/calculate", methods=["GET"])
def calculate_risk_endpoint():
"""
Calcular riesgo para una combinación de demanda y probabilidad de nulo.
Parámetros:
- demand: demanda esperada (requerido)
- nulo_prob: probabilidad de nulo (requerido, 0-1)
"""
try:
demand = float(request.args["demand"])
nulo_prob = float(request.args["nulo_prob"])
from algorithms.placement import calculate_risk
if not 0 <= nulo_prob <= 1:
return jsonify({"error": "nulo_prob must be between 0 and 1"}), 400
risk = calculate_risk(demand, nulo_prob)
return jsonify({
"demand": demand,
"nulo_probability": nulo_prob,
"risk": risk,
"explanation": f"Riesgo = demanda × probabilidad_nulo = {demand} × {nulo_prob} = {risk:.4f}"
})
except (KeyError, ValueError) as e:
return jsonify({"error": f"Invalid parameter: {str(e)}"}), 400
except Exception as e:
return jsonify({"error": f"Internal error: {str(e)}"}), 500
@app.route("/health", methods=["GET"])
def health():
"""Endpoint de salud"""
@@ -253,6 +495,30 @@ def index():
"description": "Entrenar modelos",
"parameters": "model_type (opcional)"
},
{
"path": "/recommend",
"method": "GET",
"description": "Recomendar ubicaciones para grúas",
"parameters": "week, dow, hour (requeridos), k, optimize, min_gruas, max_gruas, target_coverage (opcionales)"
},
{
"path": "/coverage/radius",
"method": "GET",
"description": "Calcular radio de cobertura según demanda",
"parameters": "demand (requerido)"
},
{
"path": "/coverage/cells",
"method": "GET",
"description": "Obtener celdas cubiertas por una celda H3",
"parameters": "h3 (requerido), radius o demand (opcional)"
},
{
"path": "/risk/calculate",
"method": "GET",
"description": "Calcular riesgo (demanda × probabilidad nulo)",
"parameters": "demand, nulo_prob (requeridos)"
},
{
"path": "/models",
"method": "GET",

267
src/core/placement.py Normal file
View File

@@ -0,0 +1,267 @@
import h3
import pandas as pd
from typing import List, Set, Dict, Tuple
import numpy as np
def coverage_radius(expected_demand: float) -> int:
"""
Determina el radio de cobertura según la demanda esperada, como medimos anteriormente
Nivel demanda | Radio H3
---------------|----------
baja (<2) | 18
media (2-5) | 12
alta (≥5) | 6
"""
if expected_demand < 2:
return 18
elif expected_demand < 5:
return 12
else:
return 6
def get_coverage_weight(expected_demand: float) -> float:
"""
Calcula un peso para la cobertura basado en la demanda.
Demanda más alta = mayor peso.
"""
if expected_demand < 2:
return 1.0
elif expected_demand < 5:
return 2.0
else:
return 3.0
def covered_cells(h3_cell: str, radius: int) -> Set[str]:
"""Obtiene todas las celdas H3 cubiertas por un radio dado."""
try:
return set(h3.k_ring(h3_cell, radius))
except Exception as e:
print(f"Error calculando cobertura para celda {h3_cell}: {e}")
return {h3_cell}
def calculate_risk(expected_demand: float, nulo_probability: float) -> float:
"""
Calcula el riesgo como producto de demanda y probabilidad de nulo.
Esto prioriza áreas con alta demanda y alta probabilidad de nulos.
"""
return expected_demand * nulo_probability
def calculate_coverage_score(cell: str, radius: int, df: pd.DataFrame) -> float:
"""
Calcula el puntaje de cobertura para una celda específica.
Considera tanto el riesgo como el peso de la demanda.
"""
covered = covered_cells(cell, radius)
covered_df = df[df['h3'].isin(covered)]
if covered_df.empty:
return 0
# Suma ponderada del riesgo
total_risk = covered_df['risk'].sum()
# Peso adicional por demanda alta
demand_weight = covered_df['coverage_weight'].sum()
return total_risk * (1 + 0.1 * demand_weight)
def build_prediction_grid(week: int, dow: int, hour: int,
demand_model, nulo_model) -> pd.DataFrame:
"""
Construye una cuadrícula de predicciones para todas las celdas H3.
"""
# Obtener todas las celdas H3 únicas de la base de datos
from db import get_all_h3_cells
h3_cells = get_all_h3_cells()
predictions = []
for h3_cell in h3_cells:
try:
# Convertir H3 a entero para el modelo
h3_int = int(h3_cell, 16)
# Predecir demanda
X_demand = [[h3_int, week, dow, hour]]
expected_demand = float(demand_model.predict(X_demand)[0])
# Predecir probabilidad de nulo
X_nulo = [[h3_int, week, dow, hour]]
nulo_prob = float(nulo_model.predict_proba(X_nulo)[0][1])
# Calcular riesgo
risk = calculate_risk(expected_demand, nulo_prob)
# Determinar radio de cobertura y peso
radius = coverage_radius(expected_demand)
coverage_weight = get_coverage_weight(expected_demand)
predictions.append({
'h3': h3_cell,
'h3_int': h3_int,
'week': week,
'dow': dow,
'hour': hour,
'expected_demand': expected_demand,
'nulo_probability': nulo_prob,
'risk': risk,
'coverage_radius': radius,
'coverage_weight': coverage_weight
})
except Exception as e:
print(f"Error procesando celda {h3_cell}: {e}")
continue
return pd.DataFrame(predictions)
def place_gruas(df: pd.DataFrame, k: int = 2,
max_iterations: int = 100) -> Dict:
"""
Algoritmo de colocación de grúas greedy.
Args:
df: DataFrame con predicciones por celda H3
k: número de grúas a colocar
max_iterations: máximo de iteraciones para evitar loops infinitos
Returns:
Diccionario con las grúas seleccionadas y estadísticas
"""
if df.empty:
return {"selected": [], "coverage": 0, "message": "No data available"}
# Asegurar que tenemos columnas necesarias
required_cols = ['h3', 'expected_demand', 'risk', 'coverage_radius']
for col in required_cols:
if col not in df.columns:
raise ValueError(f"DataFrame missing required column: {col}")
uncovered = df.copy()
selected_cells = []
coverage_stats = []
total_initial_risk = df['risk'].sum()
iteration = 0
while len(selected_cells) < k and iteration < max_iterations and not uncovered.empty:
iteration += 1
best_cell = None
best_score = -1
best_coverage = set()
# Evaluar cada celda no cubierta como candidata
for _, row in uncovered.iterrows():
current_cell = row['h3']
radius = row['coverage_radius']
# Calcular celdas cubiertas
covered = covered_cells(current_cell, radius)
# Calcular puntaje de cobertura
score = calculate_coverage_score(current_cell, radius, uncovered)
if score > best_score:
best_score = score
best_cell = current_cell
best_coverage = covered
if best_cell is None:
break
# Añadir a seleccionadas
selected_cells.append(best_cell)
# Calcular estadísticas para esta grúa
grua_stats = {
'h3': best_cell,
'expected_demand': float(uncovered.loc[uncovered['h3'] == best_cell, 'expected_demand'].iloc[0]),
'coverage_radius': int(uncovered.loc[uncovered['h3'] == best_cell, 'coverage_radius'].iloc[0]),
'nulo_probability': float(uncovered.loc[uncovered['h3'] == best_cell, 'nulo_probability'].iloc[0]),
'cells_covered': len(best_coverage),
'risk_covered': float(uncovered[uncovered['h3'].isin(best_coverage)]['risk'].sum())
}
coverage_stats.append(grua_stats)
# Eliminar celdas cubiertas
uncovered = uncovered[~uncovered['h3'].isin(best_coverage)]
# Calcular cobertura total
covered_by_all = set()
for cell in selected_cells:
cell_data = df[df['h3'] == cell].iloc[0]
covered = covered_cells(cell, cell_data['coverage_radius'])
covered_by_all.update(covered)
total_cells_covered = len(covered_by_all)
total_cells = len(df)
coverage_percentage = (total_cells_covered / total_cells * 100) if total_cells > 0 else 0
total_risk_covered = df[df['h3'].isin(covered_by_all)]['risk'].sum()
risk_coverage_percentage = (total_risk_covered / total_initial_risk * 100) if total_initial_risk > 0 else 0
return {
'selected': selected_cells,
'statistics': coverage_stats,
'coverage': {
'cells_covered': total_cells_covered,
'total_cells': total_cells,
'coverage_percentage': round(coverage_percentage, 2),
'risk_covered': round(float(total_risk_covered), 4),
'total_risk': round(float(total_initial_risk), 4),
'risk_coverage_percentage': round(risk_coverage_percentage, 2)
},
'parameters': {
'k': k,
'iterations': iteration
}
}
def optimize_gruas_placement(df: pd.DataFrame, min_gruas: int = 1,
max_gruas: int = 10,
target_coverage: float = 80.0) -> Dict:
"""
Encuentra el número óptimo de grúas para alcanzar una cobertura objetivo.
Args:
df: DataFrame con predicciones
min_gruas: mínimo número de grúas
max_gruas: máximo número de grúas
target_coverage: porcentaje de cobertura objetivo
Returns:
Resultado con número óptimo de grúas
"""
results = []
for k in range(min_gruas, max_gruas + 1):
placement = place_gruas(df, k=k)
results.append({
'k': k,
'coverage_percentage': placement['coverage']['coverage_percentage'],
'risk_coverage_percentage': placement['coverage']['risk_coverage_percentage'],
'selected_cells': placement['selected'],
'placement': placement
})
# Si alcanzamos la cobertura objetivo, podemos detenernos
if placement['coverage']['coverage_percentage'] >= target_coverage:
break
# Encontrar el mejor balance
if not results:
return {"error": "No results generated"}
# Ordenar por cobertura de riesgo (prioridad) y luego por número de grúas (menos es mejor)
results.sort(key=lambda x: (-x['risk_coverage_percentage'], x['k']))
best = results[0]
return {
'optimal_k': best['k'],
'coverage_percentage': best['coverage_percentage'],
'risk_coverage_percentage': best['risk_coverage_percentage'],
'selected_cells': best['selected_cells'],
'all_results': results
}

View File

@@ -48,3 +48,60 @@ def fetch_data_legacy():
except SQLAlchemyError as e:
print(f"Error al ejecutar la consulta: {e}")
return pd.DataFrame()
def get_all_h3_cells() -> list:
"""Obtener todas las celdas H3 únicas de la base de datos"""
query = """
SELECT DISTINCT h3
FROM demanda_h3_hour_ml
"""
try:
with engine.connect() as conn:
df = pd.read_sql(text(query), conn)
return df['h3'].tolist() if not df.empty else []
except SQLAlchemyError as e:
print(f"Error obteniendo celdas H3: {e}")
return []
def get_h3_cells_by_area(min_lat: float = None, max_lat: float = None,
min_lon: float = None, max_lon: float = None) -> list:
"""
Obtener celdas H3 dentro de un área geográfica específica.
Nota: Esta función requiere que la tabla tenga columnas lat/lon
o que se pueda calcular H3 a partir de coordenadas.
"""
# Implementación básica - ajustar según tu esquema de datos
query = """
SELECT DISTINCT h3
FROM demanda_h3_hour_ml
"""
conditions = []
params = {}
if min_lat:
conditions.append("latitude >= :min_lat")
params['min_lat'] = min_lat
if max_lat:
conditions.append("latitude <= :max_lat")
params['max_lat'] = max_lat
if min_lon:
conditions.append("longitude >= :min_lon")
params['min_lon'] = min_lon
if max_lon:
conditions.append("longitude <= :max_lon")
params['max_lon'] = max_lon
if conditions:
query += " AND " + " AND ".join(conditions)
try:
with engine.connect() as conn:
df = pd.read_sql(text(query), conn, params=params)
return df['h3'].tolist() if not df.empty else []
except SQLAlchemyError as e:
print(f"Error obteniendo celdas H3 por área: {e}")
return []

View File

@@ -21,7 +21,8 @@ dependencies = [
'xgboost',
'scikit-learn',
'joblib',
'python-dotenv'
'python-dotenv',
'h3'
]
[tool.setuptools]

View File

@@ -6,3 +6,4 @@ joblib
SQLAlchemy
psycopg2-binary
python-dotenv
h3