import { Worker } from 'worker_threads' import genericPool from 'generic-pool' import os from 'os' import logger from './logger' // import { QuestionDb } from '../types/basicTypes' // --------------------------------------------------------------------------- const workerFile = './src/utils/classes.ts' let pool: any = null let workers: any = null // --------------------------------------------------------------------------- // doALongTask(i, taskObj).then((res) => { // msgAll(res) // console.log('[MSGFROMCLIENT]', res) // }) export function doALongTask(obj: any): Promise { return new Promise((resolve) => { pool .acquire() .then(function(client) { doSomething(client, obj).then((res) => { resolve(res) // TODO: check if result is really a result, and want to release port pool.release(client) // console.log('[RELEASE]: #' + client.index) }) }) .catch(function(err) { logger.Log('resourcePromise error', logger.GetColor('redbg')) console.error(err) // handle error - this is generally a timeout or maxWaitingClients // error }) }) } export function initWorkerPool(initData: any): void { if (workers && pool) { logger.Log('WORKER AND POOL ALREADY EXISTS', logger.GetColor('redbg')) return } workers = [] const factory = { create: function() { const currInd = workers.length const worker = getAWorker(currInd, initData) workers.push(worker) return { worker: worker, index: currInd, } }, destroy: function(client) { // console.log('[DESTROY]') client.worker.terminate() // console.log('[DESTROYED] #' + client.index) }, } const opts = { min: os.cpus().length - 1, // minimum size of the pool max: os.cpus().length - 1, // maximum size of the pool maxWaitingClients: 999, } pool = genericPool.createPool(factory, opts) } export function msgAllWorker(data: any): void { workers.forEach((worker) => { worker.postMessage(data) }) } // --------------------------------------------------------------------------- function getAWorker(i, initData) { const worker = workerTs(workerFile, { workerData: { workerIndex: i, initData: initData, }, }) worker.setMaxListeners(50) // worker.on('message', (msg) => { // logger.Log(`[MAIN]: Msg from worker #${i}`, msg) // }) worker.on('online', () => { logger.Log(`[THREAD #${i}]: Worker ${i} online`) }) worker.on('error', (err) => { logger.Log('Worker error!', logger.GetColor('redbg')) console.error(err) }) worker.on('exit', (code) => { // TODO: this is critical, whole server should stop, or child threads should be restarted logger.Log( `[MAIN]: worker #${i} exit code: ${code}`, code === 0 ? logger.GetColor('redbg') : logger.GetColor('green') ) }) return worker } // --------------------------------------------------------------------------- function doSomething(client, obj) { const { /* index, */ worker } = client return new Promise((resolve) => { // console.log('[ACCUIRE]: #' + index) worker.postMessage(obj) worker.once('message', (msg) => { resolve(msg) }) }) } const workerTs = (file: string, wkOpts: any) => { wkOpts.eval = true if (!wkOpts.workerData) { wkOpts.workerData = {} } wkOpts.workerData.__filename = file return new Worker( ` const wk = require('worker_threads'); require('ts-node').register(); let file = wk.workerData.__filename; delete wk.workerData.__filename; require(file); `, wkOpts ) }