diff --git a/CHANGES.md b/CHANGES.md index 9dfd306..c0507c2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,7 +4,7 @@ 本版本无功能更新。特性如下: -- 重构源码至 ESModule +- 重构源码,拆分各个子模块任务区域 - 代码添加了 JSDoc 注释,在 VSCode 开发时有较好的类型提示 - 优化控制台进度提示文字的样式 - 禁用 package 依赖锁文件的生成 diff --git a/bin/dem2terrain.js b/bin/dem2terrain.js index 53154f4..ac579ea 100644 --- a/bin/dem2terrain.js +++ b/bin/dem2terrain.js @@ -1,25 +1,21 @@ -#!/usr/bin/env node --experimental-modules -import { resolve, isAbsolute, relative } from 'node:path' -import { Command } from 'commander'; -import { - main -} from '../index.js'; - -const program = new Command(); -const version = '1.0.4' +#!/usr/bin/env node +const program = require('commander'); +const main = require('../index'); +const path = require('path'); +const version = '1.0.4'; program.name('dem2terrain') .description('使用 GDAL 制作地形瓦片,支持 raster-dem(mapboxgl) 和 terrarium 两种编码输出格式,当前仅输出 PNG 容器格式。') .argument('', '输入 tif 格式的 DEM 文件路径,支持相对路径') .argument('', '输出目录,支持相对路径') .version(version, '-v, --version', '当前版本') - .helpOption('-h, --help', '帮助') + .helpOption('-h, --help', '帮助'); // --- 配置可选参数 program .option('-s, --size ', '指定生成瓦片的尺寸(256 或 512)| 默认 512 像素', '512') .option('-z, --zoom ', '指定瓦片的等级生成范围。例如,想生成 7 ~ 12 级的瓦片,则输入 -z 7-12 | 默认值是 -z 5-14', '5-14') - .option('-e, --encoding ', '指定瓦片的数据编码规则(mapbox 或 terrarium)| 默认 -e mapbox', 'mapbox') + .option('-e, --encoding ', '指定瓦片的数据编码规则(mapbox 或 terrarium)| 默认 -e mapbox', 'mapbox'); // --- 解析参数 program.parse(); @@ -37,7 +33,7 @@ const outputDir = args[1]; const options = program.opts(); const tileSize = Number(options['size']); -const encoding = options['encoding'] +const encoding = options['encoding']; let zoom = options['zoom']; zoom = zoom.split('-'); const minZoom = Number(zoom[0]); @@ -51,22 +47,21 @@ if (minZoom >= maxZoom) { process.exit(); } -const inputAbsolutePath = isAbsolute(inputDem) ? inputDem : resolve(process.cwd(), inputDem) -const outFileAbsolutePath = isAbsolute(outputDir) ? outputDir : resolve(process.cwd(), outputDir) +const inputAbsolutePath = path.isAbsolute(inputDem) ? inputDem : path.resolve(process.cwd(), inputDem); +const outFileAbsolutePath = path.isAbsolute(outputDir) ? outputDir : path.resolve(process.cwd(), outputDir); const logMsg = `\n>> 开始转换... - - 输入文件: ${inputAbsolutePath} - 输出路径: ${outFileAbsolutePath} - 瓦片编码: ${encoding === 'mapbox' ? 'mapbox(raster-dem)' : encoding} - 瓦片尺寸: ${tileSize} px - 瓦片等级: ${minZoom} 至 ${maxZoom} 级 -` -console.log(logMsg) +`; +console.log(logMsg); main(inputDem, outputDir, { minZoom, maxZoom, tileSize, encoding -}); +}); \ No newline at end of file diff --git a/index.js b/index.js index 0b4c9cc..d300808 100644 --- a/index.js +++ b/index.js @@ -1 +1,4 @@ -export { default as main } from './src/index.js' \ No newline at end of file +const main = require('./src/index'); + +module.exports = main; + diff --git a/package.json b/package.json index a5a31f2..83be9f4 100644 --- a/package.json +++ b/package.json @@ -7,11 +7,12 @@ "engines": { "node": ">= 14" }, - "type": "module", "bin": { "dem2terrain": "./bin/dem2terrain.js" }, - "scripts": {}, + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, "repository": { "type": "git", "url": "git+https://github.com/FreeGIS/dem2terrain.git" @@ -34,4 +35,4 @@ "gdal": "^0.11.1", "single-line-log": "^1.1.2" } -} \ No newline at end of file +} diff --git a/src/createtile.js b/src/createtile.js index 00cad9f..3fecaba 100644 --- a/src/createtile.js +++ b/src/createtile.js @@ -1,77 +1,59 @@ -import { join } from 'path'; -import gdal from 'gdal'; -import { createDirs } from './util.js'; -import { getDriverByName } from './gdal-util.js'; +const gdal = require('gdal'); +const { createDirs } = require('./util'); +const { getDriverByName } = require('./gdal-util'); +const path = require('path'); -const { - GDT_Byte, - open, -} = gdal - -function write_terrain_tile(overviewInfo, readinfo, writeinfo, bandnums) { - bandnums.forEach(i => { - let readband; - if (overviewInfo.index === undefined) - readband = readinfo.ds.bands.get(i); - else // 从影像金字塔里读取band信息 - readband = readinfo.ds.bands.get(i).overviews.get(overviewInfo.index); - let writeband = writeinfo.ds.bands.get(i); - let bandBuffData = new Uint8Array(writeinfo.wxsize * writeinfo.wysize); - // 从数据集band读取对应的像素出来写入bandBuffData - readband.pixels.read(readinfo.rx, readinfo.ry, readinfo.rxsize, readinfo.rysize, bandBuffData, { - buffer_width: writeinfo.wxsize, - buffer_height: writeinfo.wysize, - data_type: GDT_Byte - }); - // 写入 - writeband.pixels.write(writeinfo.wx, writeinfo.wy, writeinfo.wxsize, writeinfo.wysize, bandBuffData); - }) +function writeTerrainTile(overviewInfo, readinfo, writeinfo, bandnums) { + bandnums.forEach(i => { + let readband; + if (overviewInfo.index === undefined) + readband = readinfo.ds.bands.get(i); + else // 从影像金字塔里读取band信息 + readband = readinfo.ds.bands.get(i).overviews.get(overviewInfo.index); + let writeband = writeinfo.ds.bands.get(i); + let bandBuffData = new Uint8Array(writeinfo.wxsize * writeinfo.wysize); + // 从数据集band读取对应的像素出来写入bandBuffData + readband.pixels.read(readinfo.rx, readinfo.ry, readinfo.rxsize, readinfo.rysize, bandBuffData, { + buffer_width: writeinfo.wxsize, + buffer_height: writeinfo.wysize, + data_type: gdal.GDT_Byte + }); + // 写入 + writeband.pixels.write(writeinfo.wx, writeinfo.wy, writeinfo.wxsize, writeinfo.wysize, bandBuffData); + }) } -let dataset = null; -let memDriver = null; - -export function createTile(createInfo, callback) { - const { - outTileSize, - overviewInfo, - rb, - wb, - dsPath, - x, - y, - z, - outputTile, - } = createInfo; - if (dataset === null) { - dataset = open(dsPath, 'r'); - } - // 创建一个 mem 驱动,将读取的像素暂存至内存 - if (memDriver === null) { - memDriver = getDriverByName('mem'); - } - const msmDS = memDriver.create("", outTileSize, outTileSize, 3); - rb.ds = dataset; - wb.ds = msmDS; - write_terrain_tile(overviewInfo, rb, wb, [1, 2, 3]); - const pngPath = join(outputTile, '/' + z + '/' + x + '/' + y + '.png'); - //递归创建文件目录 - createDirs(pngPath); - const pngDriver = getDriverByName('png'); - const pngDs = pngDriver.createCopy(pngPath, msmDS); +let dataset = null, memDriver = null; +function createTile(createInfo, callback) { + const { outTileSize, overviewInfo, rb, wb, dsPath, x, y, z, outputTile } = createInfo; + if (dataset === null) + dataset = gdal.open(dsPath, 'r'); + // 创建一个mem内存,将读取的像素写入mem + if (memDriver === null) + memDriver = getDriverByName('mem'); + const msmDS = memDriver.create("", outTileSize, outTileSize, 3); + rb.ds = dataset; + wb.ds = msmDS; + writeTerrainTile(overviewInfo, rb, wb, [1, 2, 3]); + const pngPath = path.join(outputTile, '/' + z + '/' + x + '/' + y + '.png'); + //递归创建文件目录 + createDirs(pngPath); + const pngDriver = getDriverByName('png'); + const pngDs = pngDriver.createCopy(pngPath, msmDS); - // 释放内存 - msmDS.flush(); - msmDS.close(); - pngDs.close(); - callback(null, process.pid); + // 释放内存 + msmDS.flush(); + msmDS.close(); + pngDs.close(); + callback(null, process.pid); } +function closeDataset(callback) { + if (dataset) { + dataset.close(); + dataset = null; + callback(null, process.pid); + } + callback(null, null); -export function closeDataset(callback) { - if (dataset) { - dataset.close(); - dataset = null; - callback(null, process.pid); - } - callback(null, null); } +module.exports = { createTile, closeDataset } \ No newline at end of file diff --git a/src/dem-encode.js b/src/dem-encode.js index 9ff67b9..7f86612 100644 --- a/src/dem-encode.js +++ b/src/dem-encode.js @@ -3,12 +3,12 @@ * @param {number} height 高程值 * @returns {[number, number, number]} */ -export function mapboxEncode(height) { - const value = Math.floor((height + 10000) * 10); - const r = value >> 16; - const g = value >> 8 & 0x0000FF; - const b = value & 0x0000FF; - return [r, g, b]; +function mapboxEncode(height) { + const value = Math.floor((height + 10000) * 10); + const r = value >> 16; + const g = value >> 8 & 0x0000FF; + const b = value & 0x0000FF; + return [r, g, b]; } /** @@ -16,8 +16,8 @@ export function mapboxEncode(height) { * @param {[number, number, number]} color * @returns {number} 高程值 */ -export function mapboxDecode(color) { - return -10000 + ((color[0] * 256 * 256 + color[1] * 256 + color[2]) * 0.1); +function mapboxDecode(color) { + return -10000 + ((color[0] * 256 * 256 + color[1] * 256 + color[2]) * 0.1); } @@ -26,12 +26,12 @@ export function mapboxDecode(color) { * @param {number} height 高程值 * @returns {[number, number, number]} */ -export function terrariumEncode(height) { - height += 32768; - const r = Math.floor(height / 256.0); - const g = Math.floor(height % 256); - const b = Math.floor((height - Math.floor(height)) * 256.0); - return [r, g, b]; +function terrariumEncode(height) { + height += 32768; + const r = Math.floor(height / 256.0); + const g = Math.floor(height % 256); + const b = Math.floor((height - Math.floor(height)) * 256.0); + return [r, g, b]; } /** @@ -39,6 +39,10 @@ export function terrariumEncode(height) { * @param {[number, number, number]} color * @returns {number} 高程值 */ -export function terrariumDecode(color) { - return (color[0] * 256 + color[1] + color[2] / 256.0) - 32768; +function terrariumDecode(color) { + return (color[0] * 256 + color[1] + color[2] / 256.0) - 32768; } + +module.exports = { + mapboxEncode, terrariumEncode, mapboxDecode, terrariumDecode +} \ No newline at end of file diff --git a/src/gdal-util.js b/src/gdal-util.js index 5ee4049..60629f1 100644 --- a/src/gdal-util.js +++ b/src/gdal-util.js @@ -1,35 +1,19 @@ -import gdal from 'gdal'; -const { - drivers, - open, - SpatialReference, - suggestedWarpOutput, - GRA_Average, - GRA_Bilinear, - GRA_Cubic, - GRA_CubicSpline, - GRA_Lanczos, - GRA_Mode, - GRA_NearestNeighbor, - reprojectImage: _reprojectImage -} = gdal +const gdal = require('gdal'); + /** * 根据驱动名称(支持任意大小写)获取 GDAL 驱动 * @param {string} driverName 驱动名称 * @returns {import('gdal').Driver} */ -export function getDriverByName(driverName) { - const length = drivers.count(); +function getDriverByName(driverName) { + const length = gdal.drivers.count(); let nameNormal = driverName.toUpperCase(); for (let i = 0; i < length; i++) { - const driver = drivers.get(i); - if (driver.description === nameNormal) { - return driver; - } + const driver = gdal.drivers.get(i); + if (driver.description === nameNormal) { return driver; } } - throw new Error(`当前操作系统的 GDAL 不存在输入的驱动名称:${nameNormal}`); + throw new Error(`当前gdal种不存在输入的驱动名称${nameNormal}`); } - /** * @function 栅格重投影 * @description 输入一个源数据,设置投影输出数据文件路径和投影坐标系的epsg编码,设置采样参数,输出栅格重投影文件 @@ -41,18 +25,18 @@ export function getDriverByName(driverName) { * * @author freegis */ -export function reprojectImage(src_ds, reproject_path, t_epsg, resampling = 0) { +function reprojectImage(src_ds, reproject_path, t_epsg, resampling = 0) { let s_ds; if (typeof (src_ds) === 'string') - s_ds = open(src_ds); + s_ds = gdal.open(src_ds); else s_ds = src_ds; // 获取源数据集的 坐标系 const s_srs = s_ds.srs; // 投影的目标坐标系 - const t_srs = SpatialReference.fromEPSGA(t_epsg); + const t_srs = gdal.SpatialReference.fromEPSGA(t_epsg); // 输入源数据,源坐标系,目标坐标系,智能计算出输出的栅格像元分辨率和仿射变换参数 - const { rasterSize, geoTransform } = suggestedWarpOutput({ + const { rasterSize, geoTransform } = gdal.suggestedWarpOutput({ src: s_ds, s_srs: s_srs, t_srs: t_srs @@ -71,33 +55,39 @@ export function reprojectImage(src_ds, reproject_path, t_epsg, resampling = 0) { let gdal_resampling; switch (resampling) { case 0: - gdal_resampling = GRA_Average; + gdal_resampling = gdal.GRA_Average; break; case 1: - gdal_resampling = GRA_Bilinear; + gdal_resampling = gdal.GRA_Bilinear; break; case 2: - gdal_resampling = GRA_Cubic; + gdal_resampling = gdal.GRA_Cubic; break; case 3: - gdal_resampling = GRA_CubicSpline; + gdal_resampling = gdal.GRA_CubicSpline; break; case 4: - gdal_resampling = GRA_Lanczos; + gdal_resampling = gdal.GRA_Lanczos; break; case 5: - gdal_resampling = GRA_Mode; + gdal_resampling = gdal.GRA_Mode; break; case 6: - gdal_resampling = GRA_NearestNeighbor; + gdal_resampling = gdal.GRA_NearestNeighbor; break; default: - gdal_resampling = GRA_Average; + gdal_resampling = gdal.GRA_Average; break; } - _reprojectImage({ src: s_ds, dst: t_ds, s_srs, t_srs, resampling: gdal_resampling }); + gdal.reprojectImage({ src: s_ds, dst: t_ds, s_srs, t_srs, resampling: gdal_resampling }); // 关闭退出 t_ds.close(); if (typeof (src_ds) === 'string') s_ds.close(); } + + + +module.exports = { + getDriverByName, reprojectImage +} \ No newline at end of file diff --git a/src/globalMercator.js b/src/global-mercator.js similarity index 99% rename from src/globalMercator.js rename to src/global-mercator.js index 0ac90d8..05d2675 100644 --- a/src/globalMercator.js +++ b/src/global-mercator.js @@ -296,5 +296,5 @@ class GlobalMercator { } } -export default GlobalMercator; +module.exports = GlobalMercator; diff --git a/src/index.js b/src/index.js index a6d53b1..ca3d6b4 100644 --- a/src/index.js +++ b/src/index.js @@ -1,37 +1,18 @@ -import { tmpdir } from 'os'; -import { unlinkSync } from 'fs'; -import { join } from 'path'; - -import gdal from 'gdal'; - -import { prettyTime, uuid } from './util.js'; -import { reprojectImage } from './gdal-util.js'; -import { - mapboxEncode, - terrariumEncode, -} from './dem-encode.js'; -// 创建一个进程池 -import initProcessPool, { - end, -} from './process-pool/index.js'; - -import GlobalMercator from './GlobalMercator.js'; -import ProgressBar from './progressbar/index.js'; -const worker = await import('./createtile.js'); - -const { - open, - GDT_Int16 -} = gdal; -const dataType = GDT_Int16; -const processPool = initProcessPool( - worker, - [ - 'createTile', - 'closeDataset', - ] -); -const childPids = new Set(); +const gdal = require('gdal'); +const fs = require('fs'); +const path = require('path'); +const os = require('os'); +const { prettyTime, uuid } = require('./util'); +const { reprojectImage } = require('./gdal-util'); +const { mapboxEncode, terrariumEncode } = require('./dem-encode'); +const GlobalMercator = require('./global-mercator'); +const ProgressBar = require('./progressbar/index'); + +// 创建一个线程池 +const workerFarm = require('./workfarm/index'); +const workers = workerFarm(require.resolve('./createtile'), ['createTile', 'closeDataset']); + +let childPids = new Set(); const progressBar = new ProgressBar(60, '进度'); const mercator = new GlobalMercator(); @@ -98,6 +79,7 @@ const encodeDataset = ( const bandOneHeight = sourceDataset.bands.get(1); const heightBuffer = new Int16Array(sourceWidth * sourceHeight); // 地形是GDT_Int16 读取所有像素 + const dataType = gdal.GDT_Int16; bandOneHeight.pixels.read(0, 0, sourceWidth, sourceHeight, heightBuffer, { buffer_width: sourceWidth, buffer_height: sourceHeight, @@ -106,7 +88,7 @@ const encodeDataset = ( // 创建编码转换的栅格文件 const sourceDataDriver = sourceDataset.driver; - const encodedDatasetPath = join(tmpdir(), `${uuid()}.tif`); + const encodedDatasetPath = path.join(os.tmpdir(), `${uuid()}.tif`); const encodedDataset = sourceDataDriver.create( encodedDatasetPath, sourceWidth, @@ -163,10 +145,10 @@ const project = (encodedDataset) => { let dataset; if (sourceEPSG !== 3857) { // 如果不是墨卡托投影,需要预先投影 - webMercatorDatasetPath = join(tmpdir(), `${uuid()}.tif`); + webMercatorDatasetPath = path.join(os.tmpdir(), `${uuid()}.tif`); // 地形编码,非普通影像,采用最近邻采样重投影,避免出现尖尖问题 reprojectImage(encodedDataset, webMercatorDatasetPath, 3857, 6); - dataset = open(webMercatorDatasetPath, 'r'); + dataset = gdal.open(webMercatorDatasetPath, 'r'); } else { dataset = encodedDataset; } @@ -224,7 +206,6 @@ const buildPyramid = ( return adjustZoom } - /** * * @param {string} tifFilePath TIF 文件路径 @@ -236,19 +217,12 @@ const buildPyramid = ( * encoding: 'mapbox' | 'terrarium'; * }} options 可选配置 */ -export default function main(tifFilePath, outputDir, options) { +function main(tifFilePath, outputDir, options) { // 计时开始 const startTime = globalThis.performance.now() - - // 解构可选参数 - const { - minZoom, - maxZoom, - tileSize, - encoding, - } = options; - const sourceDataset = open(tifFilePath, 'r'); - + // 结构可选参数 + const { minZoom, maxZoom, tileSize, encoding } = options; + const sourceDataset = gdal.open(tifFilePath, 'r'); //#region 步骤 1 - 高程值转 RGB,重新编码 const { encodedDataset, @@ -270,7 +244,6 @@ export default function main(tifFilePath, outputDir, options) { console.log('>> 步骤3: 构建影像金字塔索引 - 完成'); //#endregion - //#region 步骤4 - 切片 const ominx = dataset.geoTransform[0]; const omaxx = dataset.geoTransform[0] + dataset.rasterSize.x * dataset.geoTransform[1]; @@ -294,10 +267,8 @@ export default function main(tifFilePath, outputDir, options) { tmaxy }; } - // 设置进度条任务总数 progressBar.setTaskTotal(statistics.tileCount) - // 实际裙边有1像素 256+1+1 上下左右各1像素 // 裙边所需的缩放 let offset = 0 @@ -306,9 +277,10 @@ export default function main(tifFilePath, outputDir, options) { offset = 256.0 / tileSize; outTileSize = tileSize + 2; } - for (let tz = minZoom; tz <= maxZoom; tz++) { const { tminx, tminy, tmaxx, tmaxy } = statistics.levelInfo[tz]; + // 生成z级别的目录 + /** * @type {OverviewInfo} */ @@ -352,16 +324,15 @@ export default function main(tifFilePath, outputDir, options) { z: tz, outputTile: outputDir }; - processPool.process.options.createTile(createInfo, function (err, pid) { + workers.createTile(createInfo, function (err, pid) { if (err) { console.log(err); } childPids.add(pid); statistics.completeCount++; - + // 更新进度条 progressBar.render(statistics.completeCount); - if (statistics.completeCount === statistics.tileCount) { const endTime = globalThis.performance.now() const { @@ -378,19 +349,20 @@ export default function main(tifFilePath, outputDir, options) { childPids.delete(closePid); if (childPids.size === 0) { // 关闭子进程任务 - end(processPool); + workerFarm.end(workers); // 删除临时文件 - unlinkSync(encodedDatasetPath); + fs.unlinkSync(encodedDatasetPath); if (webMercatorDatasetPath !== undefined) - unlinkSync(webMercatorDatasetPath); + fs.unlinkSync(webMercatorDatasetPath); + resetStats(); } }, args: [], retries: 0 } // 循环调用,关闭子进程资源 - for (let childId in processPool.process.children) { - processPool.process.send(childId, call); + for (let childId in workers.farm.children) { + workers.farm.send(childId, call); } } }) @@ -398,8 +370,6 @@ export default function main(tifFilePath, outputDir, options) { } } //#endregion - - resetStats(); } const resetStats = () => { @@ -477,3 +447,5 @@ function geoQuery(overviewInfo, ulx, uly, lrx, lry, querysize = 0) { wb: { wx, wy, wxsize, wysize } } } + +module.exports = main; \ No newline at end of file diff --git a/src/process-pool/child/index.js b/src/process-pool/child/index.js deleted file mode 100644 index 9409303..0000000 --- a/src/process-pool/child/index.js +++ /dev/null @@ -1,65 +0,0 @@ -let $module - -/* - let contextProto = this.context; - while (contextProto = Object.getPrototypeOf(contextProto)) { - completionGroups.push(Object.getOwnPropertyNames(contextProto)); - } -*/ -function handle(data) { - let idx = data.idx; - let child = data.child - let method = data.method - let args = data.args - - let callback = function () { - let _args = Array.prototype.slice.call(arguments) - if (_args[0] instanceof Error) { - let e = _args[0] - _args[0] = { - '$error': '$error', - 'type': e.constructor.name, - 'message': e.message, - 'stack': e.stack - } - Object.keys(e).forEach(function (key) { - _args[0][key] = e[key] - }) - } - - process.send({ - owner: 'farm', - idx: idx, - child: child, - args: _args - }) - } - - let exec - - if (method == null && typeof $module == 'function') { - exec = $module - } else if (typeof $module[method] == 'function') { - exec = $module[method] - } - - if (!exec) { - return console.error('NO SUCH METHOD:', method) - } - - exec.apply(null, args.concat([callback])) -} - -process.on('message', function (data) { - if (data.owner !== 'farm') { - return; - } - - if (!$module) { - return $module = require(data.module) - } - if (data.event == 'die') { - return process.exit(0) - } - handle(data) -}) diff --git a/src/process-pool/fork.js b/src/process-pool/fork.js deleted file mode 100644 index a54ee8d..0000000 --- a/src/process-pool/fork.js +++ /dev/null @@ -1,29 +0,0 @@ -import { fork as sysFork } from 'node:child_process' - -export default function fork(forkModule, workerOptions) { - // suppress --debug / --inspect flags while preserving others (like --harmony) - const filteredArgs = process.execArgv.filter((v) => { - return !(/^--(debug|inspect)/).test(v) - }) - const options = Object.assign({ - execArgv: filteredArgs, - env: process.env, - cwd: process.cwd() - }, workerOptions) - const child = sysFork('./child.js', process.argv, options) - - child.on('error', () => { - // this *should* be picked up by onExit and the operation requeued - }) - - child.send({ - owner: 'farm', - module: forkModule - }) - - // return a send() function for this child - return { - send: child.send.bind(child), - child: child - } -} diff --git a/src/process-pool/index.js b/src/process-pool/index.js deleted file mode 100644 index 1eed20f..0000000 --- a/src/process-pool/index.js +++ /dev/null @@ -1,47 +0,0 @@ -'use strict' - -import TaskProcess from './pool.js' - -const _processPools = [] // keep record of farms so we can end() them if required - -/** - * 创建进程池 - * @param {*} options - * @param {*} path - * @param {*} methods - * @returns - */ -export default function initProcessPool(options, path, methods) { - if (typeof options === 'string') { - methods = path - path = options - options = {} - } - - const taskProcess = new TaskProcess(options, path); - const api = taskProcess.setup(methods); - - _processPools.push({ - process: taskProcess, - api: api - }); - api.process = taskProcess; - // return the public API - return api; -} - -/** - * - * @param {*} api - * @param {Function} callback - * @returns - */ -export function end(api, callback) { - for (let i = 0; i < _processPools.length; i++) { - if (_processPools[i] && _processPools[i].api === api) { - return _processPools[i].process.end(callback) - } - } - - process.nextTick(callback.bind(null, new Error('Worker process not found!'))) -} diff --git a/src/process-pool/pool.js b/src/process-pool/pool.js deleted file mode 100644 index ef8b2ac..0000000 --- a/src/process-pool/pool.js +++ /dev/null @@ -1,313 +0,0 @@ -'use strict' -import os from 'node:os' -import errno from 'errno' -import fork from './fork.js' - -export const TimeoutError = errno.create('TimeoutError') -const ProcessTerminatedError = errno.create('ProcessTerminatedError') -const MaxConcurrentCallsError = errno.create('MaxConcurrentCallsError') - -const DEFAULT_OPTIONS = { - workerOptions: {}, - maxCallsPerWorker: Infinity, - maxConcurrentWorkers: (os.cpus() || { length: 1 }).length, - maxConcurrentCallsPerWorker: 10, - maxConcurrentCalls: Infinity, - maxCallTime: Infinity, // exceed this and the whole worker is terminated - maxRetries: Infinity, - forcedKillTime: 100, - autoStart: false, - onChild: function () { } -} - -export default class TaskProcess { - constructor(options, path) { - this.options = Object.assign({}, DEFAULT_OPTIONS, options) - this.path = path - this.activeCalls = 0 - } - // make a handle to pass back in the form of an external API - mkhandle(method) { - return function () { - let args = Array.prototype.slice.call(arguments) - if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) { - let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')') - if (typeof args[args.length - 1] == 'function') - return process.nextTick(args[args.length - 1].bind(null, err)) - throw err - } - this.addCall({ - method: method, - callback: args.pop(), - args: args, - retries: 0 - }) - }.bind(this) - } - // a constructor of sorts - setup(methods) { - let iface - if (!methods) { // single-function export - iface = this.mkhandle() - } else { // multiple functions on the export - iface = {} - methods.forEach(function (m) { - iface[m] = this.mkhandle(m) - }.bind(this)) - } - - this.searchStart = -1 - this.childId = -1 - this.children = {} - this.activeChildren = 0 - this.callQueue = [] - - if (this.options.autoStart) { - while (this.activeChildren < this.options.maxConcurrentWorkers) - this.startChild() - } - - return iface - } - // when a child exits, check if there are any outstanding jobs and requeue them - onExit(childId) { - // delay this to give any sends a chance to finish - setTimeout(function () { - let doQueue = false - if (this.children[childId] && this.children[childId].activeCalls) { - this.children[childId].calls.forEach(function (call, i) { - if (!call) - return - else if (call.retries >= this.options.maxRetries) { - this.receive({ - idx: i, - child: childId, - args: [new ProcessTerminatedError('cancel after ' + call.retries + ' retries!')] - }) - } else { - call.retries++ - this.callQueue.unshift(call) - doQueue = true - } - }.bind(this)) - } - this.stopChild(childId) - doQueue && this.processQueue() - }.bind(this), 10) - } - // start a new worker - startChild() { - this.childId++ - - let forked = fork(this.path, this.options.workerOptions), id = this.childId, c = { - send: forked.send, - child: forked.child, - calls: [], - activeCalls: 0, - exitCode: null - } - - this.options.onChild(forked.child) - - forked.child.on('message', function (data) { - if (data.owner !== 'farm') { - return - } - this.receive(data) - }.bind(this)) - forked.child.once('exit', function (code) { - c.exitCode = code - this.onExit(id) - }.bind(this)) - - this.activeChildren++ - this.children[id] = c - } - // stop a worker, identified by id - stopChild(childId) { - let child = this.children[childId] - if (child) { - child.send({ owner: 'farm', event: 'die' }) - setTimeout(function () { - if (child.exitCode === null) - child.child.kill('SIGKILL') - }, this.options.forcedKillTime).unref(); delete this.children[childId] - this.activeChildren-- - } - } - // called from a child process, the data contains information needed to - // look up the child and the original call so we can invoke the callback - receive(data) { - let idx = data.idx, childId = data.child, args = data.args, child = this.children[childId], call - - if (!child) { - return - } - - call = child.calls[idx] - if (!call) { - - return - } - - if (this.options.maxCallTime !== Infinity) - clearTimeout(call.timer) - - if (args[0] && args[0].$error == '$error') { - let e = args[0] - switch (e.type) { - case 'TypeError': args[0] = new TypeError(e.message); break - case 'RangeError': args[0] = new RangeError(e.message); break - case 'EvalError': args[0] = new EvalError(e.message); break - case 'ReferenceError': args[0] = new ReferenceError(e.message); break - case 'SyntaxError': args[0] = new SyntaxError(e.message); break - case 'URIError': args[0] = new URIError(e.message); break - default: args[0] = new Error(e.message) - } - args[0].type = e.type - args[0].stack = e.stack - - // Copy any custom properties to pass it on. - Object.keys(e).forEach(function (key) { - args[0][key] = e[key] - }) - } - - process.nextTick(function () { - call.callback.apply(null, args) - }); delete child.calls[idx] - child.activeCalls-- - this.activeCalls-- - - if (child.calls.length >= this.options.maxCallsPerWorker - && !Object.keys(child.calls).length) { - // this child has finished its run, kill it - this.stopChild(childId) - } - - // allow any outstanding calls to be processed - this.processQueue() - } - childTimeout(childId) { - let child = this.children[childId], i - - if (!child) - return - - for (i in child.calls) { - this.receive({ - idx: i, - child: childId, - args: [new TimeoutError('worker call timed out!')] - }) - } - this.stopChild(childId) - } - // send a call to a worker, identified by id - send(childId, call) { - let child = this.children[childId], idx = child.calls.length - - child.calls.push(call) - child.activeCalls++ - this.activeCalls++ - - child.send({ - owner: 'farm', - idx: idx, - child: childId, - method: call.method, - args: call.args - }) - - if (this.options.maxCallTime !== Infinity) { - call.timer = - setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime) - } - } - // a list of active worker ids, in order, but the starting offset is - // shifted each time this method is called, so we work our way through - // all workers when handing out jobs - childKeys() { - let cka = Object.keys(this.children), cks - - if (this.searchStart >= cka.length - 1) - this.searchStart = 0 - - else - this.searchStart++ - - cks = cka.splice(0, this.searchStart) - - return cka.concat(cks) - } - // Calls are added to a queue, this processes the queue and is called - // whenever there might be a chance to send more calls to the workers. - // The various options all impact on when we're able to send calls, - // they may need to be kept in a queue until a worker is ready. - processQueue() { - let cka, i = 0, childId - - if (!this.callQueue.length) - return this.ending && this.end() - - if (this.activeChildren < this.options.maxConcurrentWorkers) - this.startChild() - - for (cka = this.childKeys(); i < cka.length; i++) { - childId = +cka[i] - if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker - && this.children[childId].calls.length < this.options.maxCallsPerWorker) { - - this.send(childId, this.callQueue.shift()) - if (!this.callQueue.length) - return this.ending && this.end() - } /*else { - console.log( - , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker - , this.children[childId].calls.length < this.options.maxCallsPerWorker - , this.children[childId].calls.length , this.options.maxCallsPerWorker) - }*/ - - - - - - } - - if (this.ending) - this.end() - } - // add a new call to the call queue, then trigger a process of the queue - addCall(call) { - if (this.ending) - return this.end() // don't add anything new to the queue - this.callQueue.push(call) - this.processQueue() - } - // kills child workers when they're all done - end(callback) { - let complete = true - if (this.ending === false) - return - if (callback) - this.ending = callback - else if (this.ending == null) - this.ending = true - Object.keys(this.children).forEach(function (child) { - if (!this.children[child]) - return - if (!this.children[child].activeCalls) - this.stopChild(child) - - else - complete = false - }.bind(this)) - - if (complete && typeof this.ending == 'function') { - process.nextTick(function () { - this.ending() - this.ending = false - }.bind(this)) - } - } -} diff --git a/src/progressbar/format.js b/src/progressbar/format.js index e2d56ad..7dac25e 100644 --- a/src/progressbar/format.js +++ b/src/progressbar/format.js @@ -4,27 +4,28 @@ * @param {Array} params * @returns */ -export function format(str, params = []) { - const pattern = /{([\s\S])*?}/gim; - let index = 0; - let params_index = 0; - return str.replace(pattern, (match, tuple, offset) => { - index = offset + match.length; - params_index += 1; +function format(str, params = []) { + const pattern = /{([\s\S])*?}/gim; + let index = 0; + let params_index = 0; + return str.replace(pattern, (match, tuple, offset) => { + index = offset + match.length; + params_index += 1; - // 异常格式处理,对于列表和对象类型的param,对外抛出异常 - if ( - Array.isArray(params[params_index - 1]) || - typeof params[params_index - 1] === "object" - ) { - throw TypeError(params[params_index - 1] + "不能为对象类型"); - } + // 异常格式处理,对于列表和对象类型的param,对外抛出异常 + if ( + Array.isArray(params[params_index - 1]) || + typeof params[params_index - 1] === "object" + ) { + throw TypeError(params[params_index - 1] + "不能为对象类型"); + } - if (match.length > 2) { - match = match.slice(1, match.length - 1); - return eval('params[params_index-1].' + match);; - } else { - return params[params_index - 1]; - } - }); + if (match.length > 2) { + match = match.slice(1, match.length - 1); + return eval('params[params_index-1].' + match);; + } else { + return params[params_index - 1]; + } + }); } +module.exports = format; \ No newline at end of file diff --git a/src/progressbar/index.js b/src/progressbar/index.js index 5778b98..8cfedab 100644 --- a/src/progressbar/index.js +++ b/src/progressbar/index.js @@ -1,81 +1,81 @@ /** * 进度条实现。 */ -import { stdout as log } from 'single-line-log'; -import cliColor from 'cli-color'; -import { format } from './format.js'; - -const { blue, green, yellow, red } = cliColor - +const log = require('single-line-log').stdout; +const format = require('./format'); +const cliColor = require('cli-color'); +/** + * 进度条实现。 + */ +const { blue, green, yellow, red } = cliColor; /** * 进度条 */ class ProgressBar { - constructor( - barLength = 28, - description = 'PROGRESS' - ) { - this.length = barLength; - this.taskTotal = 0; - this.descriptionStyle = blue.bold(description); - - //this.completed = 0; - //this.tickStep = tickStep; - } - - /** - * 设置一共有多少个任务 - * @param {number} value - */ - setTaskTotal(value) { - this.taskTotal = value - } + constructor( + barLength = 28, + description = 'PROGRESS' + ) { + this.length = barLength; + this.taskTotal = 0; + this.descriptionStyle = blue.bold(description); - /** - * 在控制台中绘制当前进度条 - * @param {number} completed 完成了多少个任务 - */ - render(completed) { - //this.completed++; - //const completed = this.completed * this.tickStep; - const finishedRate = Number((completed / this.taskTotal).toFixed(4)); - const finishedCellCount = Math.floor(finishedRate * this.length); - let i = 0 - // 拼接黑色条 - let cell = ''; - for (i = 0; i < finishedCellCount; ++i) { - cell += '█'; - } - // 拼接灰色条 - let empty = ''; - for (i = 0; i < this.length - finishedCellCount; ++i) { - empty += '░'; + //this.completed = 0; + //this.tickStep = tickStep; } - const percentStr = (100 * finishedRate).toFixed(2); + /** + * 设置一共有多少个任务 + * @param {number} value + */ + setTaskTotal(value) { + this.taskTotal = value + } /** - * 使用cli-color进行包装美化。 + * 在控制台中绘制当前进度条 + * @param {number} completed 完成了多少个任务 */ - const cellStyle = green.bgBlack.bold(cell); - const completedStyle = green.bold(completed); - const statusStyle = Number(finishedRate) === 1 ? green.bold('完成') : yellow.bold('转换中⏳') + render(completed) { + //this.completed++; + //const completed = this.completed * this.tickStep; + const finishedRate = Number((completed / this.taskTotal).toFixed(4)); + const finishedCellCount = Math.floor(finishedRate * this.length); + let i = 0 + // 拼接黑色条 + let cell = ''; + for (i = 0; i < finishedCellCount; ++i) { + cell += '█'; + } + // 拼接灰色条 + let empty = ''; + for (i = 0; i < this.length - finishedCellCount; ++i) { + empty += '░'; + } - // 拼接最终文本 - const cmdtext = format( - ">> 步骤4: {} - {}% {}{} {}/{}", - [ - statusStyle, - percentStr, - cellStyle, - empty, - completedStyle, - String(this.taskTotal), - ] - ); + const percentStr = (100 * finishedRate).toFixed(2); - log(cmdtext); - } -} + /** + * 使用cli-color进行包装美化。 + */ + const cellStyle = green.bgBlack.bold(cell); + const completedStyle = green.bold(completed); + const statusStyle = Number(finishedRate) === 1 ? green.bold('完成') : yellow.bold('转换中⏳') -export default ProgressBar; \ No newline at end of file + // 拼接最终文本 + const cmdtext = format( + ">> 步骤4: {} - {}% {}{} {}/{}", + [ + statusStyle, + percentStr, + cellStyle, + empty, + completedStyle, + String(this.taskTotal), + ] + ); + + log(cmdtext); + } +} +module.exports = ProgressBar; \ No newline at end of file diff --git a/src/util.js b/src/util.js index a55e551..c5afa6e 100644 --- a/src/util.js +++ b/src/util.js @@ -1,5 +1,5 @@ -import { existsSync, mkdirSync } from 'node:fs'; -import { dirname } from 'node:path'; +const fs = require('fs'); +const path = require('path'); const s4 = () => { return (((1 + Math.random()) * 0x10000) | 0).toString(16).substring(1); @@ -9,7 +9,7 @@ const s4 = () => { * 获取随机 uuid * @returns {string} */ -export const uuid = () => { +const uuid = () => { return (s4() + s4() + '-' + s4() + '-' + s4() + '-' + s4() + '-' + s4() + s4() + s4()); } @@ -18,17 +18,17 @@ export const uuid = () => { * @param {string} file * @returns {boolean} */ -export const createDirs = (file) => { +const createDirs = (file) => { // 获取文件根目录 - const dirpath = dirname(file); + const dirpath = path.dirname(file); // 有路径直接回调走 - if (existsSync(dirpath)) { + if (fs.existsSync(dirpath)) { return true; } else { if (createDirs(dirpath)) { try { // 并发时有问题,查询时无,创建时别的子进程已经创建 - mkdirSync(dirpath); + fs.mkdirSync(dirpath); } catch { } @@ -45,7 +45,7 @@ export const createDirs = (file) => { * unit: 'ms' | 'sec' | 'min' | 'hour'; * }} */ -export const prettyTime = (timeInMs) => { +const prettyTime = (timeInMs) => { let result = 0 let unit = 'ms' if (timeInMs < 1000) { @@ -65,3 +65,8 @@ export const prettyTime = (timeInMs) => { unit } } + + +module.exports = { + uuid, createDirs, prettyTime +} \ No newline at end of file diff --git a/src/workfarm/child/index.js b/src/workfarm/child/index.js new file mode 100644 index 0000000..78f6337 --- /dev/null +++ b/src/workfarm/child/index.js @@ -0,0 +1,56 @@ +'use strict' + +let $module + +/* + let contextProto = this.context; + while (contextProto = Object.getPrototypeOf(contextProto)) { + completionGroups.push(Object.getOwnPropertyNames(contextProto)); + } +*/ + + +function handle (data) { + let idx = data.idx + , child = data.child + , method = data.method + , args = data.args + , callback = function () { + let _args = Array.prototype.slice.call(arguments) + if (_args[0] instanceof Error) { + let e = _args[0] + _args[0] = { + '$error' : '$error' + , 'type' : e.constructor.name + , 'message' : e.message + , 'stack' : e.stack + } + Object.keys(e).forEach(function(key) { + _args[0][key] = e[key] + }) + } + process.send({ owner: 'farm', idx: idx, child: child, args: _args }) + } + , exec + + if (method == null && typeof $module == 'function') + exec = $module + else if (typeof $module[method] == 'function') + exec = $module[method] + + if (!exec) + return console.error('NO SUCH METHOD:', method) + + exec.apply(null, args.concat([ callback ])) +} + + +process.on('message', function (data) { + if (data.owner !== 'farm') { + return; + } + + if (!$module) return $module = require(data.module) + if (data.event == 'die') return process.exit(0) + handle(data) +}) diff --git a/src/workfarm/farm.js b/src/workfarm/farm.js new file mode 100644 index 0000000..2fe3d84 --- /dev/null +++ b/src/workfarm/farm.js @@ -0,0 +1,342 @@ +'use strict' + +const DEFAULT_OPTIONS = { + workerOptions : {} + , maxCallsPerWorker : Infinity + , maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length + , maxConcurrentCallsPerWorker : 10 + , maxConcurrentCalls : Infinity + , maxCallTime : Infinity // exceed this and the whole worker is terminated + , maxRetries : Infinity + , forcedKillTime : 100 + , autoStart : false + , onChild : function() {} + } + +const fork = require('./fork') + , TimeoutError = require('errno').create('TimeoutError') + , ProcessTerminatedError = require('errno').create('ProcessTerminatedError') + , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError') + + +function Farm (options, path) { + this.options = Object.assign({}, DEFAULT_OPTIONS, options) + this.path = path + this.activeCalls = 0 +} + + +// make a handle to pass back in the form of an external API +Farm.prototype.mkhandle = function (method) { + return function () { + let args = Array.prototype.slice.call(arguments) + if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) { + let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')') + if (typeof args[args.length - 1] == 'function') + return process.nextTick(args[args.length - 1].bind(null, err)) + throw err + } + this.addCall({ + method : method + , callback : args.pop() + , args : args + , retries : 0 + }) + }.bind(this) +} + + +// a constructor of sorts +Farm.prototype.setup = function (methods) { + let iface + if (!methods) { // single-function export + iface = this.mkhandle() + } else { // multiple functions on the export + iface = {} + methods.forEach(function (m) { + iface[m] = this.mkhandle(m) + }.bind(this)) + } + + this.searchStart = -1 + this.childId = -1 + this.children = {} + this.activeChildren = 0 + this.callQueue = [] + + if (this.options.autoStart) { + while (this.activeChildren < this.options.maxConcurrentWorkers) + this.startChild() + } + + return iface +} + + +// when a child exits, check if there are any outstanding jobs and requeue them +Farm.prototype.onExit = function (childId) { + // delay this to give any sends a chance to finish + setTimeout(function () { + let doQueue = false + if (this.children[childId] && this.children[childId].activeCalls) { + this.children[childId].calls.forEach(function (call, i) { + if (!call) return + else if (call.retries >= this.options.maxRetries) { + this.receive({ + idx : i + , child : childId + , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ] + }) + } else { + call.retries++ + this.callQueue.unshift(call) + doQueue = true + } + }.bind(this)) + } + this.stopChild(childId) + doQueue && this.processQueue() + }.bind(this), 10) +} + + +// start a new worker +Farm.prototype.startChild = function () { + this.childId++ + + let forked = fork(this.path, this.options.workerOptions) + , id = this.childId + , c = { + send : forked.send + , child : forked.child + , calls : [] + , activeCalls : 0 + , exitCode : null + } + + this.options.onChild(forked.child); + + forked.child.on('message', function(data) { + if (data.owner !== 'farm') { + return; + } + this.receive(data); + }.bind(this)) + forked.child.once('exit', function (code) { + c.exitCode = code + this.onExit(id) + }.bind(this)) + + this.activeChildren++ + this.children[id] = c +} + + +// stop a worker, identified by id +Farm.prototype.stopChild = function (childId) { + let child = this.children[childId] + if (child) { + child.send({owner: 'farm', event: 'die'}) + setTimeout(function () { + if (child.exitCode === null) + child.child.kill('SIGKILL') + }, this.options.forcedKillTime).unref() + ;delete this.children[childId] + this.activeChildren-- + } +} + + +// called from a child process, the data contains information needed to +// look up the child and the original call so we can invoke the callback +Farm.prototype.receive = function (data) { + let idx = data.idx + , childId = data.child + , args = data.args + , child = this.children[childId] + , call + + if (!child) { + return; + } + + call = child.calls[idx] + if (!call) { + + return; + } + + if (this.options.maxCallTime !== Infinity) + clearTimeout(call.timer) + + if (args[0] && args[0].$error == '$error') { + let e = args[0] + switch (e.type) { + case 'TypeError': args[0] = new TypeError(e.message); break + case 'RangeError': args[0] = new RangeError(e.message); break + case 'EvalError': args[0] = new EvalError(e.message); break + case 'ReferenceError': args[0] = new ReferenceError(e.message); break + case 'SyntaxError': args[0] = new SyntaxError(e.message); break + case 'URIError': args[0] = new URIError(e.message); break + default: args[0] = new Error(e.message) + } + args[0].type = e.type + args[0].stack = e.stack + + // Copy any custom properties to pass it on. + Object.keys(e).forEach(function(key) { + args[0][key] = e[key]; + }); + } + + process.nextTick(function () { + call.callback.apply(null, args) + }) + + ;delete child.calls[idx] + child.activeCalls-- + this.activeCalls-- + + if (child.calls.length >= this.options.maxCallsPerWorker + && !Object.keys(child.calls).length) { + // this child has finished its run, kill it + this.stopChild(childId) + } + + // allow any outstanding calls to be processed + this.processQueue() +} + + +Farm.prototype.childTimeout = function (childId) { + let child = this.children[childId] + , i + + if (!child) + return + + for (i in child.calls) { + this.receive({ + idx : i + , child : childId + , args : [ new TimeoutError('worker call timed out!') ] + }) + } + this.stopChild(childId) +} + + +// send a call to a worker, identified by id +Farm.prototype.send = function (childId, call) { + let child = this.children[childId] + , idx = child.calls.length + + child.calls.push(call) + child.activeCalls++ + this.activeCalls++ + + child.send({ + owner : 'farm' + , idx : idx + , child : childId + , method : call.method + , args : call.args + }) + + if (this.options.maxCallTime !== Infinity) { + call.timer = + setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime) + } +} + + +// a list of active worker ids, in order, but the starting offset is +// shifted each time this method is called, so we work our way through +// all workers when handing out jobs +Farm.prototype.childKeys = function () { + let cka = Object.keys(this.children) + , cks + + if (this.searchStart >= cka.length - 1) + this.searchStart = 0 + else + this.searchStart++ + + cks = cka.splice(0, this.searchStart) + + return cka.concat(cks) +} + + +// Calls are added to a queue, this processes the queue and is called +// whenever there might be a chance to send more calls to the workers. +// The various options all impact on when we're able to send calls, +// they may need to be kept in a queue until a worker is ready. +Farm.prototype.processQueue = function () { + let cka, i = 0, childId + + if (!this.callQueue.length) + return this.ending && this.end() + + if (this.activeChildren < this.options.maxConcurrentWorkers) + this.startChild() + + for (cka = this.childKeys(); i < cka.length; i++) { + childId = +cka[i] + if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker + && this.children[childId].calls.length < this.options.maxCallsPerWorker) { + + this.send(childId, this.callQueue.shift()) + if (!this.callQueue.length) + return this.ending && this.end() + } /*else { + console.log( + , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker + , this.children[childId].calls.length < this.options.maxCallsPerWorker + , this.children[childId].calls.length , this.options.maxCallsPerWorker) + }*/ + } + + if (this.ending) + this.end() +} + + +// add a new call to the call queue, then trigger a process of the queue +Farm.prototype.addCall = function (call) { + if (this.ending) + return this.end() // don't add anything new to the queue + this.callQueue.push(call) + this.processQueue() +} + + +// kills child workers when they're all done +Farm.prototype.end = function (callback) { + let complete = true + if (this.ending === false) + return + if (callback) + this.ending = callback + else if (this.ending == null) + this.ending = true + Object.keys(this.children).forEach(function (child) { + if (!this.children[child]) + return + if (!this.children[child].activeCalls) + this.stopChild(child) + else + complete = false + }.bind(this)) + + if (complete && typeof this.ending == 'function') { + process.nextTick(function () { + this.ending() + this.ending = false + }.bind(this)) + } +} + + +module.exports = Farm +module.exports.TimeoutError = TimeoutError diff --git a/src/workfarm/fork.js b/src/workfarm/fork.js new file mode 100644 index 0000000..5a035d9 --- /dev/null +++ b/src/workfarm/fork.js @@ -0,0 +1,33 @@ +'use strict' + +const childProcess = require('child_process') + , childModule = require.resolve('./child/index') + + +function fork (forkModule, workerOptions) { + // suppress --debug / --inspect flags while preserving others (like --harmony) + let filteredArgs = process.execArgv.filter(function (v) { + return !(/^--(debug|inspect)/).test(v) + }) + , options = Object.assign({ + execArgv : filteredArgs + , env : process.env + , cwd : process.cwd() + }, workerOptions) + , child = childProcess.fork(childModule, process.argv, options) + + child.on('error', function() { + // this *should* be picked up by onExit and the operation requeued + }) + + child.send({ owner: 'farm', module: forkModule }) + + // return a send() function for this child + return { + send : child.send.bind(child) + , child : child + } +} + + +module.exports = fork diff --git a/src/workfarm/index.js b/src/workfarm/index.js new file mode 100644 index 0000000..a6387f4 --- /dev/null +++ b/src/workfarm/index.js @@ -0,0 +1,34 @@ +'use strict' + +const Farm = require('./farm') + +let farms = [] // keep record of farms so we can end() them if required + + +function farm (options, path, methods) { + if (typeof options == 'string') { + methods = path + path = options + options = {} + } + + let f = new Farm(options, path) + , api = f.setup(methods) + + farms.push({ farm: f, api: api }); + api.farm = f; + // return the public API + return api; +} + + +function end (api, callback) { + for (let i = 0; i < farms.length; i++) + if (farms[i] && farms[i].api === api) + return farms[i].farm.end(callback) + process.nextTick(callback.bind(null, new Error('Worker farm not found!'))) +} + + +module.exports = farm +module.exports.end = end diff --git a/third-party/gdal-data.zip b/third-party/gdal-data.zip index c3c7f3d..1bd6bc9 100644 Binary files a/third-party/gdal-data.zip and b/third-party/gdal-data.zip differ