diff --git a/README.md b/README.md index 5a0b697..6b9e258 100755 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ databases the pi might not be enough. The server utilizes multiple CPU cores, long running operations are ran in separate threads. Because of the implementation, the more cores a CPU has, the server uses more memory, but able to run more threads, and serve more requests at once. The used cores can be limited with environment variables -(detailed below). +(detailed below, `NS_THREAD_COUNT`). ## Terminology @@ -60,7 +60,7 @@ read them very carefully, you should know about what was created!** * By default the server redirects all HTTP traffic to HTTPS. To disable this use `NS_NO_HTTPS_FORCE` * The server launches a thread for each CPU core. This could be an overkill on 4+ cored CPU-s. Use - `NS_THREAD_COUNT` to restrict the number of threads + `NS_THREAD_COUNT` to restrict the number of threads, and potentially reduce memory usage of the server. * The setup script can be also used to update and rebuild all git modules if ran after the initial setup diff --git a/src/modules/api/api.ts b/src/modules/api/api.ts index f17bd60..7b2ea48 100644 --- a/src/modules/api/api.ts +++ b/src/modules/api/api.ts @@ -39,8 +39,8 @@ import { Submodule, } from '../../types/basicTypes' import { loadJSON } from '../../utils/actions' -import { initWorkerPool } from '../../utils/workerPool' import { paths } from '../../utils/files' +import { initWorkerPool } from '../../worker/workerPool' // other paths const moduleName = 'API' diff --git a/src/modules/api/submodules/p2p.ts b/src/modules/api/submodules/p2p.ts index 9fc54f5..c1cfaf9 100644 --- a/src/modules/api/submodules/p2p.ts +++ b/src/modules/api/submodules/p2p.ts @@ -33,7 +33,6 @@ import { } from '../../../types/basicTypes' import utils from '../../../utils/utils' import { backupData, writeData } from '../../../utils/actions' -import { WorkerResult } from '../../../utils/classes' import dbtools from '../../../utils/dbtools' import { createKeyPair, @@ -41,11 +40,6 @@ import { encrypt, isKeypairValid, } from '../../../utils/encryption' -import { - doALongTask, - msgAllWorker, - setPendingJobsAlertCount, -} from '../../../utils/workerPool' import { countOfQdb, countOfQdbs, @@ -54,7 +48,18 @@ import { removeCacheFromQuestion, } from '../../../utils/qdbUtils' import { files, paths, readAndValidateFile } from '../../../utils/files' -import { GetResult, get, post } from '../../../utils/networkUtils' +import { GetResult, get } from '../../../utils/networkUtils' +import { + msgAllWorker, + queueWork, + setPendingJobsAlertCount, +} from '../../../worker/workerPool' +import { WorkerResult } from '../../../worker/worker' +import { + loginToPeer, + peerToString, + updatePeersFile, +} from '../../../utils/p2putils' interface MergeResult { newData: Subject[] @@ -87,6 +92,7 @@ interface RemotePeerInfo { } interface SyncDataRes { + result?: string questionDbs?: QuestionDb[] remoteInfo?: RemotePeerInfo encryptedUsers?: string @@ -120,14 +126,6 @@ function updateThirdPartyPeers( ) } -function peerToString(peer: { host: string; port: string | number }) { - return `${peer.host}:${peer.port}` -} - -function isPeerSameAs(peer1: PeerInfo, peer2: PeerInfo) { - return peer1.host === peer2.host && peer1.port === peer2.port -} - export function getNewDataSince(subjects: Subject[], date: number): Subject[] { return subjects .map((subject) => { @@ -323,49 +321,69 @@ function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) { } async function authAndGetNewData({ + peers, peer, selfInfo, lastSyncWithPeer, lastSync, }: { + peers: PeerInfo[] peer: PeerInfo selfInfo: PeerInfo lastSyncWithPeer: number lastSync: number }): Promise> { - const { data, error, cookies } = await post<{ - result: string - msg: string - }>({ - hostname: peer.host, - path: '/api/login', - port: peer.port, - bodyObject: { pw: peer.pw }, - http: peer.http, - }) + let sessionCookie = peer.sessionCookie - if (error || !data || data.result !== 'success') { - return { - error: data ? new Error(data.msg) : error, - data: { - peer: peer, - }, + if (!sessionCookie) { + const loginResult = await loginToPeer(peer) + if (typeof loginResult === 'string') { + sessionCookie = loginResult + updatePeersFile(peers, { ...peer, sessionCookie: loginResult }) + } else { + return { + error: loginResult, + data: { + peer: peer, + }, + } } } - const getRes = await get( - { - headers: { - cookie: cookies.join(), + const getSyncData = () => { + return get( + { + headers: { + cookies: `sessionID=${sessionCookie}`, + }, + host: peer.host, + port: peer.port, + path: `/api/getnewdatasince?host=${encodeURIComponent( + peerToString(selfInfo) + )}${lastSync ? `&since=${lastSyncWithPeer}` : ''}`, }, - host: peer.host, - port: peer.port, - path: `/api/getnewdatasince?host=${encodeURIComponent( - peerToString(selfInfo) - )}${lastSync ? `&since=${lastSyncWithPeer}` : ''}`, - }, - peer.http - ) + peer.http + ) + } + + let getRes = await getSyncData() + + if (getRes.data?.result === 'nouser') { + // FIXME: make this more pretty? (duplicate code, see above) + const loginResult = await loginToPeer(peer) + if (typeof loginResult === 'string') { + sessionCookie = loginResult + updatePeersFile(peers, { ...peer, sessionCookie: loginResult }) + } else { + return { + error: loginResult, + data: { + peer: peer, + }, + } + } + getRes = await getSyncData() + } return { ...getRes, data: { ...getRes.data, peer: peer } } } @@ -385,21 +403,6 @@ function setup(data: SubmoduleData): Submodule { // SETUP // --------------------------------------------------------------------------------------- - if (!utils.FileExists(paths.peersFile)) { - logger.Log( - `Warning: peers file was missing, so it was created`, - 'yellowbg' - ) - utils.CreatePath(paths.peersPath) - utils.WriteFile('[]', paths.peersFile) - } - - if (!utils.FileExists(paths.selfInfoFile)) { - const msg = `Self info file for p2p does not exist! (${paths.selfInfoFile}) P2P functionality will not be loaded` - logger.Log(msg, 'redbg') - return {} - } - let publicKey: string let privateKey: string @@ -483,7 +486,7 @@ function setup(data: SubmoduleData): Submodule { selfInfo: { ...selfInfo, publicKey: publicKey }, myPeers: peers.map((peer) => { // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { pw, ...restOfPeer } = peer + const { pw, sessionCookie, ...restOfPeer } = peer return restOfPeer }), } @@ -571,7 +574,7 @@ function setup(data: SubmoduleData): Submodule { rawNewQuestionDbs.push(remoteQdb) } else { mergeJobs.push( - doALongTask({ + queueWork({ type: 'merge', data: { localQdbIndex: localQdb.index, @@ -630,6 +633,7 @@ function setup(data: SubmoduleData): Submodule { const lastSyncWithPeer = peer.lastSync || 0 return authAndGetNewData({ + peers: peers, peer: peer, selfInfo: selfInfo, lastSyncWithPeer: lastSyncWithPeer, @@ -844,21 +848,10 @@ function setup(data: SubmoduleData): Submodule { newQuestions: newQuestionCount, } // Processing result data is successfull - const updatedPeersFile = peers.map((x) => { - if (isPeerSameAs(peer, x)) { - return { - ...x, - lastSync: syncStart, - } - } else { - return x - } + updatePeersFile(peers, { + ...peer, + lastSync: syncStart, }) - - utils.WriteFile( - JSON.stringify(updatedPeersFile, null, 2), - paths.peersFile - ) } // ------------------------------------------------------------------------------------------------------- @@ -1014,7 +1007,6 @@ function setup(data: SubmoduleData): Submodule { remotePublicKey, JSON.stringify(newUsers) ) - // TODO: count sent user count logger.Log( `\tSending new users to "${remoteHost}" (encrypted)`, 'green' @@ -1050,7 +1042,6 @@ function setup(data: SubmoduleData): Submodule { ? 'all time' : new Date(since).toLocaleString() - // TODO: count sent data logger.Log( `\tSending new data to ${logger.C( 'blue' diff --git a/src/modules/api/submodules/qminingapi.ts b/src/modules/api/submodules/qminingapi.ts index d46c293..4b83f8f 100644 --- a/src/modules/api/submodules/qminingapi.ts +++ b/src/modules/api/submodules/qminingapi.ts @@ -48,11 +48,6 @@ import { editDb, RecievedData, } from '../../../utils/actions' -import { - WorkerResult, - // compareQuestionObj, -} from '../../../utils/classes' -import { doALongTask, msgAllWorker } from '../../../utils/workerPool' import dbtools from '../../../utils/dbtools' import { dataToString, @@ -65,6 +60,8 @@ import { isJsonValidAndLogError, TestUsersSchema, } from '../../../types/typeSchemas' +import { msgAllWorker, queueWork } from '../../../worker/workerPool' +import { WorkerResult } from '../../../worker/worker' interface SavedQuestionData { fname: string @@ -74,13 +71,12 @@ interface SavedQuestionData { date: string | Date } -// interface SavedQuestion { -// Questions: Question[] -// subj: string -// userid: number -// testUrl: string -// date: string -// } +export interface QuestionAddResponse { + success: boolean + newQuestions: number + totalNewQuestions: number + result?: string +} const line = '====================================================' // lol @@ -163,8 +159,8 @@ function searchInDbs( // searchIn could be [0], [1], ... to search every db in different thread. Put this into a // forEach(qdbs) to achieve this return new Promise((resolve) => { - doALongTask({ - type: 'work', + queueWork({ + type: 'search', data: { searchIn: searchIn, testUrl: testUrl, @@ -553,7 +549,9 @@ function setup(data: SubmoduleData): Submodule { writeIsAddingData(req.body) - const location = req.body.location.split('/')[2] + const location = req.body.location.includes('/') + ? req.body.location.split('/')[2] + : req.body.location try { let maxIndex = -1 @@ -603,17 +601,27 @@ function setup(data: SubmoduleData): Submodule { res.json({ success: resultArray.length > 0, - newQuestions: resultArray, + newQuestions: resultArray, // FIXME: this is useless? totalNewQuestions: totalNewQuestions, }) if (totalNewQuestions > 0) { resultArray.forEach((result) => { - msgAllWorker({ - // TODO: recognize base64 image - type: 'newQuestions', - data: result, - }) + if (result.newQuestions.length > 0) { + msgAllWorker({ + type: 'newQuestions', + data: result, + }) + if (req.body.fromPeer) return + queueWork({ + type: 'sendQuestionsToPeers', + data: { + newQuestions: result.newQuestions, + subj: result.subjName, + location: location, // TODO: location undefined? + }, + }) + } }) } }) diff --git a/src/tests/possibleAnswerPenalty.test.ts b/src/tests/possibleAnswerPenalty.test.ts index 0df02b9..fb06dec 100644 --- a/src/tests/possibleAnswerPenalty.test.ts +++ b/src/tests/possibleAnswerPenalty.test.ts @@ -1,4 +1,4 @@ -import { setNoPossibleAnswersPenalties } from '../utils/classes' +import { setNoPossibleAnswersPenalties } from '../worker/handlers/handleSearch' import { Question } from '../types/basicTypes' import { noPossibleAnswerMatchPenalty, diff --git a/src/types/basicTypes.ts b/src/types/basicTypes.ts index 85452f3..d82ddbe 100644 --- a/src/types/basicTypes.ts +++ b/src/types/basicTypes.ts @@ -175,6 +175,7 @@ export interface PeerInfo { publicKey: string contact: string pw?: string + sessionCookie?: string lastSync?: number note?: string http?: boolean diff --git a/src/types/typeSchemas.ts b/src/types/typeSchemas.ts index b17e240..191fa62 100644 --- a/src/types/typeSchemas.ts +++ b/src/types/typeSchemas.ts @@ -61,6 +61,7 @@ export const PeerInfoSchema: Schema = { ...PeerInfoSchemaBase.properties, publicKey: { type: 'string' }, pw: { type: 'string' }, + sessionCookie: { type: 'string' }, }, required: ['name', 'host', 'port', 'contact', 'pw'], } diff --git a/src/utils/actions.ts b/src/utils/actions.ts index 4890641..fd6a535 100755 --- a/src/utils/actions.ts +++ b/src/utils/actions.ts @@ -22,8 +22,6 @@ const recDataFile = './stats/recdata' const dataLockFile = './data/lockData' import logger from '../utils/logger' -import { WorkerResult } from '../utils/classes' -import { doALongTask, msgAllWorker } from './workerPool' import idStats from '../utils/ids' import utils from '../utils/utils' import { @@ -42,6 +40,8 @@ import { DataFile, } from '../types/basicTypes' import { countOfQdbs } from './qdbUtils' +import { WorkerResult } from '../worker/worker' +import { queueWork, msgAllWorker } from '../worker/workerPool' // if a recievend question doesnt match at least this % to any other question in the db it gets // added to db @@ -56,6 +56,7 @@ export interface RecievedData { id: string version: string location: string + fromPeer?: boolean } export interface Result { @@ -183,8 +184,8 @@ function processIncomingRequestUsingDb( recievedQuestions.push(currentQuestion) // This here searches only in relevant subjects, and not all subjects questionSearchPromises.push( - doALongTask({ - type: 'work', + queueWork({ + type: 'search', data: { searchIn: [qdb.index], question: currentQuestion, @@ -201,7 +202,7 @@ function processIncomingRequestUsingDb( Promise.all(questionSearchPromises) .then((results: Array) => { - const allQuestions: Question[] = [] // all new questions here that do not have result + const newQuestions: Question[] = [] // all new questions here that do not have result results.forEach((result: WorkerResult, i) => { const add = ( result.result as SearchResultQuestion[] @@ -209,7 +210,7 @@ function processIncomingRequestUsingDb( return res.match < minMatchAmmountToAdd }) if (add && !result.error) { - allQuestions.push(recievedQuestions[i]) + newQuestions.push(recievedQuestions[i]) } }) @@ -223,8 +224,8 @@ function processIncomingRequestUsingDb( logger.GetColor('redbg') ) } - if (allQuestions.length > 0 && subjName) { - addQuestionsToDb(allQuestions, subjName, qdb) + if (newQuestions.length > 0 && subjName) { + addQuestionsToDb(newQuestions, subjName, qdb) currWrites++ logger.DebugLog( @@ -246,12 +247,12 @@ function processIncomingRequestUsingDb( idStats.LogId( user.id, recievedData.subj, - allQuestions.length, + newQuestions.length, allQLength ) logger.DebugLog('New Questions:', 'isadding', 2) - logger.DebugLog(allQuestions, 'isadding', 2) + logger.DebugLog(newQuestions, 'isadding', 2) logger.DebugLog( 'ProcessIncomingRequest done', @@ -259,7 +260,7 @@ function processIncomingRequestUsingDb( 1 ) resolve({ - newQuestions: allQuestions, + newQuestions: newQuestions, subjName: recievedData.subj, qdbIndex: qdb.index, qdbName: qdb.name, @@ -343,7 +344,7 @@ function runCleanWorker( // }${logger.C('')}")` // ) // pass recieved questions to a worker - doALongTask({ + queueWork({ type: 'dbClean', data: { questions: recievedQuesitons, diff --git a/src/utils/classes.ts b/src/utils/classes.ts deleted file mode 100755 index 7c04232..0000000 --- a/src/utils/classes.ts +++ /dev/null @@ -1,539 +0,0 @@ -/* ---------------------------------------------------------------------------- - - Question Server - GitLab: - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . - - ------------------------------------------------------------------------- */ -// FIXME: this should be renamed to worker.ts or something - -import { isMainThread, parentPort, workerData } from 'worker_threads' - -import { recognizeTextFromBase64, tesseractLoaded } from './tesseract' -import logger from './logger' -import { - Question, - QuestionData, - QuestionDb, - Subject, -} from '../types/basicTypes' -import { editDb, Edits, updateQuestionsInArray } from './actions' -import { - cleanDb, - countOfQdbs, - createQuestion, - getSubjectDifference, - getSubjNameWithoutYear, - minMatchToNotSearchOtherSubjects, - noPossibleAnswerMatchPenalty, - prepareQuestion, - SearchResultQuestion, - searchSubject, -} from './qdbUtils' -// import { TaskObject } from './workerPool' - -export interface WorkerResult { - msg: string - workerIndex: number - result?: SearchResultQuestion[] | number[][] - error?: boolean -} - -// --------------------------------------------------------------------------------------------------------- -// String Utils -// --------------------------------------------------------------------------------------------------------- - -// Exported -// --------------------------------------------------------------------------------------------------------- - -// Not exported -// --------------------------------------------------------------------------------------------------------- - -// --------------------------------------------------------------------------------------------------------- -// Question -// --------------------------------------------------------------------------------------------------------- - -async function recognizeQuestionImage(question: Question): Promise { - const base64Data = question.data.base64 - if (Array.isArray(base64Data) && base64Data.length) { - const res: string[] = [] - for (let i = 0; i < base64Data.length; i++) { - const base64 = base64Data[i] - const text = await recognizeTextFromBase64(base64) - if (text && text.trim()) { - res.push(text) - } - } - - if (res.length) { - return { - ...question, - Q: res.join(' '), - data: { - ...question.data, - type: 'simple', - }, - } - } - } - - return question -} - -// --------------------------------------------------------------------------------------------------------- -// Subject -// --------------------------------------------------------------------------------------------------------- - -// --------------------------------------------------------------------------------------------------------- -// QuestionDB -// --------------------------------------------------------------------------------------------------------- - -function doSearch( - data: Array, - subjName: string, - question: Question, - searchTillMatchPercent?: number, - searchInAllIfNoResult?: Boolean -): SearchResultQuestion[] { - let result: SearchResultQuestion[] = [] - - const questionToSearch = prepareQuestion(question) - - data.every((subj) => { - if ( - subjName - .toLowerCase() - .includes(getSubjNameWithoutYear(subj.Name).toLowerCase()) - ) { - logger.DebugLog(`Searching in ${subj.Name} `, 'searchworker', 2) - const subjRes = searchSubject( - subj, - questionToSearch, - subjName, - searchTillMatchPercent - ) - result = result.concat(subjRes) - if (searchTillMatchPercent) { - return !subjRes.some((sr) => { - return sr.match >= searchTillMatchPercent - }) - } - return true - } - return true - }) - - if (searchInAllIfNoResult) { - // FIXME: dont research subject searched above - if ( - result.length === 0 || - result[0].match < minMatchToNotSearchOtherSubjects - ) { - logger.DebugLog( - 'Reqults length is zero when comparing names, trying all subjects', - 'searchworker', - 1 - ) - data.every((subj) => { - const subjRes = searchSubject( - subj, - questionToSearch, - subjName, - searchTillMatchPercent - ) - result = result.concat(subjRes) - - if (searchTillMatchPercent) { - const continueSearching = !subjRes.some((sr) => { - return sr.match >= searchTillMatchPercent - }) - return continueSearching - } - return true - }) - } - } - - result = setNoPossibleAnswersPenalties( - questionToSearch.data.possibleAnswers, - result - ) - - result = result.sort((q1, q2) => { - if (q1.match < q2.match) { - return 1 - } else if (q1.match > q2.match) { - return -1 - } else { - return 0 - } - }) - - return result -} - -function setNoPossibleAnswersPenalties( - questionPossibleAnswers: QuestionData['possibleAnswers'], - results: SearchResultQuestion[] -): SearchResultQuestion[] { - if (!Array.isArray(questionPossibleAnswers)) { - return results - } - const noneHasPossibleAnswers = results.every((x) => { - return !Array.isArray(x.q.data.possibleAnswers) - }) - if (noneHasPossibleAnswers) return results - - let possibleAnswerMatch = false - const updated = results.map((result) => { - const matchCount = Array.isArray(result.q.data.possibleAnswers) - ? result.q.data.possibleAnswers.filter((resultPossibleAnswer) => { - return questionPossibleAnswers.some( - (questionPossibleAnswer) => { - if ( - questionPossibleAnswer.val && - resultPossibleAnswer.val - ) { - return questionPossibleAnswer.val.includes( - resultPossibleAnswer.val - ) - } else { - return false - } - } - ) - }).length - : 0 - - if (matchCount === questionPossibleAnswers.length) { - possibleAnswerMatch = true - return result - } else { - return { - ...result, - match: result.match - noPossibleAnswerMatchPenalty, - detailedMatch: { - ...result.detailedMatch, - qMatch: - result.detailedMatch.qMatch - - noPossibleAnswerMatchPenalty, - }, - } - } - }) - - if (possibleAnswerMatch) { - return updated - } else { - return results - } -} - -// --------------------------------------------------------------------------------------------------------- -// Multi threaded stuff -// --------------------------------------------------------------------------------------------------------- - -interface WorkData { - subjName: string - question: Question - searchTillMatchPercent: number - searchInAllIfNoResult: boolean - searchIn: number[] - index: number -} - -if (!isMainThread) { - handleWorkerData() -} - -function handleWorkerData() { - const { - workerIndex, - initData, - }: { workerIndex: number; initData: Array } = workerData - let qdbs: Array = initData - - const qdbCount = initData.length - const { subjCount, questionCount } = countOfQdbs(initData) - - logger.Log( - `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty! qdbs: ${qdbCount}, subjects: ${subjCount.toLocaleString()}, questions: ${questionCount.toLocaleString()}` - ) - - parentPort.on('message', async (msg /*: TaskObject */) => { - try { - await tesseractLoaded - - if (msg.type === 'work') { - const { - subjName, - question: originalQuestion, - searchTillMatchPercent, - searchInAllIfNoResult, - searchIn, - index, - }: WorkData = msg.data - - let searchResult: SearchResultQuestion[] = [] - let error = false - - const question = await recognizeQuestionImage(originalQuestion) - - try { - qdbs.forEach((qdb) => { - if (searchIn.includes(qdb.index)) { - const res = doSearch( - qdb.data, - subjName, - question, - searchTillMatchPercent, - searchInAllIfNoResult - ) - searchResult = [ - ...searchResult, - ...res.map((x) => { - return { - ...x, - detailedMatch: { - ...x.detailedMatch, - qdb: qdb.name, - }, - } - }), - ] - } - }) - } catch (err) { - logger.Log( - 'Error in worker thread!', - logger.GetColor('redbg') - ) - console.error(err) - console.error( - JSON.stringify( - { - subjName: subjName, - question: question, - searchTillMatchPercent: searchTillMatchPercent, - searchInAllIfNoResult: searchInAllIfNoResult, - searchIn: searchIn, - index: index, - }, - null, - 2 - ) - ) - error = true - } - - // sorting - const sortedResult: SearchResultQuestion[] = searchResult.sort( - (q1, q2) => { - if (q1.match < q2.match) { - return 1 - } else if (q1.match > q2.match) { - return -1 - } else { - return 0 - } - } - ) - - const workerResult: WorkerResult = { - msg: `From thread #${workerIndex}: job ${ - !isNaN(index) ? `#${index}` : '' - }done`, - workerIndex: workerIndex, - result: sortedResult, - error: error, - } - - // ONDONE: - parentPort.postMessage(workerResult) - - // console.log( - // `[THREAD #${workerIndex}]: Work ${ - // !isNaN(index) ? `#${index}` : '' - // }done!` - // ) - } else if (msg.type === 'merge') { - const { - localQdbIndex, - remoteQdb, - }: { localQdbIndex: number; remoteQdb: QuestionDb } = msg.data - const localQdb = qdbs.find((qdb) => qdb.index === localQdbIndex) - - const { newData, newSubjects } = getSubjectDifference( - localQdb.data, - remoteQdb.data - ) - - parentPort.postMessage({ - msg: `From thread #${workerIndex}: merge done`, - workerIndex: workerIndex, - newData: newData, - newSubjects: newSubjects, - localQdbIndex: localQdbIndex, - }) - } else if (msg.type === 'dbEdit') { - const { dbIndex, edits }: { dbIndex: number; edits: Edits } = - msg.data - const { resultDb } = editDb(qdbs[dbIndex], edits) - qdbs[dbIndex] = resultDb - logger.DebugLog( - `Worker db edit ${workerIndex}`, - 'worker update', - 1 - ) - - parentPort.postMessage({ - msg: `From thread #${workerIndex}: db edit`, - workerIndex: workerIndex, - }) - } else if (msg.type === 'newQuestions') { - const { - subjName, - qdbIndex, - newQuestions, - }: { - subjName: string - qdbIndex: number - newQuestions: Question[] - } = msg.data - - const newQuestionsWithCache = newQuestions.map((question) => { - if (!question.cache) { - return createQuestion(question) - } else { - return question - } - }) - - let added = false - qdbs = qdbs.map((qdb) => { - if (qdb.index === qdbIndex) { - return { - ...qdb, - data: qdb.data.map((subj) => { - if (subj.Name === subjName) { - added = true - return { - Name: subj.Name, - Questions: [ - ...subj.Questions, - ...newQuestionsWithCache, - ], - } - } else { - return subj - } - }), - } - } else { - return qdb - } - }) - - if (!added) { - qdbs = qdbs.map((qdb) => { - if (qdb.index === qdbIndex) { - return { - ...qdb, - data: [ - ...qdb.data, - { - Name: subjName, - Questions: [...newQuestionsWithCache], - }, - ], - } - } else { - return qdb - } - }) - } - logger.DebugLog( - `Worker new question ${workerIndex}`, - 'worker update', - 1 - ) - - parentPort.postMessage({ - msg: `From thread #${workerIndex}: update done`, - workerIndex: workerIndex, - }) - - // console.log(`[THREAD #${workerIndex}]: update`) - } else if (msg.type === 'newdb') { - const { data }: { data: QuestionDb } = msg - qdbs.push(data) - - parentPort.postMessage({ - msg: `From thread #${workerIndex}: new db add done`, - workerIndex: workerIndex, - }) - // console.log(`[THREAD #${workerIndex}]: newdb`) - } else if (msg.type === 'dbClean') { - const removedIndexes = cleanDb(msg.data, qdbs) - - const workerResult: WorkerResult = { - msg: `From thread #${workerIndex}: db clean done`, - workerIndex: workerIndex, - result: removedIndexes, - } - - parentPort.postMessage(workerResult) - } else if (msg.type === 'rmQuestions') { - const { - questionIndexesToRemove, - subjIndex, - qdbIndex, - recievedQuestions, - } = msg.data - - qdbs[qdbIndex].data[subjIndex].Questions = - updateQuestionsInArray( - questionIndexesToRemove, - qdbs[qdbIndex].data[subjIndex].Questions, - recievedQuestions - ) - - parentPort.postMessage({ - msg: `From thread #${workerIndex}: rm question done`, - workerIndex: workerIndex, - }) - } else { - logger.Log(`Invalid msg type!`, logger.GetColor('redbg')) - console.error(msg) - - parentPort.postMessage({ - msg: `From thread #${workerIndex}: Invalid message type (${msg.type})!`, - workerIndex: workerIndex, - }) - } - } catch (e) { - console.error(e) - parentPort.postMessage({ - msg: `From thread #${workerIndex}: unhandled error occured!`, - workerIndex: workerIndex, - e: e, - }) - } - }) -} - -// ------------------------------------------------------------------------ - -export { doSearch, setNoPossibleAnswersPenalties } diff --git a/src/utils/networkUtils.ts b/src/utils/networkUtils.ts index f21e28e..9fa2a82 100644 --- a/src/utils/networkUtils.ts +++ b/src/utils/networkUtils.ts @@ -23,7 +23,6 @@ export function get( try { resolve({ data: JSON.parse(body) }) } catch (e) { - console.log(body) resolve({ error: e, options: options }) } }) @@ -46,6 +45,7 @@ interface PostParams { port: number bodyObject: any http?: boolean + cookies?: string } // https://nodejs.org/api/http.html#httprequesturl-options-callback @@ -55,6 +55,7 @@ export function post({ port, bodyObject, http, + cookies, }: PostParams): Promise> { const provider = http ? httpRequest : httpsRequest const body = JSON.stringify(bodyObject) @@ -69,6 +70,11 @@ export function post({ headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body), + ...(cookies + ? { + cookie: cookies, + } + : {}), }, }, (res) => { @@ -85,7 +91,6 @@ export function post({ cookies: res.headers['set-cookie'], }) } catch (e) { - console.log(body) resolve({ error: e }) } }) @@ -100,3 +105,20 @@ export function post({ req.end() }) } + +export function parseCookies(responseCookies: string[]): { + [key: string]: string +} { + const cookiesArray = responseCookies.join('; ').split('; ') + const parsedCookies: { [key: string]: string } = cookiesArray.reduce( + (acc, cookieString) => { + const [key, val] = cookieString.split('=') + return { + ...acc, + [key]: val || true, + } + }, + {} + ) + return parsedCookies +} diff --git a/src/utils/p2putils.ts b/src/utils/p2putils.ts new file mode 100644 index 0000000..46216ab --- /dev/null +++ b/src/utils/p2putils.ts @@ -0,0 +1,53 @@ +import { PeerInfo } from '../types/basicTypes' +import { paths } from './files' +import { parseCookies, post } from './networkUtils' +import utils from './utils' + +export function peerToString(peer: { + host: string + port: string | number +}): string { + return `${peer.host}:${peer.port}` +} + +export function isPeerSameAs(peer1: PeerInfo, peer2: PeerInfo): boolean { + return peer1.host === peer2.host && peer1.port === peer2.port +} + +export function updatePeersFile( + peers: PeerInfo[], + updatedPeer: PeerInfo +): void { + const updatedPeers = peers.map((x) => { + if (isPeerSameAs(updatedPeer, x)) { + return { + ...x, + ...updatedPeer, + } + } else { + return x + } + }) + + utils.WriteFile(JSON.stringify(updatedPeers, null, 2), paths.peersFile) +} + +export async function loginToPeer(peer: PeerInfo): Promise { + const { data, error, cookies } = await post<{ + result: string + msg: string + }>({ + hostname: peer.host, + path: '/api/login', + port: peer.port, + bodyObject: { pw: peer.pw }, + http: peer.http, + }) + + if (error || !data || data.result !== 'success') { + return data ? new Error(data.msg) : error + } + + const parsedCookies = parseCookies(cookies) + return parsedCookies.sessionID +} diff --git a/src/worker/handlers/handleDbClean.ts b/src/worker/handlers/handleDbClean.ts new file mode 100644 index 0000000..ece4e81 --- /dev/null +++ b/src/worker/handlers/handleDbClean.ts @@ -0,0 +1,30 @@ +import { parentPort } from 'node:worker_threads' +import { cleanDb } from '../../utils/qdbUtils' +import { Question, QuestionDb } from '../../types/basicTypes' +import { WorkerResult } from '../worker' + +export type DbCleanTaskObject = { + type: 'dbClean' + data: { + questions: Question[] + subjToClean: string + overwriteBeforeDate: number + qdbIndex: number + } +} + +export const handleDbClean = async ( + qdbs: QuestionDb[], + msg: DbCleanTaskObject, + workerIndex: number +): Promise => { + const removedIndexes = cleanDb(msg.data, qdbs) + + const workerResult: WorkerResult = { + msg: `From thread #${workerIndex}: db clean done`, + workerIndex: workerIndex, + result: removedIndexes, + } + + parentPort.postMessage(workerResult) +} diff --git a/src/worker/handlers/handleDbEdit.ts b/src/worker/handlers/handleDbEdit.ts new file mode 100644 index 0000000..45b3693 --- /dev/null +++ b/src/worker/handlers/handleDbEdit.ts @@ -0,0 +1,34 @@ +import { parentPort } from 'node:worker_threads' +import { QuestionDb } from '../../types/basicTypes' +import { Edits, editDb } from '../../utils/actions' +import logger from '../../utils/logger' + +export type DbEditTaskObject = { + type: 'dbEdit' + data: { dbIndex: number; edits: Edits } +} + +export const handleDbEdit = async ( + qdbs: QuestionDb[], + msg: DbEditTaskObject, + workerIndex: number, + setQdbs: (newVal: Array) => void +): Promise => { + const { dbIndex, edits }: { dbIndex: number; edits: Edits } = msg.data + const { resultDb } = editDb(qdbs[dbIndex], edits) + setQdbs( + qdbs.map((qdb, i) => { + if (i === dbIndex) { + return resultDb + } else { + return qdb + } + }) + ) + logger.DebugLog(`Worker db edit ${workerIndex}`, 'worker update', 1) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: db edit`, + workerIndex: workerIndex, + }) +} diff --git a/src/worker/handlers/handleMerge.ts b/src/worker/handlers/handleMerge.ts new file mode 100644 index 0000000..17f1140 --- /dev/null +++ b/src/worker/handlers/handleMerge.ts @@ -0,0 +1,36 @@ +import { parentPort } from 'node:worker_threads' +import { QuestionDb } from '../../types/basicTypes' +import { getSubjectDifference } from '../../utils/qdbUtils' + +export type MergeTaskObject = { + type: 'merge' + data: { + localQdbIndex: number + remoteQdb: QuestionDb + } +} + +export const handleMerge = async ( + qdbs: QuestionDb[], + msg: MergeTaskObject, + workerIndex: number +): Promise => { + const { + localQdbIndex, + remoteQdb, + }: { localQdbIndex: number; remoteQdb: QuestionDb } = msg.data + const localQdb = qdbs.find((qdb) => qdb.index === localQdbIndex) + + const { newData, newSubjects } = getSubjectDifference( + localQdb.data, + remoteQdb.data + ) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: merge done`, + workerIndex: workerIndex, + newData: newData, + newSubjects: newSubjects, + localQdbIndex: localQdbIndex, + }) +} diff --git a/src/worker/handlers/handleNewDb.ts b/src/worker/handlers/handleNewDb.ts new file mode 100644 index 0000000..0984fc6 --- /dev/null +++ b/src/worker/handlers/handleNewDb.ts @@ -0,0 +1,22 @@ +import { parentPort } from 'node:worker_threads' +import { QuestionDb } from '../../types/basicTypes' + +export type NewDbTaskObject = { + type: 'newdb' + data: QuestionDb +} + +export const handleNewDb = async ( + qdbs: QuestionDb[], + msg: NewDbTaskObject, + workerIndex: number, + setQdbs: (newVal: Array) => void +): Promise => { + const { data }: { data: QuestionDb } = msg + setQdbs([...qdbs, data]) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: new db add done`, + workerIndex: workerIndex, + }) +} diff --git a/src/worker/handlers/handleNewQuestion.ts b/src/worker/handlers/handleNewQuestion.ts new file mode 100644 index 0000000..758dfd8 --- /dev/null +++ b/src/worker/handlers/handleNewQuestion.ts @@ -0,0 +1,81 @@ +import { parentPort } from 'node:worker_threads' +import { QuestionDb } from '../../types/basicTypes' +import logger from '../../utils/logger' +import { createQuestion } from '../../utils/qdbUtils' +import { Result } from '../../utils/actions' + +export type NewQuestionTaskObject = { + type: 'newQuestions' + data: Omit +} + +export const handleNewQuestions = async ( + qdbs: QuestionDb[], + msg: NewQuestionTaskObject, + workerIndex: number, + setQdbs: (newVal: Array) => void +): Promise => { + const { subjName, qdbIndex, newQuestions } = msg.data + + const newQuestionsWithCache = newQuestions.map((question) => { + if (!question.cache) { + return createQuestion(question) + } else { + return question + } + }) + + let added = false + setQdbs( + qdbs.map((qdb) => { + if (qdb.index === qdbIndex) { + return { + ...qdb, + data: qdb.data.map((subj) => { + if (subj.Name === subjName) { + added = true + return { + Name: subj.Name, + Questions: [ + ...subj.Questions, + ...newQuestionsWithCache, + ], + } + } else { + return subj + } + }), + } + } else { + return qdb + } + }) + ) + + if (!added) { + setQdbs( + qdbs.map((qdb) => { + if (qdb.index === qdbIndex) { + return { + ...qdb, + data: [ + ...qdb.data, + { + Name: subjName, + Questions: [...newQuestionsWithCache], + }, + ], + } + } else { + return qdb + } + }) + ) + } + logger.DebugLog(`Worker new question ${workerIndex}`, 'worker update', 1) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: update done`, + workerIndex: workerIndex, + }) +} diff --git a/src/worker/handlers/handleQuestionsToPeers.ts b/src/worker/handlers/handleQuestionsToPeers.ts new file mode 100644 index 0000000..057161a --- /dev/null +++ b/src/worker/handlers/handleQuestionsToPeers.ts @@ -0,0 +1,153 @@ +import { parentPort } from 'node:worker_threads' +import { PeerInfo, Question, QuestionDb } from '../../types/basicTypes' +import { files, paths, readAndValidateFile } from '../../utils/files' +import utils from '../../utils/utils' +import { RecievedData } from '../../utils/actions' +import { removeCacheFromQuestion } from '../../utils/qdbUtils' +import { QuestionAddResponse } from '../../modules/api/submodules/qminingapi' +import logger from '../../utils/logger' +import { + loginToPeer, + peerToString, + updatePeersFile, +} from '../../utils/p2putils' +import { post } from '../../utils/networkUtils' + +const login = async (peer: PeerInfo): Promise => { + const loginResult = await loginToPeer(peer) + if (typeof loginResult === 'string') { + return loginResult + } else { + return null + } +} + +export type QuestionsToPeersTaskObject = { + type: 'sendQuestionsToPeers' + data: { + newQuestions: Question[] + location: string + subj: string + } +} + +export const handleQuestionsToPeers = async ( + _qdbs: QuestionDb[], + msg: QuestionsToPeersTaskObject, + workerIndex: number +): Promise => { + const { newQuestions, location, subj } = msg.data + + const domain = utils.ReadFile(paths.domainFile).trim() + const peers = readAndValidateFile(files.peersFile) + + if (!peers || peers.length === 0 || newQuestions.length === 0) { + parentPort.postMessage({ + msg: `From thread #${workerIndex}: sendQuestionsToPeers done`, + workerIndex: workerIndex, + }) + return + } + + const dataToSend: RecievedData = { + fromPeer: true, + subj: subj, + location: location, + id: domain, // client ID + version: 'P2P', + quiz: newQuestions.map((question) => { + return removeCacheFromQuestion({ + ...question, + data: { + ...question.data, + source: domain, + }, + }) + }), + } + + const results: { + errors: PeerInfo[] + hasNew: PeerInfo[] + sent: PeerInfo[] + loginErrors: PeerInfo[] + } = { + errors: [], + hasNew: [], + sent: [], + loginErrors: [], + } + + const postData = (peer: PeerInfo, sessionCookie: string) => { + return post({ + hostname: peer.host, + port: peer.port, + http: peer.http, + path: '/api/isAdding', + bodyObject: dataToSend, + cookies: `sessionID=${sessionCookie}`, + }) + } + + for (const peer of peers) { + let sessionCookie = peer.sessionCookie + + if (!sessionCookie) { + sessionCookie = await login(peer) + if (!sessionCookie) { + results.loginErrors.push(peer) + continue + } + updatePeersFile(peers, { ...peer, sessionCookie: sessionCookie }) + } + + let res = await postData(peer, sessionCookie) + + if (res.data?.result === 'nouser' && sessionCookie) { + sessionCookie = await login(peer) + if (!sessionCookie) { + results.loginErrors.push(peer) + continue + } + updatePeersFile(peers, { ...peer, sessionCookie: sessionCookie }) + res = await postData(peer, sessionCookie) + } + + if (res.error || !res.data?.success) { + results.errors.push(peer) + } else if (res.data?.totalNewQuestions > 0) { + results.hasNew.push(peer) + } else { + results.sent.push(peer) + } + } + + const logMsg: string[] = [] + const addToLogMsg = ( + peerResult: PeerInfo[], + prefix: string, + color: string + ) => { + if (peerResult.length > 0) { + logMsg.push( + `${logger.C(color)}${prefix}:${logger.C()} ` + + peerResult.map((x) => peerToString(x)).join(', ') + ) + } + } + addToLogMsg(results.loginErrors, 'Login error', 'red') + addToLogMsg(results.errors, 'Error', 'red') + addToLogMsg(results.hasNew, 'Had new questions', 'blue') + addToLogMsg(results.sent, 'Sent', 'green') + + logger.Log( + `\t${logger.C( + 'green' + )}Sent new questions to peers${logger.C()}; ${logMsg.join(', ')}` + ) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: sendQuestionsToPeers done`, + workerIndex: workerIndex, + }) +} diff --git a/src/worker/handlers/handleRmQuestions.ts b/src/worker/handlers/handleRmQuestions.ts new file mode 100644 index 0000000..03ebd81 --- /dev/null +++ b/src/worker/handlers/handleRmQuestions.ts @@ -0,0 +1,53 @@ +import { parentPort } from 'node:worker_threads' +import { Question, QuestionDb } from '../../types/basicTypes' +import { updateQuestionsInArray } from '../../utils/actions' + +export type RmQuestionsTaskObject = { + type: 'rmQuestions' + data: { + questionIndexesToRemove: number[][] + subjIndex: number + qdbIndex: number + recievedQuestions: Question[] + } +} + +export const handleRmQuestions = async ( + qdbs: QuestionDb[], + msg: RmQuestionsTaskObject, + workerIndex: number, + setQdbs: (newVal: QuestionDb[]) => void +): Promise => { + const { questionIndexesToRemove, subjIndex, qdbIndex, recievedQuestions } = + msg.data + + const newQdbs = qdbs.map((qdb, i) => { + if (i === qdbIndex) { + return { + ...qdb, + data: qdb.data.map((subj, j) => { + if (j === subjIndex) { + return { + ...subj, + Questions: updateQuestionsInArray( + questionIndexesToRemove, + qdbs[qdbIndex].data[subjIndex].Questions, + recievedQuestions + ), + } + } else { + return subj + } + }), + } + } else { + return qdb + } + }) + setQdbs(newQdbs) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: rm question done`, + workerIndex: workerIndex, + }) +} diff --git a/src/worker/handlers/handleSearch.ts b/src/worker/handlers/handleSearch.ts new file mode 100644 index 0000000..7449601 --- /dev/null +++ b/src/worker/handlers/handleSearch.ts @@ -0,0 +1,286 @@ +import { parentPort } from 'worker_threads' +import { + Question, + QuestionData, + QuestionDb, + Subject, +} from '../../types/basicTypes' +import logger from '../../utils/logger' +import { + SearchResultQuestion, + getSubjNameWithoutYear, + minMatchToNotSearchOtherSubjects, + noPossibleAnswerMatchPenalty, + prepareQuestion, + searchSubject, +} from '../../utils/qdbUtils' +import { recognizeTextFromBase64 } from '../../utils/tesseract' +import { WorkerResult } from '../worker' + +export type SearchTaskObject = { + type: 'search' + data: { + searchIn: number[] + question: Question + subjName: string + testUrl?: string + questionData?: QuestionData + searchInAllIfNoResult?: boolean + searchTillMatchPercent?: number + [key: string]: any + } +} + +export function doSearch( + data: Array, + subjName: string, + question: Question, + searchTillMatchPercent?: number, + searchInAllIfNoResult?: Boolean +): SearchResultQuestion[] { + let result: SearchResultQuestion[] = [] + + const questionToSearch = prepareQuestion(question) + + data.every((subj) => { + if ( + subjName + .toLowerCase() + .includes(getSubjNameWithoutYear(subj.Name).toLowerCase()) + ) { + logger.DebugLog(`Searching in ${subj.Name} `, 'searchworker', 2) + const subjRes = searchSubject( + subj, + questionToSearch, + subjName, + searchTillMatchPercent + ) + result = result.concat(subjRes) + if (searchTillMatchPercent) { + return !subjRes.some((sr) => { + return sr.match >= searchTillMatchPercent + }) + } + return true + } + return true + }) + + if (searchInAllIfNoResult) { + // FIXME: dont research subject searched above + if ( + result.length === 0 || + result[0].match < minMatchToNotSearchOtherSubjects + ) { + logger.DebugLog( + 'Reqults length is zero when comparing names, trying all subjects', + 'searchworker', + 1 + ) + data.every((subj) => { + const subjRes = searchSubject( + subj, + questionToSearch, + subjName, + searchTillMatchPercent + ) + result = result.concat(subjRes) + + if (searchTillMatchPercent) { + const continueSearching = !subjRes.some((sr) => { + return sr.match >= searchTillMatchPercent + }) + return continueSearching + } + return true + }) + } + } + + result = setNoPossibleAnswersPenalties( + questionToSearch.data.possibleAnswers, + result + ) + + result = result.sort((q1, q2) => { + if (q1.match < q2.match) { + return 1 + } else if (q1.match > q2.match) { + return -1 + } else { + return 0 + } + }) + + return result +} + +export function setNoPossibleAnswersPenalties( + questionPossibleAnswers: QuestionData['possibleAnswers'], + results: SearchResultQuestion[] +): SearchResultQuestion[] { + if (!Array.isArray(questionPossibleAnswers)) { + return results + } + const noneHasPossibleAnswers = results.every((x) => { + return !Array.isArray(x.q.data.possibleAnswers) + }) + if (noneHasPossibleAnswers) return results + + let possibleAnswerMatch = false + const updated = results.map((result) => { + const matchCount = Array.isArray(result.q.data.possibleAnswers) + ? result.q.data.possibleAnswers.filter((resultPossibleAnswer) => { + return questionPossibleAnswers.some( + (questionPossibleAnswer) => { + if ( + questionPossibleAnswer.val && + resultPossibleAnswer.val + ) { + return questionPossibleAnswer.val.includes( + resultPossibleAnswer.val + ) + } else { + return false + } + } + ) + }).length + : 0 + + if (matchCount === questionPossibleAnswers.length) { + possibleAnswerMatch = true + return result + } else { + return { + ...result, + match: result.match - noPossibleAnswerMatchPenalty, + detailedMatch: { + ...result.detailedMatch, + qMatch: + result.detailedMatch.qMatch - + noPossibleAnswerMatchPenalty, + }, + } + } + }) + + if (possibleAnswerMatch) { + return updated + } else { + return results + } +} + +async function recognizeQuestionImage(question: Question): Promise { + const base64Data = question.data.base64 + if (Array.isArray(base64Data) && base64Data.length) { + const res: string[] = [] + for (let i = 0; i < base64Data.length; i++) { + const base64 = base64Data[i] + const text = await recognizeTextFromBase64(base64) + if (text && text.trim()) { + res.push(text) + } + } + + if (res.length) { + return { + ...question, + Q: res.join(' '), + data: { + ...question.data, + type: 'simple', + }, + } + } + } + + return question +} + +export const handleSearch = async ( + qdbs: QuestionDb[], + msg: SearchTaskObject, + workerIndex: number +): Promise => { + const { + subjName, + question: originalQuestion, + searchTillMatchPercent, + searchInAllIfNoResult, + searchIn, + index, + } = msg.data + + let searchResult: SearchResultQuestion[] = [] + let error = false + + const question = await recognizeQuestionImage(originalQuestion) + + try { + qdbs.forEach((qdb) => { + if (searchIn.includes(qdb.index)) { + const res = doSearch( + qdb.data, + subjName, + question, + searchTillMatchPercent, + searchInAllIfNoResult + ) + searchResult = [ + ...searchResult, + ...res.map((x) => { + return { + ...x, + detailedMatch: { + ...x.detailedMatch, + qdb: qdb.name, + }, + } + }), + ] + } + }) + } catch (err) { + logger.Log('Error in worker thread!', logger.GetColor('redbg')) + console.error(err) + console.error( + JSON.stringify( + { + subjName: subjName, + question: question, + searchTillMatchPercent: searchTillMatchPercent, + searchInAllIfNoResult: searchInAllIfNoResult, + searchIn: searchIn, + index: index, + }, + null, + 2 + ) + ) + error = true + } + + // sorting + const sortedResult: SearchResultQuestion[] = searchResult.sort((q1, q2) => { + if (q1.match < q2.match) { + return 1 + } else if (q1.match > q2.match) { + return -1 + } else { + return 0 + } + }) + + const workerResult: WorkerResult = { + msg: `From thread #${workerIndex}: job ${ + !isNaN(index) ? `#${index}` : '' + }done`, + workerIndex: workerIndex, + result: sortedResult, + error: error, + } + + parentPort.postMessage(workerResult) +} diff --git a/src/worker/worker.ts b/src/worker/worker.ts new file mode 100644 index 0000000..b3e4181 --- /dev/null +++ b/src/worker/worker.ts @@ -0,0 +1,95 @@ +import { isMainThread, parentPort, workerData } from 'worker_threads' +import { QuestionDb } from '../types/basicTypes' +import { SearchResultQuestion, countOfQdbs } from '../utils/qdbUtils' +import logger from '../utils/logger' +import { TaskObject } from './workerPool' +import { tesseractLoaded } from '../utils/tesseract' +import { handleSearch } from './handlers/handleSearch' +import { handleMerge } from './handlers/handleMerge' +import { handleDbEdit } from './handlers/handleDbEdit' +import { handleNewQuestions } from './handlers/handleNewQuestion' +import { handleNewDb } from './handlers/handleNewDb' +import { handleDbClean } from './handlers/handleDbClean' +import { handleQuestionsToPeers } from './handlers/handleQuestionsToPeers' +import { handleRmQuestions } from './handlers/handleRmQuestions' + +export interface WorkerResult { + msg: string + workerIndex: number + result?: SearchResultQuestion[] | number[][] + error?: boolean +} + +if (!isMainThread) { + handleWorkerData() +} + +async function handleWorkerData() { + const { + workerIndex, + initData, + }: { workerIndex: number; initData: Array } = workerData + let qdbs: Array = initData + const setQdbs = (newVal: Array) => { + qdbs = newVal + } + + const qdbCount = initData.length + const { subjCount, questionCount } = countOfQdbs(initData) + + logger.Log( + `[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty! qdbs: ${qdbCount}, subjects: ${subjCount.toLocaleString()}, questions: ${questionCount.toLocaleString()}` + ) + + parentPort.on('message', async (msg: TaskObject) => { + try { + await tesseractLoaded + + await handleMessage(qdbs, msg, workerIndex, setQdbs) + } catch (e) { + console.error(e) + parentPort.postMessage({ + msg: `From thread #${workerIndex}: unhandled error occured! (${ + (msg as any)?.type + })`, + workerIndex: workerIndex, + e: e, + }) + } + }) +} + +async function handleMessage( + qdbs: QuestionDb[], + msg: TaskObject, + workerIndex: number, + setQdbs: (newVal: QuestionDb[]) => void +) { + if (msg.type === 'search') { + await handleSearch(qdbs, msg, workerIndex) + } else if (msg.type === 'merge') { + await handleMerge(qdbs, msg, workerIndex) + } else if (msg.type === 'dbEdit') { + await handleDbEdit(qdbs, msg, workerIndex, setQdbs) + } else if (msg.type === 'newQuestions') { + await handleNewQuestions(qdbs, msg, workerIndex, setQdbs) + } else if (msg.type === 'newdb') { + await handleNewDb(qdbs, msg, workerIndex, setQdbs) + } else if (msg.type === 'dbClean') { + await handleDbClean(qdbs, msg, workerIndex) + } else if (msg.type === 'rmQuestions') { + await handleRmQuestions(qdbs, msg, workerIndex, setQdbs) + } else if (msg.type === 'sendQuestionsToPeers') { + await handleQuestionsToPeers(qdbs, msg, workerIndex) + } else { + logger.Log(`Invalid msg type!`, logger.GetColor('redbg')) + console.error(msg) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: Invalid message type (${ + (msg as any)?.type + })!`, + workerIndex: workerIndex, + }) + } +} diff --git a/src/utils/workerPool.ts b/src/worker/workerPool.ts similarity index 85% rename from src/utils/workerPool.ts rename to src/worker/workerPool.ts index f2e0dea..196c739 100644 --- a/src/utils/workerPool.ts +++ b/src/worker/workerPool.ts @@ -23,10 +23,17 @@ import { v4 as uuidv4 } from 'uuid' import { EventEmitter } from 'events' import os from 'os' -import logger from './logger' -import { Result, Edits } from './actions' -import type { Question, QuestionDb, QuestionData } from '../types/basicTypes' -import type { WorkerResult } from './classes' +import type { QuestionDb } from '../types/basicTypes' +import { SearchTaskObject } from './handlers/handleSearch' +import { DbEditTaskObject } from './handlers/handleDbEdit' +import { NewQuestionTaskObject } from './handlers/handleNewQuestion' +import { NewDbTaskObject } from './handlers/handleNewDb' +import { DbCleanTaskObject } from './handlers/handleDbClean' +import { RmQuestionsTaskObject } from './handlers/handleRmQuestions' +import { MergeTaskObject } from './handlers/handleMerge' +import { QuestionsToPeersTaskObject } from './handlers/handleQuestionsToPeers' +import { WorkerResult } from './worker' +import logger from '../utils/logger' const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length @@ -36,47 +43,15 @@ interface WorkerObj { free: Boolean } -// FIXME: type depending on type -export interface TaskObject { - type: - | 'work' - | 'dbEdit' - | 'newQuestions' - | 'newdb' - | 'dbClean' - | 'rmQuestions' - | 'merge' - data: - | { - searchIn: number[] - question: Question - subjName: string - testUrl?: string - questionData?: QuestionData - searchInAllIfNoResult?: boolean - searchTillMatchPercent?: number - [key: string]: any - } - | { dbIndex: number; edits: Edits } - | QuestionDb - | Omit - | { - questions: Question[] - subjToClean: string - overwriteBeforeDate: number - qdbIndex: number - } - | { - questionIndexesToRemove: number[][] - subjIndex: number - qdbIndex: number - recievedQuestions: Question[] - } - | { - localQdbIndex: number - remoteQdb: QuestionDb - } -} +export type TaskObject = + | SearchTaskObject + | DbEditTaskObject + | NewQuestionTaskObject + | NewDbTaskObject + | DbCleanTaskObject + | RmQuestionsTaskObject + | MergeTaskObject + | QuestionsToPeersTaskObject interface PendingJob { workData: TaskObject @@ -98,7 +73,7 @@ interface DoneEvent extends EventEmitter { export const defaultAlertOnPendingCount = 100 let alertOnPendingCount = defaultAlertOnPendingCount -const workerFile = './src/utils/classes.ts' +const workerFile = './src/worker/worker.ts' let workers: Array let getInitData: () => Array = null const pendingJobs: { @@ -129,7 +104,7 @@ export function msgAllWorker(data: TaskObject): Promise { return new Promise((resolve) => { const promises: Promise[] = [] workers.forEach((worker) => { - promises.push(doALongTask(data, worker.index)) + promises.push(queueWork(data, worker.index)) }) Promise.all(promises).then((res) => { logger.DebugLog('MSGING ALL WORKER DONE', 'job', 1) @@ -144,7 +119,7 @@ export function setPendingJobsAlertCount(newVal?: number): void { alertOnPendingCount = count } -export function doALongTask( +export function queueWork( obj: TaskObject, targetWorkerIndex?: number ): Promise { @@ -335,11 +310,16 @@ const workerTs = ( wkOpts.workerData.__filename = file return new Worker( ` + try { const wk = require('worker_threads'); require('ts-node').register(); let file = wk.workerData.__filename; delete wk.workerData.__filename; require(file); + } catch (e) { + console.error('Error while creating new Worker:') + console.error(e) + } `, wkOpts ) diff --git a/submodules/moodle-test-userscript b/submodules/moodle-test-userscript index 9453aa8..8e192d3 160000 --- a/submodules/moodle-test-userscript +++ b/submodules/moodle-test-userscript @@ -1 +1 @@ -Subproject commit 9453aa8b978f944df1ae28eeaf28120b91965c1f +Subproject commit 8e192d3b23736851423cd469b950a2e97139f357 diff --git a/submodules/qmining-page b/submodules/qmining-page index 6eeb83b..075271c 160000 --- a/submodules/qmining-page +++ b/submodules/qmining-page @@ -1 +1 @@ -Subproject commit 6eeb83b2e7a48aa9743e53c651629fd500c8e6d5 +Subproject commit 075271ca01786c849dd8604934bb2de6b5a0ee82 diff --git a/testingTools/tests/testScripts/postTestData.sh b/testingTools/tests/testScripts/postTestData.sh index 3d13228..9b21224 100755 --- a/testingTools/tests/testScripts/postTestData.sh +++ b/testingTools/tests/testScripts/postTestData.sh @@ -16,5 +16,5 @@ else --cookie "sessionID=e0ac328d-86cc-4dbf-a00b-213bec6011e7" \ -H "Content-Type: application/json" \ -X POST --data "$data" \ - "$url/isAdding" + "$url/api/isAdding" fi diff --git a/tsconfig.json b/tsconfig.json index 57cd03a..c2d85bd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -19,6 +19,9 @@ "lib": ["dom", "ES2020"], "resolveJsonModule": true }, + "ts-node": { + "files": true + }, "files": ["src/server.ts"], "include": ["src/**/*"], "exclude": [