diff --git a/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json b/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json index 60326522..f23411e7 100644 --- a/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json +++ b/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json @@ -468,7 +468,7 @@ "description": "Filter out the instances that have since been deleted or updated outside of the window." } ], - "scriptLines": [ "parameters{", " Offset as integer (0),", " Limit as integer (200),", " ApiVersion as integer (2),", " StartTime as string ('0001-01-01T00:00:00Z'),", " EndTime as string ('9999-12-31T23:59:59Z'),", " ContainerName as string ('dicom'),", " InstanceTablePath as string ('instance'),", " SeriesTablePath as string ('series'),", " StudyTablePath as string ('study'),", " RetentionHours as integer (720)", "}", "source(output(", " body as (action as string, metadata as (undefined as string), partitionName as string, sequence as short, seriesInstanceUid as string, sopInstanceUid as string, state as string, studyInstanceUid as string, timestamp as string, filePath as string),", " headers as [string,string]", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " format: 'rest',", " timeout: 30,", " requestInterval: 0,", " entity: (concat('/v', toString($ApiVersion), '/changefeed')),", " queryParameters: ['includeMetadata' -> 'true', 'offset' -> ($Offset), 'limit' -> ($Limit), 'startTime' -> ($StartTime), 'endTime' -> ($EndTime)],", " httpMethod: 'GET',", " responseFormat: ['type' -> 'json', 'documentForm' -> 'arrayOfDocuments']) ~> changeFeed", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " sopInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " sopClassUid as string,", " filePath as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath)) ~> existingInstances", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " instanceCount as long", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath)) ~> existingSeries", "flattened derive(timestamp = toTimestamp(substring(timestamp, 1, 23), 'yyyy-MM-dd\\'T\\'HH:mm:ss.SSS', 'UTC'),", " studyDate = toDate(byPath('metadata.{00080020}.Value[1]'), 'yyyyMMdd', 'UTC'),", " studyDescription = toString(byPath('metadata.{00081030}.Value[1]')),", " issuerOfPatientId = toString(byPath('metadata.{00100021}.Value[1]')),", " patientId = toString(byPath('metadata.{00100020}.Value[1]')),", " patientName = toString(byPath('metadata.{00100010}.Value[1].Alphabetic')),", " modality = toString(byPath('metadata.{00080060}.Value[1]')),", " sopClassUid = toString(byPath('metadata.{00080016}.Value[1]')),", " filePath = toString(filePath)) ~> extracted", "changeFeed select(mapColumn(", " action = body.action,", " timestamp = body.timestamp,", " partitionName = body.partitionName,", " studyInstanceUid = body.studyInstanceUid,", " seriesInstanceUid = body.seriesInstanceUid,", " sopInstanceUid = body.sopInstanceUid,", " metadata = body.metadata,", " filePath = body.filePath", " ),", " skipDuplicateMapInputs: false,", " skipDuplicateMapOutputs: false) ~> flattened", "upToDate alterRow(upsertIf(or(equals(action,'Create'),equals(action,'Update'))),", " deleteIf(equals(action,'Delete'))) ~> instanceSinkUpdates", "extracted aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " sopInstanceUid),", " action = last(action),", " lastModifiedTimestamp = last(timestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " modality = last(modality),", " sopClassUid = last(sopClassUid),", " filePath = last(filePath)) ~> aggregatedChanges", "existingInstances aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " modality = last(modality),", " instanceCount = count()) ~> allSeries", "existingSeries aggregate(groupBy(partitionName,", " studyInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " instanceCount = sum(instanceCount),", " seriesCount = count()) ~> allStudies", "modifiedSeries alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> seriesSinkUpdates", "modifiedStudies alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> studySinkUpdate", "upToDate aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid),", " instanceDifference = sum(iif(equals(action, 'Create'), 1, iif(equals(action, 'Delete'), -1, 0)))) ~> seriesChanges", "seriesChanges aggregate(groupBy(partitionName,", " studyInstanceUid),", " instanceDifference = sum(instanceDifference)) ~> studyChanges", "annotatedSeries filter(hasChange) ~> modifiedSeries", "allSeries derive(hasChange = not(isNull(seriesCache#lookup(partitionName, studyInstanceUid, seriesInstanceUid)))) ~> annotatedSeries", "allStudies derive(hasChange = not(isNull(studyCache#lookup(partitionName, studyInstanceUid)))) ~> annotatedStudies", "annotatedStudies filter(hasChange) ~> modifiedStudies", "aggregatedChanges filter(or(not(isNull(filePath)), equals(action, 'Delete'))) ~> upToDate", "instanceSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid','sopInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 1,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " sopInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " sopClassUid,", " filePath", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> instanceTable", "seriesSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 2,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> seriesTable", "studySinkUpdate sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($StudyTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 3,", " mapColumn(", " partitionName,", " studyInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " seriesCount,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> studyTable", "seriesChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> seriesCache", "studyChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> studyCache" ] + "scriptLines": [ "parameters{", " Offset as integer (0),", " Limit as integer (200),", " ApiVersion as integer (2),", " StartTime as string ('0001-01-01T00:00:00Z'),", " EndTime as string ('9999-12-31T23:59:59Z'),", " ContainerName as string ('dicom'),", " InstanceTablePath as string ('instance'),", " SeriesTablePath as string ('series'),", " StudyTablePath as string ('study'),", " RetentionHours as integer (720)", "}", "source(output(", " body as (action as string, metadata as [string,(vr as string, Value as string[])], partitionName as string, sequence as short, seriesInstanceUid as string, sopInstanceUid as string, state as string, studyInstanceUid as string, timestamp as string, filePath as string),", " headers as [string,string]", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " format: 'rest',", " timeout: 30,", " requestInterval: 0,", " entity: (concat('/v', toString($ApiVersion), '/changefeed')),", " queryParameters: ['includeMetadata' -> 'true', 'offset' -> ($Offset), 'limit' -> ($Limit), 'startTime' -> ($StartTime), 'endTime' -> ($EndTime)],", " httpMethod: 'GET',", " responseFormat: ['type' -> 'json', 'documentForm' -> 'arrayOfDocuments']) ~> changeFeed", "source(output(", " partitionName as string,", " studyinstanceuid as string,", " seriesinstanceuid as string,", " sopinstanceuid as string,", " lastModifiedTimestamp as timestamp,", " studydate as string,", " studydescription as string,", " issuerofpatientid as string,", " patientid as string,", " patientname as string,", " modality as string,", " sopclassuid as string,", " filepath as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath)) ~> existingInstances", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " instanceCount as long", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath)) ~> existingSeries", "flattened derive(timestamp = toTimestamp(substring(timestamp, 1, 23), 'yyyy-MM-dd\\'T\\'HH:mm:ss.SSS', 'UTC'),", " studydate = toString(byPath('metadata[\"00080020\"].Value[1]')),", " studydate_formatted = toDate(byPath('metadata[\"00080020\"].Value[1]'), 'yyyyMMdd', 'UTC'),", " studydescription = toString(byPath('metadata[\"00081030\"].Value[1]')),", " issuerofpatientid = toString(byPath('metadata[\"00100021\"].Value[1]')),", " patientid = toString(byPath('metadata[\"00100020\"].Value[1]')),", " patientname = regexExtract(toString(byPath('metadata[\"00100010\"].Value[1]')), '\"Alphabetic\"\\\\s*:\\\\s*\"([^\"]*)\"'),", " modality = toString(byPath('metadata[\"00080060\"].Value[1]')),", " sopclassuid = toString(byPath('metadata[\"00080016\"].Value[1]')),", " filepath = toString(filepath),", " metadata_string = toString(metadata),", " created_date = currentTimestamp(),", " patientbirthdate = toString(byPath('metadata[\"00100030\"].Value[1]')),", " accessionnumber = toString(byPath('metadata[\"00080050\"].Value[1]')),", " referringphysicianname = regexExtract(toString(byPath('metadata[\"00080090\"].Value[1]')), '\"Alphabetic\"\\\\s*:\\\\s*\"([^\"]*)\"'),", " modalitiesinstudy = toString(byPath('metadata[\"00080061\"].Value[1]')),", " performedprocedurestepstartdate = toString(byPath('metadata[\"00400244\"].Value[1]')),", " manufacturermodelname = toString(byPath('metadata[\"00081090\"].Value[1]')),", " studytime = toString(byPath('metadata[\"00080030\"].Value[1]')),", " timezoneoffsetfromutc = toString(byPath('metadata[\"00080201\"].Value[1]')),", " numberofstudyrelatedseries = toString(byPath('metadata[\"00201206\"].Value[1]')),", " numberofstudyrelatedinstances = toString(byPath('metadata[\"00201208\"].Value[1]')),", " seriesnumber = toString(byPath('metadata[\"00200011\"].Value[1]')),", " seriesdescription = toString(byPath('metadata[\"0008103E\"].Value[1]')),", " numberofseriesrelatedinstances = toString(byPath('metadata[\"00201209\"].Value[1]')),", " bodypartexamined = toString(byPath('metadata[\"00180015\"].Value[1]')),", " laterality = toString(byPath('metadata[\"00200060\"].Value[1]')),", " seriesdate = toString(byPath('metadata[\"00080021\"].Value[1]')),", " seriestime = toString(byPath('metadata[\"00080031\"].Value[1]')),", " instancenumber = toString(byPath('metadata[\"00200013\"].Value[1]')),", " documenttitle = toString(byPath('metadata[\"00420010\"].Value[1]'))) ~> extracted", "changeFeed select(mapColumn(", " action = body.action,", " timestamp = body.timestamp,", " partitionName = body.partitionName,", " studyinstanceuid = body.studyInstanceUid,", " seriesinstanceuid = body.seriesInstanceUid,", " sopinstanceuid = body.sopInstanceUid,", " metadata = body.metadata,", " filepath = body.filePath", " ),", " skipDuplicateMapInputs: false,", " skipDuplicateMapOutputs: false) ~> flattened", "upToDate alterRow(upsertIf(or(equals(action,'Create'),equals(action,'Update'))),", " deleteIf(equals(action,'Delete'))) ~> instanceSinkUpdates", "extracted aggregate(groupBy(partitionName,", " studyinstanceuid,", " seriesinstanceuid,", " sopinstanceuid),", " action = last(action),", " lastModifiedTimestamp = last(timestamp),", " studydate = last(studydate),", " studydate_formatted = last(studydate_formatted),", " studydescription = last(studydescription),", " issuerofpatientid = last(issuerofpatientid),", " patientid = last(patientid),", " patientname = last(patientname),", " modality = last(modality),", " sopclassuid = last(sopclassuid),", " filepath = last(filepath),", " metadata = last(metadata),", " metadata_string = last(metadata_string),", " created_date = last(created_date),", " patientbirthdate = last(patientbirthdate),", " accessionnumber = last(accessionnumber),", " referringphysicianname = last(referringphysicianname),", " modalitiesinstudy = last(modalitiesinstudy),", " performedprocedurestepstartdate = last(performedprocedurestepstartdate),", " manufacturermodelname = last(manufacturermodelname),", " studytime = last(studytime),", " timezoneoffsetfromutc = last(timezoneoffsetfromutc),", " numberofstudyrelatedseries = last(numberofstudyrelatedseries),", " numberofstudyrelatedinstances = last(numberofstudyrelatedinstances),", " seriesnumber = last(seriesnumber),", " seriesdescription = last(seriesdescription),", " numberofseriesrelatedinstances = last(numberofseriesrelatedinstances),", " bodypartexamined = last(bodypartexamined),", " laterality = last(laterality),", " seriesdate = last(seriesdate),", " seriestime = last(seriestime),", " instancenumber = last(instancenumber),", " documenttitle = last(documenttitle)) ~> aggregatedChanges", "existingInstances aggregate(groupBy(partitionName,", " studyInstanceUid = studyinstanceuid,", " seriesInstanceUid = seriesinstanceuid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(toDate(studydate, 'yyyyMMdd', 'UTC')),", " studyDescription = last(studydescription),", " issuerOfPatientId = last(issuerofpatientid),", " patientId = last(patientid),", " patientName = last(patientname),", " modality = last(modality),", " instanceCount = count()) ~> allSeries", "existingSeries aggregate(groupBy(partitionName,", " studyInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " instanceCount = sum(instanceCount),", " seriesCount = count()) ~> allStudies", "modifiedSeries alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> seriesSinkUpdates", "modifiedStudies alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> studySinkUpdate", "upToDate aggregate(groupBy(partitionName,", " studyInstanceUid = studyinstanceuid,", " seriesInstanceUid = seriesinstanceuid),", " instanceDifference = sum(iif(equals(action, 'Create'), 1, iif(equals(action, 'Delete'), -1, 0)))) ~> seriesChanges", "seriesChanges aggregate(groupBy(partitionName,", " studyInstanceUid),", " instanceDifference = sum(instanceDifference)) ~> studyChanges", "annotatedSeries filter(hasChange) ~> modifiedSeries", "allSeries derive(hasChange = not(isNull(seriesCache#lookup(partitionName, studyInstanceUid, seriesInstanceUid)))) ~> annotatedSeries", "allStudies derive(hasChange = not(isNull(studyCache#lookup(partitionName, studyInstanceUid)))) ~> annotatedStudies", "annotatedStudies filter(hasChange) ~> modifiedStudies", "aggregatedChanges filter(or(not(isNull(filepath)), equals(action, 'Delete'))) ~> upToDate", "instanceSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyinstanceuid','seriesinstanceuid','sopinstanceuid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 1,", " mapColumn(", " partitionName,", " studyinstanceuid,", " seriesinstanceuid,", " sopinstanceuid,", " lastModifiedTimestamp,", " studydate,", " studydate_formatted,", " studydescription,", " issuerofpatientid,", " patientid,", " patientname,", " modality,", " sopclassuid,", " filepath,", " metadata,", " metadata_string,", " created_date,", " patientbirthdate,", " accessionnumber,", " referringphysicianname,", " modalitiesinstudy,", " performedprocedurestepstartdate,", " manufacturermodelname,", " studytime,", " timezoneoffsetfromutc,", " numberofstudyrelatedseries,", " numberofstudyrelatedinstances,", " seriesnumber,", " seriesdescription,", " numberofseriesrelatedinstances,", " bodypartexamined,", " laterality,", " seriesdate,", " seriestime,", " instancenumber,", " documenttitle", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> instanceTable", "seriesSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 2,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> seriesTable", "studySinkUpdate sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($StudyTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 3,", " mapColumn(", " partitionName,", " studyInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " seriesCount,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> studyTable", "seriesChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> seriesCache", "studyChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> studyCache" ] } }, "dependsOn": []