diff --git a/src/api/ateliereLive/pipelines/streams/streams.ts b/src/api/ateliereLive/pipelines/streams/streams.ts index 879c1760..a576461f 100644 --- a/src/api/ateliereLive/pipelines/streams/streams.ts +++ b/src/api/ateliereLive/pipelines/streams/streams.ts @@ -14,8 +14,8 @@ import { getSourceIdFromSourceName, getUuidFromIngestName } from '../../ingest'; import { connectIngestToPipeline } from '../../streams'; import { getAuthorizationHeader } from '../../utils/authheader'; import { - getAvailablePortsForIngest, getCurrentlyUsedPorts, + getNextAvailablePortForIngest, initDedicatedPorts } from '../../utils/fwConfigPorts'; import { @@ -90,23 +90,16 @@ export async function createStream( await initDedicatedPorts(); for (const pipeline of production_settings.pipelines) { - const availablePorts = getAvailablePortsForIngest( + const availablePort = await getNextAvailablePortForIngest( source.ingest_name, usedPorts ); - if (availablePorts.size === 0) { + if (availablePort == -1) { Log().error(`No available ports for ingest '${source.ingest_name}'`); throw `No available ports for ingest '${source.ingest_name}'`; } - const availablePort = availablePorts.values().next().value; - if (!availablePort) - throw `Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name} cannot be undefined`; - Log().info( - `Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name}` - ); - const pipelineSource = pipeline.sources?.find( (s) => s.ingest_source_name === source.ingest_source_name && diff --git a/src/api/ateliereLive/utils/fwConfigPorts.test.ts b/src/api/ateliereLive/utils/fwConfigPorts.test.ts index 71e5eb93..248af37c 100644 --- a/src/api/ateliereLive/utils/fwConfigPorts.test.ts +++ b/src/api/ateliereLive/utils/fwConfigPorts.test.ts @@ -1,7 +1,7 @@ import { getPipelines } from '../pipelines/pipelines'; import { - getAvailablePortsForIngest, getCurrentlyUsedPorts, + getNextAvailablePortForIngest, initDedicatedPorts } from './fwConfigPorts'; @@ -35,17 +35,20 @@ describe.skip('fwConfigPorts tests', () => { describe('getAvailableTypePorts', () => { test('should return available ingest ports', async () => { - const ingestPorts = getAvailablePortsForIngest('cloud_ingest', usedPorts); + const ingestPort = getNextAvailablePortForIngest( + 'cloud_ingest', + usedPorts + ); - expect(ingestPorts).not.toBeUndefined(); - expect(ingestPorts.size).toBeGreaterThan(0); + expect(ingestPort).not.toBeUndefined(); + expect(ingestPort).toBeGreaterThan(-1); }); test('should return default ingest ports when ingest doesnt exist', async () => { - const ingestPorts = getAvailablePortsForIngest('wrong_name', usedPorts); + const ingestPort = getNextAvailablePortForIngest('wrong_name', usedPorts); - expect(ingestPorts).not.toBeUndefined(); - expect(ingestPorts.size).toBeGreaterThan(0); + expect(ingestPort).not.toBeUndefined(); + expect(ingestPort).toBeGreaterThan(-1); }); }); }); diff --git a/src/api/ateliereLive/utils/fwConfigPorts.ts b/src/api/ateliereLive/utils/fwConfigPorts.ts index cfc29187..b0d172cf 100644 --- a/src/api/ateliereLive/utils/fwConfigPorts.ts +++ b/src/api/ateliereLive/utils/fwConfigPorts.ts @@ -1,27 +1,20 @@ -import { FwConfigTypeEnum } from '../../../interfaces/firewallConfig'; -import { getFwConfigs } from '../../manager/firewallConfig'; +import { + FwConfigTypeEnum, + FwConfigWithId +} from '../../../interfaces/firewallConfig'; +import { + getFwConfigs, + putFwConfigLastUsedPortIndex +} from '../../manager/firewallConfig'; import { getPipeline } from '../pipelines/pipelines'; -const dedicatedPorts = new Map>(); +const dedicatedPorts = new Map(); export async function initDedicatedPorts() { dedicatedPorts.clear(); (await getFwConfigs()).map((conf) => { - const temp = dedicatedPorts.get(`${conf.type}-${conf.name}`); - if (temp) { - conf.port_range_allow.forEach((port) => { - temp.add(port); - }); - dedicatedPorts.set(`${conf.type}-${conf.name}`, temp); - } else { - dedicatedPorts.set( - `${conf.type}-${conf.name}`, - new Set(conf.port_range_allow) - ); - } + dedicatedPorts.set(`${conf.type}-${conf.name}`, conf); }); - const temp = dedicatedPorts; - return temp; } export async function getCurrentlyUsedPorts( @@ -65,7 +58,7 @@ export async function getCurrentlyUsedPorts( return usedPorts; } -export function getAvailablePortsForIngest( +export async function getNextAvailablePortForIngest( name: string, usedPorts: Set ) { @@ -77,26 +70,23 @@ export function getAvailablePortsForIngest( `${FwConfigTypeEnum.Ingest}-${'default'}` )!; } - const availablePorts = new Set(); - dedicatedPortsForName.forEach((dedPort) => { - if (usedPorts && !usedPorts.has(dedPort)) { - availablePorts.add(dedPort); - } - }); - return availablePorts; -} -export function getAvailablePortsForNameAndType( - name: string, - type: string, - usedPorts: Set -) { - const dedicatedTypePorts = dedicatedPorts.get(`${type}-${name}`)!; - const availablePorts = new Set(); - dedicatedTypePorts.forEach((dedPort) => { - if (usedPorts && !usedPorts.has(dedPort)) { - availablePorts.add(dedPort); + const port_range = dedicatedPortsForName.port_range_allow; + const numberOfPorts = port_range.length; + let availablePort = -1; + for (let i = 0; i < numberOfPorts; i++) { + const currentPort = + port_range[dedicatedPortsForName.last_used_port_index++ % numberOfPorts]; + if (usedPorts && !usedPorts.has(currentPort)) { + availablePort = currentPort; + break; } - }); - return availablePorts; + } + + if (availablePort != -1) { + dedicatedPortsForName.last_used_port_index %= numberOfPorts; + await putFwConfigLastUsedPortIndex(dedicatedPortsForName); + } + + return availablePort; } diff --git a/src/api/manager/firewallConfig.ts b/src/api/manager/firewallConfig.ts index b14118a3..0eca3343 100644 --- a/src/api/manager/firewallConfig.ts +++ b/src/api/manager/firewallConfig.ts @@ -48,7 +48,24 @@ export async function putFwConfig( { name: fwConfig.name, type: fwConfig.type, - port_range_allow: fwConfig.port_range_allow + port_range_allow: fwConfig.port_range_allow, + last_used_port_index: fwConfig.last_used_port_index } ); } + +export async function putFwConfigLastUsedPortIndex( + fwConfig: FwConfigWithId +): Promise { + const db = await getDatabase(); + + const result = await db + .collection('fw_config') + .findOneAndUpdate( + { _id: fwConfig._id }, + { $set: { last_used_port_index: fwConfig.last_used_port_index } } + ); + if (!result.value) { + console.log('Failed to update firewall rules with last used port:', result); + } +} diff --git a/src/api/manager/workflow.ts b/src/api/manager/workflow.ts index 05decb42..e88e5df2 100644 --- a/src/api/manager/workflow.ts +++ b/src/api/manager/workflow.ts @@ -40,8 +40,8 @@ import { import { getSourcesByIds } from './sources'; import { SourceToPipelineStream } from '../../interfaces/Source'; import { - getAvailablePortsForIngest, getCurrentlyUsedPorts, + getNextAvailablePortForIngest, initDedicatedPorts } from '../ateliereLive/utils/fwConfigPorts'; import { getAudioMapping } from './inventory'; @@ -111,21 +111,15 @@ async function connectIngestSources( const audioMapping = newAudioMapping?.length ? newAudioMapping : [[0, 1]]; for (const pipeline of productionSettings.pipelines) { - const availablePorts = getAvailablePortsForIngest( + const nextAvailablePort = await getNextAvailablePortForIngest( source.ingest_name, usedPorts ); - if (availablePorts.size === 0) { - Log().error(`No available ports for ingest '${source.ingest_name}'`); - throw `No available ports for ingest '${source.ingest_name}'`; + if (nextAvailablePort == -1) { + throw `Failed to find an available port to '${source.ingest_name}'-${source.ingest_source_name}`; } - const availablePort = availablePorts.values().next().value || 0; - Log().info( - `Allocated port ${availablePort} on '${source.ingest_name}' for ${source.ingest_source_name}` - ); - const pipelineSource = pipeline.sources?.find( (s) => s.ingest_source_name === source.ingest_source_name && @@ -161,7 +155,7 @@ async function connectIngestSources( interfaces: [ { ...pipeline.interfaces[0], - port: availablePort + port: nextAvailablePort } ] }; @@ -179,7 +173,7 @@ async function connectIngestSources( throw `Source '${source.ingest_name}/${ingestUuid}:${source.ingest_source_name}' failed to connect to '${pipeline.pipeline_name}/${pipeline.pipeline_id}': ${error.message}`; }); - usedPorts.add(availablePort); + usedPorts.add(nextAvailablePort); sourceToPipelineStreams.push({ source_id: source._id.toString(), stream_uuid: result.stream_uuid, @@ -719,25 +713,18 @@ export async function startProduction( } catch (error) { Log().error('Could not setup control panels'); Log().error(error); - if (typeof error !== 'string') { - return { - ok: false, - value: [ - { step: 'start', success: true }, - { step: 'streams', success: true }, - { step: 'control_panels', success: false } - ], - error: 'Unknown error occured' - }; + let errorMessage = 'Unknown error occured'; + if (typeof error === 'string') { + errorMessage = error; } return { ok: false, value: [ { step: 'start', success: true }, { step: 'streams', success: true }, - { step: 'control_panels', success: false, message: error } + { step: 'control_panels', success: false, message: errorMessage } ], - error: error + error: errorMessage }; } // Try to connect control panels and pipeline-to-pipeline connections end @@ -746,36 +733,30 @@ export async function startProduction( for (const pipeline of production_settings.pipelines) { await createPipelineOutputs(pipeline); } - } catch (e) { + } catch (error) { Log().error('Could not setup pipeline outputs'); - Log().error(e); + Log().error(error); Log().error('Stopping pipelines'); await stopPipelines( production_settings.pipelines.map((pipeline) => pipeline.pipeline_id!) - ).catch((error) => { - throw `Failed to stop pipelines after production start failure: ${error}`; + ).catch((stropError) => { + throw `Failed to stop pipelines after production start failure: ${stropError}`; }); - if (typeof e !== 'string') { - return { - ok: false, - value: [ - { step: 'start', success: true }, - { step: 'streams', success: true }, - { step: 'control_panels', success: true }, - { step: 'pipeline_outputs', success: false } - ], - error: 'Unknown error occured' - }; + + let errorMessage = 'Unknown error occured'; + if (typeof error === 'string') { + errorMessage = error; } + return { ok: false, value: [ { step: 'start', success: true }, { step: 'streams', success: true }, { step: 'control_panels', success: true }, - { step: 'pipeline_outputs', success: false, message: e } + { step: 'pipeline_outputs', success: false, message: errorMessage } ], - error: e + error: errorMessage }; } @@ -816,22 +797,15 @@ export async function startProduction( Log().info( `Production '${production.name}' with preset '${production_settings.name}' started` ); - } catch (e) { + } catch (error) { Log().error('Could not start multiviews'); - Log().error(e); - if (typeof e !== 'string') { - return { - ok: false, - value: [ - { step: 'start', success: true }, - { step: 'streams', success: true }, - { step: 'control_panels', success: true }, - { step: 'pipeline_outputs', success: false }, - { step: 'multiviews', success: false } - ], - error: 'Could not start multiviews' - }; + Log().error(error); + + let errorMessage = 'Could not start multiviews'; + if (typeof error === 'string') { + errorMessage = error; } + return { ok: false, value: [ @@ -839,9 +813,9 @@ export async function startProduction( { step: 'streams', success: true }, { step: 'control_panels', success: true }, { step: 'pipeline_outputs', success: true }, - { step: 'multiviews', success: false, message: e } + { step: 'multiviews', success: false, message: errorMessage } ], - error: e + error: errorMessage }; } // Try to setup multiviews end diff --git a/src/api/mongoClient/defaults/fwConfig.ts b/src/api/mongoClient/defaults/fwConfig.ts index 68f4f195..32dc770d 100644 --- a/src/api/mongoClient/defaults/fwConfig.ts +++ b/src/api/mongoClient/defaults/fwConfig.ts @@ -4,6 +4,7 @@ export const defaultFwConfig = [ type: 'ingest', port_range_allow: [...Array(200).keys()].map( (increment) => 9000 + increment - ) + ), + last_used_port_index: 0 } ]; diff --git a/src/hooks/useGetFirstEmptySlot.ts b/src/hooks/useGetFirstEmptySlot.ts index 8cda1821..5805ddfa 100644 --- a/src/hooks/useGetFirstEmptySlot.ts +++ b/src/hooks/useGetFirstEmptySlot.ts @@ -5,7 +5,7 @@ import { CallbackHook } from './types'; export function useGetFirstEmptySlot(): CallbackHook< (productionSetup?: Production | undefined) => number > { - const [loading, setLoading] = useState(true); + const [loading] = useState(true); const findFirstEmptySlot = (productionSetup: Production | undefined) => { if (!productionSetup) throw 'no_production'; diff --git a/src/interfaces/firewallConfig.ts b/src/interfaces/firewallConfig.ts index 8f0193a1..823a623d 100644 --- a/src/interfaces/firewallConfig.ts +++ b/src/interfaces/firewallConfig.ts @@ -10,6 +10,7 @@ export interface FwConfig { name: string; type: FwConfigType; port_range_allow: number[]; + last_used_port_index: number; } export type FwConfigWithId = WithId;