From 16cd4d9d4b2ddce0b1f70cbde7a5949dac626399 Mon Sep 17 00:00:00 2001 From: mrfry Date: Mon, 12 Dec 2022 20:41:51 +0100 Subject: [PATCH] restart workers on error --- src/modules/api/submodules/qminingapi.ts | 2 +- src/utils/classes.ts | 13 +++++- src/utils/tesseract.ts | 2 +- src/utils/workerPool.ts | 54 +++++++++++++++++++----- 4 files changed, 57 insertions(+), 14 deletions(-) diff --git a/src/modules/api/submodules/qminingapi.ts b/src/modules/api/submodules/qminingapi.ts index c3a07c2..b15f3a8 100644 --- a/src/modules/api/submodules/qminingapi.ts +++ b/src/modules/api/submodules/qminingapi.ts @@ -483,7 +483,7 @@ function setup(data: SubmoduleData): Submodule { const dataFiles: Array = utils.ReadJSON(dbsFile) const questionDbs: Array = loadJSON(dataFiles, publicDir) - initWorkerPool(questionDbs) + initWorkerPool(() => questionDbs) const filesToWatch = [ { diff --git a/src/utils/classes.ts b/src/utils/classes.ts index a9919b7..8fd9484 100755 --- a/src/utils/classes.ts +++ b/src/utils/classes.ts @@ -693,8 +693,18 @@ function handleWorkerData() { }: { workerIndex: number; initData: Array } = workerData let qdbs: Array = initData + const subjCount = initData.length + const questionCount = initData.reduce((qCount, qdb) => { + return ( + qCount + + qdb.data.reduce((sCount, subject) => { + return sCount + subject.Questions.length + }, 0) + ) + }, 0) + logger.Log( - `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty` + `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty! subjects: ${subjCount}, questions: ${questionCount}` ) parentPort.on('message', async (msg /*: TaskObject */) => { @@ -936,6 +946,7 @@ export function cleanDb( return recievedQuestions.map(() => []) } + // FIXME: compare images & data too! const questionIndexesToRemove = recievedQuestions.map((recievedQuestion) => qdbs[qdbIndex].data[subjIndex].Questions.reduce( (acc, question, i) => { diff --git a/src/utils/tesseract.ts b/src/utils/tesseract.ts index 316f8d0..117d922 100644 --- a/src/utils/tesseract.ts +++ b/src/utils/tesseract.ts @@ -9,7 +9,7 @@ import utils from './utils' import { isMainThread, workerData } from 'worker_threads' let recognizeCount = 0 -const MAX_ALLOWED_RECOGNIZE_COUNT = 100 +const MAX_ALLOWED_RECOGNIZE_COUNT = 3000 // ~ 500 MB // https://github.com/naptha/tesseract.js/blob/master/docs/api.md let tesseractWorker: TesseractWorker = null diff --git a/src/utils/workerPool.ts b/src/utils/workerPool.ts index cdd81f3..12417bc 100644 --- a/src/utils/workerPool.ts +++ b/src/utils/workerPool.ts @@ -28,6 +28,8 @@ import { Result, Edits } from './actions' import type { Question, QuestionDb, QuestionData } from '../types/basicTypes' import type { WorkerResult } from './classes' +const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length + interface WorkerObj { worker: Worker index: number @@ -91,6 +93,7 @@ interface DoneEvent extends EventEmitter { const alertOnPendingCount = 50 const workerFile = './src/utils/classes.ts' let workers: Array +let getInitData: () => Array = null const pendingJobs: { [id: string]: PendingJob } = {} @@ -158,14 +161,16 @@ export function doALongTask( }) } -export function initWorkerPool(initData: Array): Array { +export function initWorkerPool( + initDataGetter: () => Array +): Array { + getInitData = initDataGetter if (workers) { logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg')) return null } workers = [] - const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length if (process.env.NS_THREAD_COUNT) { logger.Log( `Setting thread count from enviroment variable NS_WORKER_COUNT: '${threadCount}'`, @@ -175,7 +180,7 @@ export function initWorkerPool(initData: Array): Array { for (let i = 0; i < threadCount; i++) { workers.push({ - worker: getAWorker(i, initData), + worker: getAWorker(i, getInitData()), index: i, free: true, }) @@ -239,10 +244,10 @@ function processJob() { } } -function getAWorker(i: number, initData: Array) { +function getAWorker(workerIndex: number, initData: Array) { const worker = workerTs(workerFile, { workerData: { - workerIndex: i, + workerIndex: workerIndex, initData: initData, }, }) @@ -254,16 +259,43 @@ function getAWorker(i: number, initData: Array) { console.error(err) }) - worker.on('exit', (code) => { - // TODO: this is critical, whole server should stop, or child threads should be restarted - logger.Log( - `[MAIN]: worker #${i} exit code: ${code}`, - code === 0 ? logger.GetColor('redbg') : logger.GetColor('green') - ) + worker.on('exit', (exitCode) => { + handleWorkerExit(workerIndex, exitCode) }) return worker } +function handleWorkerExit(exitedWorkerIndex: number, exitCode: number) { + logger.Log( + `[THREAD #${exitedWorkerIndex}]: exit code: ${exitCode}`, + logger.GetColor('redbg') + ) + + const exitedWorker = workers.find((worker) => { + return worker.index === exitedWorkerIndex + }) + try { + exitedWorker.worker.removeAllListeners() + exitedWorker.worker.terminate() + } catch (e) { + console.log(e) + } + + workers = workers.filter((worker) => { + return worker.index !== exitedWorkerIndex + }) + + if (workers.length < threadCount) { + logger.Log(`[THREAD #${exitedWorkerIndex}]: Restarting ... `) + + workers.push({ + worker: getAWorker(exitedWorkerIndex, getInitData()), + index: exitedWorkerIndex, + free: true, + }) + } +} + // --------------------------------------------------------------------------- function doSomething(currWorker: WorkerObj, obj: TaskObject) {