mrfrys-node-server/src/worker/workerPool.ts
2023-05-01 10:56:58 +02:00

312 lines
8.9 KiB
TypeScript

/* ----------------------------------------------------------------------------
Question Server
GitLab: <https://gitlab.com/MrFry/mrfrys-node-server>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
------------------------------------------------------------------------- */
import { Worker } from 'worker_threads'
import { v4 as uuidv4 } from 'uuid'
import { EventEmitter } from 'events'
import os from 'os'
import type { QuestionDb } from '../types/basicTypes'
import { SearchTaskObject } from './handlers/handleSearch'
import { DbEditTaskObject } from './handlers/handleDbEdit'
import { NewQuestionTaskObject } from './handlers/handleNewQuestion'
import { NewDbTaskObject } from './handlers/handleNewDb'
import { DbCleanTaskObject } from './handlers/handleDbClean'
import { RmQuestionsTaskObject } from './handlers/handleRmQuestions'
import { MergeTaskObject } from './handlers/handleMerge'
import { QuestionsToPeersTaskObject } from './handlers/handleQuestionsToPeers'
import { WorkerResult } from './worker'
import logger from '../utils/logger'
import { UsersToPeersTaskObject } from './handlers/handleUsersToPeers'
const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length
interface WorkerObj {
worker: Worker
index: number
free: Boolean
}
export type TaskObject =
| SearchTaskObject
| DbEditTaskObject
| NewQuestionTaskObject
| NewDbTaskObject
| DbCleanTaskObject
| RmQuestionsTaskObject
| MergeTaskObject
| QuestionsToPeersTaskObject
| UsersToPeersTaskObject
interface PendingJob {
workData: TaskObject
doneEvent: DoneEvent
targetWorkerIndex?: number
}
interface JobEvent extends EventEmitter {
on(event: 'jobDone', listener: () => void): this
on(event: 'newJob', listener: () => void): this
emit(event: 'newJob'): boolean
emit(event: 'jobDone'): boolean
}
interface DoneEvent extends EventEmitter {
once(event: 'done', listener: (result: WorkerResult) => void): this
emit(event: 'done', res: WorkerResult): boolean
}
export const defaultAlertOnPendingCount = 100
let alertOnPendingCount = defaultAlertOnPendingCount
const workerFile = './src/worker/worker.ts'
let workers: Array<WorkerObj>
let getInitData: () => Array<QuestionDb> = null
const pendingJobs: {
[id: string]: PendingJob
} = {}
const jobEvents: JobEvent = new EventEmitter()
jobEvents.on('jobDone', () => {
processJob()
})
jobEvents.on('newJob', () => {
processJob()
})
// ---------------------------------------------------------------------------
function handleWorkerError(worker: WorkerObj, err: Error) {
// TODO: restart worker if exited or things like that
logger.Log('resourcePromise error', logger.GetColor('redbg'))
console.error(err, worker)
}
// TODO: accuire all workers here, and handle errors so they can be removed if threads exit
export function msgAllWorker(data: TaskObject): Promise<WorkerResult[]> {
return new Promise((resolve) => {
const promises: Promise<WorkerResult>[] = []
workers.forEach((worker) => {
promises.push(queueWork(data, worker.index))
})
Promise.all(promises).then((res) => {
resolve(res)
})
})
}
export function setPendingJobsAlertCount(newVal?: number): void {
const count = newVal != null ? newVal : defaultAlertOnPendingCount
alertOnPendingCount = count
}
export function queueWork(
obj: TaskObject,
targetWorkerIndex?: number
): Promise<WorkerResult> {
if (Object.keys(pendingJobs).length > alertOnPendingCount) {
console.error(
`More than ${alertOnPendingCount} callers waiting for free resource! (${
Object.keys(pendingJobs).length
})`
)
}
const jobId = uuidv4()
// FIXME: delete doneEvent?
const doneEvent: DoneEvent = new EventEmitter()
pendingJobs[jobId] = {
workData: obj,
targetWorkerIndex: targetWorkerIndex,
doneEvent: doneEvent,
}
jobEvents.emit('newJob')
return new Promise((resolve) => {
doneEvent.once('done', (result: WorkerResult) => {
jobEvents.emit('jobDone')
resolve(result)
})
})
}
export function initWorkerPool(
initDataGetter: () => Array<QuestionDb>
): Array<WorkerObj> {
getInitData = initDataGetter
if (workers) {
logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg'))
return null
}
workers = []
if (process.env.NS_THREAD_COUNT) {
logger.Log(
`Setting thread count from enviroment variable NS_WORKER_COUNT: '${threadCount}'`,
'yellowbg'
)
}
for (let i = 0; i < threadCount; i++) {
workers.push({
worker: getAWorker(i, getInitData()),
index: i,
free: true,
})
}
return workers
}
// ---------------------------------------------------------------------------
function processJob() {
if (Object.keys(pendingJobs).length > 0) {
const keys = Object.keys(pendingJobs)
let jobKey: string, freeWorker: WorkerObj
let i = 0
while (!freeWorker && i < keys.length) {
jobKey = keys[i]
if (!isNaN(pendingJobs[jobKey].targetWorkerIndex)) {
if (workers[pendingJobs[jobKey].targetWorkerIndex].free) {
freeWorker = workers[pendingJobs[jobKey].targetWorkerIndex]
}
} else {
freeWorker = workers.find((worker) => {
return worker.free
})
}
i++
}
if (!freeWorker) {
return
}
if (freeWorker.free) {
freeWorker.free = false
}
const job = pendingJobs[jobKey]
delete pendingJobs[jobKey]
doSomething(freeWorker, job.workData)
.then((res: WorkerResult) => {
freeWorker.free = true
job.doneEvent.emit('done', res)
})
.catch(function (err) {
handleWorkerError(freeWorker, err)
})
}
}
function getAWorker(workerIndex: number, initData: Array<QuestionDb>) {
const worker = workerTs(workerFile, {
workerData: {
workerIndex: workerIndex,
initData: initData,
},
})
worker.setMaxListeners(50)
worker.on('error', (err) => {
logger.Log('Worker error!', logger.GetColor('redbg'))
console.error(err)
})
worker.on('exit', (exitCode) => {
handleWorkerExit(workerIndex, exitCode)
})
return worker
}
function handleWorkerExit(exitedWorkerIndex: number, exitCode: number) {
logger.Log(
`[THREAD #${exitedWorkerIndex}]: exit code: ${exitCode}`,
logger.GetColor('redbg')
)
const exitedWorker = workers.find((worker) => {
return worker.index === exitedWorkerIndex
})
try {
exitedWorker.worker.removeAllListeners()
exitedWorker.worker.terminate()
} catch (e) {
console.log(e)
}
workers = workers.filter((worker) => {
return worker.index !== exitedWorkerIndex
})
if (workers.length < threadCount) {
logger.Log(`[THREAD #${exitedWorkerIndex}]: Restarting ... `)
workers.push({
worker: getAWorker(exitedWorkerIndex, getInitData()),
index: exitedWorkerIndex,
free: true,
})
}
}
// ---------------------------------------------------------------------------
function doSomething(currWorker: WorkerObj, obj: TaskObject) {
const { /* index, */ worker } = currWorker
return new Promise((resolve) => {
worker.postMessage(obj)
worker.once('message', (msg: WorkerResult) => {
resolve(msg)
})
})
}
const workerTs = (
file: string,
wkOpts: {
workerData: {
workerIndex: number
initData: QuestionDb[]
__filename?: string
}
eval?: boolean
}
) => {
wkOpts.eval = true
wkOpts.workerData.__filename = file
return new Worker(
`
try {
const wk = require('worker_threads');
require('ts-node').register();
let file = wk.workerData.__filename;
delete wk.workerData.__filename;
require(file);
} catch (e) {
console.error('Error while creating new Worker:')
console.error(e)
}
`,
wkOpts
)
}