forked from DEIB-GECO/Metadata-Manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDbContainer.scala
1347 lines (1156 loc) · 51.8 KB
/
DbContainer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package it.polimi.genomics.metadata.database
import it.polimi.genomics.metadata.step.utils.ParameterUtil
import org.joda.time.DateTime
import org.slf4j.{Logger, LoggerFactory}
import slick.dbio.Effect.Write
import slick.driver.PostgresDriver
import slick.driver.PostgresDriver.api._
import slick.jdbc.meta.MTable
import slick.sql.FixedSqlAction
import scala.concurrent.Await
import scala.concurrent.duration.Duration
/**
* File table case class
*
*/
case class File(id: Option[Int], datasetId: Int, url: String, name: String, stage: String, status: String, size: String, lastUpdate: String, hash: String,
originSize: String, originLastUpdate: String, dateProcessed: String, copyNumber: Int)
/**
* Created by Nacho on 11/11/16.
* dbContainer is the interface between the h2 database and the file logger.
* for the development of GMQLImporter all the asynchronous operations in the database are done
* synchronous waiting them to finish.
*
* How to use it properly:
* 1- Instantiate the class and set it up by giving the database path in the setDatabase method.
* This has to be done at the beginning of the usage of the database, same instantiation should be used
* for all the sources and datasets contained in the same xml configuration file.
*
* 2- For getting the source/dataset/file IDs sourceId/datasetId/fileId are provided.
*
* 3- For providing feedback about which files are outdated, before starting checking if the files of a dataset
* have to be updated (checkIfUpdate), mark the dataset to be compared with the markToCompare method.
*
* 4- For each file, the checkIfUpdate(...) method will indicate if that file has to be updated.
* 4.1- After you update the file (if you have to) use markAsUpdated(...) method to indicate that was updated.
* 4.2- If the update process was not correct, use markAsFailed(...) method to indicate that was an error.
*
* 5- After modifications have been done and comparisons for the dataset have finished, use method markAsOutdated()
* to identify and mark the outdated files, not using this method can lead to inconsistency.
*
* 6- You can get lists of files to be processed getFilesToProcess() method. Returns files with "UPDATE" status.
*
* 7- After you use the information in the log (as example, when the transformer uses info generated by downloader)
* log it by using the method markAsProcessed().
*/
case class DbContainer() {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
//------------------------------------------------testing space-------------------------------------------------------
//here print of run parameters and all the xmlConfigurationFile could be done.
//-------------------------------------DATABASE DEFINITION------------------------------------------------------------
var database: PostgresDriver.backend.DatabaseDef = _
//------------------------------------------LOG PRINTING--------------------------------------------------------------
/**
* prints the log for dataset downloaded files for a specified run
*
* @param runDatasetId identifies of the rundataset.
* @param stage indicates if log corresponds to download or transformation.
*/
def printRunDatasetLog(runDatasetId: Int, stage: Stage.Value): Unit = {
val query = (for (f <- runDatasetLogs.filter(f => f.runDatasetId === runDatasetId && f.stage === stage.toString))
yield (f.totalFiles, f.downloadedFiles)).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
var totalFiles = 0
var downloadedFiles = 0
result.foreach(log => {
totalFiles = totalFiles + log._1
downloadedFiles = downloadedFiles + log._2
})
val datasetQuery = runDatasets.filter(_.id === runDatasetId).result
val datasetExecution = database.run(datasetQuery)
val datasetResult = Await.result(datasetExecution, Duration.Inf)
if (datasetResult.nonEmpty) {
val datasetId = datasetResult.head._3
val datasetNameQuery = (for (f <- datasets.filter(f => f.id === datasetId)) yield f.name).result
val datasetNameExecution = database.run(datasetNameQuery)
val datasetNameResult = Await.result(datasetNameExecution, Duration.Inf)
val datasetName = datasetNameResult.head
logger.info(s"\tDataset $datasetName ${stage.toString} statistics:")
logger.info(s"\t\tTotal files to ${stage.toString}: $totalFiles")
logger.info(s"\t\t${stage.toString} successful files: $downloadedFiles")
}
}
//-------------------------------BASIC INSERTIONS SOURCE/DATASET/FILE/RUN---------------------------------------------
/**
* Tries to create a source with the given name and returns its id, if already exists is not replaced.
*
* @param name name of the source, should be unique.
* @return id of the source.
*/
def sourceId(name: String): Int = {
val query = (for (s <- sources.filter(_.name === name)) yield s.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
result.head
//here I have to create the source.
else {
val idQuery = (sources returning sources.map(_.id)) += (None, name)
val executionId = database.run(idQuery)
val resultId = Await.result(executionId, Duration.Inf)
resultId
}
}
/**
* Tries to create a dataset and returns its id, if already exists is not replaced.
*
* @param sourceId dataset owner's id
* @param name name of the dataset should be unique for each source.
* @return id of the dataset.
*/
def datasetId(sourceId: Int, name: String): Int = {
val query = (for (s <- datasets.filter(ds => ds.sourceId === sourceId && ds.name === name)) yield s.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
result.head
//here I have to create the dataset.
else {
val idQuery: FixedSqlAction[Int, NoStream, Write] = (datasets returning datasets.map(_.id)) +=
(None, sourceId, name)
val executionId = database.run(idQuery)
val resultId = Await.result(executionId, Duration.Inf)
resultId
}
}
/**
* Tries to create a file and returns its id, if already exists is not replaced.
*
* @param datasetId file owner's id.
* @param url origin url for the file.
* @param stage stage of the process the file is used Download/Transform.
* @param candidateName the name the file should have.
* @return id of the file.
*/
def fileId(datasetId: Int, url: String, stage: Stage.Value, candidateName: String, useUrl: Boolean): Int = {
val filterCondition =
if (useUrl)
(f: Files) => f.datasetId === datasetId && f.url === url && f.stage === stage.toString && f.name === candidateName
else
(f: Files) => f.datasetId === datasetId && f.stage === stage.toString && f.name === candidateName
val query = files.filter(filterCondition).map(_.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
result.head
//here I have to create the file.
else {
val idQuery: FixedSqlAction[Int, NoStream, Write] = (files returning files.map(_.id)) +=
File(None, datasetId, url, candidateName, stage.toString, "FAILED", "", "", "", "", "", "", 0)
//FAILED STATUS IS GIVEN BY DEFAULT WHEN CREATING A FILE (MEANS HAVE NOT BEEN DOWNLOADED)
val executionId = database.run(idQuery)
val resultId = Await.result(executionId, Duration.Inf)
resultId
}
}
/**
* Creates a run, with its general settings and the actual datetime.
*
* @param downloadEnabled indicates if downloading was enabled during the run.
* @param transformEnabled indicates if transforming was enabled during the run.
* @param loadEnabled indicates if loading was enabled during the run.
* @param outputFolder indicates the outputFolder defined as working directory.
* @return the run's id.
*/
def runId(downloadEnabled: String, transformEnabled: String, loadEnabled: String, outputFolder: String): Int = {
val dateTimeStart = DateTime.now().toString
//runs have always to be created.
val query = (runs returning runs.map(_.id)) +=
(None, dateTimeStart, "", downloadEnabled, transformEnabled, loadEnabled, outputFolder)
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result > 1) {
val pastRunId = result - 1
val updateQuery = (for (f <- runs.filter(r => r.id === pastRunId && r.datetimeEnd === ""))
yield f.datetimeEnd)
.update(dateTimeStart)
Await.result(database.run(updateQuery), Duration.Inf)
}
result
}
//--------------------------SECONDARY INSERTIONS RUNSOURCE/RUNDATASET/RUNFILE/PARAMETERS------------------------------
/**
* generates the last representation of the source in the last run.
*
* @param sourceId id for the source.
* @param url url for the source.
* @param outputFolder working directory for the source.
* @param downloadEnabled indicates if the source is being downloaded.
* @param downloader indicates the downloader used for the source.
* @param transformEnabled indicates if the source is being transformed.
* @param transformer indicates the transformer used by the source.
* @param loadEnabled indicates if the source is bein loaded.
* @param loader indicates the loader used by the source.
* @return the runSource id.
*/
def runSourceId(sourceId: Int, url: String, outputFolder: String,
downloadEnabled: String, downloader: String, transformEnabled: String,
transformer: String, loadEnabled: String, loader: String
): Int = {
val runId = getMaxRunNumber
val query = (for (rs <- runSources.filter(r => r.runId === runId && r.sourceId === sourceId)) yield rs.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
result.head
//here I have to create the runSource
else {
val idQuery: FixedSqlAction[Int, NoStream, Write] = (runSources returning runSources.map(_.id)) += (
None, runId, sourceId, url, outputFolder, downloadEnabled, downloader,
transformEnabled, transformer, loadEnabled, loader
)
val executionId = database.run(idQuery)
val resultId = Await.result(executionId, Duration.Inf)
resultId
}
}
/**
* Inserts the parameters used by a source
*
* @param runSourceId source who is using the parameters
* @param description explains what the parameter is used for
* @param key indicates the name of the parameter
* @param value indicates the value of the parameter
* @return id of the parameter.
*/
def runSourceParameterId(runSourceId: Int, description: String, key: String, value: String): Int = {
//parameters have always to be created.
val query = (runSourceParameters returning runSourceParameters.map(_.id)) += (
None, runSourceId, description, key, value
)
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
result
}
/**
* generates the last representation of the dataset in the last run.
*
* @param datasetId id for the dataset.
* @param outputFolder working directory for the dataset.
* @param downloadEnabled indicates if the dataset is being downloaded.
* @param transformEnabled indicates if the dataset is being transformed.
* @param loadEnabled indicates if the source is being loaded.
* @param schemaUrl indicates the url of the schema.
* @param schemaLocation indicates whether the schema is local or remote.
* @return the runDataset id, 0 if any error occurs.
*/
def runDatasetId(datasetId: Int, outputFolder: String, downloadEnabled: String, transformEnabled: String,
loadEnabled: String, schemaUrl: String, schemaLocation: String, runId2: Option[Int]
): Int = {
val runId = runId2.getOrElse(getMaxRunNumber)
if (existsRun(runId)) {
val query = (for (rd <- runDatasets.filter(r => r.runId === runId && r.datasetId === datasetId)) yield rd.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
result.head
//here I have to create the runDataset
else {
val idQuery: FixedSqlAction[Int, NoStream, Write] = (runDatasets returning runDatasets.map(_.id)) += (
None, runId, datasetId, outputFolder, downloadEnabled, transformEnabled, loadEnabled, schemaUrl, schemaLocation
)
val executionId = database.run(idQuery)
val resultId = Await.result(executionId, Duration.Inf)
resultId
}
}
else
0
}
/**
* indicates whether the run consulted exists or does not
*
* @param runId identifier for the run to check
*/
def existsRun(runId: Int): Boolean = {
val query = (for (rd <- runs.filter(r => r.id === runId)) yield rd.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
true
else
false
}
/**
* Inserts the parameters used by a source
*
* @param runDatasetId dataset who is using the parameters
* @param description explains what the parameter is used for
* @param key indicates the name of the parameter
* @param value indicates the value of the parameter
* @return id of the parameter.
*/
def runDatasetParameterId(runDatasetId: Int, description: String, key: String, value: String): Int = {
//parameters have always to be created.
val query = (runDatasetParameters returning runDatasetParameters.map(_.id)) += (
None, runDatasetId, description, key, value
)
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
result
}
/**
* Inserts the parameters used by a source
*
* @param totalFiles explains what the parameter is used for
* @param downloadedFiles indicates the name of the parameter
* @return id of the parameter.
*/
def runDatasetLogId(runDatasetId: Int, stage: Stage.Value, totalFiles: Int, downloadedFiles: Int): Int = {
//parameters have always to be created there is no way to identify changes or differences.
val query = (runDatasetLogs returning runDatasetLogs.map(_.id)) += (
None, runDatasetId, stage.toString, totalFiles, downloadedFiles
)
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
result
}
/**
* Generates the versioning for the metadata of the files.
*
* @param fileId indicats the file whose verions are.
* @return id of the runFile.
*/
def runFileId(fileId: Int): Int = {
val runId = getMaxRunNumber
val query = (for (rf <- runFiles.filter(r => r.runId === runId && r.fileId === fileId)) yield rf.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
//I get the data from the FILES table
val fileFieldsQuery = (for (f <- files.filter(_.id === fileId)) yield (
f.status, f.size, f.lastUpdate, f.hash, f.originSize, f.originLastUpdate, f.dateProcessed)).result
val fileFields = Await.result(database.run(fileFieldsQuery), Duration.Inf).head
//here I have to write the actual status of the file to the runfile.
if (result.nonEmpty) {
val updateQuery = (for (f <- runFiles.filter(r => r.runId === runId && r.fileId === fileId))
yield (f.status, f.size, f.lastUpdate, f.hash, f.originSize, f.originLastUpdate, f.dateProcessed))
.update(fileFields._1, fileFields._2, fileFields._3, fileFields._4, fileFields._5, fileFields._6, fileFields._7)
Await.result(database.run(updateQuery), Duration.Inf)
result.head
}
//here I have to create the runFile
else {
//and then I create the version of the runfiles.
val idQuery: FixedSqlAction[Int, NoStream, Write] = (runFiles returning runFiles.map(_.id)) += (
None, runId, fileId, fileFields._1, fileFields._2, fileFields._3,
fileFields._4, fileFields._5, fileFields._6, fileFields._7
)
val executionId = database.run(idQuery)
val resultId = Await.result(executionId, Duration.Inf)
resultId
}
}
//-------------------------------Run closing--------------------------------------------------------------------------
/**
* Puts the time finished of the run
*
* @param runId id for the run.
*/
def endRun(runId: Int): Unit = {
val dateTimeEnd = DateTime.now().toString
val updateQuery = (for (r <- runs.filter(f => f.id === runId)) yield r.datetimeEnd).update(dateTimeEnd)
val execution = database.run(updateQuery)
/*val result = */
Await.result(execution, Duration.Inf)
}
//------------------------------FILE OPERATIONS SECTION FILENAME/CHECKIFUPDATE/PROCESS--------------------------------
/**
* By receiving a candidate name returns a unique name inside the dataset.
*
* @param fileId id for the file.
* @return unique name among the dataset's files. 0 as the Int indicates the file should not exist.
*/
def getFileNameAndCopyNumber(fileId: Int): (String, Int) = {
val query = (for (f <- files.filter(f => f.id === fileId)) yield (f.name, f.datasetId, f.copyNumber, f.stage)).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty) {
if (result.head._3 != 0) //this indicates the copynumber has been assigned
(result.head._1, result.head._3)
//while copynumber is 0 means the value has not been assigned.
else
getFileName(fileId, result.head._1, result.head._2, 1, Stage.withName(result.head._4))
}
else
("FileDoesNotExist", 0)
}
/**
* Returns hash, size and last update.
*
* @param fileId identifier of the file.
* @return hash, size and last update.
*/
def getFileDetails(fileId: Int): (String, String, String) = {
val query = (for (f <- files.filter(f => f.id === fileId)) yield (f.hash, f.size, f.lastUpdate)).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
result.head
else
("file", "does not", "exist")
}
def getFileAllDetail(fileId: Int): Option[File] = {
val query = files.filter(f => f.id === fileId).result.headOption
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
result
}
/**
* By receiving a candidate name returns a unique name inside the dataset.
*
* @param fileId id for the file.
* @param name candidate name.
* @param datasetId dataset.
* @param copyNumber actual guess of the number of copy the file is.
* @param stage whether is download or transform
* @return unique name among the dataset's files.
*/
private def getFileName(fileId: Int, name: String, datasetId: Int, copyNumber: Int, stage: Stage.Value): (String, Int) = {
val query = (for (s <- files.filter(f => f.datasetId === datasetId && f.stage === stage.toString &&
f.name === name && f.copyNumber === copyNumber && f.id =!= fileId)) yield s.id).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
//this means the name is already in use. I have to recursively get a new name.
if (result.nonEmpty)
getFileName(fileId, name, datasetId, copyNumber + 1, stage)
//here I have to create the file.
else {
val updateQuery = (
for (s <- files.filter(f => f.id === fileId)) yield (s.name, s.copyNumber)).update(name, copyNumber)
val executionId = database.run(updateQuery)
/*val result = */
Await.result(executionId, Duration.Inf)
(name, copyNumber)
}
}
/**
* indicates which is the maximum copy number for the same filename inside the same dataset.
*
* @param datasetId datast where the file belongs
* @param fileName original file name
* @param stage indicates whether download/transform
* @return max copy number
*/
def getMaxCopyNumber(datasetId: Int, fileName: String, stage: Stage.Value): Int = {
val query = (for (s <- files.filter(f => f.datasetId === datasetId &&
f.name === fileName && f.stage === stage.toString)
) yield s.copyNumber).result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
result.max
}
/**
* gets the last run ever run
*
* @return max id of runs
*/
// def getMaxRunNumber: Int ={
lazy val getMaxRunNumber: Int = {
val query = (for (r <- runs) yield r.id).max.result
val execution = database.run(query)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty)
result.max
else
0
}
/**
* gets the previous run based in the given run
*
* @param runNumber queried run
* @return previous run or 0 if no previous run exists
*/
def getPreviousRunNumber(runNumber: Int): Int = {
var previousRun = runNumber - 1
while (!existsRun(previousRun) && previousRun > 0)
previousRun -= 1
previousRun
}
/**
* prints the number of files that are downloaded and ready to process, the failed files and the new files
* downloaded in the selected run
*
* @param datasetId dataset where the files belong to
* @param runId corresponden run to be consulted
*/
def printNewReadyFailedFiles(datasetId: Int, runId: Int): Unit = {
val updatedFiles = getFilesForStatistics(datasetId, runId, Stage.DOWNLOAD, FILE_STATUS.UPDATED)
val failedFiles = getFilesForStatistics(datasetId, runId, Stage.DOWNLOAD, FILE_STATUS.FAILED)
val compareFiles = getFilesForStatistics(datasetId, runId, Stage.DOWNLOAD, FILE_STATUS.COMPARE)
val outdatedFiles = getFilesForStatistics(datasetId, runId, Stage.DOWNLOAD, FILE_STATUS.OUTDATED)
val totalFiles = updatedFiles + failedFiles + compareFiles + outdatedFiles
logger.info(s"\t\t$updatedFiles ready to process files")
logger.info(s"\t\t$failedFiles failed files")
val previousRunId = getPreviousRunNumber(runId)
if (previousRunId > 0) {
val updatedFilesPast = getFilesForStatistics(datasetId, previousRunId, Stage.DOWNLOAD, FILE_STATUS.UPDATED)
val failedFilesPast = getFilesForStatistics(datasetId, previousRunId, Stage.DOWNLOAD, FILE_STATUS.FAILED)
val compareFilesPast = getFilesForStatistics(datasetId, previousRunId, Stage.DOWNLOAD, FILE_STATUS.COMPARE)
val outdatedFilesPast = getFilesForStatistics(datasetId, previousRunId, Stage.DOWNLOAD, FILE_STATUS.OUTDATED)
val newFiles = totalFiles - updatedFilesPast - failedFilesPast - compareFilesPast - outdatedFilesPast
logger.info(s"\t\t$newFiles new files indexed")
}
else {
logger.info(s"\t\t$totalFiles new files indexed")
}
}
/**
* returns all the non outdated files with its copy number.
*
* @param datasetId dataset from where files are required.
* @param runId run requested to check
* @param stage indicates if is download or transform
* @param status indicates if is UPDATED, FAILED or OUTDATED
* @return (fileId, filename, copyNumber)
*/
def getFilesForStatistics(datasetId: Int, runId: Int, stage: Stage.Value, status: FILE_STATUS.Value): Int = {
val query = runFiles.filter(runFile => runFile.File.filter(file => file.datasetId === datasetId && file.stage === stage.toString).exists && runFile.runId === runId && runFile.status === status.toString).length
val execution = database.run(query.result)
Await.result(execution, Duration.Inf)
}
/**
* checks if the given file has to be updated based on its hash, size and last update.
*
* @param fileId id for the file.
* @param hash hash of the file.
* @param originSize original size in the source.
* @param originLastUpdate original last updated in the source.
* @return true = has to be updated.
*/
def checkIfUpdateFile(fileId: Int, hash: String, originSize: String, originLastUpdate: String): Boolean = {
val toReturn = checkIfUpdateFileAux(fileId, hash, originSize, originLastUpdate)
if (!toReturn)
runFileId(fileId)
toReturn
}
/**
* checks if the given file has to be updated based on its hash, size and last update.
*
* @param fileId id for the file.
* @param hash hash of the file.
* @param originSize original size in the source.
* @param originLastUpdate original last updated in the source.
* @return true = has to be updated.
*/
def checkIfUpdateFileAux(fileId: Int, hash: String, originSize: String, originLastUpdate: String): Boolean = {
val query = for (f <- files.filter(f => f.id === fileId)) yield (f.status, f.hash, f.originSize, f.originLastUpdate)
val queryResult = query.result
val execution = database.run(queryResult)
val result = Await.result(execution, Duration.Inf)
if (result.nonEmpty) {
val oldStatus = result.head._1
val oldHash = result.head._2
val oldOriginSize = result.head._3
val oldOriginLastUpdate = result.head._4
//UPDATE indicates that file was put on the download folder and marked as update
if (oldStatus.equalsIgnoreCase(FILE_STATUS.UPDATED.toString))
false
//FAILED indicates that the file was not put in the folder, OUTDATED means the file was deleted from the server
else if (oldStatus.equalsIgnoreCase(FILE_STATUS.FAILED.toString) ||
oldStatus.equalsIgnoreCase(FILE_STATUS.OUTDATED.toString) ||
oldOriginSize != originSize || oldOriginLastUpdate != originLastUpdate ||
(oldHash != hash && hash != "")
) {
Await.result(database
.run((for (f <- files.filter(f => f.id === fileId)) yield (f.hash, f.originSize, f.originLastUpdate))
.update(hash, originSize, originLastUpdate))
, Duration.Inf
)
true
}
//any other case whilst COMPARE, shouldn't be downloaded
else if (oldStatus.equalsIgnoreCase(FILE_STATUS.COMPARE.toString)) {
Await.result(database
.run((for (f <- files.filter(f => f.id === fileId)) yield f.status)
.update(FILE_STATUS.UPDATED.toString))
, Duration.Inf
)
false
}
else false
}
else false
}
/**
* returns all the non outdated files with its copy number.
*
* @param datasetId dataset from where files are required.
* @param stage whether is download or transform
* @return (fileId, filename, copyNumber)
*/
def getFilesToProcess(datasetId: Int, stage: Stage.Value): Seq[(Int, String, Int)] = {
val query = (for (f <- files.filter(f => f.datasetId === datasetId && f.stage === stage.toString &&
(f.status === FILE_STATUS.UPDATED.toString && !(f.size === "")))
) yield (f.id, f.name, f.copyNumber)).result
val execution = database.run(query)
Await.result(execution, Duration.Inf)
}
/**
* returns all the failed files in the dataset with its copy number.
* files marked as UPDATED without size are also considered as FAILED
*
* @param datasetId dataset from where files are required.
* @param stage whether is download or transform
* @return (fileId, filename, copyNumber, url, hash)
*/
def getFailedFiles(datasetId: Int, stage: Stage.Value): Seq[(Int, String, Int, String, String)] = {
val query = (for (f <- files.filter(f => f.datasetId === datasetId && f.stage === stage.toString &&
(f.status === FILE_STATUS.FAILED.toString || (f.status === FILE_STATUS.UPDATED.toString && f.size === "")))
) yield (f.id, f.name, f.copyNumber, f.url, f.hash)).result
val execution = database.run(query)
Await.result(execution, Duration.Inf)
}
/**
* marks indicated file as to be UPDATED.
*
* @param fileId identifier for the file.
* @param size downloaded or transformed file's size
*/
def markAsUpdated(fileId: Int, size: String): Unit = {
val query = (for (f <- files.filter(f => f.id === fileId))
yield (f.status, f.size, f.lastUpdate)).update(FILE_STATUS.UPDATED.toString, size, DateTime.now().toString)
val execution = database.run(query)
Await.result(execution, Duration.Inf)
runFileId(fileId)
}
/**
* marks indicated file as to be UPDATED.
*
* @param fileId identifier for the file.
* @param size downloaded or transformed file's size.
* @param hash downloaded or transformed file's hash.
*/
def markAsUpdated(fileId: Int, size: String, hash: String): Unit = {
val query = (for (f <- files.filter(f => f.id === fileId))
yield (f.status, f.size, f.lastUpdate, f.hash)).update(FILE_STATUS.UPDATED.toString, size, DateTime.now().toString, hash)
val execution = database.run(query)
Await.result(execution, Duration.Inf)
runFileId(fileId)
}
/**
* to be used when the file download or transformation fails, puts file status into FAILED
*
* @param fileId identifier for the file.
*/
def markAsFailed(fileId: Int): Unit = {
val query = (for (f <- files.filter(f => f.id === fileId))
yield (f.status, f.lastUpdate)).update(FILE_STATUS.FAILED.toString, DateTime.now().toString)
val execution = database.run(query)
Await.result(execution, Duration.Inf)
runFileId(fileId)
}
/**
* mark all files that have not been compared into the log as outdated.
* meant to be used at the end of all comparisons (all check if udpate)
* changes COMPARE to OUTDATED.
*
* @param datasetId identifier for the dataset.
* @param stage indicates whether refers to download or transformed files.
*/
def markAsOutdated(datasetId: Int, stage: Stage.Value): Unit = {
val queryIds = Await.result(database.run((for (f <- files.filter(f => f.datasetId === datasetId &&
f.status === FILE_STATUS.COMPARE.toString && f.stage === stage.toString)) yield f.id).result), Duration.Inf)
val query = (
for (f <- files.filter(
f => f.datasetId === datasetId && f.status === FILE_STATUS.COMPARE.toString && f.stage === stage.toString
)) yield f.status).update(FILE_STATUS.OUTDATED.toString)
val execution = database.run(query)
Await.result(execution, Duration.Inf)
queryIds.map(runFileId)
}
/**
* mark all the files with any status into status COMPARE
* meant to be used to check which files have been deleted from the source.
*
* @param datasetId identifier for the dataset.
* @param stage indicates whether refers to download or transformed files.
*/
def markToCompare(datasetId: Int, stage: Stage.Value): Unit = {
val query = (for (f <- files.filter(f => f.datasetId === datasetId
&& f.stage === stage.toString)
) yield f.status).update(FILE_STATUS.COMPARE.toString)
val execution = database.run(query)
Await.result(execution, Duration.Inf)
}
/**
* delete all files of given datasetId and the stage.
*
* @param datasetId identifier for the dataset.
* @param stage indicates whether refers to download or transformed files.
*/
def delete(datasetId: Int, stage: Stage.Value): Unit = {
val query = files.filter(f => f.datasetId === datasetId && f.stage === stage.toString)
val action = query.delete
val execution = database.run(action)
Await.result(execution, Duration.Inf)
}
/**
* Gives the current status of a file
*
* @param datasetId dataset where the file belongs to
* @param url origin url for the file
* @param stage whether is download or transform.
*/
def fileStatus(datasetId: Int, url: String, stage: Stage.Value): Option[FILE_STATUS.Value] = {
val query = for (f <- files.filter(f => f.datasetId === datasetId
&& f.url === url && f.stage === stage.toString)
) yield f.status
val execution = database.run(query.result)
val res = Await.result(execution, Duration.Inf)
if (res.nonEmpty)
Option(FILE_STATUS.withName(res.head))
else
None
}
/**
* returns the url of a specified file.
*
* @param fileName name of the file.
* @param datasetID dataset from where files are required.
* @param stage whether is download or transform.
* @return the url of the file.
*/
def getFileUrl(fileName: String, datasetID: Int, stage: Stage.Value): String = {
val query = for (f <- files.filter(f => f.name === fileName && f.datasetId === datasetID && f.stage === stage.toString)) yield f.url
val execution = database.run(query.result)
Await.result(execution, Duration.Inf).head
}
//------------------------------DATABASE BASIC OPERATIONS OPEN/CLOSE--------------------------------------------------
/**
* Opens or create the database, checks the existence of its tables and creates them if do not exist.
*
* @param path directory where the database file is.
*/
def setDatabase(path: String): Unit = {
//;DB_CLOSE_ON_EXIT=FALSE
//jdbc:h2:file:/home/nachon/Downloads/tesis/db/db
//database = Database.forURL("jdbc:h2:file:" + path + "/db;AUTO_SERVER=TRUE", driver = "org.h2.Driver", keepAliveConnection = true)
//database = Database.forURL("jdbc:postgresql://localhost/gmql_importer?user=geco&password=geco78", driver = "org.postgresql.Driver", keepAliveConnection = true)
database = Database.forURL(
"jdbc:postgresql://localhost/gmql_importer",
"geco",
"geco78",
driver = "org.postgresql.Driver")
val tables = Await.result(database.run(MTable.getTables), Duration.Inf).toList
if (!tables.exists(_.name.name == "SOURCES".toLowerCase())) {
val setup = DBIO.seq(sources.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table SOURCES created")
}
if (!tables.exists(_.name.name == "DATASETS".toLowerCase())) {
val setup = DBIO.seq(datasets.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table DATASETS created")
}
if (!tables.exists(_.name.name == "FILES".toLowerCase())) {
val setup = DBIO.seq(files.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table FILES created")
}
if (!tables.exists(_.name.name == "RUNS".toLowerCase())) {
val setup = DBIO.seq(runs.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table RUNS created")
}
if (!tables.exists(_.name.name == "RUNSOURCES".toLowerCase())) {
val setup = DBIO.seq(runSources.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table RUNSOURCES created")
}
if (!tables.exists(_.name.name == "RUNSOURCEPARAMETERS".toLowerCase())) {
val setup = DBIO.seq(runSourceParameters.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table RUNSOURCEPARAMETERS created")
}
if (!tables.exists(_.name.name == "RUNDATASETS".toLowerCase())) {
val setup = DBIO.seq(runDatasets.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table RUNDATASETS created")
}
if (!tables.exists(_.name.name == "RUNDATASETPARAMETERS".toLowerCase())) {
val setup = DBIO.seq(runDatasetParameters.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table RUNDATASETPARAMETERS created")
}
if (!tables.exists(_.name.name == "RUNDATASETLOGS".toLowerCase())) {
val setup = DBIO.seq(runDatasetLogs.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table RUNDATASETLOGS created")
}
if (!tables.exists(_.name.name == "RUNFILES".toLowerCase())) {
val setup = DBIO.seq(runFiles.schema.create)
val setupFuture = database.run(setup)
Await.result(setupFuture, Duration.Inf)
logger.info("Table RUNFILES created")
}
}
/**
* Returns the last date in which a file of the specified dataset is returned.
*
* @param datasetID identifier of the dataset.
* @return the date of the last download.
*/
def getLastDownloadDate(datasetID: Int): String = {
val query = for (f <- files.filter(f => f.datasetId === datasetID)) yield f.lastUpdate
val execution = database.run(query.result)
val res = Await.result(execution, Duration.Inf)
res.max
}
/**
* closes the database connection.
*/
def closeDatabase(): Unit = {
val closing = database.shutdown
Await.result(closing, Duration.Inf)
}
//-------------------------------------DATABASE SCHEMAS---------------------------------------------------------------
//---------------------------------- Definition of the SOURCES table--------------------------------------------------
/**
* SOURCES TABLE:
* ID: INT PK AUTOINC
* NAME: STRING
*
* @param tag SOURCES
*/
class Sources(tag: Tag) extends
Table[(Option[Int], String)](tag, "SOURCES".toLowerCase()) {
def id = column[Int]("SOURCE_ID".toLowerCase(), O.PrimaryKey, O.AutoInc)
def name = column[String]("NAME".toLowerCase())
def sourceNameIndex = index("sourceNameIndex", name, unique = true)
def * = (id.?, name)
}
val sources = TableQuery[Sources]
//------------------------------------- Definition of the DATASETS table----------------------------------------------
/**
* DATASETS TABLE:
* ID: INT PK AUTOINC
* SOURCE_ID: INT FK(SOURCES)
* NAME: STRING
*
* @param tag DATASETS
*/
class Datasets(tag: Tag) extends
Table[(Option[Int], Int, String)](tag, "DATASETS".toLowerCase()) {
def id = column[Int]("DATASET_ID".toLowerCase(), O.PrimaryKey, O.AutoInc)
def sourceId = column[Int]("SOURCE_ID".toLowerCase())
def name = column[String]("NAME".toLowerCase())
def Source = foreignKey("DATASETS_SOURCE_FK", sourceId, sources)(
_.id,
onUpdate = ForeignKeyAction.Restrict,
onDelete = ForeignKeyAction.Cascade
)
def datasetNameIndex = index("DatasetNameIndex", name, unique = false)
//ADDED
def datasetSourceIDIndex = index("datasetSourceIdIndex", sourceId, unique = false)
def * = (id.?, sourceId, name)
}
val datasets = TableQuery[Datasets]
//----------------------------------------- Definition of the FILES table---------------------------------------------
/**
* FILES TABLE:
* ID: INT PK AUTOINC
* DATASET_ID: INT FK(DATASET)
* URL: STRING
* NAME: STRING
* STAGE: STRING
* STATUS: STRING
* SIZE: STRING
* LAST_UPDATE: STRING
* HASH: STRING
* SIZE_IN_ORIGIN: STRING
* LAST_UPDATE_IN_ORIGIN: STRING
* DATE_PROCESSED: STRING
* COPY_NUMBER: INT
*
* @param tag FILES
*/
class Files(tag: Tag) extends
Table[File](tag, "FILES".toLowerCase()) {
def id = column[Int]("FILE_ID".toLowerCase(), O.PrimaryKey, O.AutoInc)
def datasetId = column[Int]("DATASET_ID".toLowerCase())
def url = column[String]("URL".toLowerCase())
def name = column[String]("NAME".toLowerCase())
def stage = column[String]("STAGE".toLowerCase())
//THIS IS DOWNLOAD/TRANSFORM
def status = column[String]("STATUS".toLowerCase())
//UPDATED/FAILED/OUTDATED
def size = column[String]("FILE_SIZE".toLowerCase())
def lastUpdate = column[String]("LAST_UPDATE".toLowerCase())
def hash = column[String]("HASH".toLowerCase())