forked from DEIB-GECO/Metadata-Manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFlattenerStep.scala
240 lines (182 loc) · 9.29 KB
/
FlattenerStep.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package it.polimi.genomics.metadata.step
import java.io._
import it.polimi.genomics.metadata.cleaner.RuleBase
import it.polimi.genomics.metadata.database.FileDatabase
import it.polimi.genomics.metadata.downloader_transformer.default.SchemaFinder
import it.polimi.genomics.metadata.mapper.RemoteDatabase.DbHandler
import it.polimi.genomics.metadata.step.CleanerStep.{createSymbolicLink, logger}
import it.polimi.genomics.metadata.step.utils.DirectoryNamingUtil
import it.polimi.genomics.metadata.step.xml.{Dataset, Source}
import slick.jdbc.PostgresProfile.api._
import slick.jdbc.{GetResult, PositionedResult}
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
object FlattenerStep extends Step {
val GCM_CURATE_PREFIX = "gcm_curated__"
object ResultMap extends GetResult[Map[String, Any]] {
def apply(pr: PositionedResult) = {
val rs = pr.rs // <- jdbc result set
val md = rs.getMetaData();
val res = (1 to pr.numColumns).map { i => md.getColumnName(i) -> rs.getObject(i) }.toMap
res
}
}
val excludeList = List("item_id",
"experiment_type_id",
"dataset_id",
"case_study_id", "project_id",
"replicate_id", "biosample_id", "donor_id",
"technical_replicate_number",
".*_tid")
/*val columnNamesMap = Map("program_name" -> "source",
"name" -> "dataset_name",
"format" -> "file_format",
"is_ann" -> "is_annotation",
"type" -> "biosample_type",
"bio_replicate_num" -> "biological_replicate_number",
"tech_replicate_num" -> "technical_replicate_number", //no need, it is excluded...
"temp_column" -> "technical_replicate_number",
"external_ref" -> "external_reference")
*/
val columnNamesMap = Map("temp_column" -> "technical_replicate_number")
override def execute(source: Source, parallelExecution: Boolean): Unit = {
if (source.flattenerEnabled) {
logger.info("Starting flattener for: " + source.outputFolder)
val sourceId = FileDatabase.sourceId(source.name)
//counters
var modifiedRegionFilesSource = 0
var modifiedMetadataFilesSource = 0
var wrongSchemaFilesSource = 0
//integration process for each dataset contained in the source.
val integrateThreads = source.datasets.map((dataset: Dataset) => {
new Thread {
override def run(): Unit = {
val rulePath = source.flattenerRulePath
// if (dataset.transformEnabled) {
// flattener works at application level
if (true) {
val ruleBasePath = new RuleBase(rulePath)
val t0Dataset: Long = System.nanoTime()
var modifiedRegionFilesDataset = 0
var modifiedMetadataFilesDataset = 0
var wrongSchemaFilesDataset = 0
var totalTransformedFiles = 0
val datasetOutputFolder = dataset.fullDatasetOutputFolder
val cleanerFolder = datasetOutputFolder + File.separator + DirectoryNamingUtil.cleanFolderName
val flattenerFolder = datasetOutputFolder + File.separator + DirectoryNamingUtil.flattenFolderName
val inputFolder = new File(cleanerFolder)
if (!inputFolder.exists()) {
throw new Exception("No input folder: " + cleanerFolder)
}
val folder = new File(flattenerFolder)
if (folder.exists()) {
TransformerStep.deleteFolder(folder)
}
logger.info("Starting flattener for: " + dataset.name)
if (SchemaFinder.downloadSchema(source.rootOutputFolder, dataset, flattenerFolder, source))
logger.debug("Schema flattener for: " + dataset.name)
else
logger.warn("Schema not found for: " + dataset.name)
if (!folder.exists()) {
folder.mkdirs()
logger.debug("Folder created: " + folder)
}
logger.info("Flattener for dataset: " + dataset.name)
val fileList = inputFolder.listFiles.
filter(_.isFile).
filter(!_.getName.endsWith(".schema")).toList
val datasetFileName = datasetOutputFolder + File.separator + "dataset_name.txt"
val datasetFullName = scala.io.Source.fromFile(datasetFileName).mkString.trim
fileList.foreach { file =>
val fileName = file.getName
val fullOutPath = flattenerFolder + File.separator + fileName
if (fileName.endsWith(".meta")) {
val fileNameFirstPart = fileName.split("\\.").head
val regionFileName = fileName.replace(".meta","")
ruleBasePath.applyRBToFile(file.getAbsolutePath, fullOutPath)
val databaseLinesTuplesTry = Try {
val query =
sql"""SELECT *, r.biological_replicate_number || '_' || r.technical_replicate_number as temp_column
FROM dataset d
JOIN item i on d.dataset_id = i.dataset_id
LEFT JOIN experiment_type et on i.experiment_type_id = et.experiment_type_id
LEFT JOIN case2item c2i on i.item_id = c2i.item_id
LEFT JOIN case_study cs on c2i.case_study_id = cs.case_study_id
LEFT JOIN project p on cs.project_id = p.project_id
LEFT JOIN replicate2item r2i on i.item_id = r2i.item_id
LEFT JOIN replicate r on r2i.replicate_id = r.replicate_id
LEFT JOIN biosample b on r.biosample_id = b.biosample_id
LEFT JOIN donor d2 on b.donor_id = d2.donor_id
WHERE d.dataset_name = '#${datasetFullName}'
AND i.file_name = '#${regionFileName}'""".as(ResultMap)
val result = DbHandler.database.run(query)
val res = Await.result(result, Duration.Inf)
val innerResult = res
.flatten
.filterNot { case (col, _) => excludeList.exists(col.matches) }
.filterNot { case (_, value) => value == null }
.map { case (col, value) => (columnNamesMap.getOrElse(col, col), value) }
if(innerResult.isEmpty)
throw new Exception("Sql query result is empty")
innerResult
}
// val databaseLinesTuples=databaseLinesTuplesTry.getOrElse(Seq.empty)
// .to[mutable.Set]
val databaseLines = databaseLinesTuplesTry match{
case Success(v) =>
val databaseLinesTuples = v.to[mutable.Set]
def addCount(key: String, countKey: String, keepZero: Boolean = true): Unit = {
val count = databaseLinesTuples.count(_._1 == key)
if (keepZero || count > 0)
databaseLinesTuples += countKey -> count
}
addCount("biological_replicate_number", "biological_replicate_count")
addCount("technical_replicate_number", "technical_replicate_count")
databaseLinesTuples
.map(t => GCM_CURATE_PREFIX + t.productIterator.mkString("\t"))
case Failure(e) =>
logger.warn(s"GCM export error => dataset: $datasetFullName , file: $fileNameFirstPart , ${e.getMessage}" /*,e*/)
Seq.empty
}
//read file
val fileLines = scala.io.Source.fromFile(fullOutPath).getLines()
.filter(_.trim.nonEmpty)
val allLines = databaseLines ++ fileLines
val allLinesUniqSorted = allLines
//remove duplicates
.toSet
//to sort
.toList.sorted
val pw = new PrintWriter(new File(fullOutPath))
pw.write(allLinesUniqSorted.mkString(scala.compat.Platform.EOL))
pw.close()
} else {
createSymbolicLink(file.getAbsolutePath, fullOutPath)
}
}
modifiedMetadataFilesSource = modifiedMetadataFilesSource + modifiedMetadataFilesDataset
modifiedRegionFilesSource = modifiedRegionFilesSource + modifiedRegionFilesDataset
wrongSchemaFilesSource = wrongSchemaFilesSource + wrongSchemaFilesDataset
}
}
}
})
if (parallelExecution) {
integrateThreads.foreach(_.start())
integrateThreads.foreach(_.join())
}
else {
for (thread <- integrateThreads) {
thread.start()
thread.join()
}
}
logger.info(modifiedRegionFilesSource + " region data files modified in source: " + source.name)
logger.info(modifiedMetadataFilesSource + " metadata files modified in source: " + source.name)
logger.info(wrongSchemaFilesSource + " region data files do not respect the schema in source: " + source.name)
logger.info(s"Source ${source.name} flattening finished")
}
}
}