mirror of
https://gitlab.com/MrFry/mrfrys-node-server
synced 2025-04-01 20:24:18 +02:00
prettier 4 tabwidth
This commit is contained in:
parent
00ec614f1d
commit
96b413a365
42 changed files with 7034 additions and 6905 deletions
|
@ -29,268 +29,274 @@ import type { Question, QuestionDb, QuestionData } from '../types/basicTypes'
|
|||
import type { WorkerResult } from './classes'
|
||||
|
||||
interface WorkerObj {
|
||||
worker: Worker
|
||||
index: number
|
||||
free: Boolean
|
||||
worker: Worker
|
||||
index: number
|
||||
free: Boolean
|
||||
}
|
||||
|
||||
export interface TaskObject {
|
||||
type: 'work' | 'dbEdit' | 'newQuestions' | 'newdb' | 'dbClean' | 'rmQuestions'
|
||||
data:
|
||||
| {
|
||||
searchIn: number[]
|
||||
question: Question
|
||||
subjName: string
|
||||
testUrl?: string
|
||||
questionData?: QuestionData
|
||||
searchInAllIfNoResult?: boolean
|
||||
searchTillMatchPercent?: number
|
||||
[key: string]: any
|
||||
}
|
||||
| { dbIndex: number; edits: Edits }
|
||||
| QuestionDb
|
||||
| Result
|
||||
| {
|
||||
questions: Question[]
|
||||
subjToClean: string
|
||||
overwriteFromDate: number
|
||||
qdbIndex: number
|
||||
}
|
||||
| {
|
||||
questionIndexesToRemove: number[][]
|
||||
subjIndex: number
|
||||
qdbIndex: number
|
||||
recievedQuestions: Question[]
|
||||
}
|
||||
type:
|
||||
| 'work'
|
||||
| 'dbEdit'
|
||||
| 'newQuestions'
|
||||
| 'newdb'
|
||||
| 'dbClean'
|
||||
| 'rmQuestions'
|
||||
data:
|
||||
| {
|
||||
searchIn: number[]
|
||||
question: Question
|
||||
subjName: string
|
||||
testUrl?: string
|
||||
questionData?: QuestionData
|
||||
searchInAllIfNoResult?: boolean
|
||||
searchTillMatchPercent?: number
|
||||
[key: string]: any
|
||||
}
|
||||
| { dbIndex: number; edits: Edits }
|
||||
| QuestionDb
|
||||
| Result
|
||||
| {
|
||||
questions: Question[]
|
||||
subjToClean: string
|
||||
overwriteFromDate: number
|
||||
qdbIndex: number
|
||||
}
|
||||
| {
|
||||
questionIndexesToRemove: number[][]
|
||||
subjIndex: number
|
||||
qdbIndex: number
|
||||
recievedQuestions: Question[]
|
||||
}
|
||||
}
|
||||
|
||||
interface PendingJob {
|
||||
workData: TaskObject
|
||||
doneEvent: DoneEvent
|
||||
targetWorkerIndex?: number
|
||||
workData: TaskObject
|
||||
doneEvent: DoneEvent
|
||||
targetWorkerIndex?: number
|
||||
}
|
||||
|
||||
interface JobEvent extends EventEmitter {
|
||||
on(event: 'jobDone', listener: () => void): this
|
||||
on(event: 'newJob', listener: () => void): this
|
||||
emit(event: 'newJob'): boolean
|
||||
emit(event: 'jobDone'): boolean
|
||||
on(event: 'jobDone', listener: () => void): this
|
||||
on(event: 'newJob', listener: () => void): this
|
||||
emit(event: 'newJob'): boolean
|
||||
emit(event: 'jobDone'): boolean
|
||||
}
|
||||
|
||||
interface DoneEvent extends EventEmitter {
|
||||
once(event: 'done', listener: (result: WorkerResult) => void): this
|
||||
emit(event: 'done', res: WorkerResult): boolean
|
||||
once(event: 'done', listener: (result: WorkerResult) => void): this
|
||||
emit(event: 'done', res: WorkerResult): boolean
|
||||
}
|
||||
|
||||
const alertOnPendingCount = 50
|
||||
const workerFile = './src/utils/classes.ts'
|
||||
let workers: Array<WorkerObj>
|
||||
const pendingJobs: {
|
||||
[id: string]: PendingJob
|
||||
[id: string]: PendingJob
|
||||
} = {}
|
||||
|
||||
const jobEvents: JobEvent = new EventEmitter()
|
||||
|
||||
jobEvents.on('jobDone', () => {
|
||||
processJob()
|
||||
processJob()
|
||||
})
|
||||
|
||||
jobEvents.on('newJob', () => {
|
||||
processJob()
|
||||
processJob()
|
||||
})
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function handleWorkerError(worker: WorkerObj, err: Error) {
|
||||
// TODO: restart worker if exited or things like that
|
||||
logger.Log('resourcePromise error', logger.GetColor('redbg'))
|
||||
console.error(err, worker)
|
||||
// TODO: restart worker if exited or things like that
|
||||
logger.Log('resourcePromise error', logger.GetColor('redbg'))
|
||||
console.error(err, worker)
|
||||
}
|
||||
|
||||
// TODO: accuire all workers here, and handle errors so they can be removed if threads exit
|
||||
export function msgAllWorker(data: TaskObject): Promise<WorkerResult[]> {
|
||||
logger.DebugLog('MSGING ALL WORKER', 'job', 1)
|
||||
return new Promise((resolve) => {
|
||||
const promises: Promise<WorkerResult>[] = []
|
||||
workers.forEach((worker) => {
|
||||
promises.push(doALongTask(data, worker.index))
|
||||
logger.DebugLog('MSGING ALL WORKER', 'job', 1)
|
||||
return new Promise((resolve) => {
|
||||
const promises: Promise<WorkerResult>[] = []
|
||||
workers.forEach((worker) => {
|
||||
promises.push(doALongTask(data, worker.index))
|
||||
})
|
||||
Promise.all(promises).then((res) => {
|
||||
logger.DebugLog('MSGING ALL WORKER DONE', 'job', 1)
|
||||
resolve(res)
|
||||
})
|
||||
})
|
||||
Promise.all(promises).then((res) => {
|
||||
logger.DebugLog('MSGING ALL WORKER DONE', 'job', 1)
|
||||
resolve(res)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export function doALongTask(
|
||||
obj: TaskObject,
|
||||
targetWorkerIndex?: number
|
||||
obj: TaskObject,
|
||||
targetWorkerIndex?: number
|
||||
): Promise<WorkerResult> {
|
||||
if (Object.keys(pendingJobs).length > alertOnPendingCount) {
|
||||
logger.Log(
|
||||
`More than ${alertOnPendingCount} callers waiting for free resource! (${
|
||||
Object.keys(pendingJobs).length
|
||||
})`,
|
||||
logger.GetColor('redbg')
|
||||
)
|
||||
}
|
||||
if (Object.keys(pendingJobs).length > alertOnPendingCount) {
|
||||
logger.Log(
|
||||
`More than ${alertOnPendingCount} callers waiting for free resource! (${
|
||||
Object.keys(pendingJobs).length
|
||||
})`,
|
||||
logger.GetColor('redbg')
|
||||
)
|
||||
}
|
||||
|
||||
const jobId = uuidv4()
|
||||
// FIXME: delete doneEvent?
|
||||
const doneEvent: DoneEvent = new EventEmitter()
|
||||
pendingJobs[jobId] = {
|
||||
workData: obj,
|
||||
targetWorkerIndex: targetWorkerIndex,
|
||||
doneEvent: doneEvent,
|
||||
}
|
||||
jobEvents.emit('newJob')
|
||||
return new Promise((resolve) => {
|
||||
doneEvent.once('done', (result: WorkerResult) => {
|
||||
jobEvents.emit('jobDone')
|
||||
resolve(result)
|
||||
const jobId = uuidv4()
|
||||
// FIXME: delete doneEvent?
|
||||
const doneEvent: DoneEvent = new EventEmitter()
|
||||
pendingJobs[jobId] = {
|
||||
workData: obj,
|
||||
targetWorkerIndex: targetWorkerIndex,
|
||||
doneEvent: doneEvent,
|
||||
}
|
||||
jobEvents.emit('newJob')
|
||||
return new Promise((resolve) => {
|
||||
doneEvent.once('done', (result: WorkerResult) => {
|
||||
jobEvents.emit('jobDone')
|
||||
resolve(result)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export function initWorkerPool(initData: Array<QuestionDb>): Array<WorkerObj> {
|
||||
if (workers) {
|
||||
logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg'))
|
||||
return null
|
||||
}
|
||||
workers = []
|
||||
if (workers) {
|
||||
logger.Log('WORKERS ALREADY EXISTS', logger.GetColor('redbg'))
|
||||
return null
|
||||
}
|
||||
workers = []
|
||||
|
||||
const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length
|
||||
if (process.env.NS_THREAD_COUNT) {
|
||||
logger.Log(
|
||||
`Setting thread count from enviroment variable NS_WORKER_COUNT: '${threadCount}'`,
|
||||
logger.GetColor('red')
|
||||
)
|
||||
}
|
||||
const threadCount = process.env.NS_THREAD_COUNT || os.cpus().length
|
||||
if (process.env.NS_THREAD_COUNT) {
|
||||
logger.Log(
|
||||
`Setting thread count from enviroment variable NS_WORKER_COUNT: '${threadCount}'`,
|
||||
logger.GetColor('red')
|
||||
)
|
||||
}
|
||||
|
||||
for (let i = 0; i < threadCount; i++) {
|
||||
workers.push({
|
||||
worker: getAWorker(i, initData),
|
||||
index: i,
|
||||
free: true,
|
||||
})
|
||||
}
|
||||
for (let i = 0; i < threadCount; i++) {
|
||||
workers.push({
|
||||
worker: getAWorker(i, initData),
|
||||
index: i,
|
||||
free: true,
|
||||
})
|
||||
}
|
||||
|
||||
return workers
|
||||
return workers
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function processJob() {
|
||||
if (Object.keys(pendingJobs).length > 0) {
|
||||
const keys = Object.keys(pendingJobs)
|
||||
let jobKey: string, freeWorker: WorkerObj
|
||||
let i = 0
|
||||
while (!freeWorker && i < keys.length) {
|
||||
jobKey = keys[i]
|
||||
if (!isNaN(pendingJobs[jobKey].targetWorkerIndex)) {
|
||||
if (workers[pendingJobs[jobKey].targetWorkerIndex].free) {
|
||||
freeWorker = workers[pendingJobs[jobKey].targetWorkerIndex]
|
||||
logger.DebugLog(
|
||||
`RESERVING WORKER ${pendingJobs[jobKey].targetWorkerIndex}`,
|
||||
'job',
|
||||
1
|
||||
)
|
||||
if (Object.keys(pendingJobs).length > 0) {
|
||||
const keys = Object.keys(pendingJobs)
|
||||
let jobKey: string, freeWorker: WorkerObj
|
||||
let i = 0
|
||||
while (!freeWorker && i < keys.length) {
|
||||
jobKey = keys[i]
|
||||
if (!isNaN(pendingJobs[jobKey].targetWorkerIndex)) {
|
||||
if (workers[pendingJobs[jobKey].targetWorkerIndex].free) {
|
||||
freeWorker = workers[pendingJobs[jobKey].targetWorkerIndex]
|
||||
logger.DebugLog(
|
||||
`RESERVING WORKER ${pendingJobs[jobKey].targetWorkerIndex}`,
|
||||
'job',
|
||||
1
|
||||
)
|
||||
}
|
||||
} else {
|
||||
freeWorker = workers.find((worker) => {
|
||||
return worker.free
|
||||
})
|
||||
if (freeWorker) {
|
||||
logger.DebugLog(
|
||||
`RESERVING FIRST AVAILABLE WORKER ${freeWorker.index}`,
|
||||
'job',
|
||||
1
|
||||
)
|
||||
}
|
||||
}
|
||||
i++
|
||||
}
|
||||
} else {
|
||||
freeWorker = workers.find((worker) => {
|
||||
return worker.free
|
||||
})
|
||||
if (freeWorker) {
|
||||
logger.DebugLog(
|
||||
`RESERVING FIRST AVAILABLE WORKER ${freeWorker.index}`,
|
||||
'job',
|
||||
1
|
||||
)
|
||||
|
||||
if (!freeWorker) {
|
||||
logger.DebugLog('NO FREE WORKER', 'job', 1)
|
||||
return
|
||||
}
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if (!freeWorker) {
|
||||
logger.DebugLog('NO FREE WORKER', 'job', 1)
|
||||
return
|
||||
}
|
||||
if (freeWorker.free) {
|
||||
freeWorker.free = false
|
||||
}
|
||||
const job = pendingJobs[jobKey]
|
||||
delete pendingJobs[jobKey]
|
||||
|
||||
if (freeWorker.free) {
|
||||
freeWorker.free = false
|
||||
doSomething(freeWorker, job.workData)
|
||||
.then((res: WorkerResult) => {
|
||||
freeWorker.free = true
|
||||
job.doneEvent.emit('done', res)
|
||||
})
|
||||
.catch(function (err) {
|
||||
handleWorkerError(freeWorker, err)
|
||||
})
|
||||
}
|
||||
const job = pendingJobs[jobKey]
|
||||
delete pendingJobs[jobKey]
|
||||
|
||||
doSomething(freeWorker, job.workData)
|
||||
.then((res: WorkerResult) => {
|
||||
freeWorker.free = true
|
||||
job.doneEvent.emit('done', res)
|
||||
})
|
||||
.catch(function (err) {
|
||||
handleWorkerError(freeWorker, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function getAWorker(i: number, initData: Array<QuestionDb>) {
|
||||
const worker = workerTs(workerFile, {
|
||||
workerData: {
|
||||
workerIndex: i,
|
||||
initData: initData,
|
||||
},
|
||||
})
|
||||
const worker = workerTs(workerFile, {
|
||||
workerData: {
|
||||
workerIndex: i,
|
||||
initData: initData,
|
||||
},
|
||||
})
|
||||
|
||||
worker.setMaxListeners(50)
|
||||
worker.setMaxListeners(50)
|
||||
|
||||
worker.on('error', (err) => {
|
||||
logger.Log('Worker error!', logger.GetColor('redbg'))
|
||||
console.error(err)
|
||||
})
|
||||
worker.on('error', (err) => {
|
||||
logger.Log('Worker error!', logger.GetColor('redbg'))
|
||||
console.error(err)
|
||||
})
|
||||
|
||||
worker.on('exit', (code) => {
|
||||
// TODO: this is critical, whole server should stop, or child threads should be restarted
|
||||
logger.Log(
|
||||
`[MAIN]: worker #${i} exit code: ${code}`,
|
||||
code === 0 ? logger.GetColor('redbg') : logger.GetColor('green')
|
||||
)
|
||||
})
|
||||
return worker
|
||||
worker.on('exit', (code) => {
|
||||
// TODO: this is critical, whole server should stop, or child threads should be restarted
|
||||
logger.Log(
|
||||
`[MAIN]: worker #${i} exit code: ${code}`,
|
||||
code === 0 ? logger.GetColor('redbg') : logger.GetColor('green')
|
||||
)
|
||||
})
|
||||
return worker
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function doSomething(currWorker: WorkerObj, obj: TaskObject) {
|
||||
const { /* index, */ worker } = currWorker
|
||||
return new Promise((resolve) => {
|
||||
worker.postMessage(obj)
|
||||
worker.once('message', (msg: WorkerResult) => {
|
||||
resolve(msg)
|
||||
const { /* index, */ worker } = currWorker
|
||||
return new Promise((resolve) => {
|
||||
worker.postMessage(obj)
|
||||
worker.once('message', (msg: WorkerResult) => {
|
||||
resolve(msg)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const workerTs = (
|
||||
file: string,
|
||||
wkOpts: {
|
||||
workerData: {
|
||||
workerIndex: number
|
||||
initData: QuestionDb[]
|
||||
__filename?: string
|
||||
file: string,
|
||||
wkOpts: {
|
||||
workerData: {
|
||||
workerIndex: number
|
||||
initData: QuestionDb[]
|
||||
__filename?: string
|
||||
}
|
||||
eval?: boolean
|
||||
}
|
||||
eval?: boolean
|
||||
}
|
||||
) => {
|
||||
wkOpts.eval = true
|
||||
wkOpts.workerData.__filename = file
|
||||
return new Worker(
|
||||
`
|
||||
wkOpts.eval = true
|
||||
wkOpts.workerData.__filename = file
|
||||
return new Worker(
|
||||
`
|
||||
const wk = require('worker_threads');
|
||||
require('ts-node').register();
|
||||
let file = wk.workerData.__filename;
|
||||
delete wk.workerData.__filename;
|
||||
require(file);
|
||||
`,
|
||||
wkOpts
|
||||
)
|
||||
wkOpts
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue