diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index 49fc119..ab51f14 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -7,34 +7,46 @@ import logger from './logger' // --------------------------------------------------------------------------- +const alertOnPendingCount = 10 const workerFile = './src/utils/classes.ts' let pool: any = null let workers: any = null // --------------------------------------------------------------------------- -// doALongTask(i, taskObj).then((res) => { -// msgAll(res) -// console.log('[MSGFROMCLIENT]', res) -// }) +function handleWorkerError(client, err) { + logger.Log('resourcePromise error', logger.GetColor('redbg')) + console.error(err) + pool.destroy(client) +} export function doALongTask(obj: any): Promise { + if (pool.pending > alertOnPendingCount) { + logger.Log( + `More than ${alertOnPendingCount} callers waiting for free resource! (${pool.pending})`, + logger.GetColor('redbg') + ) + } + return new Promise((resolve) => { + let currClient pool .acquire() .then(function(client) { - doSomething(client, obj).then((res) => { - resolve(res) - // TODO: check if result is really a result, and want to release port - pool.release(client) - // console.log('[RELEASE]: #' + client.index) - }) + currClient = client + doSomething(client, obj) + .then((res) => { + resolve(res) + // TODO: check if result is really a result, and want to release port + pool.release(client) + // console.log('[RELEASE]: #' + client.index) + }) + .catch(function(err) { + handleWorkerError(currClient, err) + }) }) .catch(function(err) { - logger.Log('resourcePromise error', logger.GetColor('redbg')) - console.error(err) - // handle error - this is generally a timeout or maxWaitingClients - // error + handleWorkerError(currClient, err) }) }) } @@ -62,15 +74,24 @@ export function initWorkerPool(initData: any): void { }, } + const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length - 1 + if (process.env.NS_THREAD_COUNT) { + logger.Log( + `Setting thread count from enviroment variable NS_THREAD_COUNT: '${threadCount}'`, + logger.GetColor('red') + ) + } + const opts = { - min: os.cpus().length - 1, // minimum size of the pool - max: os.cpus().length - 1, // maximum size of the pool + min: threadCount, // minimum size of the pool + max: threadCount, // maximum size of the pool maxWaitingClients: 999, } pool = genericPool.createPool(factory, opts) } +// TODO: accuire all workers here, and handle errors so they can be removed if threads exit export function msgAllWorker(data: any): void { workers.forEach((worker) => { worker.postMessage(data) @@ -90,7 +111,8 @@ function getAWorker(i, initData) { worker.setMaxListeners(50) // worker.on('message', (msg) => { - // logger.Log(`[MAIN]: Msg from worker #${i}`, msg) + // logger.Log(`[MAIN]: Msg from worker #${i}`) + // logger.Log(msg) // }) // worker.on('online', () => {