diff --git a/src/core/placement.py b/src/core/placement.py index bc9b0d5..7d84850 100644 --- a/src/core/placement.py +++ b/src/core/placement.py @@ -35,7 +35,22 @@ def get_coverage_weight(expected_demand: float) -> float: def covered_cells(h3_cell: str, radius: int) -> Set[str]: """Obtiene todas las celdas H3 cubiertas por un radio dado.""" try: - return set(h3.k_ring(h3_cell, radius)) + # H3 >= 4.x (API strings) + try: + import h3.api.basic_str as h3_api + return set(h3_api.grid_disk(h3_cell, radius)) + except ImportError: + pass + + # H3 3.x + try: + import h3 + return set(h3.k_ring(h3_cell, radius)) + except Exception: + pass + + raise RuntimeError("API H3 no compatible") + except Exception as e: print(f"Error calculando cobertura para celda {h3_cell}: {e}") return {h3_cell} @@ -67,9 +82,12 @@ def calculate_coverage_score(cell: str, radius: int, df: pd.DataFrame) -> float: return total_risk * (1 + 0.1 * demand_weight) def build_prediction_grid(week: int, dow: int, hour: int, - demand_model, nulo_model) -> pd.DataFrame: + demand_model, nulo_model, h3_resolution: int = 8) -> pd.DataFrame: """ - Construye una cuadrícula de predicciones para todas las celdas H3. + Construye una cuadricula de predicciones para todas las celdas H3. + + Args: + h3_resolution: Resolución H3 (default=8 para áreas urbanas) """ # Obtener todas las celdas H3 únicas de la base de datos from db import get_all_h3_cells @@ -80,7 +98,13 @@ def build_prediction_grid(week: int, dow: int, hour: int, for h3_cell in h3_cells: try: # Convertir H3 a entero para el modelo - h3_int = int(h3_cell, 16) + # Asegurar que sea un string hexadecimal válido + if isinstance(h3_cell, str): + h3_int = int(h3_cell, 16) + else: + # Si ya es numérico o otro tipo + h3_str = str(h3_cell) + h3_int = int(h3_str, 16) if h3_str.startswith('8') else int(h3_str) # Predecir demanda X_demand = [[h3_int, week, dow, hour]] @@ -98,7 +122,7 @@ def build_prediction_grid(week: int, dow: int, hour: int, coverage_weight = get_coverage_weight(expected_demand) predictions.append({ - 'h3': h3_cell, + 'h3': str(h3_cell), # Asegurar que es string 'h3_int': h3_int, 'week': week, 'dow': dow, @@ -112,12 +136,20 @@ def build_prediction_grid(week: int, dow: int, hour: int, except Exception as e: print(f"Error procesando celda {h3_cell}: {e}") + if DEBUG_MODE: + import traceback + traceback.print_exc() continue - return pd.DataFrame(predictions) + df_result = pd.DataFrame(predictions) + + if df_result.empty: + print(f"Advertencia: No se generaron predicciones para week={week}, dow={dow}, hour={hour}") + + return df_result def place_gruas(df: pd.DataFrame, k: int = 2, - max_iterations: int = 100) -> Dict: + max_iterations: int = 100, debug: bool = False) -> Dict: """ Algoritmo de colocación de grúas greedy. @@ -125,6 +157,7 @@ def place_gruas(df: pd.DataFrame, k: int = 2, df: DataFrame con predicciones por celda H3 k: número de grúas a colocar max_iterations: máximo de iteraciones para evitar loops infinitos + debug: modo depuración para imprimir información Returns: Diccionario con las grúas seleccionadas y estadísticas @@ -138,60 +171,87 @@ def place_gruas(df: pd.DataFrame, k: int = 2, if col not in df.columns: raise ValueError(f"DataFrame missing required column: {col}") - uncovered = df.copy() + # Hacer copia para trabajar + working_df = df.copy() selected_cells = [] coverage_stats = [] total_initial_risk = df['risk'].sum() + if debug: + print(f"Iniciando colocación de {k} grúas") + print(f"Total de celdas: {len(working_df)}") + print(f"Riesgo total inicial: {total_initial_risk:.4f}") + iteration = 0 - while len(selected_cells) < k and iteration < max_iterations and not uncovered.empty: + while len(selected_cells) < k and iteration < max_iterations and not working_df.empty: iteration += 1 best_cell = None best_score = -1 best_coverage = set() + best_radius = 0 + + if debug: + print(f"\nIteración {iteration}: Evaluando {len(working_df)} celdas candidatas") # Evaluar cada celda no cubierta como candidata - for _, row in uncovered.iterrows(): + for idx, row in working_df.iterrows(): current_cell = row['h3'] - radius = row['coverage_radius'] + radius = int(row['coverage_radius']) # Calcular celdas cubiertas covered = covered_cells(current_cell, radius) # Calcular puntaje de cobertura - score = calculate_coverage_score(current_cell, radius, uncovered) + score = calculate_coverage_score(current_cell, radius, working_df) if score > best_score: best_score = score best_cell = current_cell best_coverage = covered + best_radius = radius if best_cell is None: + if debug: + print("No se encontró una mejor celda, terminando...") break # Añadir a seleccionadas selected_cells.append(best_cell) + if debug: + print(f"Seleccionada celda: {best_cell} con radio {best_radius}") + print(f"Puntaje: {best_score:.4f}, Celdas cubiertas: {len(best_coverage)}") + + # Obtener datos de la celda seleccionada + cell_data = df[df['h3'] == best_cell].iloc[0] + # Calcular estadísticas para esta grúa grua_stats = { 'h3': best_cell, - 'expected_demand': float(uncovered.loc[uncovered['h3'] == best_cell, 'expected_demand'].iloc[0]), - 'coverage_radius': int(uncovered.loc[uncovered['h3'] == best_cell, 'coverage_radius'].iloc[0]), - 'nulo_probability': float(uncovered.loc[uncovered['h3'] == best_cell, 'nulo_probability'].iloc[0]), + 'expected_demand': float(cell_data['expected_demand']), + 'coverage_radius': int(cell_data['coverage_radius']), + 'nulo_probability': float(cell_data['nulo_probability']), + 'risk': float(cell_data['risk']), 'cells_covered': len(best_coverage), - 'risk_covered': float(uncovered[uncovered['h3'].isin(best_coverage)]['risk'].sum()) + 'risk_covered': float(working_df[working_df['h3'].isin(best_coverage)]['risk'].sum()) } coverage_stats.append(grua_stats) # Eliminar celdas cubiertas - uncovered = uncovered[~uncovered['h3'].isin(best_coverage)] + rows_before = len(working_df) + working_df = working_df[~working_df['h3'].isin(best_coverage)] + rows_after = len(working_df) + + if debug: + print(f"Celdas eliminadas: {rows_before - rows_after}, Quedan: {rows_after}") - # Calcular cobertura total + # Calcular cobertura total usando el DataFrame original covered_by_all = set() for cell in selected_cells: - cell_data = df[df['h3'] == cell].iloc[0] - covered = covered_cells(cell, cell_data['coverage_radius']) - covered_by_all.update(covered) + if cell in df['h3'].values: + cell_data = df[df['h3'] == cell].iloc[0] + covered = covered_cells(cell, int(cell_data['coverage_radius'])) + covered_by_all.update(covered) total_cells_covered = len(covered_by_all) total_cells = len(df) @@ -200,7 +260,7 @@ def place_gruas(df: pd.DataFrame, k: int = 2, total_risk_covered = df[df['h3'].isin(covered_by_all)]['risk'].sum() risk_coverage_percentage = (total_risk_covered / total_initial_risk * 100) if total_initial_risk > 0 else 0 - return { + result = { 'selected': selected_cells, 'statistics': coverage_stats, 'coverage': { @@ -213,13 +273,24 @@ def place_gruas(df: pd.DataFrame, k: int = 2, }, 'parameters': { 'k': k, - 'iterations': iteration + 'iterations': iteration, + 'requested_k': k, + 'actual_k': len(selected_cells) } } + + if debug: + print(f"\nResultado final:") + print(f"Grúas colocadas: {len(selected_cells)} de {k} solicitadas") + print(f"Cobertura: {coverage_percentage:.2f}% de celdas") + print(f"Cobertura de riesgo: {risk_coverage_percentage:.2f}%") + + return result def optimize_gruas_placement(df: pd.DataFrame, min_gruas: int = 1, max_gruas: int = 10, - target_coverage: float = 80.0) -> Dict: + target_coverage: float = 80.0, + debug: bool = False) -> Dict: """ Encuentra el número óptimo de grúas para alcanzar una cobertura objetivo. @@ -228,25 +299,45 @@ def optimize_gruas_placement(df: pd.DataFrame, min_gruas: int = 1, min_gruas: mínimo número de grúas max_gruas: máximo número de grúas target_coverage: porcentaje de cobertura objetivo + debug: modo depuración Returns: Resultado con número óptimo de grúas """ + if df.empty: + return {"error": "No data available for optimization"} + results = [] + if debug: + print(f"\nIniciando optimización:") + print(f"Rango de grúas: {min_gruas} a {max_gruas}") + print(f"Cobertura objetivo: {target_coverage}%") + for k in range(min_gruas, max_gruas + 1): - placement = place_gruas(df, k=k) + if debug: + print(f"\nProbando k = {k}...") - results.append({ + placement = place_gruas(df, k=k, debug=debug) + + result_entry = { 'k': k, 'coverage_percentage': placement['coverage']['coverage_percentage'], 'risk_coverage_percentage': placement['coverage']['risk_coverage_percentage'], 'selected_cells': placement['selected'], 'placement': placement - }) + } + + results.append(result_entry) + + if debug: + print(f" Cobertura: {result_entry['coverage_percentage']:.2f}%") + print(f" Cobertura riesgo: {result_entry['risk_coverage_percentage']:.2f}%") # Si alcanzamos la cobertura objetivo, podemos detenernos if placement['coverage']['coverage_percentage'] >= target_coverage: + if debug: + print(f"¡Objetivo alcanzado! Cobertura: {placement['coverage']['coverage_percentage']:.2f}%") break # Encontrar el mejor balance @@ -258,10 +349,58 @@ def optimize_gruas_placement(df: pd.DataFrame, min_gruas: int = 1, best = results[0] + # Verificar si el mejor resultado tiene suficiente cobertura + if best['coverage_percentage'] < 50 and debug: + print(f"Advertencia: La mejor solución solo cubre {best['coverage_percentage']:.2f}%") + return { 'optimal_k': best['k'], 'coverage_percentage': best['coverage_percentage'], 'risk_coverage_percentage': best['risk_coverage_percentage'], 'selected_cells': best['selected_cells'], - 'all_results': results - } \ No newline at end of file + 'all_results': results[:10] # Limitar a 10 resultados para no hacer la respuesta muy grande + } + +# Función auxiliar para detectar versión de H3 +def get_h3_version_info() -> Dict: + """Obtener información sobre la versión de H3 instalada.""" + try: + import h3 + version_info = { + 'version': h3.__version__ if hasattr(h3, '__version__') else 'unknown', + 'available_functions': [f for f in dir(h3) if not f.startswith('_')], + 'api_type': 'standard' + } + + # Verificar funciones específicas + test_cell = "88390cb29dfffff" + test_radius = 1 + + # Probar diferentes APIs + try: + # Probar API nueva + import h3.api.numpy_int as h3_new + cells = h3_new.grid_disk(test_cell, test_radius) + version_info['api_detected'] = 'numpy_int' + version_info['working_function'] = 'grid_disk' + except: + try: + # Probar k_ring directo + cells = h3.k_ring(test_cell, test_radius) + version_info['api_detected'] = 'standard_kring' + version_info['working_function'] = 'k_ring' + except AttributeError: + try: + # Probar API antigua + from h3 import h3 as h3_old + cells = h3_old.k_ring(test_cell, test_radius) + version_info['api_detected'] = 'old_api' + version_info['working_function'] = 'h3.k_ring' + except: + version_info['api_detected'] = 'unknown' + version_info['working_function'] = 'none' + + return version_info + + except Exception as e: + return {'error': str(e), 'version': 'unknown'} \ No newline at end of file