diff --git a/app/Jobs/ReferendumBucuresti241124/Turnouts/FetchTurnoutsJob.php b/app/Jobs/ReferendumBucuresti241124/Turnouts/FetchTurnoutsJob.php new file mode 100644 index 0000000..c846e25 --- /dev/null +++ b/app/Jobs/ReferendumBucuresti241124/Turnouts/FetchTurnoutsJob.php @@ -0,0 +1,91 @@ +deleteWhenDestroyed(); + + $cwd = $temporaryDirectory->path(); + + $tmpDisk = Storage::build([ + 'driver' => 'local', + 'root' => $cwd, + ]); + + $tmpDisk->put('turnout.csv', $this->scheduledJob->fetchSource()->resource()); + + // Split the CSV by county + Process::path($cwd) + ->run([ + config('import.awk_path'), + '-F,', + 'FNR==1 {header = $0; next} !seen[$1]++ {print header > $1".csv"} {print > $1".csv"}', + 'turnout.csv', + ]); + + $tmpDisk->delete('turnout.csv'); + + collect($tmpDisk->allFiles()) + ->each(function (string $file) use ($tmpDisk) { + $this->scheduledJob->disk() + ->writeStream( + $this->scheduledJob->getSourcePath($file), + $tmpDisk->readStream($file) + ); + }); + + $electionName = $this->scheduledJob->election->getFilamentName(); + $electionId = $this->scheduledJob->election_id; + + $time = now()->toDateTimeString(); + + $persistAndClean = fn () => Bus::chain([ + new PersistTemporaryTableData(Turnout::class, $electionId), + new DeleteTemporaryTableData(Turnout::class, $electionId), + ])->dispatch(); + + Bus::batch([new ImportTurnoutsJob($this->scheduledJob, County::find(403))]) + ->catch($persistAndClean) + ->then($persistAndClean) + ->name("$electionName / Prezență / $time") + ->allowFailures() + ->dispatch(); + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'turnout', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + static::name(), + ]; + } +} diff --git a/app/Jobs/ReferendumBucuresti241124/Turnouts/ImportTurnoutsJob.php b/app/Jobs/ReferendumBucuresti241124/Turnouts/ImportTurnoutsJob.php new file mode 100644 index 0000000..af023dc --- /dev/null +++ b/app/Jobs/ReferendumBucuresti241124/Turnouts/ImportTurnoutsJob.php @@ -0,0 +1,106 @@ +scheduledJob = $scheduledJob; + $this->county = $county; + } + + public function handle(): void + { + $disk = $this->scheduledJob->disk(); + $path = $this->scheduledJob->getSourcePath("{$this->county->code}.csv"); + + if (! $disk->exists($path)) { + throw new MissingSourceFileException($path); + } + + $reader = Reader::createFromStream($disk->readStream($path)); + $reader->setHeaderOffset(0); + + $values = collect(); + + $segments = Turnout::segmentsMap(); + + $records = $reader->getRecords(); + foreach ($records as $record) { + $values->push([ + 'election_id' => $this->scheduledJob->election_id, + 'county_id' => $this->county->id, + 'locality_id' => $record['Siruta'], + 'section' => $record['Nr sectie de votare'], + + 'initial_permanent' => $record['Înscriși pe liste permanente'], + 'initial_complement' => 0, + 'permanent' => $record['LP'], + 'complement' => 0, // no complementary lists for this referendum + 'supplement' => $record['LS'], + 'mobile' => $record['UM'], + + 'area' => $record['Mediu'], + 'has_issues' => $this->determineIfHasIssues($record), + + ...$segments->map(fn (string $segment) => $record[$segment]), + ]); + } + + Turnout::saveToTemporaryTable($values->all()); + } + + protected function determineIfHasIssues(array $record): bool + { + $computedTotal = collect(['LP', 'LS', 'UM']) + ->map(fn (string $key) => $record[$key]) + ->sum(); + + if ($computedTotal !== $record['LT']) { + return true; + } + + return false; + } + + /** + * Get the tags that should be assigned to the job. + * + * @return array + */ + public function tags(): array + { + return [ + 'import', + 'turnout', + 'scheduled_job:' . $this->scheduledJob->id, + 'election:' . $this->scheduledJob->election_id, + 'county:' . $this->county->code, + ]; + } +}