Skip to content

Commit

Permalink
Merge pull request #1358 from nrkno/fix/sofie-3626/delete-next-partIn…
Browse files Browse the repository at this point in the history
…stance

fix: Next PartInstance can become invalid if it's Part gets deleted, resulting in a critical crash in worker
  • Loading branch information
jstarpl authored Jan 14, 2025
2 parents 255c27c + 872c528 commit a6cbf80
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 40 deletions.
94 changes: 67 additions & 27 deletions packages/job-worker/src/ingest/commit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,13 @@ export async function CommitIngestOperation(
// Ensure any adlibbed parts are updated to follow the segmentId of the previous part
await updateSegmentIdsForAdlibbedPartInstances(context, ingestModel, beforePartMap)

// TODO: This whole section can probably be removed later, it's really unneccessary in the grand scheme of
// things, it's here only to debug some problems
if (data.renamedSegments && data.renamedSegments.size > 0) {
logger.debug(`Renamed segments: ${JSON.stringify(Array.from(data.renamedSegments.entries()))}`)
logger.verbose(`Renamed segments: ${JSON.stringify(Array.from(data.renamedSegments.entries()))}`)
}
// End of temporary section

// ensure instances have matching segmentIds with the parts
await updatePartInstancesSegmentIds(context, ingestModel, data.renamedSegments, beforePartMap)

Expand Down Expand Up @@ -220,6 +224,8 @@ export async function CommitIngestOperation(
const pSaveIngest = ingestModel.saveAllToDatabase()
pSaveIngest.catch(() => null) // Ensure promise isn't reported as unhandled

ensureNextPartInstanceIsNotDeleted(playoutModel)

await validateAdlibTestingSegment(context, playoutModel)

try {
Expand Down Expand Up @@ -273,14 +279,18 @@ function canRemoveSegment(
logger.warn(`Not allowing removal of previous playing segment "${segmentId}", making segment unsynced instead`)
return false
}
if (
currentPartInstance?.segmentId === segmentId ||
(nextPartInstance?.segmentId === segmentId && isTooCloseToAutonext(currentPartInstance, false))
) {
if (currentPartInstance?.segmentId === segmentId) {
// Don't allow removing an active rundown
logger.warn(`Not allowing removal of current playing segment "${segmentId}", making segment unsynced instead`)
return false
}
if (nextPartInstance?.segmentId === segmentId && isTooCloseToAutonext(currentPartInstance, false)) {
// Don't allow removing an active rundown
logger.warn(
`Not allowing removal of nexted segment "${segmentId}", because it's too close to an auto-next, making segment unsynced instead`
)
return false
}

if (nextPartInstance?.segmentId === segmentId && nextPartInstance.orphaned === 'adlib-part') {
// Don't allow removing an active rundown
Expand Down Expand Up @@ -365,6 +375,8 @@ async function updatePartInstancesSegmentIds(

const writeOps: AnyBulkWriteOperation<DBPartInstance>[] = []

logger.debug(`updatePartInstancesSegmentIds: renameRules: ${JSON.stringify(renameRules)}`)

for (const [newSegmentId, rule] of rulesInOrder) {
if (rule.fromSegmentIds.length) {
writeOps.push({
Expand Down Expand Up @@ -402,32 +414,24 @@ async function updatePartInstancesSegmentIds(
if (writeOps.length) await context.directCollections.PartInstances.bulkWrite(writeOps)

// Double check that there are no parts using the old segment ids:
const oldSegmentIds = Array.from(renameRules.keys())
const [badPartInstances, badParts] = await Promise.all([
await context.directCollections.PartInstances.findFetch({
rundownId: ingestModel.rundownId,
segmentId: { $in: oldSegmentIds },
}),
await context.directCollections.Parts.findFetch({
rundownId: ingestModel.rundownId,
segmentId: { $in: oldSegmentIds },
}),
])
// TODO: This whole section can probably be removed later, it's really unneccessary in the grand scheme of
// things, it's here only to debug some problems
const oldSegmentIds: SegmentId[] = []
for (const renameRule of renameRules.values()) {
oldSegmentIds.push(...renameRule.fromSegmentIds)
}
const badPartInstances = await context.directCollections.PartInstances.findFetch({
rundownId: ingestModel.rundownId,
segmentId: { $in: oldSegmentIds },
})
if (badPartInstances.length > 0) {
logger.error(
`updatePartInstancesSegmentIds: Failed to update all PartInstances using old SegmentIds "${JSON.stringify(
oldSegmentIds
)}": ${JSON.stringify(badPartInstances)}, writeOps: ${JSON.stringify(writeOps)}`
)
}

if (badParts.length > 0) {
logger.error(
`updatePartInstancesSegmentIds: Failed to update all Parts using old SegmentIds "${JSON.stringify(
oldSegmentIds
)}": ${JSON.stringify(badParts)}, writeOps: ${JSON.stringify(writeOps)}`
)
}
// End of the temporary section
}
}

Expand Down Expand Up @@ -662,10 +666,27 @@ async function getSelectedPartInstances(
})
: []

const currentPartInstance = instances.find((inst) => inst._id === playlist.currentPartInfo?.partInstanceId)
const nextPartInstance = instances.find((inst) => inst._id === playlist.nextPartInfo?.partInstanceId)
const previousPartInstance = instances.find((inst) => inst._id === playlist.previousPartInfo?.partInstanceId)

if (playlist.currentPartInfo?.partInstanceId && !currentPartInstance)
logger.error(
`playlist.currentPartInfo is set, but PartInstance "${playlist.currentPartInfo?.partInstanceId}" was not found!`
)
if (playlist.nextPartInfo?.partInstanceId && !nextPartInstance)
logger.error(
`playlist.nextPartInfo is set, but PartInstance "${playlist.nextPartInfo?.partInstanceId}" was not found!`
)
if (playlist.previousPartInfo?.partInstanceId && !previousPartInstance)
logger.error(
`playlist.previousPartInfo is set, but PartInstance "${playlist.previousPartInfo?.partInstanceId}" was not found!`
)

return {
currentPartInstance: instances.find((inst) => inst._id === playlist.currentPartInfo?.partInstanceId),
nextPartInstance: instances.find((inst) => inst._id === playlist.nextPartInfo?.partInstanceId),
previousPartInstance: instances.find((inst) => inst._id === playlist.previousPartInfo?.partInstanceId),
currentPartInstance,
nextPartInstance,
previousPartInstance,
}
}

Expand Down Expand Up @@ -815,6 +836,16 @@ async function removeSegments(
})
}
for (const segmentId of purgeSegmentIds) {
logger.debug(
`IngestModel: Removing segment "${segmentId}" (` +
`previousPartInfo?.partInstanceId: ${newPlaylist.previousPartInfo?.partInstanceId},` +
`currentPartInfo?.partInstanceId: ${newPlaylist.currentPartInfo?.partInstanceId},` +
`nextPartInfo?.partInstanceId: ${newPlaylist.nextPartInfo?.partInstanceId},` +
`previousPartInstance.segmentId: ${!previousPartInstance ? 'N/A' : previousPartInstance.segmentId},` +
`currentPartInstance.segmentId: ${!currentPartInstance ? 'N/A' : currentPartInstance.segmentId},` +
`nextPartInstance.segmentId: ${!nextPartInstance ? 'N/A' : nextPartInstance.segmentId}` +
`)`
)
ingestModel.removeSegment(segmentId)
}
}
Expand All @@ -824,3 +855,12 @@ async function validateAdlibTestingSegment(_context: JobContext, playoutModel: P
rundown.updateAdlibTestingSegmentRank()
}
}
function ensureNextPartInstanceIsNotDeleted(playoutModel: PlayoutModel) {
if (playoutModel.nextPartInstance) {
// Check if the segment of the nextPartInstance exists
if (!playoutModel.findSegment(playoutModel.nextPartInstance.partInstance.segmentId)) {
// The segment doesn't exist, set nextPartInstance to null, it'll be set by ensureNextPartIsValid() later.
playoutModel.setPartInstanceAsNext(null, false, false)
}
}
}
7 changes: 5 additions & 2 deletions packages/job-worker/src/ingest/lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ export interface CommitIngestData {
removedSegmentIds: SegmentId[]
/**
* Segments that had their ids changed. This helps then be orphaned in the correct place
* eg, whole segment is renamed and middle part deleted
* Note: Only supported for MOS, not 'normal' ingest operations
* eg, whole segment is renamed and middle part deleted.
*
* Maps fromSegmentId to toSegmentId.
*
* _Note: Only supported for MOS, not 'normal' ingest operations_
*/
renamedSegments: Map<SegmentId, SegmentId> | null

Expand Down
3 changes: 3 additions & 0 deletions packages/job-worker/src/ingest/mosDevice/diff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { IngestSegment } from '@sofie-automation/blueprints-integration'
import { SegmentOrphanedReason } from '@sofie-automation/corelib/dist/dataModel/Segment'
import { CommitIngestData } from '../lock'
import { IngestSegmentModel } from '../model/IngestSegmentModel'
import { logger } from '../../logging'

/**
* Update the Ids of Segments based on new Ingest data
Expand Down Expand Up @@ -158,9 +159,11 @@ function applyExternalIdDiff(
}

// Remove the old Segment and it's contents, the new one will be generated shortly
logger.debug(`applyExternalIdDiff: Marking Segment for removing "${oldSegmentId}"`)
ingestModel.removeSegment(oldSegmentId)
} else {
// Perform the rename
logger.debug(`applyExternalIdDiff: Marking Segment for renaming "${oldSegmentId}" -> "${newSegmentId}"`)
ingestModel.changeSegmentId(oldSegmentId, newSegmentId)
}
}
Expand Down
33 changes: 24 additions & 9 deletions packages/job-worker/src/ingest/syncChangesToPartInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { validateAdlibTestingPartInstanceProperties } from '../playout/adlibTest
import { ReadonlyDeep } from 'type-fest'
import { convertIngestModelToPlayoutRundownWithSegments } from './commit'
import { PlayoutRundownModel } from '../playout/model/PlayoutRundownModel'
import { PieceInstance } from '@sofie-automation/corelib/dist/dataModel/PieceInstance'

type PlayStatus = 'previous' | 'current' | 'next'
type SyncedInstance = {
Expand Down Expand Up @@ -143,15 +144,29 @@ export async function syncChangesToPartInstances(
if (!playoutRundownModelForPart)
throw new Error(`Internal Error: playoutRundownModelForPart is undefined (it should never be)`)

const proposedPieceInstances = getPieceInstancesForPart(
context,
playoutModel,
previousPartInstance,
playoutRundownModelForPart,
part,
await piecesThatMayBeActive,
existingPartInstance.partInstance._id
)
// TMP: wrap in try/catch for troubleshooting:
let proposedPieceInstances: PieceInstance[] = []
try {
proposedPieceInstances = getPieceInstancesForPart(
context,
playoutModel,
previousPartInstance,
playoutRundownModelForPart,
part,
await piecesThatMayBeActive,
existingPartInstance.partInstance._id
)
} catch (e) {
logger.error(
`TROUBLESHOOTING: currentPartInstance: ${JSON.stringify(playoutModel.currentPartInstance)}`
)
logger.error(`TROUBLESHOOTING: nextPartInstance: ${JSON.stringify(playoutModel.nextPartInstance)}`)
logger.error(
`TROUBLESHOOTING: previousPartInstance: ${JSON.stringify(playoutModel.previousPartInstance)}`
)

throw e
}

logger.info(`Syncing ingest changes for part: ${partId} (orphaned: ${!!newPart})`)

Expand Down
1 change: 1 addition & 0 deletions packages/job-worker/src/playout/adlibJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ export async function handleDisableNextPiece(context: JobContext, data: DisableN

return sortedPieces.find((piece) => {
return (
piece.pieceInstance.piece.enable.start !== 'now' &&
piece.pieceInstance.piece.enable.start >= nowInPart &&
((!data.undo && !piece.pieceInstance.disabled) || (data.undo && piece.pieceInstance.disabled))
)
Expand Down
8 changes: 7 additions & 1 deletion packages/job-worker/src/playout/infinites.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ export function getPieceInstancesForPart(

playingSegment = playingRundown.getSegment(playingPartInstance.partInstance.segmentId)
if (!playingSegment) {
// Double check that there are no parts using the old segment ids:
// TODO: This whole section can probably be removed later, it's really unneccessary in the grand scheme of
// things, it's here only to debug some problems
const rundownId = playingRundown.rundown._id
context.directCollections.Segments.findFetch({
rundownId: rundownId,
Expand All @@ -344,11 +347,14 @@ export function getPieceInstancesForPart(
)
})
.catch((e) => logger.error(e))
// End of temporary section

throw new Error(
`Segment "${playingPartInstance.partInstance.segmentId}" in Rundown "${
playingRundown.rundown._id
}" not found! (other segments: ${JSON.stringify(playingRundown.segments.map((s) => s.segment._id))})`
}" not found! (partInstanceId: "${
playingPartInstance.partInstance._id
}", other segments: ${JSON.stringify(playingRundown.segments.map((s) => s.segment._id))})`
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ export class PlayoutModelImpl extends PlayoutModelReadonlyImpl implements Playou
this.#baselineHelper.saveAllToDatabase(),
])

// Clean up deleted partInstances, since they have now been deleted by writePartInstancesAndPieceInstances
for (const [partInstanceId, partInstance] of this.allPartInstances) {
if (partInstance !== null) continue
this.allPartInstances.delete(partInstanceId)
}

this.#playlistHasChanged = false

// Execute deferAfterSave()'s
Expand Down
4 changes: 3 additions & 1 deletion packages/job-worker/src/playout/setNext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ async function cleanupOrphanedItems(context: JobContext, playoutModel: PlayoutMo

const selectedPartInstancesSegmentIds = new Set<SegmentId>()

const previousPartInstance = playoutModel.previousPartInstance?.partInstance
const currentPartInstance = playoutModel.currentPartInstance?.partInstance
const nextPartInstance = playoutModel.nextPartInstance?.partInstance

if (previousPartInstance) selectedPartInstancesSegmentIds.add(previousPartInstance.segmentId)
if (currentPartInstance) selectedPartInstancesSegmentIds.add(currentPartInstance.segmentId)
if (nextPartInstance) selectedPartInstancesSegmentIds.add(nextPartInstance.segmentId)

Expand All @@ -291,7 +293,7 @@ async function cleanupOrphanedItems(context: JobContext, playoutModel: PlayoutMo

const alterSegmentsFromRundowns = new Map<RundownId, { deleted: SegmentId[]; hidden: SegmentId[] }>()
for (const segment of segments) {
// If the segment is orphaned and not the segment for the next or current partinstance
// If the segment is orphaned and not the segment for the previous, current or next partInstance
if (!selectedPartInstancesSegmentIds.has(segment.segment._id)) {
let rundownSegments = alterSegmentsFromRundowns.get(segment.segment.rundownId)
if (!rundownSegments) {
Expand Down

0 comments on commit a6cbf80

Please sign in to comment.