Skip to content

Commit

Permalink
feat: restructure of determining geo and non geo types
Browse files Browse the repository at this point in the history
  • Loading branch information
jjcfrancisco committed Aug 19, 2024
1 parent d1fb9f8 commit 0c92f15
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 130 deletions.
51 changes: 36 additions & 15 deletions src/file_types/geojson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,24 @@ use serde_json;
use std::collections::HashMap;
use wkb::geom_to_wkb;

use crate::pg::binary_copy::{infer_geometry_type, insert_row};
use crate::file_types::common::{AcceptedTypes, NameAndType};
use crate::pg::binary_copy::{infer_geometry_type, insert_row, prepare_query};
use crate::pg::ops::prepare_postgis;
use crate::utils::cli::Cli;
use crate::file_types::common::{AcceptedTypes, NameAndType};

pub fn insert_data(args: Cli) -> Result<()> {
// Determine data types of the input file
let file_data_types = determine_file_data_types(&args.input)?;
let mut names_and_types = determine_non_geometry_types(&args)?;
// Prepare database
prepare_postgis(&args, &file_data_types)?;
prepare_postgis(&args, &names_and_types)?;
// Determine geometry type
let names_and_types = determine_geometry_types(&args, &mut names_and_types)?;

// Get data types
let mut types: Vec<Type> = Vec::new();
for column in file_data_types.iter() {
for column in names_and_types.iter() {
types.push(column.data_type.clone());
}
// Get geometry type
let geom_type = infer_geometry_type(&args.table, &args.schema, &args.uri)?;
// Add geometry type to types
types.push(geom_type);

// Read geojson file
let geojson = read_geojson(&args.input)?;
Expand Down Expand Up @@ -60,7 +58,8 @@ pub fn insert_data(args: Cli) -> Result<()> {
.expect("Failed to convert geojson::Geometry to geo::Geometry ✘");
let wkb = geom_to_wkb(&geom).expect("Could not convert geometry to WKB ✘");
row.push(AcceptedTypes::Geometry(Some(Wkb { geometry: wkb })));
insert_row(row, &file_data_types, &types, &args)?;
let query = prepare_query(&args, names_and_types);
insert_row(row, query, &types, &args)?;
}
println!("Data sucessfully inserted into database ✓");
}
Expand All @@ -70,9 +69,24 @@ pub fn insert_data(args: Cli) -> Result<()> {
Ok(())
}

pub fn determine_file_data_types(file_path: &str) -> Result<Vec<NameAndType>> {
fn determine_geometry_types<'a>(
args: &Cli,
names_and_types: &'a mut Vec<NameAndType>,
) -> Result<&'a mut Vec<NameAndType>> {
// Get geometry type
let geom_type = infer_geometry_type(&args.table, &args.schema, &args.uri)?;
// Add geometry type to types
names_and_types.push(NameAndType {
name: "geometry".to_string(),
data_type: geom_type,
});

Ok(names_and_types)
}

fn determine_non_geometry_types(args: &Cli) -> Result<Vec<NameAndType>> {
let mut table_config: HashMap<String, Type> = HashMap::new();
let geojson_str = std::fs::read_to_string(file_path)?;
let geojson_str = std::fs::read_to_string(&args.input)?;
let geojson = geojson_str.parse::<GeoJson>().unwrap();

match geojson {
Expand Down Expand Up @@ -165,9 +179,16 @@ mod tests {
use super::*;

#[test]
fn test_determine_file_data_types() {
let file_path = "examples/geojson/spain.geojson";
let data_types = determine_file_data_types(file_path).unwrap();
fn test_determine_non_geometry_types() {
let args = Cli {
input: "examples/geojson/spain.geojson".to_string(),
uri: "postgresql://postgres:password@localhost:5432/postgres".to_string(),
table: "spain".to_string(),
schema: None,
srid: None,
mode: None,
};
let data_types = determine_non_geometry_types(&args).unwrap();
assert_eq!(data_types.len(), 3);
for data_type in data_types {
match data_type.name.as_str() {
Expand Down
218 changes: 130 additions & 88 deletions src/file_types/geoparquet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::any::Any;

use crate::{Error, Result};

use arrow_schema::DataType;
Expand All @@ -8,70 +6,148 @@ use geoarrow::io::parquet::read_geoparquet_async;
use geoarrow::io::parquet::GeoParquetReaderOptions;
use geoarrow::table::GeoTable;
use geoarrow::trait_::GeometryArrayAccessor;
use tokio::fs::File;
use postgres::types::Type;
use std::collections::HashMap;
use std::any::TypeId;
use tokio::fs::File;

use crate::utils::cli::Cli;
use crate::file_types::common::NameAndType;
use crate::pg::ops::prepare_postgis;
use crate::pg::binary_copy::infer_geometry_type;
use crate::pg::ops::prepare_postgis;
use crate::utils::cli::Cli;

pub fn insert_data(args: Cli) -> Result<()> {
// Currently static batch size. In time, this should be dynamic
let batch_size = 500;
// Read geoparquet file using tokio runtime
let runtime = tokio::runtime::Runtime::new()?;
let geotable = runtime.block_on(read_geoparquet(&args.input, batch_size))?;
let file_data_types = determine_file_data_types(&geotable)?;
let mut non_geometry_types = determine_non_geometry_types(&geotable)?;

// Determine data types of the input file
// Prepare database
prepare_postgis(&args, &file_data_types)?;
prepare_postgis(&args, &non_geometry_types)?;

// Get data types
let mut types: Vec<Type> = Vec::new();
for column in file_data_types.iter() {
types.push(column.data_type.clone());
}
// Get geometry type
let geom_type = infer_geometry_type(&args.table, &args.schema, &args.uri)?;
// Add geometry type to types
types.push(geom_type);
// Determine geometry type
let names_and_types = determine_geometry_type(&args, &mut non_geometry_types)?;

// Process geotable
process_geotable(geotable)?;
Ok(())
}

fn determine_file_data_types(geotable: &GeoTable) -> Result<Vec<NameAndType>> {
fn determine_geometry_type<'a>(
args: &Cli,
names_and_types: &'a mut Vec<NameAndType>,
) -> Result<&'a mut Vec<NameAndType>> {
// Get geometry type
let geom_type = infer_geometry_type(&args.table, &args.schema, &args.uri)?;
// Add geometry type to types
names_and_types.push(NameAndType {
name: "geometry".to_string(),
data_type: geom_type,
});

Ok(names_and_types)
}

fn determine_non_geometry_types(geotable: &GeoTable) -> Result<Vec<NameAndType>> {
let schema = geotable.schema();
let mut table_config: HashMap<String, Type> = HashMap::new();
for field in schema.fields() {
let fields = schema.fields();

let mut names_and_types: Vec<NameAndType> = Vec::new();
for field in fields {
let name = field.name();
let data_type = field.data_type();
if data_type.type_id() == TypeId::of::<f64>() {
table_config.insert(name.to_string(), Type::FLOAT8);
} else if data_type.type_id() == TypeId::of::<i64>() {
table_config.insert(name.to_string(), Type::INT8);
} else if data_type.type_id() == TypeId::of::<String>() {
table_config.insert(name.to_string(), Type::VARCHAR);
} else if data_type.type_id() == TypeId::of::<bool>() {
table_config.insert(name.to_string(), Type::BOOL);
match data_type {
DataType::Utf8 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::VARCHAR,
});
}
DataType::Float16 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::FLOAT8,
});
}
DataType::Float32 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::FLOAT8,
});
}
DataType::Float64 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::FLOAT8,
});
}
DataType::Int8 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::INT8,
});
}
DataType::Int16 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::INT8,
});
}
DataType::Int32 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::INT8,
});
}
DataType::Int64 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::INT8,
});
}
DataType::UInt8 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::INT8,
});
}
DataType::UInt16 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::INT8,
});
}
DataType::UInt32 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::INT8,
});
}
DataType::Null => {}
DataType::Binary => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::BYTEA,
});
}
DataType::Boolean => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::BOOL,
});
}
DataType::Date32 => {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: Type::DATE,
});
}
DataType::List(_) => {}
_ => println!("Data type '{:?}' not supported ✘", data_type),
}
}

let mut names_and_types: Vec<NameAndType> = Vec::new();
for (name, data_type) in table_config.iter() {
names_and_types.push(NameAndType {
name: name.to_string(),
data_type: data_type.clone(),
});
}

Ok(names_and_types)

}

async fn read_geoparquet(file: &str, batch_size: usize) -> Result<GeoTable> {
Expand All @@ -84,34 +160,8 @@ async fn read_geoparquet(file: &str, batch_size: usize) -> Result<GeoTable> {

pub fn process_geotable(geotable: GeoTable) -> Result<()> {
let geometry_column = geotable.geometry()?;
let schema = geotable.schema();
let fields = schema.fields();
let geometry_type = geotable.geometry_data_type()?;

for field in fields {
// let name = field.name();
let data_type = field.data_type();
match data_type {
DataType::Utf8 => {}
DataType::Float16 => {}
DataType::Float32 => {}
DataType::Float64 => {}
DataType::Int8 => {}
DataType::Int16 => {}
DataType::Int32 => {}
DataType::Int64 => {}
DataType::UInt8 => {}
DataType::UInt16 => {}
DataType::UInt32 => {}
DataType::Null => {}
DataType::Binary => {}
DataType::Boolean => {}
DataType::Date32 => {}
DataType::List(_) => {}
_ => println!("Data type '{:?}' not supported ✘", data_type),
}
}

geotable.batches().into_iter().for_each(|batch| {
let address = batch.column_by_name("address");
if address.is_some() {
Expand All @@ -124,51 +174,43 @@ pub fn process_geotable(geotable: GeoTable) -> Result<()> {
match geometry_type {
geoarrow::datatypes::GeoDataType::Point(_) => {
let geoarrow_point = geom.as_point();
for point in geoarrow_point.iter_geo() {
}
for point in geoarrow_point.iter_geo() {}
}
geoarrow::datatypes::GeoDataType::MultiPoint(_) => {
let geoarrow_multipoint = geom.as_multi_point();
for multipoint in geoarrow_multipoint.iter_geo() {
}
for multipoint in geoarrow_multipoint.iter_geo() {}
}
geoarrow::datatypes::GeoDataType::LineString(_) => {
let geoarrow_line = geom.as_line_string();
for line in geoarrow_line.iter_geo() {
}
for line in geoarrow_line.iter_geo() {}
}
geoarrow::datatypes::GeoDataType::MultiLineString(_) => {
let geoarrow_multiline = geom.as_multi_line_string();
for multiline in geoarrow_multiline.iter_geo() {
}
for multiline in geoarrow_multiline.iter_geo() {}
}
geoarrow::datatypes::GeoDataType::Polygon(_) => {
let geoarrow_poly = geom.as_polygon();
for poly in geoarrow_poly.iter_geo() {
}
for poly in geoarrow_poly.iter_geo() {}
}
geoarrow::datatypes::GeoDataType::MultiPolygon(_) => {
let geoarrow_multipoly = geom.as_multi_polygon();
for multipoly in geoarrow_multipoly.iter_geo() {
}
for multipoly in geoarrow_multipoly.iter_geo() {}
}
_ => println!("Geometry type not supported ✘"),
}
let polygon = geom.as_polygon();
for poly in polygon.iter_geo() {
}

for poly in polygon.iter_geo() {}
}

// for chunk in chunks {
// Iterate over rows

// match geometry_type {
// geoarrow::datatypes::GeoDataType::Polygon(_) => {
// let polys = chunk.as_polygon();
// }
// _ => println!("Geometry type not supported ✘"),
// }
// Iterate over rows

// match geometry_type {
// geoarrow::datatypes::GeoDataType::Polygon(_) => {
// let polys = chunk.as_polygon();
// }
// _ => println!("Geometry type not supported ✘"),
// }
// }

// To polygons
Expand Down
Loading

0 comments on commit 0c92f15

Please sign in to comment.