Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
RunOnFluxBot committed Feb 7, 2025
1 parent 1f11f8e commit 26fca5b
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 137 deletions.
56 changes: 0 additions & 56 deletions services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -11975,61 +11975,6 @@ async function masterSlaveApps() {
}
}

// function responsable for monitoring apps using sharedDB project
async function monitorSharedDBApps() {
try {
// do not run if installationInProgress or removalInProgress
if (installationInProgress || removalInProgress) {
return;
}
// get list of all installed apps
const appsInstalled = await installedApps();

// eslint-disable-next-line no-restricted-syntax
for (const installedApp of appsInstalled.data.filter((app) => app.version > 3)) {
const componentUsingSharedDB = installedApp.compose.find((comp) => comp.repotag.includes('runonflux/shared-db'));
if (componentUsingSharedDB) {
log.info(`monitorSharedDBApps: Found app ${installedApp.name} using sharedDB`);
if (componentUsingSharedDB.ports && componentUsingSharedDB.ports.length > 0) {
const apiPort = componentUsingSharedDB.ports[componentUsingSharedDB.ports.length - 1]; // it's the last port from the shareddb that is the api port
// eslint-disable-next-line no-await-in-loop
const url = `http://localhost:${apiPort}/status`;
log.info(`monitorSharedDBApps: ${installedApp.name} going to check operator status on url ${url}`);
// eslint-disable-next-line no-await-in-loop
const operatorStatus = await serviceHelper.axiosGet(url).catch((error) => log.error(`monitorSharedDBApps: ${installedApp.name} operatorStatus error: ${error}`));
if (operatorStatus.data && operatorStatus.data.status !== 'OK' && operatorStatus.data.clusterStatus.length > 1) {
const sequence = operatorStatus.data.sequenceNumber || 0;
log.info(`monitorSharedDBApps: ${installedApp.name} status is not OK and sequenceNumber is ${sequence}`);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.delay(2.5 * 60 * 1000); // operator status api cache is updated every 2 minutes
// eslint-disable-next-line no-await-in-loop
const operatorStatusDoubleCheck = await serviceHelper.axiosGet(url).catch((error) => log.error(`monitorSharedDBApps: ${installedApp.name} operatorStatus double check error: ${error}`));
if (operatorStatusDoubleCheck.data && operatorStatusDoubleCheck.data.status !== 'OK' && operatorStatusDoubleCheck.data.clusterStatus.length > 1) {
const auxSequence = operatorStatusDoubleCheck.data.sequenceNumber || 0;
if (sequence === auxSequence) {
log.info(`monitorSharedDBApps: ${installedApp.name} operatorStatusDoubleCheck is not OK and sequence number is not syncing, going to uninstall the app`);
// eslint-disable-next-line no-await-in-loop
await removeAppLocally(installedApp.name, null, true, false, true);
} else {
log.info(`monitorSharedDBApps: ${installedApp.name} operatorStatusDoubleCheck node is syncing`);
}
} else {
log.info(`monitorSharedDBApps: ${installedApp.name} operatorStatusDoubleCheck is OK or there are no peers to connect to`);
}
} else {
log.info(`monitorSharedDBApps: ${installedApp.name} operatorStatus is OK`);
}
}
}
}
} catch (error) {
log.error(`monitorSharedDBApps: ${error}`);
} finally {
await serviceHelper.delay(5 * 60 * 1000);
monitorSharedDBApps();
}
}

let dosState = 0; // we can start at bigger number later
let dosMessage = null;
let dosMountMessage = '';
Expand Down Expand Up @@ -13654,5 +13599,4 @@ module.exports = {
getAppSpecsUSDPrice,
checkApplicationsCpuUSage,
monitorNodeStatus,
monitorSharedDBApps,
};
131 changes: 125 additions & 6 deletions services/dockerService.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const config = require('config');
const stream = require('stream');
const Docker = require('dockerode');
const ipLib = require('ip');
const path = require('path');
const serviceHelper = require('./serviceHelper');
const fluxCommunicationMessagesSender = require('./fluxCommunicationMessagesSender');
Expand Down Expand Up @@ -526,6 +527,83 @@ async function obtainPayloadFromStorage(url, appName) {
}
}

/**
* Inspects a Docker network and returns the next available IP address
* from its defined subnet.
*
* @param {string} appName - The name of app.
* @returns {Promise<string>} - The next available IP address.
*/
async function getNextAvailableIPForApp(appName) {
const networkName = `fluxDockerNetwork_${appName}`;
const network = docker.getNetwork(networkName);
const data = await network.inspect();

// Use the first IPAM configuration from the network.
const { Subnet, Gateway } = data.IPAM.Config[0];

// Parse the subnet using the ip library.
const subnetInfo = ipLib.cidrSubnet(Subnet);

// Calculate usable range:
// Typically, we skip the network address (firstAddress) and the broadcast address (lastAddress).
const firstIpLong = ipLib.toLong(subnetInfo.firstAddress) + 1;
const lastIpLong = ipLib.toLong(subnetInfo.lastAddress) - 1;
const gatewayLong = ipLib.toLong(Gateway);

// Collect allocated IP addresses in the network.
const allocatedIPs = new Set();
if (data.Containers) {
Object.values(data.Containers).forEach((containerInfo) => {
// Container IPs come as "172.23.143.2/24"; extract just the IP.
const containerIP = containerInfo.IPv4Address.split('/')[0];
allocatedIPs.add(containerIP);
});
}

// Iterate through the IP range to find the first available IP.
// eslint-disable-next-line no-plusplus
for (let candidateLong = firstIpLong; candidateLong <= lastIpLong; candidateLong++) {
// eslint-disable-next-line no-continue
if (candidateLong === gatewayLong) continue; // Skip the gateway.
const candidateIP = ipLib.fromLong(candidateLong);
if (!allocatedIPs.has(candidateIP)) {
return candidateIP;
}
}
throw new Error(`No available IP addresses found in the subnet ${Subnet}.`);
}

/**
* Retrieves the IP address of a running Docker container.
*
* @param {string} containerName - The name of the container.
* @returns {Promise<string|null>} - The container's IP address, or null if not found.
* @throws {Error} - If the container has no network or IP address.
*/
const getContainerIP = async (containerName) => {
try {
const container = await docker.getContainer(containerName).inspect();
const networks = Object.keys(container.NetworkSettings.Networks);

if (!Array.isArray(networks) || networks.length === 0) {
throw new Error('No networks found for container');
}

const networkName = networks[0]; // Automatically selects the first network
const ipAddressOfContainer = container.NetworkSettings.Networks[networkName].IPAddress ?? null;

if (!ipAddressOfContainer) {
throw new Error('No IPAddress found for container');
}

return ipAddressOfContainer;
} catch (error) {
log.error(`Failed to retrieve IP for ${containerName}: ${error.message}`);
return null;
}
};

/**
* Creates an app container.
*
Expand Down Expand Up @@ -647,6 +725,44 @@ async function appDockerCreate(appSpecifications, appName, isComponent, fullAppS
adjustedCommands.push(command);
}
});

const isSender = envParams?.some((env) => env.startsWith('LOG=SEND'));
const isCollector = envParams?.some((env) => env.startsWith('LOG=COLLECT'));

let syslogTarget = null;
let syslogIP = null;

if (fullAppSpecs && fullAppSpecs?.compose) {
syslogTarget = fullAppSpecs.compose.find((app) => app.environmentParameters?.some((env) => env.startsWith('LOG=COLLECT')))?.name;
}

if (syslogTarget && !isCollector) {
syslogIP = await getContainerIP(`flux${syslogTarget}_${appName}`);
}

log.info(`isSender=${isSender}, syslogTarget=${syslogTarget}, syslogCollectorIP=${syslogIP}`);

const logConfig = isSender && syslogTarget && syslogIP
? {
Type: 'syslog',
Config: {
'syslog-address': `udp://${syslogIP}:514`,
'syslog-facility': 'local0',
tag: `${appSpecifications.name}`,
},
}
: {
Type: 'json-file',
Config: {
'max-file': '1',
'max-size': '20m',
},
};

// *** Automatic IP Assignment for the Container ***
const autoAssignedIP = await getNextAvailableIPForApp(appName);
log.info(`Auto-assigned IP for ${appName}: ${autoAssignedIP}`);

const options = {
Image: appSpecifications.repotag,
name: getAppIdentifier(identifier),
Expand Down Expand Up @@ -676,14 +792,17 @@ async function appDockerCreate(appSpecifications, appName, isComponent, fullAppS
Name: restartPolicy,
},
NetworkMode: `fluxDockerNetwork_${appName}`,
LogConfig: {
Type: 'json-file',
Config: {
'max-file': '1',
'max-size': '20m',
LogConfig: logConfig,
ExtraHosts: [`fluxnode.service:${config.server.fluxNodeServiceAddress}`],
},
NetworkingConfig: {
EndpointsConfig: {
[`fluxDockerNetwork_${appName}`]: {
IPAMConfig: {
IPv4Address: autoAssignedIP,
},
},
},
ExtraHosts: [`fluxnode.service:${config.server.fluxNodeServiceAddress}`],
},
};

Expand Down
74 changes: 2 additions & 72 deletions services/fluxService.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ let prepLock = false;
*/
let daemonStartRequired = false;

/**
* Only disabled if a stream fails to meet minimum
* throughput criteria. I.e. 200Mbps.
*/
let streamChainDisabled = false;

/**
* For testing
*/
Expand All @@ -76,20 +70,6 @@ function lockStreamLock() {
lock = true;
}

/**
* For testing
*/
function disableStreaming() {
streamChainDisabled = true;
}

/**
* For testing
*/
function enableStreaming() {
streamChainDisabled = false;
}

/**
* To show the directory on the node machine where FluxOS files are stored.
* @param {object} req Request.
Expand Down Expand Up @@ -1598,12 +1578,6 @@ async function restartFluxOS(req, res) {
* @returns {Promise<void>}
*/
async function streamChainPreparation(req, res) {
if (streamChainDisabled) {
res.statusMessage = 'Failed minimium throughput criteria. Disabled.';
res.status(422).end();
return;
}

if (lock || prepLock) {
res.statusMessage = 'Streaming of chain already in progress, server busy.';
res.status(503).end();
Expand Down Expand Up @@ -1735,7 +1709,7 @@ async function streamChainPreparation(req, res) {
log.info('Stream chain prep timeout hit: services already restarted or stream in progress');
}
prepLock = false;
}, 30 * 1_000);
}, 30 * 1000);
}
}

Expand Down Expand Up @@ -1790,20 +1764,11 @@ async function streamChainPreparation(req, res) {
* @returns {Promise<void>}
*/
async function streamChain(req, res) {
if (streamChainDisabled) {
res.statusMessage = 'Failed minimium throughput criteria. Disabled.';
res.status(422).end();
return;
}

if (lock) {
res.statusMessage = 'Streaming of chain already in progress, server busy.';
res.status(503).end();
return;
}

let monitorTimer = null;

try {
lock = true;

Expand Down Expand Up @@ -1911,42 +1876,10 @@ async function streamChain(req, res) {
res.setHeader('Approx-Content-Length', totalSize.toString());

const workflow = [];
const passThrough = new stream.PassThrough();
let bytesTransferred = 0;

const monitorStreamWorker = () => {
// this may not trigger after exactly 35s, but close enough. We use 35 seconds,
// so that it can be guaranteed that the timeout set in streamChainPreparation has
// already triggered. Also gives us a better approximation of throughput as TCP can
// take quite a bit of time to wind up sometimes.
const timeoutMs = 35_000;
const thresholdMbps = 200;

// data transfer rates are usually Megabytes per second (not Mebibytes)
return setTimeout(() => {
const mbps = ((bytesTransferred / 1000 ** 2) / (timeoutMs / 1_000)) * 8;

if (mbps < thresholdMbps) {
log.info(`Stream chain transfer rate too slow: ${mbps.toFixed(2)} Mbps, cancelling stream and disabling further streams`);
streamChainDisabled = true;
passThrough.destroy();
} else {
log.info(`Stream chain transfer rate: ${mbps.toFixed(2)} Mbps, proceeding`);
}
}, timeoutMs);
};

passThrough.once('data', () => {
monitorTimer = monitorStreamWorker();
});

passThrough.on('data', (chunk) => {
bytesTransferred += chunk.byteLength;
});

const readStream = tar.create({ cwd: base }, folders);

workflow.push(readStream, passThrough);
workflow.push(readStream);

if (compress) {
log.info('Compression requested... adding gzip. This can be 10-20x slower than sending uncompressed');
Expand All @@ -1963,7 +1896,6 @@ async function streamChain(req, res) {
} catch (error) {
log.error(error);
} finally {
clearTimeout(monitorTimer);
// start services
if (daemonStartRequired) {
daemonStartRequired = false;
Expand Down Expand Up @@ -2037,8 +1969,6 @@ module.exports = {
updateDaemon,
updateFlux,
// Exports for testing purposes
disableStreaming,
enableStreaming,
fluxLog,
getStreamLock,
lockStreamLock,
Expand Down
3 changes: 0 additions & 3 deletions services/serviceManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,6 @@ async function startFluxFunctions() {
setTimeout(() => {
appsService.masterSlaveApps(); // stop and starts apps using syncthing g: when a new master is required or was changed.
}, 30 * 1000);
setTimeout(() => {
appsService.monitorSharedDBApps(); // Monitor SharedDB Apps.
}, 60 * 1000);
}, 3 * 60 * 1000);
setTimeout(() => {
setInterval(() => { // every 30 mins (15 blocks)
Expand Down

0 comments on commit 26fca5b

Please sign in to comment.