FIX: new versión H3

This commit is contained in:
2025-12-22 09:46:11 +01:00
parent aa4e540f89
commit 541f08cebd

View File

@@ -35,7 +35,22 @@ def get_coverage_weight(expected_demand: float) -> float:
def covered_cells(h3_cell: str, radius: int) -> Set[str]:
"""Obtiene todas las celdas H3 cubiertas por un radio dado."""
try:
return set(h3.k_ring(h3_cell, radius))
# H3 >= 4.x (API strings)
try:
import h3.api.basic_str as h3_api
return set(h3_api.grid_disk(h3_cell, radius))
except ImportError:
pass
# H3 3.x
try:
import h3
return set(h3.k_ring(h3_cell, radius))
except Exception:
pass
raise RuntimeError("API H3 no compatible")
except Exception as e:
print(f"Error calculando cobertura para celda {h3_cell}: {e}")
return {h3_cell}
@@ -67,9 +82,12 @@ def calculate_coverage_score(cell: str, radius: int, df: pd.DataFrame) -> float:
return total_risk * (1 + 0.1 * demand_weight)
def build_prediction_grid(week: int, dow: int, hour: int,
demand_model, nulo_model) -> pd.DataFrame:
demand_model, nulo_model, h3_resolution: int = 8) -> pd.DataFrame:
"""
Construye una cuadrícula de predicciones para todas las celdas H3.
Construye una cuadricula de predicciones para todas las celdas H3.
Args:
h3_resolution: Resolución H3 (default=8 para áreas urbanas)
"""
# Obtener todas las celdas H3 únicas de la base de datos
from db import get_all_h3_cells
@@ -80,7 +98,13 @@ def build_prediction_grid(week: int, dow: int, hour: int,
for h3_cell in h3_cells:
try:
# Convertir H3 a entero para el modelo
h3_int = int(h3_cell, 16)
# Asegurar que sea un string hexadecimal válido
if isinstance(h3_cell, str):
h3_int = int(h3_cell, 16)
else:
# Si ya es numérico o otro tipo
h3_str = str(h3_cell)
h3_int = int(h3_str, 16) if h3_str.startswith('8') else int(h3_str)
# Predecir demanda
X_demand = [[h3_int, week, dow, hour]]
@@ -98,7 +122,7 @@ def build_prediction_grid(week: int, dow: int, hour: int,
coverage_weight = get_coverage_weight(expected_demand)
predictions.append({
'h3': h3_cell,
'h3': str(h3_cell), # Asegurar que es string
'h3_int': h3_int,
'week': week,
'dow': dow,
@@ -112,12 +136,20 @@ def build_prediction_grid(week: int, dow: int, hour: int,
except Exception as e:
print(f"Error procesando celda {h3_cell}: {e}")
if DEBUG_MODE:
import traceback
traceback.print_exc()
continue
return pd.DataFrame(predictions)
df_result = pd.DataFrame(predictions)
if df_result.empty:
print(f"Advertencia: No se generaron predicciones para week={week}, dow={dow}, hour={hour}")
return df_result
def place_gruas(df: pd.DataFrame, k: int = 2,
max_iterations: int = 100) -> Dict:
max_iterations: int = 100, debug: bool = False) -> Dict:
"""
Algoritmo de colocación de grúas greedy.
@@ -125,6 +157,7 @@ def place_gruas(df: pd.DataFrame, k: int = 2,
df: DataFrame con predicciones por celda H3
k: número de grúas a colocar
max_iterations: máximo de iteraciones para evitar loops infinitos
debug: modo depuración para imprimir información
Returns:
Diccionario con las grúas seleccionadas y estadísticas
@@ -138,60 +171,87 @@ def place_gruas(df: pd.DataFrame, k: int = 2,
if col not in df.columns:
raise ValueError(f"DataFrame missing required column: {col}")
uncovered = df.copy()
# Hacer copia para trabajar
working_df = df.copy()
selected_cells = []
coverage_stats = []
total_initial_risk = df['risk'].sum()
if debug:
print(f"Iniciando colocación de {k} grúas")
print(f"Total de celdas: {len(working_df)}")
print(f"Riesgo total inicial: {total_initial_risk:.4f}")
iteration = 0
while len(selected_cells) < k and iteration < max_iterations and not uncovered.empty:
while len(selected_cells) < k and iteration < max_iterations and not working_df.empty:
iteration += 1
best_cell = None
best_score = -1
best_coverage = set()
best_radius = 0
if debug:
print(f"\nIteración {iteration}: Evaluando {len(working_df)} celdas candidatas")
# Evaluar cada celda no cubierta como candidata
for _, row in uncovered.iterrows():
for idx, row in working_df.iterrows():
current_cell = row['h3']
radius = row['coverage_radius']
radius = int(row['coverage_radius'])
# Calcular celdas cubiertas
covered = covered_cells(current_cell, radius)
# Calcular puntaje de cobertura
score = calculate_coverage_score(current_cell, radius, uncovered)
score = calculate_coverage_score(current_cell, radius, working_df)
if score > best_score:
best_score = score
best_cell = current_cell
best_coverage = covered
best_radius = radius
if best_cell is None:
if debug:
print("No se encontró una mejor celda, terminando...")
break
# Añadir a seleccionadas
selected_cells.append(best_cell)
if debug:
print(f"Seleccionada celda: {best_cell} con radio {best_radius}")
print(f"Puntaje: {best_score:.4f}, Celdas cubiertas: {len(best_coverage)}")
# Obtener datos de la celda seleccionada
cell_data = df[df['h3'] == best_cell].iloc[0]
# Calcular estadísticas para esta grúa
grua_stats = {
'h3': best_cell,
'expected_demand': float(uncovered.loc[uncovered['h3'] == best_cell, 'expected_demand'].iloc[0]),
'coverage_radius': int(uncovered.loc[uncovered['h3'] == best_cell, 'coverage_radius'].iloc[0]),
'nulo_probability': float(uncovered.loc[uncovered['h3'] == best_cell, 'nulo_probability'].iloc[0]),
'expected_demand': float(cell_data['expected_demand']),
'coverage_radius': int(cell_data['coverage_radius']),
'nulo_probability': float(cell_data['nulo_probability']),
'risk': float(cell_data['risk']),
'cells_covered': len(best_coverage),
'risk_covered': float(uncovered[uncovered['h3'].isin(best_coverage)]['risk'].sum())
'risk_covered': float(working_df[working_df['h3'].isin(best_coverage)]['risk'].sum())
}
coverage_stats.append(grua_stats)
# Eliminar celdas cubiertas
uncovered = uncovered[~uncovered['h3'].isin(best_coverage)]
rows_before = len(working_df)
working_df = working_df[~working_df['h3'].isin(best_coverage)]
rows_after = len(working_df)
if debug:
print(f"Celdas eliminadas: {rows_before - rows_after}, Quedan: {rows_after}")
# Calcular cobertura total
# Calcular cobertura total usando el DataFrame original
covered_by_all = set()
for cell in selected_cells:
cell_data = df[df['h3'] == cell].iloc[0]
covered = covered_cells(cell, cell_data['coverage_radius'])
covered_by_all.update(covered)
if cell in df['h3'].values:
cell_data = df[df['h3'] == cell].iloc[0]
covered = covered_cells(cell, int(cell_data['coverage_radius']))
covered_by_all.update(covered)
total_cells_covered = len(covered_by_all)
total_cells = len(df)
@@ -200,7 +260,7 @@ def place_gruas(df: pd.DataFrame, k: int = 2,
total_risk_covered = df[df['h3'].isin(covered_by_all)]['risk'].sum()
risk_coverage_percentage = (total_risk_covered / total_initial_risk * 100) if total_initial_risk > 0 else 0
return {
result = {
'selected': selected_cells,
'statistics': coverage_stats,
'coverage': {
@@ -213,13 +273,24 @@ def place_gruas(df: pd.DataFrame, k: int = 2,
},
'parameters': {
'k': k,
'iterations': iteration
'iterations': iteration,
'requested_k': k,
'actual_k': len(selected_cells)
}
}
if debug:
print(f"\nResultado final:")
print(f"Grúas colocadas: {len(selected_cells)} de {k} solicitadas")
print(f"Cobertura: {coverage_percentage:.2f}% de celdas")
print(f"Cobertura de riesgo: {risk_coverage_percentage:.2f}%")
return result
def optimize_gruas_placement(df: pd.DataFrame, min_gruas: int = 1,
max_gruas: int = 10,
target_coverage: float = 80.0) -> Dict:
target_coverage: float = 80.0,
debug: bool = False) -> Dict:
"""
Encuentra el número óptimo de grúas para alcanzar una cobertura objetivo.
@@ -228,25 +299,45 @@ def optimize_gruas_placement(df: pd.DataFrame, min_gruas: int = 1,
min_gruas: mínimo número de grúas
max_gruas: máximo número de grúas
target_coverage: porcentaje de cobertura objetivo
debug: modo depuración
Returns:
Resultado con número óptimo de grúas
"""
if df.empty:
return {"error": "No data available for optimization"}
results = []
if debug:
print(f"\nIniciando optimización:")
print(f"Rango de grúas: {min_gruas} a {max_gruas}")
print(f"Cobertura objetivo: {target_coverage}%")
for k in range(min_gruas, max_gruas + 1):
placement = place_gruas(df, k=k)
if debug:
print(f"\nProbando k = {k}...")
results.append({
placement = place_gruas(df, k=k, debug=debug)
result_entry = {
'k': k,
'coverage_percentage': placement['coverage']['coverage_percentage'],
'risk_coverage_percentage': placement['coverage']['risk_coverage_percentage'],
'selected_cells': placement['selected'],
'placement': placement
})
}
results.append(result_entry)
if debug:
print(f" Cobertura: {result_entry['coverage_percentage']:.2f}%")
print(f" Cobertura riesgo: {result_entry['risk_coverage_percentage']:.2f}%")
# Si alcanzamos la cobertura objetivo, podemos detenernos
if placement['coverage']['coverage_percentage'] >= target_coverage:
if debug:
print(f"¡Objetivo alcanzado! Cobertura: {placement['coverage']['coverage_percentage']:.2f}%")
break
# Encontrar el mejor balance
@@ -258,10 +349,58 @@ def optimize_gruas_placement(df: pd.DataFrame, min_gruas: int = 1,
best = results[0]
# Verificar si el mejor resultado tiene suficiente cobertura
if best['coverage_percentage'] < 50 and debug:
print(f"Advertencia: La mejor solución solo cubre {best['coverage_percentage']:.2f}%")
return {
'optimal_k': best['k'],
'coverage_percentage': best['coverage_percentage'],
'risk_coverage_percentage': best['risk_coverage_percentage'],
'selected_cells': best['selected_cells'],
'all_results': results
}
'all_results': results[:10] # Limitar a 10 resultados para no hacer la respuesta muy grande
}
# Función auxiliar para detectar versión de H3
def get_h3_version_info() -> Dict:
"""Obtener información sobre la versión de H3 instalada."""
try:
import h3
version_info = {
'version': h3.__version__ if hasattr(h3, '__version__') else 'unknown',
'available_functions': [f for f in dir(h3) if not f.startswith('_')],
'api_type': 'standard'
}
# Verificar funciones específicas
test_cell = "88390cb29dfffff"
test_radius = 1
# Probar diferentes APIs
try:
# Probar API nueva
import h3.api.numpy_int as h3_new
cells = h3_new.grid_disk(test_cell, test_radius)
version_info['api_detected'] = 'numpy_int'
version_info['working_function'] = 'grid_disk'
except:
try:
# Probar k_ring directo
cells = h3.k_ring(test_cell, test_radius)
version_info['api_detected'] = 'standard_kring'
version_info['working_function'] = 'k_ring'
except AttributeError:
try:
# Probar API antigua
from h3 import h3 as h3_old
cells = h3_old.k_ring(test_cell, test_radius)
version_info['api_detected'] = 'old_api'
version_info['working_function'] = 'h3.k_ring'
except:
version_info['api_detected'] = 'unknown'
version_info['working_function'] = 'none'
return version_info
except Exception as e:
return {'error': str(e), 'version': 'unknown'}