Skip to content
This repository has been archived by the owner on Mar 10, 2024. It is now read-only.

Commit

Permalink
chore: remove unnessessary heartbeating readable code
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasmarshall committed Dec 12, 2023
1 parent 7dd1471 commit c8b0607
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 52 deletions.
27 changes: 2 additions & 25 deletions packages/sync-workflows/activities/sync_entity_records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export function createSyncEntityRecords(
return await writer.writeEntityRecords(
connection,
entity.name,
toHeartbeatingReadable(toMappedPropertiesReadable(stream, fieldMappingConfig)),
toMappedPropertiesReadable(stream, fieldMappingConfig),
heartbeat,
/* diffAndDeleteRecords */ shouldDeleteRecords(!updatedAfterMs, connection.providerName)
);
Expand All @@ -81,7 +81,7 @@ export function createSyncEntityRecords(
return await writer.writeEntityRecords(
connection,
entity.name,
toHeartbeatingReadable(toMappedPropertiesReadable(stream, fieldMappingConfig)),
toMappedPropertiesReadable(stream, fieldMappingConfig),
heartbeat,
/* diffAndDeleteRecords */ shouldDeleteRecords(!updatedAfterMs, connection.providerName)
);
Expand Down Expand Up @@ -116,29 +116,6 @@ export function createSyncEntityRecords(
};
}

function toHeartbeatingReadable(readable: Readable): Readable {
// TODO: While this ensures rescheduling of this activity if the process dies,
// it does not ensure that we stop the stream processing.
// We need to include a timeout here to clean up the pipeline when we
// exceed the heartbeat timeout.
return pipeline(
readable,
new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
Context.current().heartbeat();
try {
callback(null, chunk);
} catch (e: any) {
return callback(e);
}
},
}),
// eslint-disable-next-line @typescript-eslint/no-empty-function
() => {}
);
}

function toMappedPropertiesReadable(readable: Readable, fieldMappingConfig: FieldMappingConfig): Readable {
return pipeline(
readable,
Expand Down
31 changes: 4 additions & 27 deletions packages/sync-workflows/activities/sync_object_records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export function createSyncObjectRecords(
return await writer.writeCommonObjectRecords(
connection,
object as CRMCommonObjectType,
toHeartbeatingReadable(readable),
readable,
heartbeat,
/* diffAndDeleteRecords */ shouldDeleteRecords(!updatedAfterMs, connection.providerName)
);
Expand All @@ -120,7 +120,7 @@ export function createSyncObjectRecords(
return await writer.writeCommonObjectRecords(
connection,
object as EngagementCommonObjectType,
toHeartbeatingReadable(readable),
readable,
heartbeat,
/* diffAndDeleteRecords */ shouldDeleteRecords(!updatedAfterMs, connection.providerName)
);
Expand Down Expand Up @@ -150,7 +150,7 @@ export function createSyncObjectRecords(
return await writer.writeObjectRecords(
connection,
object,
toHeartbeatingReadable(toMappedPropertiesReadable(stream, fieldMappingConfig)),
toMappedPropertiesReadable(stream, fieldMappingConfig),
heartbeat,
/* diffAndDeleteRecords */ shouldDeleteRecords(!updatedAfterMs, connection.providerName),
'standard'
Expand All @@ -174,7 +174,7 @@ export function createSyncObjectRecords(
return await writer.writeObjectRecords(
connection,
object,
toHeartbeatingReadable(toMappedPropertiesReadable(stream, fieldMappingConfig)),
toMappedPropertiesReadable(stream, fieldMappingConfig),
heartbeat,
/* diffAndDeleteRecords */ shouldDeleteRecords(!updatedAfterMs, connection.providerName),
'custom'
Expand Down Expand Up @@ -215,29 +215,6 @@ export function createSyncObjectRecords(
};
}

function toHeartbeatingReadable(readable: Readable): Readable {
// TODO: While this ensures rescheduling of this activity if the process dies,
// it does not ensure that we stop the stream processing.
// We need to include a timeout here to clean up the pipeline when we
// exceed the heartbeat timeout.
return pipeline(
readable,
new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
Context.current().heartbeat();
try {
callback(null, chunk);
} catch (e: any) {
return callback(e);
}
},
}),
// eslint-disable-next-line @typescript-eslint/no-empty-function
() => {}
);
}

function toMappedPropertiesReadable(readable: Readable, fieldMappingConfig: FieldMappingConfig): Readable {
return pipeline(
readable,
Expand Down

0 comments on commit c8b0607

Please sign in to comment.