-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
--------- Co-authored-by: Andrei Ioniță <[email protected]>
- Loading branch information
1 parent
890274e
commit 478f6c2
Showing
2 changed files
with
197 additions
and
0 deletions.
There are no files selected for viewing
91 changes: 91 additions & 0 deletions
91
app/Jobs/ReferendumBucuresti241124/Turnouts/FetchTurnoutsJob.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace App\Jobs\ReferendumBucuresti241124\Turnouts; | ||
|
||
use App\Jobs\DeleteTemporaryTableData; | ||
use App\Jobs\PersistTemporaryTableData; | ||
use App\Jobs\SchedulableJob; | ||
use App\Models\County; | ||
use App\Models\Turnout; | ||
use Illuminate\Support\Facades\Bus; | ||
use Illuminate\Support\Facades\Process; | ||
use Illuminate\Support\Facades\Storage; | ||
use Spatie\TemporaryDirectory\TemporaryDirectory; | ||
|
||
class FetchTurnoutsJob extends SchedulableJob | ||
{ | ||
public static function name(): string | ||
{ | ||
return 'Referendum Bucuresti 24.11.2024 / Prezență'; | ||
} | ||
|
||
public function execute(): void | ||
{ | ||
$temporaryDirectory = TemporaryDirectory::make() | ||
->deleteWhenDestroyed(); | ||
|
||
$cwd = $temporaryDirectory->path(); | ||
|
||
$tmpDisk = Storage::build([ | ||
'driver' => 'local', | ||
'root' => $cwd, | ||
]); | ||
|
||
$tmpDisk->put('turnout.csv', $this->scheduledJob->fetchSource()->resource()); | ||
|
||
// Split the CSV by county | ||
Process::path($cwd) | ||
->run([ | ||
config('import.awk_path'), | ||
'-F,', | ||
'FNR==1 {header = $0; next} !seen[$1]++ {print header > $1".csv"} {print > $1".csv"}', | ||
'turnout.csv', | ||
]); | ||
|
||
$tmpDisk->delete('turnout.csv'); | ||
|
||
collect($tmpDisk->allFiles()) | ||
->each(function (string $file) use ($tmpDisk) { | ||
$this->scheduledJob->disk() | ||
->writeStream( | ||
$this->scheduledJob->getSourcePath($file), | ||
$tmpDisk->readStream($file) | ||
); | ||
}); | ||
|
||
$electionName = $this->scheduledJob->election->getFilamentName(); | ||
$electionId = $this->scheduledJob->election_id; | ||
|
||
$time = now()->toDateTimeString(); | ||
|
||
$persistAndClean = fn () => Bus::chain([ | ||
new PersistTemporaryTableData(Turnout::class, $electionId), | ||
new DeleteTemporaryTableData(Turnout::class, $electionId), | ||
])->dispatch(); | ||
|
||
Bus::batch([new ImportTurnoutsJob($this->scheduledJob, County::find(403))]) | ||
->catch($persistAndClean) | ||
->then($persistAndClean) | ||
->name("$electionName / Prezență / $time") | ||
->allowFailures() | ||
->dispatch(); | ||
} | ||
|
||
/** | ||
* Get the tags that should be assigned to the job. | ||
* | ||
* @return array<int, string> | ||
*/ | ||
public function tags(): array | ||
{ | ||
return [ | ||
'import', | ||
'turnout', | ||
'scheduled_job:' . $this->scheduledJob->id, | ||
'election:' . $this->scheduledJob->election_id, | ||
static::name(), | ||
]; | ||
} | ||
} |
106 changes: 106 additions & 0 deletions
106
app/Jobs/ReferendumBucuresti241124/Turnouts/ImportTurnoutsJob.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace App\Jobs\ReferendumBucuresti241124\Turnouts; | ||
|
||
use App\Exceptions\MissingSourceFileException; | ||
use App\Models\County; | ||
use App\Models\ScheduledJob; | ||
use App\Models\Turnout; | ||
use Illuminate\Bus\Batchable; | ||
use Illuminate\Bus\Queueable; | ||
use Illuminate\Contracts\Queue\ShouldQueue; | ||
use Illuminate\Foundation\Bus\Dispatchable; | ||
use Illuminate\Queue\InteractsWithQueue; | ||
use Illuminate\Queue\SerializesModels; | ||
use League\Csv\Reader; | ||
|
||
class ImportTurnoutsJob implements ShouldQueue | ||
{ | ||
use Batchable; | ||
use Dispatchable; | ||
use InteractsWithQueue; | ||
use Queueable; | ||
use SerializesModels; | ||
|
||
public ScheduledJob $scheduledJob; | ||
|
||
public County $county; | ||
|
||
public function __construct(ScheduledJob $scheduledJob, County $county) | ||
{ | ||
$this->scheduledJob = $scheduledJob; | ||
$this->county = $county; | ||
} | ||
|
||
public function handle(): void | ||
{ | ||
$disk = $this->scheduledJob->disk(); | ||
$path = $this->scheduledJob->getSourcePath("{$this->county->code}.csv"); | ||
|
||
if (! $disk->exists($path)) { | ||
throw new MissingSourceFileException($path); | ||
} | ||
|
||
$reader = Reader::createFromStream($disk->readStream($path)); | ||
$reader->setHeaderOffset(0); | ||
|
||
$values = collect(); | ||
|
||
$segments = Turnout::segmentsMap(); | ||
|
||
$records = $reader->getRecords(); | ||
foreach ($records as $record) { | ||
$values->push([ | ||
'election_id' => $this->scheduledJob->election_id, | ||
'county_id' => $this->county->id, | ||
'locality_id' => $record['Siruta'], | ||
'section' => $record['Nr sectie de votare'], | ||
|
||
'initial_permanent' => $record['Înscriși pe liste permanente'], | ||
'initial_complement' => 0, | ||
'permanent' => $record['LP'], | ||
'complement' => 0, // no complementary lists for this referendum | ||
'supplement' => $record['LS'], | ||
'mobile' => $record['UM'], | ||
|
||
'area' => $record['Mediu'], | ||
'has_issues' => $this->determineIfHasIssues($record), | ||
|
||
...$segments->map(fn (string $segment) => $record[$segment]), | ||
]); | ||
} | ||
|
||
Turnout::saveToTemporaryTable($values->all()); | ||
} | ||
|
||
protected function determineIfHasIssues(array $record): bool | ||
{ | ||
$computedTotal = collect(['LP', 'LS', 'UM']) | ||
->map(fn (string $key) => $record[$key]) | ||
->sum(); | ||
|
||
if ($computedTotal !== $record['LT']) { | ||
return true; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
/** | ||
* Get the tags that should be assigned to the job. | ||
* | ||
* @return array<int, string> | ||
*/ | ||
public function tags(): array | ||
{ | ||
return [ | ||
'import', | ||
'turnout', | ||
'scheduled_job:' . $this->scheduledJob->id, | ||
'election:' . $this->scheduledJob->election_id, | ||
'county:' . $this->county->code, | ||
]; | ||
} | ||
} |