-
Notifications
You must be signed in to change notification settings - Fork 1
/
catastro_to_mongodb.py
169 lines (139 loc) · 4.48 KB
/
catastro_to_mongodb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
import json
import warnings
import click
from pathlib import Path
from sys import exit
from jsonschema import validate, ValidationError, SchemaError
from pymongo import MongoClient
from pymongo.errors import ServerSelectionTimeoutError
__VERSION__ = "1.0.2"
__SCHEMA_VERSION__ = 1
warnings.filterwarnings("ignore", category=DeprecationWarning)
@click.command()
@click.help_option("-h", "--help")
@click.version_option(__VERSION__, "-v", "--version", message="Version %(version)s")
@click.option(
"-d",
"--database",
metavar="name",
default="catastro",
help="Database name.",
show_default=True,
)
@click.option(
"-H",
"--host",
metavar="host",
default="0.0.0.0",
help="Host name.",
show_default=True,
)
@click.option(
"-p",
"--port",
metavar="port",
default=27017,
help="Port number.",
show_default=True,
)
@click.option(
"-t",
"--timeout",
metavar="sec",
default=5,
help="Connection timeout (seconds).",
show_default=True,
)
@click.argument("file", metavar='CAT_FILE')
def cli(database, host, port, timeout, file):
"""Import a CAT FILE to MongoDB
\b
Migration Script from Catastro files ( Catalog CP Backup - dBASE IV ) to Mongodb
More info: https://github.com/dcervantes/catastro-to-mongodb
"""
# Connect to MongoDB client
click.echo("....................................")
click.echo(f"Connecting to {host}:{port}")
click.echo("....................................")
print()
mongo_client = MongoClient(host, port, serverSelectionTimeoutMS=timeout)
check_connection(mongo_client, host, port)
# Create database and collections
db = mongo_client[database]
colecciones = {}
fill_char = click.style('=', fg='yellow')
cat_file_lines = line_count(file)
cat_file = open(file, "r")
# read file
json_schemas = {}
json_dict = {}
tipos = ["11", "13", "14", "15", "16", "17"]
for tipo in tipos:
with open('./schemas/tipo' + tipo + '.json', 'r') as schema:
json_schemas[tipo] = json.loads(schema.read())
db["Schemas"].insert_one(json_schemas[tipo])
with open('./dict/tipo' + tipo + '.json', 'r') as dic:
json_dict[tipo] = json.loads(dic.read())
colecciones[tipo] = db[json_schemas[tipo]["title"]]
with open(file, "r") as cat_file:
with click.progressbar(cat_file, label='Importing CAT file to Mongodb...', fill_char=fill_char, length=cat_file_lines, show_percent=True, show_pos=True) as lines:
for line in lines:
tipo = line[:2]
if tipo in tipos:
colecciones[tipo].insert_one(cat_to_json(
line, json_schemas[tipo], json_dict[tipo]))
click.echo(click.style("Import complete!", fg="green"))
def cat_to_json(line, json_schemas, json_dict):
result = {}
result["version"] = __SCHEMA_VERSION__
for pkey, pvalue in json_dict.items():
result[pkey] = {}
for skey, svalue in pvalue.items():
value = get_field_value(line, svalue)
if value != "":
result[pkey][skey] = value
validate_json(result, json_schemas)
return result
def get_field_value(line, data):
val_format = data["val_format"]
ini_char = int(data["ini_char"]) - 1
length = int(data["length"])
end_char = ini_char + length
value = line[ini_char:end_char].strip()
# Text format
if val_format == 'X' and value != "":
if "dict" in data:
return data["dict"][value]
return value
elif val_format == 'N' and value != "":
if "decimal" in data:
value = float(value)
value /= 10**int(data["decimal"])
return value
return int(value)
else:
return ""
def validate_json(validJson, schema):
try:
validate(validJson, schema)
except SchemaError as e:
print(e)
except ValidationError as e:
print(e)
print("---------")
print(e.absolute_path)
print("---------")
print(e.absolute_schema_path)
def check_connection(mongo_client, host, port):
try:
mongo_client.server_info()
except ServerSelectionTimeoutError as e:
click.echo(
f"Connection error while attempting to connect to {host}:{port}")
exit(1)
def line_count(filename):
# Every line has 1000 characters
return int(Path(filename).stat().st_size / 1000)
if __name__ == "__main__":
cli()