restart workers on error

This commit is contained in:
mrfry 2022-12-12 20:41:51 +01:00
parent 0259cfe1a7
commit 16cd4d9d4b
4 changed files with 57 additions and 14 deletions

View file

@ -483,7 +483,7 @@ function setup(data: SubmoduleData): Submodule {
const dataFiles: Array<DataFile> = utils.ReadJSON(dbsFile) const dataFiles: Array<DataFile> = utils.ReadJSON(dbsFile)
const questionDbs: Array<QuestionDb> = loadJSON(dataFiles, publicDir) const questionDbs: Array<QuestionDb> = loadJSON(dataFiles, publicDir)
initWorkerPool(questionDbs) initWorkerPool(() => questionDbs)
const filesToWatch = [ const filesToWatch = [
{ {

View file

@ -693,8 +693,18 @@ function handleWorkerData() {
}: { workerIndex: number; initData: Array<QuestionDb> } = workerData }: { workerIndex: number; initData: Array<QuestionDb> } = workerData
let qdbs: Array<QuestionDb> = initData let qdbs: Array<QuestionDb> = initData
const subjCount = initData.length
const questionCount = initData.reduce((qCount, qdb) => {
return (
qCount +
qdb.data.reduce((sCount, subject) => {
return sCount + subject.Questions.length
}, 0)
)
}, 0)
logger.Log( logger.Log(
`[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty` `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty! subjects: ${subjCount}, questions: ${questionCount}`
) )
parentPort.on('message', async (msg /*: TaskObject */) => { parentPort.on('message', async (msg /*: TaskObject */) => {
@ -936,6 +946,7 @@ export function cleanDb(
return recievedQuestions.map(() => []) return recievedQuestions.map(() => [])
} }
// FIXME: compare images & data too!
const questionIndexesToRemove = recievedQuestions.map((recievedQuestion) => const questionIndexesToRemove = recievedQuestions.map((recievedQuestion) =>
qdbs[qdbIndex].data[subjIndex].Questions.reduce<number[]>( qdbs[qdbIndex].data[subjIndex].Questions.reduce<number[]>(
(acc, question, i) => { (acc, question, i) => {

View file

@ -9,7 +9,7 @@ import utils from './utils'
import { isMainThread, workerData } from 'worker_threads' import { isMainThread, workerData } from 'worker_threads'
let recognizeCount = 0 let recognizeCount = 0
const MAX_ALLOWED_RECOGNIZE_COUNT = 100 const MAX_ALLOWED_RECOGNIZE_COUNT = 3000 // ~ 500 MB
// https://github.com/naptha/tesseract.js/blob/master/docs/api.md // https://github.com/naptha/tesseract.js/blob/master/docs/api.md
let tesseractWorker: TesseractWorker = null let tesseractWorker: TesseractWorker = null

View file

@ -28,6 +28,8 @@ import { Result, Edits } from './actions'
import type { Question, QuestionDb, QuestionData } from '../types/basicTypes' import type { Question, QuestionDb, QuestionData } from '../types/basicTypes'
import type { WorkerResult } from './classes' import type { WorkerResult } from './classes'
const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length
interface WorkerObj { interface WorkerObj {
worker: Worker worker: Worker
index: number index: number
@ -91,6 +93,7 @@ interface DoneEvent extends EventEmitter {
const alertOnPendingCount = 50 const alertOnPendingCount = 50
const workerFile = './src/utils/classes.ts' const workerFile = './src/utils/classes.ts'
let workers: Array<WorkerObj> let workers: Array<WorkerObj>
let getInitData: () => Array<QuestionDb> = null
const pendingJobs: { const pendingJobs: {
[id: string]: PendingJob [id: string]: PendingJob
} = {} } = {}
@ -158,14 +161,16 @@ export function doALongTask(
}) })
} }
export function initWorkerPool(initData: Array<QuestionDb>): Array<WorkerObj> { export function initWorkerPool(
initDataGetter: () => Array<QuestionDb>
): Array<WorkerObj> {
getInitData = initDataGetter
if (workers) { if (workers) {
logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg')) logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg'))
return null return null
} }
workers = [] workers = []
const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length
if (process.env.NS_THREAD_COUNT) { if (process.env.NS_THREAD_COUNT) {
logger.Log( logger.Log(
`Setting thread count from enviroment variable NS_WORKER_COUNT: '${threadCount}'`, `Setting thread count from enviroment variable NS_WORKER_COUNT: '${threadCount}'`,
@ -175,7 +180,7 @@ export function initWorkerPool(initData: Array<QuestionDb>): Array<WorkerObj> {
for (let i = 0; i < threadCount; i++) { for (let i = 0; i < threadCount; i++) {
workers.push({ workers.push({
worker: getAWorker(i, initData), worker: getAWorker(i, getInitData()),
index: i, index: i,
free: true, free: true,
}) })
@ -239,10 +244,10 @@ function processJob() {
} }
} }
function getAWorker(i: number, initData: Array<QuestionDb>) { function getAWorker(workerIndex: number, initData: Array<QuestionDb>) {
const worker = workerTs(workerFile, { const worker = workerTs(workerFile, {
workerData: { workerData: {
workerIndex: i, workerIndex: workerIndex,
initData: initData, initData: initData,
}, },
}) })
@ -254,16 +259,43 @@ function getAWorker(i: number, initData: Array<QuestionDb>) {
console.error(err) console.error(err)
}) })
worker.on('exit', (code) => { worker.on('exit', (exitCode) => {
// TODO: this is critical, whole server should stop, or child threads should be restarted handleWorkerExit(workerIndex, exitCode)
logger.Log(
`[MAIN]: worker #${i} exit code: ${code}`,
code === 0 ? logger.GetColor('redbg') : logger.GetColor('green')
)
}) })
return worker return worker
} }
function handleWorkerExit(exitedWorkerIndex: number, exitCode: number) {
logger.Log(
`[THREAD #${exitedWorkerIndex}]: exit code: ${exitCode}`,
logger.GetColor('redbg')
)
const exitedWorker = workers.find((worker) => {
return worker.index === exitedWorkerIndex
})
try {
exitedWorker.worker.removeAllListeners()
exitedWorker.worker.terminate()
} catch (e) {
console.log(e)
}
workers = workers.filter((worker) => {
return worker.index !== exitedWorkerIndex
})
if (workers.length < threadCount) {
logger.Log(`[THREAD #${exitedWorkerIndex}]: Restarting ... `)
workers.push({
worker: getAWorker(exitedWorkerIndex, getInitData()),
index: exitedWorkerIndex,
free: true,
})
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function doSomething(currWorker: WorkerObj, obj: TaskObject) { function doSomething(currWorker: WorkerObj, obj: TaskObject) {