mrfrys-node-server/src/worker/worker.ts
2023-09-16 09:22:37 +02:00

104 lines
3.6 KiB
TypeScript

import { isMainThread, parentPort, workerData } from 'worker_threads'
import { QuestionDb } from '../types/basicTypes'
import { SearchResultQuestion, countOfQdbs } from '../utils/qdbUtils'
import logger from '../utils/logger'
import { TaskObject } from './workerPool'
import { tesseractLoaded } from '../utils/tesseract'
import { handleSearch } from './handlers/handleSearch'
import { handleMerge } from './handlers/handleMerge'
import { handleDbEdit } from './handlers/handleDbEdit'
import { handleNewQuestions } from './handlers/handleNewQuestion'
import { handleNewDb } from './handlers/handleNewDb'
import { handleDbClean } from './handlers/handleDbClean'
import { handleQuestionsToPeers } from './handlers/handleQuestionsToPeers'
import { handleRmQuestions } from './handlers/handleRmQuestions'
import { handleUsersToPeers } from './handlers/handleUsersToPeers'
import { handleUserFilesToPeers } from './handlers/handleUserFilesToPeers'
export interface WorkerResult {
msg: string
workerIndex: number
result?: SearchResultQuestion[] | number[][]
error?: boolean
}
if (!isMainThread) {
handleWorkerData()
}
async function handleWorkerData() {
const {
workerIndex,
initData,
}: { workerIndex: number; initData: Array<QuestionDb> } = workerData
let qdbs: Array<QuestionDb> = initData
const setQdbs = (newVal: Array<QuestionDb>) => {
qdbs = newVal
}
const qdbCount = initData.length
const { subjCount, questionCount } = countOfQdbs(initData)
logger.Log(
`[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty! qdbs: ${qdbCount}, subjects: ${subjCount.toLocaleString()}, questions: ${questionCount.toLocaleString()}`
)
parentPort.on('message', async (msg: TaskObject) => {
try {
await tesseractLoaded
await handleMessage(qdbs, msg, workerIndex, setQdbs)
} catch (e) {
console.error(e)
parentPort.postMessage({
msg: `From thread #${workerIndex}: unhandled error occured! (${
(msg as any)?.type
})`,
workerIndex: workerIndex,
e: e,
})
}
})
}
function handleMessage(
qdbs: QuestionDb[],
msg: TaskObject,
workerIndex: number,
setQdbs: (newVal: QuestionDb[]) => void
) {
switch (msg.type) {
case 'search':
return handleSearch(qdbs, msg, workerIndex)
case 'merge':
return handleMerge(qdbs, msg, workerIndex)
case 'dbEdit':
return handleDbEdit(qdbs, msg, workerIndex, setQdbs)
case 'newQuestions':
return handleNewQuestions(qdbs, msg, workerIndex, setQdbs)
case 'newdb':
return handleNewDb(qdbs, msg, workerIndex, setQdbs)
case 'dbClean':
return handleDbClean(qdbs, msg, workerIndex)
case 'rmQuestions':
return handleRmQuestions(qdbs, msg, workerIndex, setQdbs)
case 'sendQuestionsToPeers':
return handleQuestionsToPeers(qdbs, msg, workerIndex)
case 'sendUsersToPeers':
return handleUsersToPeers(qdbs, msg, workerIndex)
case 'sendUserFilesToPeers':
return handleUserFilesToPeers(qdbs, msg, workerIndex)
default:
logger.Log(`Invalid msg type!`, logger.GetColor('redbg'))
console.error(msg)
parentPort.postMessage({
msg: `From thread #${workerIndex}: Invalid message type (${
(msg as any)?.type
})!`,
workerIndex: workerIndex,
})
return null
break
}
}