From dfc465f358a851df5d5672230fac477e4554b47d Mon Sep 17 00:00:00 2001 From: rafapereirabr Date: Sat, 1 Feb 2025 16:18:57 -0300 Subject: [PATCH] probabilistic matching --- tests/tests_rafa/fts_extension.R | 12 +- tests/tests_rafa/match_cases_probabilistic.R | 106 +++++------ .../match_weighted_cases_probabilistic.R | 174 ++++++++++++++++++ 3 files changed, 236 insertions(+), 56 deletions(-) create mode 100644 tests/tests_rafa/match_weighted_cases_probabilistic.R diff --git a/tests/tests_rafa/fts_extension.R b/tests/tests_rafa/fts_extension.R index d6ddd11..ae54d6c 100644 --- a/tests/tests_rafa/fts_extension.R +++ b/tests/tests_rafa/fts_extension.R @@ -205,11 +205,17 @@ query_max_similarity <- glue::glue( DBI::dbGetQuery(con, query_join) DBI::dbGetQuery(con, query_max_similarity) -w1 <- 'teste 1' -w2 <- 'teste 2' +w1 <- 'PROFESSOR ALFREDO GONCALVES FILGUEIRA' +w2 <- 'RUA PROFESSOR ALFREDO GONCALVES FIGUEIRA' -q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}');") +# q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}');") +q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}') AS similarity;") q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}') AS similarity;") DBI::dbGetQuery(con, q) + +#' damerau_levenshtein +#' jaccard +#' jaro_similarity +#' jaro_winkler_similarity diff --git a/tests/tests_rafa/match_cases_probabilistic.R b/tests/tests_rafa/match_cases_probabilistic.R index 72db2f4..1144b0c 100644 --- a/tests/tests_rafa/match_cases_probabilistic.R +++ b/tests/tests_rafa/match_cases_probabilistic.R @@ -17,7 +17,6 @@ match_cases_probabilistic <- function( ){ - # read correspondind parquet file table_name <- paste(key_cols, collapse = "_") table_name <- gsub('estado_municipio', 'municipio', table_name) @@ -51,6 +50,7 @@ match_cases_probabilistic <- function( # remove logradouro key_cols <- key_cols[key_cols != 'logradouro_sem_numero'] + # Create the JOIN condition by concatenating the key columns join_condition <- paste( glue::glue("{y}.{key_cols} = {x}.{key_cols}"), @@ -58,29 +58,33 @@ match_cases_probabilistic <- function( ) - # # whether to keep all columns in the result - # colunas_encontradas <- "" - # additional_cols <- "" - # - # if (isTRUE(resultado_completo)) { - # - # colunas_encontradas <- paste0( - # glue::glue("{key_cols}_encontrado"), - # collapse = ', ') - # - # colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas) - # colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas) - # colunas_encontradas <- paste0(", endereco_encontrado, ", colunas_encontradas) - # - # additional_cols <- paste0( - # glue::glue("filtered_cnefe.{key_cols} AS {key_cols}_encontrado"), - # collapse = ', ') - # - # additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols) - # additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols) - # additional_cols <- paste0(", filtered_cnefe.endereco_completo AS endereco_encontrado, ", additional_cols) - # - # } + # whether to keep all columns in the result + colunas_encontradas <- "" + additional_cols <- "" + + if (isTRUE(resultado_completo)) { + + colunas_encontradas <- paste0( + glue::glue("{key_cols}_encontrado"), + collapse = ', ') + + colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas) + colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas) + colunas_encontradas <- paste0(", ", colunas_encontradas) + + additional_cols <- paste0( + glue::glue("filtered_cnefe.{key_cols} AS {key_cols}_encontrado"), + collapse = ', ') + + additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols) + additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols) + additional_cols <- paste0(", ", additional_cols) + } + + # min cutoff for string match + min_cutoff <- ifelse(match_type == 'pn04', 0.9, 0.7) + + # 1st step: match -------------------------------------------------------- # match query query_match <- glue::glue( @@ -90,6 +94,7 @@ match_cases_probabilistic <- function( {x}.tempidgeocodebr, {y}.lat, {y}.lon, {x}.logradouro_sem_numero AS logradouro_sem_numero, {y}.logradouro_sem_numero AS logradouro_sem_numero_cnefe, + {y}.endereco_completo AS endereco_encontrado, jaro_winkler_similarity({x}.logradouro_sem_numero, {y}.logradouro_sem_numero) AS similarity, RANK() OVER (PARTITION BY {x}.tempidgeocodebr ORDER BY similarity DESC) AS rank {additional_cols} @@ -100,50 +105,45 @@ match_cases_probabilistic <- function( ) SELECT * FROM ranked_data - WHERE similarity > 0.8 + WHERE similarity > {min_cutoff} AND rank = 1;" ) DBI::dbExecute(con, query_match) + # a <- DBI::dbReadTable(con, 'temp_db') + + + + # 2nd step: update output table -------------------------------------------------------- + + if (isTRUE(resultado_completo)) { + additional_cols <- paste0( + glue::glue("temp_db.{key_cols}_encontrado"), + collapse = ', ') + + additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols) + additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols) + additional_cols <- paste0(", ", additional_cols) + } -# -# if (isTRUE(resultado_completo)) { -# -# colunas_encontradas <- paste0( -# glue::glue("{key_cols}_encontrado"), -# collapse = ', ') -# -# colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas) -# colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas) -# colunas_encontradas <- paste0(", endereco_encontrado, ", colunas_encontradas) -# -# additional_cols <- paste0( -# glue::glue("temp_db.{key_cols} AS {key_cols}_encontrado"), -# collapse = ', ') -# -# additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols) -# additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols) -# additional_cols <- paste0(", temp_db.endereco_completo AS endereco_encontrado, ", additional_cols) -# -# } - - - # summarize query query_update_db <- glue::glue( - "INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado {colunas_encontradas}) - SELECT temp_db.tempidgeocodebr, temp_db.lat, temp_db.lon, - '{match_type}' AS tipo_resultado {additional_cols} + "INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado, endereco_encontrado {colunas_encontradas}) + SELECT temp_db.tempidgeocodebr, + temp_db.lat, + temp_db.lon, + '{match_type}' AS tipo_resultado, + temp_db.endereco_encontrado {additional_cols} FROM temp_db WHERE temp_db.lon IS NOT NULL;" ) - temp_n <- DBI::dbExecute(con, query_update_db) + DBI::dbExecute(con, query_update_db) duckdb::duckdb_unregister_arrow(con, "filtered_cnefe") # UPDATE input_padrao_db: Remove observations found in previous step - update_input_db( + temp_n <- update_input_db( con, update_tb = x, reference_tb = output_tb diff --git a/tests/tests_rafa/match_weighted_cases_probabilistic.R b/tests/tests_rafa/match_weighted_cases_probabilistic.R new file mode 100644 index 0000000..a965f93 --- /dev/null +++ b/tests/tests_rafa/match_weighted_cases_probabilistic.R @@ -0,0 +1,174 @@ +match_type = case = "pi01" +x = 'input_padrao_db' +y = 'filtered_cnefe' +output_tb = "output_db" +key_cols <- c("estado", "municipio", "logradouro_sem_numero", "numero", "cep", "localidade") +match_type = case +resultado_completo = F # isso funciona. Falta funcionar quando TRUE + + +match_weighted_cases_probabilistic <- function(con, + x, + y, + output_tb, + key_cols, + match_type, + resultado_completo){ + + # read corresponding parquet file + table_name <- paste(key_cols, collapse = "_") + table_name <- gsub('estado_municipio', 'municipio', table_name) + table_name <- gsub('logradouro_sem_numero', 'logradouro', table_name) + + # master table + if (match_type %like% 'pn02|pi02|pn03|pi03|pn04|pi04') { + table_name <- "municipio_logradouro_numero_cep_localidade" + } + + # build path to local file + path_to_parquet <- paste0(listar_pasta_cache(), "/", table_name, ".parquet") + + + # determine geographical scope of the search + input_states <- DBI::dbGetQuery(con, "SELECT DISTINCT estado FROM input_padrao_db;")$estado + input_municipio <- DBI::dbGetQuery(con, "SELECT DISTINCT municipio FROM input_padrao_db;")$municipio + + # Load CNEFE data and write to DuckDB + # filter cnefe to include only states and municipalities + # present in the input table, reducing the search scope + filtered_cnefe <- arrow::open_dataset( path_to_parquet ) |> + dplyr::filter(estado %in% input_states) |> + dplyr::filter(municipio %in% input_municipio) |> + dplyr::compute() + + # register filtered_cnefe to db + duckdb::duckdb_register_arrow(con, "filtered_cnefe", filtered_cnefe) + + # cols that cannot be null + cols_not_null <- paste( + glue::glue("{x}.{key_cols} IS NOT NULL"), + collapse = ' AND ' + ) + + # remove numero and logradourot from key cols to allow for the matching + key_cols <- key_cols[key_cols != 'numero'] + key_cols <- key_cols[key_cols != 'logradouro_sem_numero'] + + # Create the JOIN condition by concatenating the key columns + join_condition <- paste( + glue::glue("{y}.{key_cols} = {x}.{key_cols}"), + collapse = ' AND ' + ) + + + # whether to keep all columns in the result + colunas_encontradas <- "" + additional_cols <- "" + + if (isTRUE(resultado_completo)) { + + colunas_encontradas <- paste0( + glue::glue("{key_cols}_encontrado"), + collapse = ', ') + + colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas) + colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas) + colunas_encontradas <- paste0(", ", colunas_encontradas) + + additional_cols <- paste0( + glue::glue("filtered_cnefe.{key_cols} AS {key_cols}_encontrado"), + collapse = ', ') + + additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols) + additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols) + additional_cols <- paste0(", ", additional_cols) + + } + + # min cutoff for string match + min_cutoff <- ifelse(match_type == 'pi04', 0.9, 0.7) + + # 1st step: match -------------------------------------------------------- + + query_match <- glue::glue( + "CREATE OR REPLACE TEMPORARY VIEW temp_db AS + WITH ranked_data AS ( + SELECT + {x}.tempidgeocodebr, {x}.numero, {y}.numero AS numero_cnefe, + {y}.lat, {y}.lon, + {x}.logradouro_sem_numero AS logradouro_sem_numero, + {y}.logradouro_sem_numero AS logradouro_sem_numero_cnefe, + REGEXP_REPLACE( {y}.endereco_completo, ', \\d+ -', CONCAT(', ', {x}.numero, ' (aprox) -')) AS endereco_encontrado, + jaro_winkler_similarity({x}.logradouro_sem_numero, {y}.logradouro_sem_numero) AS similarity, + RANK() OVER (PARTITION BY {x}.tempidgeocodebr ORDER BY similarity DESC) AS rank + {additional_cols} + FROM {x} + LEFT JOIN {y} + ON {join_condition} + WHERE {cols_not_null} AND {y}.numero IS NOT NULL + ) + SELECT * + FROM ranked_data + WHERE similarity > {min_cutoff} + AND rank = 1;" + ) + + DBI::dbExecute(con, query_match) + # a <- DBI::dbReadTable(con, 'temp_db') + + + + # 2nd step: aggregate -------------------------------------------------------- + + # summarize query + query_aggregate <- glue::glue( + "INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado, endereco_encontrado) + SELECT tempidgeocodebr, + SUM((1/ABS(numero - numero_cnefe) * lat)) / SUM(1/ABS(numero - numero_cnefe)) AS lat, + SUM((1/ABS(numero - numero_cnefe) * lon)) / SUM(1/ABS(numero - numero_cnefe)) AS lon, + '{match_type}' AS tipo_resultado, + FIRST(endereco_encontrado) AS endereco_encontrado + FROM temp_db + GROUP BY tempidgeocodebr, endereco_encontrado;" + ) + + + + if (isTRUE(resultado_completo)) { + + additional_cols <- paste0( + glue::glue("FIRST({key_cols}_encontrado)"), + collapse = ', ') + + additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols) + additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols) + additional_cols <- paste0(", ", additional_cols) + + query_aggregate <- glue::glue( + "INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado, endereco_encontrado {colunas_encontradas}) + SELECT tempidgeocodebr, + SUM((1/ABS(numero - numero_cnefe) * lat)) / SUM(1/ABS(numero - numero_cnefe)) AS lat, + SUM((1/ABS(numero - numero_cnefe) * lon)) / SUM(1/ABS(numero - numero_cnefe)) AS lon, + '{match_type}' AS tipo_resultado, + FIRST(endereco_encontrado) AS endereco_encontrado + {additional_cols} + FROM temp_db + GROUP BY tempidgeocodebr, endereco_encontrado;" + ) + } + + DBI::dbExecute(con, query_aggregate) + # b <- DBI::dbReadTable(con, 'output_db') + + + duckdb::duckdb_unregister_arrow(con, "filtered_cnefe") + + # UPDATE input_padrao_db: Remove observations found in previous step + temp_n <- update_input_db( + con, + update_tb = x, + reference_tb = output_tb + ) + + return(temp_n) +}