From 13aa9378a35544bd2a2b008fc763c4acc310b15f Mon Sep 17 00:00:00 2001 From: Lupu Gheorghe <46172059+gheorghelupu17@users.noreply.github.com> Date: Fri, 29 Nov 2024 21:50:40 +0200 Subject: [PATCH] add import for tournaout (#112) --- .../Records/FetchRecordsJob.php | 152 ++++++++++++++++++ .../Records/ImportAbroadRecordsJob.php | 136 ++++++++++++++++ .../ImportCorrespondenceRecordsJob.php | 121 ++++++++++++++ .../Records/ImportCountyRecordsJob.php | 126 +++++++++++++++ .../Turnouts/FetchTurnoutsJob.php | 99 ++++++++++++ .../Turnouts/ImportAbroadTurnoutsJob.php | 118 ++++++++++++++ .../Turnouts/ImportCountyTurnoutsJob.php | 106 ++++++++++++ 7 files changed, 858 insertions(+) create mode 100644 app/Jobs/Parlamentare01122024/Records/FetchRecordsJob.php create mode 100644 app/Jobs/Parlamentare01122024/Records/ImportAbroadRecordsJob.php create mode 100644 app/Jobs/Parlamentare01122024/Records/ImportCorrespondenceRecordsJob.php create mode 100644 app/Jobs/Parlamentare01122024/Records/ImportCountyRecordsJob.php create mode 100644 app/Jobs/Parlamentare01122024/Turnouts/FetchTurnoutsJob.php create mode 100644 app/Jobs/Parlamentare01122024/Turnouts/ImportAbroadTurnoutsJob.php create mode 100644 app/Jobs/Parlamentare01122024/Turnouts/ImportCountyTurnoutsJob.php diff --git a/app/Jobs/Parlamentare01122024/Records/FetchRecordsJob.php b/app/Jobs/Parlamentare01122024/Records/FetchRecordsJob.php new file mode 100644 index 0000000..2983b60 --- /dev/null +++ b/app/Jobs/Parlamentare01122024/Records/FetchRecordsJob.php @@ -0,0 +1,152 @@ +deleteWhenDestroyed(); + + $cwd = $temporaryDirectory->path(); + + $tmpDisk = Storage::build([ + 'driver' => 'local', + 'root' => $cwd, + ]); + + $tmpDisk->put('turnout.csv', $this->scheduledJob->fetchSource()->resource()); + + // Split the CSV by county + Process::path($cwd) + ->run([ + config('import.awk_path'), + '-F,', + 'FNR==1 {header = $0; next} !seen[$1]++ {print header > $1".csv"} {print > $1".csv"}', + 'turnout.csv', + ]); + + $tmpDisk->delete('turnout.csv'); + + collect($tmpDisk->allFiles()) + ->each(function (string $file) use ($tmpDisk) { + $this->scheduledJob->disk() + ->writeStream( + $this->scheduledJob->getSourcePath($file), + $tmpDisk->readStream($file) + ); + }); + + $electionName = $this->scheduledJob->election->getFilamentName(); + $electionId = $this->scheduledJob->election_id; + + $time = now()->toDateTimeString(); + + $sourceFiles = collect([ + '1' => 'AB', + '2' => 'AR', + '3' => 'AG', + '4' => 'BC', + '5' => 'BH', + '6' => 'BN', + '7' => 'BT', + '8' => 'BV', + '9' => 'BR', + '10' => 'BZ', + '11' => 'CS', + '12' => 'CL', + '13' => 'CJ', + '14' => 'CT', + '15' => 'CV', + '16' => 'DB', + '17' => 'DJ', + '18' => 'GL', + '19' => 'GR', + '20' => 'GJ', + '21' => 'HR', + '22' => 'HD', + '23' => 'IL', + '24' => 'IS', + '25' => 'IF', + '26' => 'MM', + '27' => 'MH', + '28' => 'MS', + '29' => 'NT', + '30' => 'OT', + '31' => 'PH', + '32' => 'SM', + '33' => 'SJ', + '34' => 'SB', + '35' => 'SV', + '36' => 'TR', + '37' => 'TM', + '38' => 'TL', + '39' => 'VS', + '40' => 'VL', + '41' => 'VN', + + '44' => 'B', + '45' => 'B', + '46' => 'B', + '47' => 'B', + '48' => 'B', + '49' => 'B', + ]); + + $jobs = $sourceFiles + ->map(fn (string $countyCode, string $filename) => new ImportCountyRecordsJob($this->scheduledJob, $countyCode, $filename)) + ->push(new ImportAbroadRecordsJob($this->scheduledJob)); + + $persistAndClean = fn () => Bus::chain([ + new PersistTemporaryTableData(Record::class, $electionId), + new DeleteTemporaryTableData(Record::class, $electionId), + + new PersistTemporaryTableData(Vote::class, $electionId), + new DeleteTemporaryTableData(Vote::class, $electionId), + ])->dispatch(); + + Bus::batch($jobs) + ->catch($persistAndClean) + ->then($persistAndClean) + ->then(fn () => UpdateElectionRecordsTimestamp::dispatch($electionId)) + ->name("$electionName / Rezultate / $time") + ->allowFailures() + ->dispatch(); + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'records', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + static::name(), + ]; + } +} diff --git a/app/Jobs/Parlamentare01122024/Records/ImportAbroadRecordsJob.php b/app/Jobs/Parlamentare01122024/Records/ImportAbroadRecordsJob.php new file mode 100644 index 0000000..087e13c --- /dev/null +++ b/app/Jobs/Parlamentare01122024/Records/ImportAbroadRecordsJob.php @@ -0,0 +1,136 @@ +scheduledJob = $scheduledJob; + } + + public function handle(): void + { + $disk = $this->scheduledJob->disk(); + $path = $this->scheduledJob->getSourcePath('43.csv'); + + if (! $disk->exists($path)) { + throw new MissingSourceFileException($path); + } + + $reader = Reader::createFromStream($disk->readStream($path)); + $reader->setHeaderOffset(0); + + $records = collect(); + + $votables = RecordService::generateVotables( + $reader->getHeader(), + $this->scheduledJob->election_id + ); + + foreach ($reader->getRecords() as $row) { + try { + $countryId = $this->getCountryId($row['uat_name']); + + $part = RecordService::getPart($row['report_stage_code']); + + $records->push([ + 'election_id' => $this->scheduledJob->election_id, + 'country_id' => $countryId, + 'section' => $row['precinct_nr'], + 'part' => $part, + + 'eligible_voters_permanent' => $row['a'], + 'eligible_voters_special' => 0, + + 'present_voters_permanent' => $row['b1'], + 'present_voters_special' => $row['b2'], + 'present_voters_supliment' => $row['b3'], + 'present_voters_mail' => 0, //$row['b4'], + + 'votes_valid' => $row['c'], + 'votes_null' => $row['d'], + + 'papers_received' => $row['e'], + 'papers_unused' => $row['f'], + + 'has_issues' => false, + ]); + + $votes = collect(); + foreach ($votables as $column => $votable) { + $votes->push([ + 'election_id' => $this->scheduledJob->election_id, + 'country_id' => $countryId, + 'section' => $row['precinct_nr'], + 'part' => $part, + + 'votable_type' => $votable['votable_type'], + 'votable_id' => $votable['votable_id'], + + 'votes' => $row[$column], + ]); + } + + Vote::saveToTemporaryTable($votes->all()); + } catch (CountryCodeNotFoundException $th) { + CountryCodeNotFound::dispatch($row['uat_name'], $this->scheduledJob->election); + } + } + + Record::saveToTemporaryTable($records->all()); + } + + protected function getCountryId(string $name): string + { + $country = Country::search($name)->first(); + + if (! $country) { + throw new CountryCodeNotFoundException($name); + } + + return $country->id; + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'records', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + 'abroad', + ]; + } +} diff --git a/app/Jobs/Parlamentare01122024/Records/ImportCorrespondenceRecordsJob.php b/app/Jobs/Parlamentare01122024/Records/ImportCorrespondenceRecordsJob.php new file mode 100644 index 0000000..a5eb40e --- /dev/null +++ b/app/Jobs/Parlamentare01122024/Records/ImportCorrespondenceRecordsJob.php @@ -0,0 +1,121 @@ +scheduledJob->disk(); + + $disk->put('correspondence.csv', $this->scheduledJob->fetchSource()->resource()); + + $reader = Reader::createFromStream($disk->readStream('correspondence.csv')); + $reader->setHeaderOffset(0); + + $records = collect(); + + $votables = RecordService::generateVotables( + $reader->getHeader(), + $this->scheduledJob->election_id + ); + + foreach ($reader->getRecords() as $row) { + try { + $countryId = $this->getCountryId($row['uat_name']); + + $part = RecordService::getPart($row['report_stage_code']); + + $records->push([ + 'election_id' => $this->scheduledJob->election_id, + 'country_id' => $countryId, + 'section' => $row['precinct_nr'], + 'part' => $part, + + 'eligible_voters_permanent' => $row['a'], + 'eligible_voters_special' => 0, + + 'present_voters_permanent' => 0, + 'present_voters_special' => 0, + 'present_voters_supliment' => 0, + 'present_voters_mail' => $row['c'], + + 'votes_valid' => $row['d1'], + 'votes_null' => $row['d2'], + + 'papers_received' => 0, + 'papers_unused' => 0, + + 'has_issues' => false, + ]); + + $votes = collect(); + foreach ($votables as $column => $votable) { + $votes->push([ + 'election_id' => $this->scheduledJob->election_id, + 'country_id' => $countryId, + 'section' => $row['precinct_nr'], + 'part' => $part, + + 'votable_type' => $votable['votable_type'], + 'votable_id' => $votable['votable_id'], + + 'votes' => $row[$column], + ]); + } + + Vote::saveToTemporaryTable($votes->all()); + } catch (CountryCodeNotFoundException $th) { + CountryCodeNotFound::dispatch($row['uat_name'], $this->scheduledJob->election); + } + } + + Record::saveToTemporaryTable($records->all()); + } + + protected function getCountryId(string $name): string + { + $country = Country::search($name)->first(); + + if (! $country) { + throw new CountryCodeNotFoundException($name); + } + + return $country->id; + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'records', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + 'correspondence', + ]; + } +} diff --git a/app/Jobs/Parlamentare01122024/Records/ImportCountyRecordsJob.php b/app/Jobs/Parlamentare01122024/Records/ImportCountyRecordsJob.php new file mode 100644 index 0000000..3a9a0e8 --- /dev/null +++ b/app/Jobs/Parlamentare01122024/Records/ImportCountyRecordsJob.php @@ -0,0 +1,126 @@ +scheduledJob = $scheduledJob; + $this->county = County::where('code', $countyCode)->first(); + $this->filename = $filename; + } + + public function handle(): void + { + $disk = $this->scheduledJob->disk(); + $path = $this->scheduledJob->getSourcePath("{$this->filename}.csv"); + + if (! $disk->exists($path)) { + throw new MissingSourceFileException($path); + } + + $reader = Reader::createFromStream($disk->readStream($path)); + $reader->setHeaderOffset(0); + + $records = collect(); + + $votables = RecordService::generateVotables( + $reader->getHeader(), + $this->scheduledJob->election_id + ); + + foreach ($reader->getRecords() as $row) { + $part = RecordService::getPart($row['report_stage_code']); + + $records->push([ + 'election_id' => $this->scheduledJob->election_id, + 'county_id' => $this->county->id, + 'locality_id' => $row['uat_siruta'], + 'section' => $row['precinct_nr'], + 'part' => $part, + + 'eligible_voters_permanent' => $row['a'], + 'eligible_voters_special' => 0, + + 'present_voters_permanent' => $row['b1'], + 'present_voters_special' => $row['b2'], + 'present_voters_supliment' => $row['b3'], + 'present_voters_mail' => 0, //$row['b4'], + + 'votes_valid' => $row['c'], + 'votes_null' => $row['d'], + + 'papers_received' => $row['e'], + 'papers_unused' => $row['f'], + + 'has_issues' => false, + ]); + + $votes = collect(); + + foreach ($votables as $column => $votable) { + $votes->push([ + 'election_id' => $this->scheduledJob->election_id, + 'county_id' => $this->county->id, + 'locality_id' => $row['uat_siruta'], + 'section' => $row['precinct_nr'], + 'part' => $part, + + 'votable_type' => $votable['votable_type'], + 'votable_id' => $votable['votable_id'], + + 'votes' => $row[$column], + ]); + } + + Vote::saveToTemporaryTable($votes->all()); + } + + Record::saveToTemporaryTable($records->all()); + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'records', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + 'county:' . $this->county->code, + ]; + } +} diff --git a/app/Jobs/Parlamentare01122024/Turnouts/FetchTurnoutsJob.php b/app/Jobs/Parlamentare01122024/Turnouts/FetchTurnoutsJob.php new file mode 100644 index 0000000..3b20df7 --- /dev/null +++ b/app/Jobs/Parlamentare01122024/Turnouts/FetchTurnoutsJob.php @@ -0,0 +1,99 @@ +deleteWhenDestroyed(); + + $cwd = $temporaryDirectory->path(); + + $tmpDisk = Storage::build([ + 'driver' => 'local', + 'root' => $cwd, + ]); + + $tmpDisk->put('turnout.csv', $this->scheduledJob->fetchSource()->resource()); + + // Split the CSV by county + Process::path($cwd) + ->run([ + config('import.awk_path'), + '-F,', + 'FNR==1 {header = $0; next} !seen[$1]++ {print header > $1".csv"} {print > $1".csv"}', + 'turnout.csv', + ]); + + $tmpDisk->delete('turnout.csv'); + + collect($tmpDisk->allFiles()) + ->each(function (string $file) use ($tmpDisk) { + $this->scheduledJob->disk() + ->writeStream( + $this->scheduledJob->getSourcePath($file), + $tmpDisk->readStream($file) + ); + }); + + $counties = County::all(); + + $electionName = $this->scheduledJob->election->getFilamentName(); + $electionId = $this->scheduledJob->election_id; + + $time = now()->toDateTimeString(); + + $jobs = $counties + ->map(fn (County $county) => new ImportCountyTurnoutsJob($this->scheduledJob, $county)) + ->push(new ImportAbroadTurnoutsJob($this->scheduledJob)); + + $persistAndClean = fn () => Bus::chain([ + new PersistTemporaryTableData(Turnout::class, $electionId), + new DeleteTemporaryTableData(Turnout::class, $electionId), + ])->dispatch(); + + Bus::batch($jobs) + ->catch($persistAndClean) + ->then($persistAndClean) + ->then(fn () => UpdateElectionTurnoutsTimestamp::dispatch($electionId)) + ->name("$electionName / Prezență / $time") + ->allowFailures() + ->dispatch(); + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'turnout', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + static::name(), + ]; + } +} diff --git a/app/Jobs/Parlamentare01122024/Turnouts/ImportAbroadTurnoutsJob.php b/app/Jobs/Parlamentare01122024/Turnouts/ImportAbroadTurnoutsJob.php new file mode 100644 index 0000000..681e476 --- /dev/null +++ b/app/Jobs/Parlamentare01122024/Turnouts/ImportAbroadTurnoutsJob.php @@ -0,0 +1,118 @@ +scheduledJob = $scheduledJob; + } + + public function handle(): void + { + $disk = $this->scheduledJob->disk(); + $path = $this->scheduledJob->getSourcePath('SR.csv'); + + if (! $disk->exists($path)) { + throw new MissingSourceFileException($path); + } + + $reader = Reader::createFromStream($disk->readStream($path)); + $reader->setHeaderOffset(0); + + $values = collect(); + + $segments = Turnout::segmentsMap(); + + foreach ($reader->getRecords() as $record) { + try { + $values->push([ + 'election_id' => $this->scheduledJob->election_id, + 'country_id' => $this->getCountryId($record['UAT']), + 'section' => $record['Nr sectie de votare'], + + 'initial_permanent' => $record['Înscriși pe liste permanente'], + 'initial_complement' => 0, + 'permanent' => $record['LP'], + 'complement' => $record['LSC'], + 'supplement' => $record['LS'], + 'mobile' => $record['UM'], + + 'area' => $record['Mediu'], + 'has_issues' => $this->determineIfHasIssues($record), + + ...$segments->map(fn (string $segment) => $record[$segment]), + ]); + } catch (CountryCodeNotFoundException $th) { + CountryCodeNotFound::dispatch($record['UAT'], $this->scheduledJob->election); + } + } + + Turnout::saveToTemporaryTable($values->all()); + } + + protected function determineIfHasIssues(array $record): bool + { + $computedTotal = collect(['LP', 'LSC', 'LS', 'UM']) + ->map(fn (string $key) => $record[$key]) + ->sum(); + + if ($computedTotal !== $record['LT']) { + return true; + } + + return false; + } + + protected function getCountryId(string $name): string + { + $country = Country::search($name)->first(); + + if (! $country) { + throw new CountryCodeNotFoundException($name); + } + + return $country->id; + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'turnout', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + 'abroad', + ]; + } +} diff --git a/app/Jobs/Parlamentare01122024/Turnouts/ImportCountyTurnoutsJob.php b/app/Jobs/Parlamentare01122024/Turnouts/ImportCountyTurnoutsJob.php new file mode 100644 index 0000000..5f2800e --- /dev/null +++ b/app/Jobs/Parlamentare01122024/Turnouts/ImportCountyTurnoutsJob.php @@ -0,0 +1,106 @@ +scheduledJob = $scheduledJob; + $this->county = $county; + } + + public function handle(): void + { + $disk = $this->scheduledJob->disk(); + $path = $this->scheduledJob->getSourcePath("{$this->county->code}.csv"); + + if (! $disk->exists($path)) { + throw new MissingSourceFileException($path); + } + + $reader = Reader::createFromStream($disk->readStream($path)); + $reader->setHeaderOffset(0); + + $values = collect(); + + $segments = Turnout::segmentsMap(); + + $records = $reader->getRecords(); + foreach ($records as $record) { + $values->push([ + 'election_id' => $this->scheduledJob->election_id, + 'county_id' => $this->county->id, + 'locality_id' => $record['Siruta'], + 'section' => $record['Nr sectie de votare'], + + 'initial_permanent' => $record['Înscriși pe liste permanente'], + 'initial_complement' => 0, + 'permanent' => $record['LP'], + 'complement' => $record['LSC'], + 'supplement' => $record['LS'], + 'mobile' => $record['UM'], + + 'area' => $record['Mediu'], + 'has_issues' => $this->determineIfHasIssues($record), + + ...$segments->map(fn (string $segment) => $record[$segment]), + ]); + } + + Turnout::saveToTemporaryTable($values->all()); + } + + protected function determineIfHasIssues(array $record): bool + { + $computedTotal = collect(['LP', 'LSC', 'LS', 'UM']) + ->map(fn (string $key) => $record[$key]) + ->sum(); + + if ($computedTotal !== $record['LT']) { + return true; + } + + return false; + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'turnout', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + 'county:' . $this->county->code, + ]; + } +}