forked from Global-Witness/uk-companies-house-parsers-public
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit b051891
Showing
3 changed files
with
303 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
output_data | ||
original_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
import csv | ||
import os | ||
import sys | ||
|
||
COMPANIES_OUTPUT_FILENAME_TEMPLATE = "companies_data_%s.csv" | ||
PERSONS_OUTPUT_FILENAME_TEMPLATE = "persons_data_%s.csv" | ||
SNAPSHOT_HEADER_IDENTIFIER = "DDDDSNAP" | ||
TRAILER_RECORD_IDENTIFIER = "99999999" | ||
COMPANY_RECORD_TYPE = '1' | ||
PERSON_RECORD_TYPE = '2' | ||
|
||
def process_header_row(row): | ||
header_identifier = row[0:8] | ||
run_number = row[8:12] | ||
production_date = row[12:20] | ||
if header_identifier != SNAPSHOT_HEADER_IDENTIFIER: | ||
print("Unsuported file type from header: '%s'. Expecting a snapshot header: '%s'" % (header_identifier, SNAPSHOT_HEADER_IDENTIFIER)) | ||
sys.exit(1) | ||
print("Processing snapshot file with run number %s from date %s" % | ||
(run_number, production_date)) | ||
|
||
def process_company_row(row, output_writer): | ||
company_number = row[0:8] | ||
record_type = row[8] | ||
company_status = row[9] | ||
number_of_officers = int(row[32:36]) | ||
name_length = int(row[36:40]) | ||
company_name = row[40:(40 + name_length - 1)] | ||
output_writer.writerow([company_number, company_status, number_of_officers, company_name]) | ||
|
||
def process_person_row(row, output_writer): | ||
company_number = row[0:8] | ||
record_type = row[8] | ||
app_date_origin = row[9] | ||
appointment_type = row[10:12] | ||
person_number = row[12:24] | ||
corporate_indicator = row[24] | ||
appointment_date = row[32:40] | ||
resignation_date = row[40:48] | ||
postcode = row[48:56] | ||
partial_date_of_birth = row[56:64] | ||
full_date_of_birth = row[64:72] | ||
variable_data_length = int(row[72:76]) | ||
variable_data = row[76:76 + variable_data_length] | ||
variable_data_array = variable_data.split('<') | ||
title = variable_data_array[0] | ||
forenames = variable_data_array[1] | ||
surname = variable_data_array[2] | ||
honours = variable_data_array[3] | ||
care_of = variable_data_array[4] | ||
po_box = variable_data_array[5] | ||
address_line_1 = variable_data_array[6] | ||
address_line_2 = variable_data_array[7] | ||
post_town = variable_data_array[8] | ||
county = variable_data_array[9] | ||
country = variable_data_array[10] | ||
occupation = variable_data_array[11] | ||
nationality = variable_data_array[12] | ||
res_country = variable_data_array[13] | ||
# print(company_number, record_type, app_date_origin, appointment_type, person_number, corporate_indicator, appointment_date, | ||
# resignation_date, postcode, partial_date_of_birth, full_date_of_birth, variable_data_length, variable_data) | ||
# print("title = %s forenames = %s surname = %s honours = %s care_of = %s po_box = %s address_line_1 = %s address_line_2 = %s post_town = %s county = %s country = %s occupation = %s nationality = %s res_country = %s" % ( | ||
# title, forenames, surname, honours, care_of, po_box, address_line_1, address_line_2, post_town, county, country, occupation, nationality, res_country)) | ||
output_writer.writerow([ company_number, app_date_origin, appointment_type, person_number, | ||
corporate_indicator, appointment_date, resignation_date, postcode, partial_date_of_birth, full_date_of_birth, | ||
title, forenames, surname, honours, care_of, po_box, address_line_1, address_line_2, post_town, county, country, | ||
occupation, nationality, res_country | ||
]) | ||
|
||
def init_company_output_file(filename): | ||
output_companies_file = open(filename, 'w') | ||
companies_writer = csv.writer(output_companies_file, delimiter=",") | ||
companies_writer.writerow(["Company Number", "Company Status", "Number of Officers", "Company Name"]) | ||
return output_companies_file, companies_writer | ||
|
||
def init_person_output_file(filename): | ||
output_persons_file = open(filename, 'w') | ||
persons_writer = csv.writer(output_persons_file, delimiter=",") | ||
persons_writer.writerow(["Company Number", "App Date Origin", "Appointment Type", "Person number", "Corporate indicator", | ||
"Appointment Date", "Resignation Date", "Person Postcode", "Partial Date of Birth", "Full Date of Birth", "Title", | ||
"Forenames", "Surname", "Honours", "Care_of", "PO_box", "Address line 1", "Address line 2", "Post_town", "County", | ||
"Country", "Occupation", "Nationality", "Resident Country"]) | ||
return output_persons_file, persons_writer | ||
|
||
def init_input_files(output_folder, base_input_name): | ||
companies_output_filename = os.path.join(output_folder, COMPANIES_OUTPUT_FILENAME_TEMPLATE %(base_input_name)) | ||
persons_output_filename = os.path.join(output_folder, PERSONS_OUTPUT_FILENAME_TEMPLATE %(base_input_name)) | ||
PERSONS_OUTPUT_FILENAME_TEMPLATE | ||
print("Saving companies data to %s" % companies_output_filename) | ||
print("Saving persons data to %s" % persons_output_filename) | ||
output_companies_file, output_companies_writer = init_company_output_file(companies_output_filename) | ||
output_persons_file, output_persons_writer = init_person_output_file(persons_output_filename) | ||
return output_companies_file, output_companies_writer, output_persons_file, output_persons_writer | ||
|
||
def process_company_appointments_data(input_file, output_folder, base_input_name): | ||
companies_processed = 0 | ||
persons_processed = 0 | ||
output_companies_file, output_companies_writer, output_persons_file, output_persons_writer = init_input_files(output_folder, base_input_name) | ||
for row_num, row in enumerate(input_file): | ||
if row_num == 0: | ||
process_header_row(row) | ||
elif row[0:8] == TRAILER_RECORD_IDENTIFIER: | ||
# End of file | ||
record_count = int(row[8:16]) | ||
print("Reached end of file. Processed %s == %s records: %s companies, %s persons." % (record_count, | ||
companies_processed + persons_processed, companies_processed, persons_processed)) | ||
output_companies_file.close() | ||
output_persons_file.close() | ||
sys.exit(0) | ||
elif row[8] == COMPANY_RECORD_TYPE: | ||
process_company_row(row, output_companies_writer) | ||
companies_processed += 1 | ||
elif row[8] == PERSON_RECORD_TYPE: | ||
process_person_row(row, output_persons_writer) | ||
persons_processed += 1 | ||
|
||
if __name__ == '__main__': | ||
if len(sys.argv) < 3: | ||
print( | ||
'Usage: python process_company_appointments_data.py input_file output_folder\n', | ||
'E.g. python process_company_appointments_data.py Prod195_1111_ni_sample.dat ./output/' | ||
) | ||
sys.exit(1) | ||
input_filename = sys.argv[1] | ||
output_folder = sys.argv[2] | ||
input_file = open(input_filename, 'r') | ||
base_input_name = os.path.basename(input_filename) | ||
# Do not include the extension in the base input name | ||
base_input_name = os.path.splitext(base_input_name)[0] | ||
process_company_appointments_data(input_file, output_folder, base_input_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
import csv | ||
import os | ||
import sys | ||
|
||
PERSONS_OUTPUT_FILENAME_TEMPLATE = "persons_data_%s.csv" | ||
DISQUALIFICATIONS_FILENAME_TEMPLATE = "disqualifications_data_%s.csv" | ||
EXEMPTIONS_FILENAME_TEMPLATE = 'exemptions_data_%s.csv' | ||
SNAPSHOT_HEADER_IDENTIFIER = "DISQUALS" | ||
TRAILER_RECORD_IDENTIFIER = "DISQUALS" | ||
PERSON_RECORD_TYPE = '1' | ||
DISQUALIFICATION_RECORD_TYPE = '2' | ||
EXEMPTION_RECORD_TYPE = '3' | ||
|
||
|
||
def process_header_row(row): | ||
header_identifier = row[0:8] | ||
print(header_identifier) | ||
run_number = row[8:12] | ||
production_date = row[12:20] | ||
if header_identifier != SNAPSHOT_HEADER_IDENTIFIER: | ||
print( | ||
"Unsuported file type from header: '%s'. Expecting a snapshot header: '%s'" | ||
% (header_identifier, SNAPSHOT_HEADER_IDENTIFIER)) | ||
sys.exit(1) | ||
print("Processing snapshot file with run number %s from date %s" % | ||
(run_number, production_date)) | ||
|
||
|
||
def process_person_row(row, output_writer): | ||
record_type = row[0] | ||
person_number = row[1:12] | ||
person_dob = row[13:24] | ||
person_postcode = row[13:20] | ||
person_variable_ind = int(row[29:33]) | ||
person_details = row[33:33 + person_variable_ind] | ||
output_writer.writerow([ | ||
record_type, person_number, person_dob, person_postcode, person_details | ||
]) | ||
|
||
|
||
def process_disqualification_row(row, output_writer): | ||
record_type = row[0] | ||
person_number = row[1:13] | ||
disqual_start_date = row[13:22] | ||
disqual_end_date = row[21::28] | ||
section_of_act = row[29:49] | ||
disqual_type = row[49:79] | ||
disqual_order_date = row[79:87] | ||
case_number = row[87:117] | ||
company_name = row[117:277] | ||
court_name_variable_ind = int(row[277:279]) | ||
court_name = row[281:281 + court_name_variable_ind] | ||
output_writer.writerow([ | ||
record_type, person_number, disqual_start_date, disqual_end_date, | ||
section_of_act, disqual_type, disqual_order_date, case_number, | ||
company_name, court_name_variable_ind | ||
]) | ||
|
||
|
||
def process_exemption_row(row, output_writer): | ||
record_type = row[0] | ||
person_number = row[1:9] | ||
exemption_start_date = row[13:22] | ||
exemption_end_date = row[21:29] | ||
exemption_purpose = row[29:39] | ||
exemption_company_name_ind = int(row[39:43]) | ||
exemption_company_name = row[43:43 + exemption_company_name_ind] | ||
output_writer.writerow([ | ||
record_type, person_number, exemption_start_date, exemption_end_date, | ||
exemption_purpose, exemption_company_name | ||
]) | ||
|
||
|
||
def init_person_output_file(filename): | ||
output_persons_file = open(filename, 'w') | ||
persons_writer = csv.writer(output_persons_file, delimiter=",") | ||
persons_writer.writerow([ | ||
"record_type", "person_number", "person_dob", "person_postcode", | ||
"person_details" | ||
]) | ||
return output_persons_file, persons_writer | ||
|
||
|
||
def init_disquals_output_file(filename): | ||
output_disquals_file = open(filename, 'w') | ||
disqauls_writer = csv.writer(output_disquals_file, delimiter=",") | ||
disqauls_writer.writerow([ | ||
"record_type", "person_number", "disqual_start_date", | ||
"disqual_end_date", "section_of_act", "disqual_type", | ||
"disqual_order_date", "case_number", "company_name", "court_name" | ||
]) | ||
return output_disquals_file, disqauls_writer | ||
|
||
|
||
def init_exemptions_output_file(filename): | ||
output_exemptions_file = open(filename, 'w') | ||
exemptions_writer = csv.writer(output_exemptions_file, delimiter=",") | ||
exemptions_writer.writerow([ | ||
"record_type", "person_number", "exemption_start_date", | ||
"exemption_end_date", "exemption_purpose", "exemption_company_name" | ||
]) | ||
return output_exemptions_file, exemptions_writer | ||
|
||
|
||
def init_input_files(output_folder, base_input_name): | ||
persons_output_filename = os.path.join( | ||
output_folder, PERSONS_OUTPUT_FILENAME_TEMPLATE % (base_input_name)) | ||
disquals_output_filename = os.path.join( | ||
output_folder, DISQUALIFICATIONS_FILENAME_TEMPLATE % (base_input_name)) | ||
exemptions_output_filename = os.path.join( | ||
output_folder, EXEMPTIONS_FILENAME_TEMPLATE % (base_input_name)) | ||
print("Saving companies data to %s" % persons_output_filename) | ||
print("Saving persons data to %s" % disquals_output_filename) | ||
print("Saving persons data to %s" % exemptions_output_filename) | ||
output_persons_file, output_persons_writer = init_person_output_file( | ||
persons_output_filename) | ||
output_disquals_file, output_disquals_writer = init_disquals_output_file( | ||
disquals_output_filename) | ||
output_exemptions_file, output_exemptions_writer = init_exemptions_output_file( | ||
exemptions_output_filename) | ||
return output_persons_file, output_persons_writer, output_disquals_file, output_disquals_writer, output_exemptions_file, output_exemptions_writer | ||
|
||
|
||
def process_company_appointments_data(input_file, output_folder, | ||
base_input_name): | ||
persons_processed = 0 | ||
disquals_processed = 0 | ||
exemptions_processed = 0 | ||
output_persons_file, output_persons_writer, output_disquals_file, output_disquals_writer, output_exemptions_file, output_exemptions_writer = init_input_files( | ||
output_folder, base_input_name) | ||
for row_num, row in enumerate(input_file): | ||
if row_num == 0: | ||
process_header_row(row) | ||
elif row[0:8] == TRAILER_RECORD_IDENTIFIER: | ||
# End of file | ||
record_count = int(row[45:53]) | ||
print( | ||
"Reached end of file. Processed %s == %s records: %s persons, %s disquals, %s exemptions." | ||
% (record_count, persons_processed + disquals_processed + | ||
exemptions_processed, persons_processed, disquals_processed, | ||
exemptions_processed)) | ||
output_persons_file.close() | ||
output_disquals_file.close() | ||
output_exemptions_file.close() | ||
sys.exit(0) | ||
elif row[0] == PERSON_RECORD_TYPE: | ||
process_person_row(row, output_persons_writer) | ||
persons_processed += 1 | ||
elif row[0] == DISQUALIFICATION_RECORD_TYPE: | ||
process_disqualification_row(row, output_disquals_writer) | ||
disquals_processed += 1 | ||
elif row[0] == EXEMPTION_RECORD_TYPE: | ||
process_exemption_row(row, output_exemptions_writer) | ||
exemptions_processed += 1 | ||
|
||
|
||
if __name__ == '__main__': | ||
if len(sys.argv) < 3: | ||
print( | ||
'Usage: python process_disqualified_directors_data.py input_file output_folder\n', | ||
'E.g. python process_disqualified_directors_data.py Prod195_1111_ni_sample.dat ./output/' | ||
) | ||
sys.exit(1) | ||
input_filename = sys.argv[1] | ||
output_folder = sys.argv[2] | ||
input_file = open(input_filename, 'r') | ||
base_input_name = os.path.basename(input_filename) | ||
# Do not include the extension in the base input name | ||
base_input_name = os.path.splitext(base_input_name)[0] | ||
process_company_appointments_data(input_file, output_folder, | ||
base_input_name) |