From ad3e13c8f8c6616bb879a83c1e1565e5808b0745 Mon Sep 17 00:00:00 2001 From: rafapereirabr Date: Wed, 22 Jan 2025 20:44:26 -0300 Subject: [PATCH] cleaning code --- R/lookup_cases.R | 192 ----------- tests/tests_rafa/geocode_rafa_outuputdb.R | 379 ---------------------- 2 files changed, 571 deletions(-) delete mode 100644 R/lookup_cases.R delete mode 100644 tests/tests_rafa/geocode_rafa_outuputdb.R diff --git a/R/lookup_cases.R b/R/lookup_cases.R deleted file mode 100644 index 67af0b7..0000000 --- a/R/lookup_cases.R +++ /dev/null @@ -1,192 +0,0 @@ - -lookup_cases <- function(con, - relevant_cols, - case, - full_results, - lookup_vector, - input_states, - input_municipio){ - - # read corresponding parquet file - key_cols <- gsub('_padr', '', relevant_cols) - table_name <- paste(key_cols, collapse = "_") - table_name <- gsub('bairro', 'localidade', table_name) - y <- table_name <- gsub('estado_municipio', 'municipio', table_name) - - # build path to local file - path_to_parquet <- paste0(geocodebr::get_cache_dir(), "/", table_name, ".parquet") - - # filter cnefe to include only states and municipalities - # present in the input table, reducing the search scope and consequently - # reducing processing time and memory usage - - # Load CNEFE data and write to DuckDB - filtered_cnefe <- arrow::open_dataset( path_to_parquet ) |> - dplyr::filter(estado %in% input_states) |> - dplyr::filter(municipio %in% input_municipio) |> - dplyr::compute() - - # register filtered_cnefe to db - duckdb::duckdb_register_arrow(con, "filtered_cnefe", filtered_cnefe) - - join_condition <- paste( - glue::glue("standard_locations.{relevant_cols} = filtered_cnefe.{lookup_vector[relevant_cols]}"), - collapse = " AND " - ) - - cols_not_null <- paste( - glue::glue("standard_locations.{relevant_cols} IS NOT NULL"), - collapse = ' AND ' - ) - - query_lookup <- glue::glue( - "UPDATE standard_locations ", - "SET lat = filtered_cnefe.lat, - lon = filtered_cnefe.lon, - match_type = '{case}', - matched_address = filtered_cnefe.endereco_completo ", - "FROM ", - "filtered_cnefe ", - " WHERE {cols_not_null} AND ", - "standard_locations.match_type IS NULL AND {join_condition}" - ) - - - # cases with no number - if (case %in% possible_match_types_no_number) { - query_lookup <- gsub("standard_locations.numero_padr IS NOT NULL AND ", "", query_lookup) - } - - # cases with no logradouro - if (case %in% possible_match_types_no_logradouro) { - query_lookup <- gsub("standard_locations.logradouro_padr IS NOT NULL AND ", "", query_lookup) - } - - # whether to keep all columns in the result - if (isFALSE(full_results)) { - query_lookup <- gsub("matched_address = filtered_cnefe.endereco_completo", "", query_lookup) - } - - - n_rows_affected <- DBI::dbExecute(con, query_lookup) - - duckdb::duckdb_unregister_arrow(con, "filtered_cnefe") - - return(n_rows_affected) -} - - -lookup_weighted_cases <- function(con, - relevant_cols, - case, - full_results, - lookup_vector, - input_states, - input_municipio){ - - - # read corresponding parquet file - key_cols <- gsub('_padr', '', relevant_cols) - table_name <- paste(key_cols, collapse = "_") - table_name <- gsub('bairro', 'localidade', table_name) - table_name <- gsub('logradouro', 'logradouro_numero', table_name) - y <- table_name <- gsub('estado_municipio', 'municipio', table_name) - - # build path to local file - path_to_parquet <- paste0(listar_pasta_cache(), "/", table_name, ".parquet") - - # filter cnefe to include only states and municipalities - # present in the input table, reducing the search scope and consequently - # reducing processing time and memory usage - - # Load CNEFE data and write to DuckDB - filtered_cnefe <- arrow::open_dataset( path_to_parquet ) |> - dplyr::filter(estado %in% input_states) |> - dplyr::filter(municipio %in% input_municipio) |> - dplyr::compute() - - # register filtered_cnefe to db - duckdb::duckdb_register_arrow(con, "filtered_cnefe", filtered_cnefe) - - - # first left join - join_condition <- paste( - glue::glue("standard_locations.{relevant_cols} = filtered_cnefe.{lookup_vector[relevant_cols]}"), - collapse = " AND " - ) - - cols_not_null <- paste( - glue::glue("standard_locations.{relevant_cols} IS NOT NULL"), - collapse = ' AND ' - ) - - # Construct the SQL match query - query_match <- glue::glue( - "CREATE OR REPLACE TEMPORARY TABLE temp_join AS - SELECT standard_locations.tempidgeocodebr, standard_locations.numero_padr, - filtered_cnefe.numero AS numero_db, filtered_cnefe.lat, - filtered_cnefe.lon, filtered_cnefe.endereco_completo - FROM standard_locations - LEFT JOIN filtered_cnefe - ON {join_condition} - WHERE {cols_not_null} AND ", - "standard_locations.match_type IS NULL AND ", - "filtered_cnefe.numero IS NOT NULL;" - ) - - - # whether to keep all columns in the result - if (isFALSE(full_results)) { - query_match <- gsub(", filtered_cnefe.endereco_completo", "", query_match) - } - - DBI::dbExecute(con, query_match) - - - # summarize - query_aggregate <- glue::glue( - "CREATE OR REPLACE TEMPORARY TABLE tempdb AS - SELECT tempidgeocodebr, - SUM((1/ABS(numero_padr - numero_db) * lat)) / SUM(1/ABS(numero_padr - numero_db)) AS lat, - SUM((1/ABS(numero_padr - numero_db) * lon)) / SUM(1/ABS(numero_padr - numero_db)) AS lon, - REGEXP_REPLACE(FIRST(endereco_completo), ', \\d+ -', CONCAT(', ', FIRST(numero_padr), ' (aprox) -')) AS matched_address - FROM temp_join - GROUP BY tempidgeocodebr;" - ) - - # update output - query_lookup <- glue::glue( - "UPDATE standard_locations ", - "SET lat = tempdb.lat, - lon = tempdb.lon, - match_type = '{case}', - matched_address = tempdb.matched_address", - " FROM ", - "tempdb ", - "WHERE standard_locations.match_type IS NULL AND standard_locations.tempidgeocodebr = tempdb.tempidgeocodebr" - ) - - # whether to keep all columns in the result - if (isFALSE(full_results)) { - - query_aggregate <- glue::glue( - "CREATE OR REPLACE TEMPORARY TABLE tempdb AS - SELECT tempidgeocodebr, - SUM((1/ABS(numero_padr - numero_db) * lat)) / SUM(1/ABS(numero_padr - numero_db)) AS lat, - SUM((1/ABS(numero_padr - numero_db) * lon)) / SUM(1/ABS(numero_padr - numero_db)) AS lon - FROM temp_join - GROUP BY tempidgeocodebr;" - ) - - query_lookup <- gsub("matched_address = tempdb.matched_address", "", query_lookup) - } - - - temp_n <- DBI::dbExecute(con, query_aggregate) - n_rows_affected <- DBI::dbExecute(con, query_lookup) - - duckdb::duckdb_unregister_arrow(con, "filtered_cnefe") - - return(n_rows_affected) - - } diff --git a/tests/tests_rafa/geocode_rafa_outuputdb.R b/tests/tests_rafa/geocode_rafa_outuputdb.R deleted file mode 100644 index 8fe0e60..0000000 --- a/tests/tests_rafa/geocode_rafa_outuputdb.R +++ /dev/null @@ -1,379 +0,0 @@ -geocode_db <- function(addresses_table, - address_fields = setup_address_fields(), - n_cores = 1, - progress = TRUE, - keep_matched_address = FALSE, - cache = TRUE - ){ - # check input - assert_address_fields(address_fields, addresses_table) - checkmate::assert_data_frame(addresses_table) - checkmate::assert_number(n_cores, lower = 1, finite = TRUE) - checkmate::assert_logical(progress, any.missing = FALSE, len = 1) - checkmate::assert_logical(cache, any.missing = FALSE, len = 1) - checkmate::assert_logical(keep_matched_address, any.missing = FALSE, len = 1) - - # normalize input data ------------------------------------------------------- - - # standardizing the addresses table to increase the chances of finding a match - # in the CNEFE data - - if (progress) message_standardizing_addresses() - - input_padrao <- enderecobr::padronizar_enderecos( - addresses_table, - campos_do_endereco = enderecobr::correspondencia_campos( - logradouro = address_fields[["logradouro"]], - numero = address_fields[["numero"]], - cep = address_fields[["cep"]], - bairro = address_fields[["bairro"]], - municipio = address_fields[["municipio"]], - estado = address_fields[["estado"]] - ), - formato_estados = "sigla", - formato_numeros = 'integer' - ) - - - # keep and rename colunms of input_padrao to use the - # same column names used in cnefe data set - data.table::setDT(input_padrao) - cols_to_keep <- names(input_padrao)[! names(input_padrao) %in% address_fields] - input_padrao <- input_padrao[, .SD, .SDcols = c(cols_to_keep)] - names(input_padrao) <- c(gsub("_padr", "", names(input_padrao))) - - data.table::setnames( - x = input_padrao, - old = c('logradouro', 'bairro'), - new = c('logradouro_sem_numero', 'localidade')) - - # create temp id - input_padrao[, tempidgeocodebr := 1:nrow(input_padrao) ] - data.table::setDT(addresses_table)[, tempidgeocodebr := 1:nrow(input_padrao) ] - - # # sort input data - # input_padrao <- input_padrao[order(estado, municipio, logradouro_sem_numero, numero, cep, localidade)] - - - # downloading cnefe - cnefe_dir <- download_cnefe( - progress = progress, - cache = cache - ) - - - # creating a temporary db and register the input table data - con <- create_geocodebr_db(n_cores = n_cores) - - # Convert input data frame to DuckDB table - duckdb::dbWriteTable(con, "input_padrao_db", input_padrao, - overwrite = TRUE, temporary = TRUE) - - - # create an empty output table that will be populated ----------------------------------------------- - - query_create_empty_output_db <- glue::glue( - "CREATE OR REPLACE TABLE output_db ( - tempidgeocodebr INTEGER, - lon NUMERIC, - lat NUMERIC, - match_type VARCHAR, matched_address VARCHAR);" - ) - - if (isFALSE(keep_matched_address)) { - query_create_empty_output_db <- gsub(", matched_address VARCHAR);", ");", - query_create_empty_output_db) - } - - DBI::dbExecute(con, query_create_empty_output_db) - - # START MATCHING ----------------------------------------------- - - # determine geographical scope of the search - input_states <- unique(input_padrao$estado) - input_municipio <- unique(input_padrao$municipio) - - input_municipio <- input_municipio[!is.na(input_municipio)] - if(is.null(input_municipio)){ input_municipio <- "*"} - - # start progress bar - if (progress) { - prog <- create_progress_bar(input_padrao) - - message_looking_for_matches() - } - - n_rows <- nrow(input_padrao) - matched_rows <- 0 - - # start matching - for (case in all_possible_match_types ) { - relevant_cols <- get_relevant_cols_arrow(case) - - if (progress) update_progress_bar(matched_rows, case) - - - if (all(relevant_cols %in% names(input_padrao))) { - - # select match function - match_fun <- ifelse(case %in% number_interpolation_types, match_weighted_cases2, match_cases2) - - n_rows_affected <- match_fun( - con, - x = 'input_padrao_db', - y = 'filtered_cnefe', # keep this for now - output_tb = "output_db", - key_cols = relevant_cols, - match_type = case, - keep_matched_address = keep_matched_address, - input_states = input_states, - input_municipio = input_municipio - ) - - matched_rows <- matched_rows + n_rows_affected - - # leave the loop early if we find all addresses before covering all cases - if (matched_rows == n_rows) break - } - - } - - if (progress) finish_progress_bar(matched_rows) - - - # prepare output ----------------------------------------------- - # # THIS could BE IMPROVED / optimized - # - # # list all table outputs - # all_possible_tables <- glue::glue("output_{all_possible_match_types}") - # - # # check which tables have been created - # output_tables <- lapply( - # X= all_possible_tables, - # FUN = function(i){ ifelse( DBI::dbExistsTable(con, i), i, 'empty') }) |> - # unlist() - # - # all_output_tbs <- output_tables[!grepl('empty', output_tables)] - # - # # save output to db - # output_query <- paste("CREATE TEMPORARY TABLE output_db AS", - # paste0("SELECT ", paste0('*', " FROM ", all_output_tbs), - # collapse = " UNION ALL ") - # ) - # - # DBI::dbExecute(con, output_query) - - # add precision column - add_precision_col(con, update_tb = 'output_db') - - # output with all original columns - duckdb::dbWriteTable(con, "input_db", addresses_table, - temporary = TRUE, overwrite=TRUE) - - x_columns <- names(addresses_table) - - output_deterministic <- merge_results( - con, - x='input_db', - y='output_db', - key_column='tempidgeocodebr', - select_columns = x_columns, - keep_matched_address = keep_matched_address - ) - - # Disconnect from DuckDB when done - duckdb::dbDisconnect(con) - - # Return the result - return(output_deterministic) -} - - -match_cases2 <- function(con, - x, - y, - output_tb, - key_cols, - match_type, - keep_matched_address, - input_states, - input_municipio -){ - - # read correspondind parquet file - table_name <- paste(key_cols, collapse = "_") - table_name <- gsub('estado_municipio', 'municipio', table_name) - table_name <- gsub('logradouro_sem_numero', 'logradouro', table_name) - y <- table_name - - # build path to local file - path_to_parquet <- paste0(geocodebr::get_cache_dir(), "/", table_name, ".parquet") - - # filter cnefe to include only states and municipalities - # present in the input table, reducing the search scope and consequently - # reducing processing time and memory usage - - # Load CNEFE data and write to DuckDB - filtered_cnefe <- arrow::open_dataset( path_to_parquet ) |> - dplyr::filter(estado %in% input_states) |> - dplyr::filter(municipio %in% input_municipio) |> - dplyr::compute() - - - # register filtered_cnefe to db - duckdb::duckdb_register_arrow(con, "filtered_cnefe", filtered_cnefe) - - - # Create the JOIN condition by concatenating the key columns - join_condition <- paste( - glue::glue("filtered_cnefe.{key_cols} = {x}.{key_cols}"), - collapse = ' AND ' - ) - - # # TO DO: match probabilistico - # # isso eh um teste provisorio - # if( match_type %in% probabilistic_logradouro_match_types) { - # join_condition <- gsub("= input_padrao_db.logradouro_sem_numero", "LIKE '%' || input_padrao_db.logradouro_sem_numero || '%'", join_condition) - # } - - # query for left join - query_match <- glue::glue( - "INSERT INTO output_db (tempidgeocodebr, lon, lat, match_type, matched_address) - SELECT {x}.tempidgeocodebr, filtered_cnefe.lon, filtered_cnefe.lat, '{match_type}' AS match_type, filtered_cnefe.endereco_completo AS matched_address - FROM {x} - LEFT JOIN filtered_cnefe - ON {join_condition} - WHERE {x}.numero IS NOT NULL AND filtered_cnefe.lon IS NOT NULL;" - ) - - if (match_type %in% possible_match_types_no_number) { - query_match <- gsub("input_padrao_db.numero IS NOT NULL AND", "", query_match) - } - - if (isFALSE(keep_matched_address)) { - query_match <- gsub("lat, match_type, matched_address)", "lat, match_type)", query_match) - } - - if (isFALSE(keep_matched_address)) { - query_match <- gsub(", filtered_cnefe.endereco_completo AS matched_address", "", query_match) - } - - temp_n <- DBI::dbExecute(con, query_match) - duckdb::duckdb_unregister_arrow(con, "filtered_cnefe") - - # UPDATE input_padrao_db: Remove observations found in previous step - update_input_db( - con, - update_tb = x, - reference_tb = output_tb - ) - - return(temp_n) -} - - - -match_weighted_cases2 <- function(con, - x, - y, - output_tb, - key_cols, - match_type, - keep_matched_address, - input_states, - input_municipio){ - - - # read correspondind parquet file - table_name <- paste(key_cols, collapse = "_") - table_name <- gsub('estado_municipio', 'municipio', table_name) - table_name <- gsub('logradouro_sem_numero', 'logradouro_numero', table_name) - y <- table_name - - # build path to local file - path_to_parquet <- paste0(geocodebr::get_cache_dir(), "/", table_name, ".parquet") - - # filter cnefe to include only states and municipalities - # present in the input table, reducing the search scope and consequently - # reducing processing time and memory usage - - # Load CNEFE data and write to DuckDB - filtered_cnefe <- arrow::open_dataset( path_to_parquet ) |> - dplyr::filter(estado %in% input_states) |> - dplyr::filter(municipio %in% input_municipio) |> - dplyr::compute() - - # register filtered_cnefe to db - duckdb::duckdb_register_arrow(con, "filtered_cnefe", filtered_cnefe) - - y <- 'filtered_cnefe' - - # Create the JOIN condition by concatenating the key columns - join_condition <- paste( - glue::glue("{y}.{key_cols} = {x}.{key_cols}"), - collapse = ' AND ' - ) - - # # TO DO: match probabilistico - # # isso eh um teste provisorio - # if( match_type %in% probabilistic_logradouro_match_types) { - # join_condition <- gsub("= input_padrao_db.logradouro_sem_numero", "LIKE '%' || input_padrao_db.logradouro_sem_numero || '%'", join_condition) - # } - - # Construct the SQL match query - query_match <- glue::glue( - "CREATE OR REPLACE TEMPORARY VIEW temp_db AS - SELECT {x}.tempidgeocodebr, {x}.numero, {y}.numero as numero_db, {y}.lat, {y}.lon, {y}.endereco_completo - FROM {x} - LEFT JOIN {y} - ON {join_condition} - WHERE {x}.numero IS NOT NULL AND {y}.numero IS NOT NULL;" - ) - - if (isFALSE(keep_matched_address)) { - query_match <- gsub(", filtered_cnefe.endereco_completo", "", query_match) - } - - DBI::dbExecute(con, query_match) - - - # summarize - query_aggregate <- glue::glue( - "INSERT INTO output_db (tempidgeocodebr, lon, lat, match_type, matched_address) - SELECT tempidgeocodebr, - SUM((1/ABS(numero - numero_db) * lon)) / SUM(1/ABS(numero - numero_db)) AS lon, - SUM((1/ABS(numero - numero_db) * lat)) / SUM(1/ABS(numero - numero_db)) AS lat, - '{match_type}' AS match_type, - REGEXP_REPLACE(FIRST(endereco_completo), ', \\d+ -', CONCAT(', ', FIRST(numero), ' (aprox) -')) AS matched_address - FROM temp_db - GROUP BY tempidgeocodebr;" - ) - - if (isFALSE(keep_matched_address)) { - query_aggregate <- glue::glue( - "INSERT INTO output_db (tempidgeocodebr, lon, lat, match_type) - SELECT tempidgeocodebr, - SUM((1/ABS(numero - numero_db) * lon)) / SUM(1/ABS(numero - numero_db)) AS lon, - SUM((1/ABS(numero - numero_db) * lat)) / SUM(1/ABS(numero - numero_db)) AS lat, - '{match_type}' AS match_type - FROM temp_db - GROUP BY tempidgeocodebr;" - ) - } - - temp_n <- DBI::dbExecute(con, query_aggregate) - duckdb::duckdb_unregister_arrow(con, "filtered_cnefe") - - - # UPDATE input_padrao_db: Remove observations found in previous step - update_input_db( - con, - update_tb = x, - reference_tb = output_tb - ) - - - - return(temp_n) -} -