From 57be51315fa90c3a1af1fee0d3974100944c3ca8 Mon Sep 17 00:00:00 2001 From: baljeetsethi3 Date: Wed, 7 Feb 2024 14:32:43 +0530 Subject: [PATCH] Updated ADF template to align with Healthcare solution in MS Fabric --- ...opy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json b/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json index 60326522..92844f06 100644 --- a/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json +++ b/templates/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format/Copy DICOM Metadata Changes to ADLS Gen2 in Delta Format.json @@ -468,7 +468,7 @@ "description": "Filter out the instances that have since been deleted or updated outside of the window." } ], - "scriptLines": [ "parameters{", " Offset as integer (0),", " Limit as integer (200),", " ApiVersion as integer (2),", " StartTime as string ('0001-01-01T00:00:00Z'),", " EndTime as string ('9999-12-31T23:59:59Z'),", " ContainerName as string ('dicom'),", " InstanceTablePath as string ('instance'),", " SeriesTablePath as string ('series'),", " StudyTablePath as string ('study'),", " RetentionHours as integer (720)", "}", "source(output(", " body as (action as string, metadata as (undefined as string), partitionName as string, sequence as short, seriesInstanceUid as string, sopInstanceUid as string, state as string, studyInstanceUid as string, timestamp as string, filePath as string),", " headers as [string,string]", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " format: 'rest',", " timeout: 30,", " requestInterval: 0,", " entity: (concat('/v', toString($ApiVersion), '/changefeed')),", " queryParameters: ['includeMetadata' -> 'true', 'offset' -> ($Offset), 'limit' -> ($Limit), 'startTime' -> ($StartTime), 'endTime' -> ($EndTime)],", " httpMethod: 'GET',", " responseFormat: ['type' -> 'json', 'documentForm' -> 'arrayOfDocuments']) ~> changeFeed", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " sopInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " sopClassUid as string,", " filePath as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath)) ~> existingInstances", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " instanceCount as long", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath)) ~> existingSeries", "flattened derive(timestamp = toTimestamp(substring(timestamp, 1, 23), 'yyyy-MM-dd\\'T\\'HH:mm:ss.SSS', 'UTC'),", " studyDate = toDate(byPath('metadata.{00080020}.Value[1]'), 'yyyyMMdd', 'UTC'),", " studyDescription = toString(byPath('metadata.{00081030}.Value[1]')),", " issuerOfPatientId = toString(byPath('metadata.{00100021}.Value[1]')),", " patientId = toString(byPath('metadata.{00100020}.Value[1]')),", " patientName = toString(byPath('metadata.{00100010}.Value[1].Alphabetic')),", " modality = toString(byPath('metadata.{00080060}.Value[1]')),", " sopClassUid = toString(byPath('metadata.{00080016}.Value[1]')),", " filePath = toString(filePath)) ~> extracted", "changeFeed select(mapColumn(", " action = body.action,", " timestamp = body.timestamp,", " partitionName = body.partitionName,", " studyInstanceUid = body.studyInstanceUid,", " seriesInstanceUid = body.seriesInstanceUid,", " sopInstanceUid = body.sopInstanceUid,", " metadata = body.metadata,", " filePath = body.filePath", " ),", " skipDuplicateMapInputs: false,", " skipDuplicateMapOutputs: false) ~> flattened", "upToDate alterRow(upsertIf(or(equals(action,'Create'),equals(action,'Update'))),", " deleteIf(equals(action,'Delete'))) ~> instanceSinkUpdates", "extracted aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " sopInstanceUid),", " action = last(action),", " lastModifiedTimestamp = last(timestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " modality = last(modality),", " sopClassUid = last(sopClassUid),", " filePath = last(filePath)) ~> aggregatedChanges", "existingInstances aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " modality = last(modality),", " instanceCount = count()) ~> allSeries", "existingSeries aggregate(groupBy(partitionName,", " studyInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " instanceCount = sum(instanceCount),", " seriesCount = count()) ~> allStudies", "modifiedSeries alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> seriesSinkUpdates", "modifiedStudies alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> studySinkUpdate", "upToDate aggregate(groupBy(partitionName,", " studyInstanceUid,", " seriesInstanceUid),", " instanceDifference = sum(iif(equals(action, 'Create'), 1, iif(equals(action, 'Delete'), -1, 0)))) ~> seriesChanges", "seriesChanges aggregate(groupBy(partitionName,", " studyInstanceUid),", " instanceDifference = sum(instanceDifference)) ~> studyChanges", "annotatedSeries filter(hasChange) ~> modifiedSeries", "allSeries derive(hasChange = not(isNull(seriesCache#lookup(partitionName, studyInstanceUid, seriesInstanceUid)))) ~> annotatedSeries", "allStudies derive(hasChange = not(isNull(studyCache#lookup(partitionName, studyInstanceUid)))) ~> annotatedStudies", "annotatedStudies filter(hasChange) ~> modifiedStudies", "aggregatedChanges filter(or(not(isNull(filePath)), equals(action, 'Delete'))) ~> upToDate", "instanceSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid','sopInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 1,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " sopInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " sopClassUid,", " filePath", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> instanceTable", "seriesSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 2,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> seriesTable", "studySinkUpdate sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($StudyTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 3,", " mapColumn(", " partitionName,", " studyInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " seriesCount,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> studyTable", "seriesChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> seriesCache", "studyChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> studyCache" ] + "scriptLines": [ "parameters{", " Offset as integer (0),", " Limit as integer (200),", " ApiVersion as integer (2),", " StartTime as string ('0001-01-01T00:00:00Z'),", " EndTime as string ('9999-12-31T23:59:59Z'),", " ContainerName as string ('dicom'),", " InstanceTablePath as string ('instance'),", " SeriesTablePath as string ('series'),", " StudyTablePath as string ('study'),", " RetentionHours as integer (720)", "}", "source(output(", " body as (action as string, metadata as [string,(vr as string, Value as string[])], partitionName as string, sequence as short, seriesInstanceUid as string, sopInstanceUid as string, state as string, studyInstanceUid as string, timestamp as string, filePath as string),", " headers as [string,string]", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " format: 'rest',", " timeout: 30,", " requestInterval: 0,", " entity: (concat('/v', toString($ApiVersion), '/changefeed')),", " queryParameters: ['includeMetadata' -> 'true', 'offset' -> ($Offset), 'limit' -> ($Limit), 'startTime' -> ($StartTime), 'endTime' -> ($EndTime)],", " httpMethod: 'GET',", " responseFormat: ['type' -> 'json', 'documentForm' -> 'arrayOfDocuments']) ~> changeFeed", "source(output(", " partitionName as string,", " studyinstanceuid as string,", " seriesinstanceuid as string,", " sopinstanceuid as string,", " lastModifiedTimestamp as timestamp,", " studydate as string,", " studydescription as string,", " issuerofpatientid as string,", " patientid as string,", " patientname as string,", " modality as string,", " sopclassuid as string,", " filepath as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath)) ~> existingInstances", "source(output(", " partitionName as string,", " studyInstanceUid as string,", " seriesInstanceUid as string,", " lastModifiedTimestamp as timestamp,", " studyDate as date,", " studyDescription as string,", " issuerOfPatientId as string,", " patientId as string,", " patientName as string,", " modality as string,", " instanceCount as long", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: true,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath)) ~> existingSeries", "flattened derive(timestamp = toTimestamp(substring(timestamp, 1, 23), 'yyyy-MM-dd\\'T\\'HH:mm:ss.SSS', 'UTC'),", " studydate = toString(byPath('metadata[\"00080020\"].Value[1]')),", " studydate_formatted = toDate(byPath('metadata[\"00080020\"].Value[1]'), 'yyyyMMdd', 'UTC'),", " studydescription = toString(byPath('metadata[\"00081030\"].Value[1]')),", " issuerofpatientid = toString(byPath('metadata[\"00100021\"].Value[1]')),", " patientid = toString(byPath('metadata[\"00100020\"].Value[1]')),", " patientname = toString(byPath('metadata[\"00100010\"].Value[1].Alphabetic')),", " modality = toString(byPath('metadata[\"00080060\"].Value[1]')),", " sopclassuid = toString(byPath('metadata[\"00080016\"].Value[1]')),", " filepath = toString(filepath),", " metadata_string = toString(metadata),", " created_date = currentTimestamp(),", " patientbirthdate = toString(byPath('metadata[\"00100030\"].Value[1]')),", " accessionnumber = toString(byPath('metadata[\"00080050\"].Value[1]')),", " referringphysicianname = toString(byPath('metadata[\"00080090\"].Value[1]')),", " modalitiesinstudy = toString(byPath('metadata[\"00080061\"].Value[1]')),", " performedprocedurestepstartdate = toString(byPath('metadata[\"00400244\"].Value[1]')),", " manufacturermodelname = toString(byPath('metadata[\"00081090\"].Value[1]')),", " studytime = toString(byPath('metadata[\"00080030\"].Value[1]')),", " timezoneoffsetfromutc = toString(byPath('metadata[\"00080201\"].Value[1]')),", " numberofstudyrelatedseries = toString(byPath('metadata[\"00201206\"].Value[1]')),", " numberofstudyrelatedinstances = toString(byPath('metadata[\"00201208\"].Value[1]')),", " seriesnumber = toString(byPath('metadata[\"00200011\"].Value[1]')),", " seriesdescription = toString(byPath('metadata[\"0008103E\"].Value[1]')),", " numberofseriesrelatedinstances = toString(byPath('metadata[\"00201209\"].Value[1]')),", " bodypartexamined = toString(byPath('metadata[\"00180015\"].Value[1]')),", " laterality = toString(byPath('metadata[\"00200060\"].Value[1]')),", " seriesdate = toString(byPath('metadata[\"00080021\"].Value[1]')),", " seriestime = toString(byPath('metadata[\"00080031\"].Value[1]')),", " instancenumber = toString(byPath('metadata[\"00200013\"].Value[1]')),", " documenttitle = toString(byPath('metadata[\"00420010\"].Value[1]'))) ~> extracted", "changeFeed select(mapColumn(", " action = body.action,", " timestamp = body.timestamp,", " partitionName = body.partitionName,", " studyinstanceuid = body.studyInstanceUid,", " seriesinstanceuid = body.seriesInstanceUid,", " sopinstanceuid = body.sopInstanceUid,", " metadata = body.metadata,", " filepath = body.filePath", " ),", " skipDuplicateMapInputs: false,", " skipDuplicateMapOutputs: false) ~> flattened", "upToDate alterRow(upsertIf(or(equals(action,'Create'),equals(action,'Update'))),", " deleteIf(equals(action,'Delete'))) ~> instanceSinkUpdates", "extracted aggregate(groupBy(partitionName,", " studyinstanceuid,", " seriesinstanceuid,", " sopinstanceuid),", " action = last(action),", " lastModifiedTimestamp = last(timestamp),", " studydate = last(studydate),", " studydate_formatted = last(studydate_formatted),", " studydescription = last(studydescription),", " issuerofpatientid = last(issuerofpatientid),", " patientid = last(patientid),", " patientname = last(patientname),", " modality = last(modality),", " sopclassuid = last(sopclassuid),", " filepath = last(filepath),", " metadata = last(metadata),", " metadata_string = last(metadata_string),", " created_date = last(created_date),", " patientbirthdate = last(patientbirthdate),", " accessionnumber = last(accessionnumber),", " referringphysicianname = last(referringphysicianname),", " modalitiesinstudy = last(modalitiesinstudy),", " performedprocedurestepstartdate = last(performedprocedurestepstartdate),", " manufacturermodelname = last(manufacturermodelname),", " studytime = last(studytime),", " timezoneoffsetfromutc = last(timezoneoffsetfromutc),", " numberofstudyrelatedseries = last(numberofstudyrelatedseries),", " numberofstudyrelatedinstances = last(numberofstudyrelatedinstances),", " seriesnumber = last(seriesnumber),", " seriesdescription = last(seriesdescription),", " numberofseriesrelatedinstances = last(numberofseriesrelatedinstances),", " bodypartexamined = last(bodypartexamined),", " laterality = last(laterality),", " seriesdate = last(seriesdate),", " seriestime = last(seriestime),", " instancenumber = last(instancenumber),", " documenttitle = last(documenttitle)) ~> aggregatedChanges", "existingInstances aggregate(groupBy(partitionName,", " studyInstanceUid = studyinstanceuid,", " seriesInstanceUid = seriesinstanceuid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(toDate(studydate, 'yyyyMMdd', 'UTC')),", " studyDescription = last(studydescription),", " issuerOfPatientId = last(issuerofpatientid),", " patientId = last(patientid),", " patientName = last(patientname),", " modality = last(modality),", " instanceCount = count()) ~> allSeries", "existingSeries aggregate(groupBy(partitionName,", " studyInstanceUid),", " lastModifiedTimestamp = last(lastModifiedTimestamp),", " studyDate = last(studyDate),", " studyDescription = last(studyDescription),", " issuerOfPatientId = last(issuerOfPatientId),", " patientId = last(patientId),", " patientName = last(patientName),", " instanceCount = sum(instanceCount),", " seriesCount = count()) ~> allStudies", "modifiedSeries alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> seriesSinkUpdates", "modifiedStudies alterRow(upsertIf(instanceCount>0),", " deleteIf(instanceCount<=0)) ~> studySinkUpdate", "upToDate aggregate(groupBy(partitionName,", " studyInstanceUid = studyinstanceuid,", " seriesInstanceUid = seriesinstanceuid),", " instanceDifference = sum(iif(equals(action, 'Create'), 1, iif(equals(action, 'Delete'), -1, 0)))) ~> seriesChanges", "seriesChanges aggregate(groupBy(partitionName,", " studyInstanceUid),", " instanceDifference = sum(instanceDifference)) ~> studyChanges", "annotatedSeries filter(hasChange) ~> modifiedSeries", "allSeries derive(hasChange = not(isNull(seriesCache#lookup(partitionName, studyInstanceUid, seriesInstanceUid)))) ~> annotatedSeries", "allStudies derive(hasChange = not(isNull(studyCache#lookup(partitionName, studyInstanceUid)))) ~> annotatedStudies", "annotatedStudies filter(hasChange) ~> modifiedStudies", "aggregatedChanges filter(or(not(isNull(filepath)), equals(action, 'Delete'))) ~> upToDate", "instanceSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($InstanceTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyinstanceuid','seriesinstanceuid','sopinstanceuid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 1,", " mapColumn(", " partitionName,", " studyinstanceuid,", " seriesinstanceuid,", " sopinstanceuid,", " lastModifiedTimestamp,", " studydate,", " studydate_formatted,", " studydescription,", " issuerofpatientid,", " patientid,", " patientname,", " modality,", " sopclassuid,", " filepath,", " metadata,", " metadata_string,", " created_date,", " patientbirthdate,", " accessionnumber,", " referringphysicianname,", " modalitiesinstudy,", " performedprocedurestepstartdate,", " manufacturermodelname,", " studytime,", " timezoneoffsetfromutc,", " numberofstudyrelatedseries,", " numberofstudyrelatedinstances,", " seriesnumber,", " seriesdescription,", " numberofseriesrelatedinstances,", " bodypartexamined,", " laterality,", " seriesdate,", " seriestime,", " instancenumber,", " documenttitle", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> instanceTable", "seriesSinkUpdates sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($SeriesTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 2,", " mapColumn(", " partitionName,", " studyInstanceUid,", " seriesInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " modality,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> seriesTable", "studySinkUpdate sink(allowSchemaDrift: true,", " validateSchema: false,", " format: 'delta',", " fileSystem: ($ContainerName),", " folderPath: ($StudyTablePath),", " mergeSchema: true,", " autoCompact: true,", " optimizedWrite: false,", " vacuum: ($RetentionHours),", " deletable: true,", " insertable: false,", " updateable: false,", " upsertable: true,", " keys:['partitionName','studyInstanceUid'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " saveOrder: 3,", " mapColumn(", " partitionName,", " studyInstanceUid,", " lastModifiedTimestamp,", " studyDate,", " studyDescription,", " issuerOfPatientId,", " patientId,", " patientName,", " seriesCount,", " instanceCount", " ),", " partitionBy('key',", " 0,", " partitionName", " )) ~> studyTable", "seriesChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid','seriesInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> seriesCache", "studyChanges sink(validateSchema: false,", " keys:['partitionName','studyInstanceUid'],", " store: 'cache',", " format: 'inline',", " output: false,", " saveOrder: 1) ~> studyCache" ] } }, "dependsOn": []