Skip to content

Commit

Permalink
fix(STAR-1493): support for multiple productions on one machine
Browse files Browse the repository at this point in the history
Added a new field to fw_config objects, an index counter: last_used_port_index
  • Loading branch information
permobergedge committed Mar 7, 2025
1 parent 2c0101a commit 8b99241
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 115 deletions.
13 changes: 3 additions & 10 deletions src/api/ateliereLive/pipelines/streams/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import { getSourceIdFromSourceName, getUuidFromIngestName } from '../../ingest';
import { connectIngestToPipeline } from '../../streams';
import { getAuthorizationHeader } from '../../utils/authheader';
import {
getAvailablePortsForIngest,
getCurrentlyUsedPorts,
getNextAvailablePortForIngest,
initDedicatedPorts
} from '../../utils/fwConfigPorts';
import {
Expand Down Expand Up @@ -90,23 +90,16 @@ export async function createStream(
await initDedicatedPorts();

for (const pipeline of production_settings.pipelines) {
const availablePorts = getAvailablePortsForIngest(
const availablePort = await getNextAvailablePortForIngest(
source.ingest_name,
usedPorts
);

if (availablePorts.size === 0) {
if (availablePort == -1) {
Log().error(`No available ports for ingest '${source.ingest_name}'`);
throw `No available ports for ingest '${source.ingest_name}'`;
}

const availablePort = availablePorts.values().next().value;
if (!availablePort)
throw `Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name} cannot be undefined`;
Log().info(
`Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name}`
);

const pipelineSource = pipeline.sources?.find(
(s) =>
s.ingest_source_name === source.ingest_source_name &&
Expand Down
17 changes: 10 additions & 7 deletions src/api/ateliereLive/utils/fwConfigPorts.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getPipelines } from '../pipelines/pipelines';
import {
getAvailablePortsForIngest,
getCurrentlyUsedPorts,
getNextAvailablePortForIngest,
initDedicatedPorts
} from './fwConfigPorts';

Expand Down Expand Up @@ -35,17 +35,20 @@ describe.skip('fwConfigPorts tests', () => {

describe('getAvailableTypePorts', () => {
test('should return available ingest ports', async () => {
const ingestPorts = getAvailablePortsForIngest('cloud_ingest', usedPorts);
const ingestPort = getNextAvailablePortForIngest(
'cloud_ingest',
usedPorts
);

expect(ingestPorts).not.toBeUndefined();
expect(ingestPorts.size).toBeGreaterThan(0);
expect(ingestPort).not.toBeUndefined();
expect(ingestPort).toBeGreaterThan(-1);
});

test('should return default ingest ports when ingest doesnt exist', async () => {
const ingestPorts = getAvailablePortsForIngest('wrong_name', usedPorts);
const ingestPort = getNextAvailablePortForIngest('wrong_name', usedPorts);

expect(ingestPorts).not.toBeUndefined();
expect(ingestPorts.size).toBeGreaterThan(0);
expect(ingestPort).not.toBeUndefined();
expect(ingestPort).toBeGreaterThan(-1);
});
});
});
66 changes: 28 additions & 38 deletions src/api/ateliereLive/utils/fwConfigPorts.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
import { FwConfigTypeEnum } from '../../../interfaces/firewallConfig';
import { getFwConfigs } from '../../manager/firewallConfig';
import {
FwConfigTypeEnum,
FwConfigWithId
} from '../../../interfaces/firewallConfig';
import {
getFwConfigs,
putFwConfigLastUsedPortIndex
} from '../../manager/firewallConfig';
import { getPipeline } from '../pipelines/pipelines';

const dedicatedPorts = new Map<string, Set<number>>();
const dedicatedPorts = new Map<string, FwConfigWithId>();

export async function initDedicatedPorts() {
dedicatedPorts.clear();
(await getFwConfigs()).map((conf) => {
const temp = dedicatedPorts.get(`${conf.type}-${conf.name}`);
if (temp) {
conf.port_range_allow.forEach((port) => {
temp.add(port);
});
dedicatedPorts.set(`${conf.type}-${conf.name}`, temp);
} else {
dedicatedPorts.set(
`${conf.type}-${conf.name}`,
new Set<number>(conf.port_range_allow)
);
}
dedicatedPorts.set(`${conf.type}-${conf.name}`, conf);
});
const temp = dedicatedPorts;
return temp;
}

export async function getCurrentlyUsedPorts(
Expand Down Expand Up @@ -65,7 +58,7 @@ export async function getCurrentlyUsedPorts(
return usedPorts;
}

export function getAvailablePortsForIngest(
export async function getNextAvailablePortForIngest(
name: string,
usedPorts: Set<number>
) {
Expand All @@ -77,26 +70,23 @@ export function getAvailablePortsForIngest(
`${FwConfigTypeEnum.Ingest}-${'default'}`
)!;
}
const availablePorts = new Set<number>();
dedicatedPortsForName.forEach((dedPort) => {
if (usedPorts && !usedPorts.has(dedPort)) {
availablePorts.add(dedPort);
}
});
return availablePorts;
}

export function getAvailablePortsForNameAndType(
name: string,
type: string,
usedPorts: Set<number>
) {
const dedicatedTypePorts = dedicatedPorts.get(`${type}-${name}`)!;
const availablePorts = new Set<number>();
dedicatedTypePorts.forEach((dedPort) => {
if (usedPorts && !usedPorts.has(dedPort)) {
availablePorts.add(dedPort);
const port_range = dedicatedPortsForName.port_range_allow;
const numberOfPorts = port_range.length;
let availablePort = -1;
for (let i = 0; i < numberOfPorts; i++) {
const currentPort =
port_range[dedicatedPortsForName.last_used_port_index++ % numberOfPorts];
if (usedPorts && !usedPorts.has(currentPort)) {
availablePort = currentPort;
break;
}
});
return availablePorts;
}

if (availablePort != -1) {
dedicatedPortsForName.last_used_port_index %= numberOfPorts;
await putFwConfigLastUsedPortIndex(dedicatedPortsForName);
}

return availablePort;
}
19 changes: 18 additions & 1 deletion src/api/manager/firewallConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,24 @@ export async function putFwConfig(
{
name: fwConfig.name,
type: fwConfig.type,
port_range_allow: fwConfig.port_range_allow
port_range_allow: fwConfig.port_range_allow,
last_used_port_index: fwConfig.last_used_port_index
}
);
}

export async function putFwConfigLastUsedPortIndex(
fwConfig: FwConfigWithId
): Promise<void> {
const db = await getDatabase();

const result = await db
.collection('fw_config')
.findOneAndUpdate(
{ _id: fwConfig._id },
{ $set: { last_used_port_index: fwConfig.last_used_port_index } }
);
if (!result.value) {
console.log('Failed to update firewall rules with last used port:', result);
}
}
88 changes: 31 additions & 57 deletions src/api/manager/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import {
import { getSourcesByIds } from './sources';
import { SourceToPipelineStream } from '../../interfaces/Source';
import {
getAvailablePortsForIngest,
getCurrentlyUsedPorts,
getNextAvailablePortForIngest,
initDedicatedPorts
} from '../ateliereLive/utils/fwConfigPorts';
import { getAudioMapping } from './inventory';
Expand Down Expand Up @@ -111,21 +111,15 @@ async function connectIngestSources(
const audioMapping = newAudioMapping?.length ? newAudioMapping : [[0, 1]];

for (const pipeline of productionSettings.pipelines) {
const availablePorts = getAvailablePortsForIngest(
const nextAvailablePort = await getNextAvailablePortForIngest(
source.ingest_name,
usedPorts
);

if (availablePorts.size === 0) {
Log().error(`No available ports for ingest '${source.ingest_name}'`);
throw `No available ports for ingest '${source.ingest_name}'`;
if (nextAvailablePort == -1) {
throw `Failed to find an available port to '${source.ingest_name}'-${source.ingest_source_name}`;
}

const availablePort = availablePorts.values().next().value || 0;
Log().info(
`Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name}`
);

const pipelineSource = pipeline.sources?.find(
(s) =>
s.ingest_source_name === source.ingest_source_name &&
Expand Down Expand Up @@ -161,7 +155,7 @@ async function connectIngestSources(
interfaces: [
{
...pipeline.interfaces[0],
port: availablePort
port: nextAvailablePort
}
]
};
Expand All @@ -179,7 +173,7 @@ async function connectIngestSources(
throw `Source '${source.ingest_name}/${ingestUuid}:${source.ingest_source_name}' failed to connect to '${pipeline.pipeline_name}/${pipeline.pipeline_id}': ${error.message}`;
});

usedPorts.add(availablePort);
usedPorts.add(nextAvailablePort);
sourceToPipelineStreams.push({
source_id: source._id.toString(),
stream_uuid: result.stream_uuid,
Expand Down Expand Up @@ -719,25 +713,18 @@ export async function startProduction(
} catch (error) {
Log().error('Could not setup control panels');
Log().error(error);
if (typeof error !== 'string') {
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: false }
],
error: 'Unknown error occured'
};
let errorMessage = 'Unknown error occured';
if (typeof error === 'string') {
errorMessage = error;
}
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: false, message: error }
{ step: 'control_panels', success: false, message: errorMessage }
],
error: error
error: errorMessage
};
} // Try to connect control panels and pipeline-to-pipeline connections end

Expand All @@ -746,36 +733,30 @@ export async function startProduction(
for (const pipeline of production_settings.pipelines) {
await createPipelineOutputs(pipeline);
}
} catch (e) {
} catch (error) {
Log().error('Could not setup pipeline outputs');
Log().error(e);
Log().error(error);
Log().error('Stopping pipelines');
await stopPipelines(
production_settings.pipelines.map((pipeline) => pipeline.pipeline_id!)
).catch((error) => {
throw `Failed to stop pipelines after production start failure: ${error}`;
).catch((stropError) => {
throw `Failed to stop pipelines after production start failure: ${stropError}`;
});
if (typeof e !== 'string') {
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: false }
],
error: 'Unknown error occured'
};

let errorMessage = 'Unknown error occured';
if (typeof error === 'string') {
errorMessage = error;
}

return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: false, message: e }
{ step: 'pipeline_outputs', success: false, message: errorMessage }
],
error: e
error: errorMessage
};
}

Expand Down Expand Up @@ -816,32 +797,25 @@ export async function startProduction(
Log().info(
`Production '${production.name}' with preset '${production_settings.name}' started`
);
} catch (e) {
} catch (error) {
Log().error('Could not start multiviews');
Log().error(e);
if (typeof e !== 'string') {
return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: false },
{ step: 'multiviews', success: false }
],
error: 'Could not start multiviews'
};
Log().error(error);

let errorMessage = 'Could not start multiviews';
if (typeof error === 'string') {
errorMessage = error;
}

return {
ok: false,
value: [
{ step: 'start', success: true },
{ step: 'streams', success: true },
{ step: 'control_panels', success: true },
{ step: 'pipeline_outputs', success: true },
{ step: 'multiviews', success: false, message: e }
{ step: 'multiviews', success: false, message: errorMessage }
],
error: e
error: errorMessage
};
} // Try to setup multiviews end

Expand Down
3 changes: 2 additions & 1 deletion src/api/mongoClient/defaults/fwConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export const defaultFwConfig = [
type: 'ingest',
port_range_allow: [...Array(200).keys()].map(
(increment) => 9000 + increment
)
),
last_used_port_index: 0
}
];
2 changes: 1 addition & 1 deletion src/hooks/useGetFirstEmptySlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { CallbackHook } from './types';
export function useGetFirstEmptySlot(): CallbackHook<
(productionSetup?: Production | undefined) => number
> {
const [loading, setLoading] = useState(true);
const [loading] = useState(true);

const findFirstEmptySlot = (productionSetup: Production | undefined) => {
if (!productionSetup) throw 'no_production';
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/firewallConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface FwConfig {
name: string;
type: FwConfigType;
port_range_allow: number[];
last_used_port_index: number;
}

export type FwConfigWithId = WithId<FwConfig>;

0 comments on commit 8b99241

Please sign in to comment.