Thread error handling, enviroment variable for thread count

This commit is contained in:
mrfry 2021-04-09 11:39:42 +02:00
parent f08079ba84
commit 575befb491

View file

@ -7,34 +7,46 @@ import logger from './logger'
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const alertOnPendingCount = 10
const workerFile = './src/utils/classes.ts' const workerFile = './src/utils/classes.ts'
let pool: any = null let pool: any = null
let workers: any = null let workers: any = null
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// doALongTask(i, taskObj).then((res) => { function handleWorkerError(client, err) {
// msgAll(res) logger.Log('resourcePromise error', logger.GetColor('redbg'))
// console.log('[MSGFROMCLIENT]', res) console.error(err)
// }) pool.destroy(client)
}
export function doALongTask(obj: any): Promise<any> { export function doALongTask(obj: any): Promise<any> {
if (pool.pending > alertOnPendingCount) {
logger.Log(
`More than ${alertOnPendingCount} callers waiting for free resource! (${pool.pending})`,
logger.GetColor('redbg')
)
}
return new Promise((resolve) => { return new Promise((resolve) => {
let currClient
pool pool
.acquire() .acquire()
.then(function(client) { .then(function(client) {
doSomething(client, obj).then((res) => { currClient = client
doSomething(client, obj)
.then((res) => {
resolve(res) resolve(res)
// TODO: check if result is really a result, and want to release port // TODO: check if result is really a result, and want to release port
pool.release(client) pool.release(client)
// console.log('[RELEASE]: #' + client.index) // console.log('[RELEASE]: #' + client.index)
}) })
.catch(function(err) {
handleWorkerError(currClient, err)
})
}) })
.catch(function(err) { .catch(function(err) {
logger.Log('resourcePromise error', logger.GetColor('redbg')) handleWorkerError(currClient, err)
console.error(err)
// handle error - this is generally a timeout or maxWaitingClients
// error
}) })
}) })
} }
@ -62,15 +74,24 @@ export function initWorkerPool(initData: any): void {
}, },
} }
const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length - 1
if (process.env.NS_THREAD_COUNT) {
logger.Log(
`Setting thread count from enviroment variable NS_THREAD_COUNT: '${threadCount}'`,
logger.GetColor('red')
)
}
const opts = { const opts = {
min: os.cpus().length - 1, // minimum size of the pool min: threadCount, // minimum size of the pool
max: os.cpus().length - 1, // maximum size of the pool max: threadCount, // maximum size of the pool
maxWaitingClients: 999, maxWaitingClients: 999,
} }
pool = genericPool.createPool(factory, opts) pool = genericPool.createPool(factory, opts)
} }
// TODO: accuire all workers here, and handle errors so they can be removed if threads exit
export function msgAllWorker(data: any): void { export function msgAllWorker(data: any): void {
workers.forEach((worker) => { workers.forEach((worker) => {
worker.postMessage(data) worker.postMessage(data)
@ -90,7 +111,8 @@ function getAWorker(i, initData) {
worker.setMaxListeners(50) worker.setMaxListeners(50)
// worker.on('message', (msg) => { // worker.on('message', (msg) => {
// logger.Log(`[MAIN]: Msg from worker #${i}`, msg) // logger.Log(`[MAIN]: Msg from worker #${i}`)
// logger.Log(msg)
// }) // })
// worker.on('online', () => { // worker.on('online', () => {