import { Worker } from 'worker_threads' import { v4 as uuidv4 } from 'uuid' import { EventEmitter } from 'events' import os from 'os' import logger from './logger' interface WorkerObj { worker: any index: number free: Boolean } interface PendingJob { workData: any doneEvent: any targetWorkerIndex?: number } const alertOnPendingCount = 10 const workerFile = './src/utils/classes.ts' let workers: Array const pendingJobs: { [id: string]: PendingJob } = {} const jobEvents = new EventEmitter() jobEvents.on('jobDone', () => { processJob() }) jobEvents.on('newJob', () => { processJob() }) // --------------------------------------------------------------------------- function handleWorkerError(worker: WorkerObj, err) { // TODO: restart worker if exited or things like that logger.Log('resourcePromise error', logger.GetColor('redbg')) console.error(err) } // TODO: accuire all workers here, and handle errors so they can be removed if threads exit export function msgAllWorker(data: any): Promise { logger.DebugLog('MSGING ALL WORKER', 'job', 1) return new Promise((resolve) => { const promises = [] workers.forEach((worker) => { promises.push(doALongTask(data, worker.index)) }) Promise.all(promises).then((res) => { logger.DebugLog('MSGING ALL WORKER DONE', 'job', 1) resolve(res) }) }) } export function doALongTask( obj: any, targetWorkerIndex?: number ): Promise { if (Object.keys(pendingJobs).length > alertOnPendingCount) { logger.Log( `More than ${alertOnPendingCount} callers waiting for free resource! (${ Object.keys(pendingJobs).length })`, logger.GetColor('redbg') ) } const jobId = uuidv4() // FIXME: delete doneEvent? const doneEvent = new EventEmitter() pendingJobs[jobId] = { workData: obj, targetWorkerIndex: targetWorkerIndex, doneEvent: doneEvent, } jobEvents.emit('newJob') return new Promise((resolve) => { doneEvent.once('done', (result) => { jobEvents.emit('jobDone') resolve(result) }) }) } export function initWorkerPool(initData: any): void { if (workers) { logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg')) return } workers = [] const factory = { create: function(index) { return getAWorker(index, initData) }, destroy: function(client) { client.worker.terminate() }, } const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length if (process.env.NS_THREAD_COUNT) { logger.Log( `Setting thread count from enviroment variable NS_WORKER_COUNT: '${threadCount}'`, logger.GetColor('red') ) } for (let i = 0; i < threadCount; i++) { workers.push({ worker: factory.create(i), index: i, free: true, }) } } // --------------------------------------------------------------------------- function processJob() { if (Object.keys(pendingJobs).length > 0) { // FIXME: FIFO OR ANYTHING ELSE (JOB PROCESSING ORDER) const keys = Object.keys(pendingJobs) let jobKey, freeWorker let i = 0 while (!freeWorker && i < keys.length) { jobKey = keys[i] if (!isNaN(pendingJobs[jobKey].targetWorkerIndex)) { if (workers[pendingJobs[jobKey].targetWorkerIndex].free) { freeWorker = workers[pendingJobs[jobKey].targetWorkerIndex] logger.DebugLog( `RESERVING WORKER ${pendingJobs[jobKey].targetWorkerIndex}`, 'job', 1 ) } } else { freeWorker = workers.find((worker) => { return worker.free }) if (freeWorker) { logger.DebugLog( `RESERVING FIRST AVAILABLE WORKER ${freeWorker.index}`, 'job', 1 ) } } i++ } if (!freeWorker) { logger.DebugLog('NO FREE WORKER', 'job', 1) return } if (freeWorker.free) { freeWorker.free = false } const job = pendingJobs[jobKey] delete pendingJobs[jobKey] doSomething(freeWorker, job.workData) .then((res) => { freeWorker.free = true job.doneEvent.emit('done', res) }) .catch(function(err) { handleWorkerError(freeWorker, err) }) } } function getAWorker(i, initData) { const worker = workerTs(workerFile, { workerData: { workerIndex: i, initData: initData, }, }) worker.setMaxListeners(50) worker.on('error', (err) => { logger.Log('Worker error!', logger.GetColor('redbg')) console.error(err) }) worker.on('exit', (code) => { // TODO: this is critical, whole server should stop, or child threads should be restarted logger.Log( `[MAIN]: worker #${i} exit code: ${code}`, code === 0 ? logger.GetColor('redbg') : logger.GetColor('green') ) }) return worker } // --------------------------------------------------------------------------- function doSomething(currWorker, obj) { const { /* index, */ worker } = currWorker return new Promise((resolve) => { worker.postMessage(obj) worker.once('message', (msg) => { resolve(msg) }) }) } const workerTs = (file: string, wkOpts: any) => { wkOpts.eval = true if (!wkOpts.workerData) { wkOpts.workerData = {} } wkOpts.workerData.__filename = file return new Worker( ` const wk = require('worker_threads'); require('ts-node').register(); let file = wk.workerData.__filename; delete wk.workerData.__filename; require(file); `, wkOpts ) }