worker file split, sending new questions to peers instantly

This commit is contained in:
mrfry 2023-04-24 20:39:15 +02:00
parent 8c4e184741
commit 252826a081
25 changed files with 1016 additions and 705 deletions

View file

@ -23,7 +23,7 @@ databases the pi might not be enough.
The server utilizes multiple CPU cores, long running operations are ran in separate threads. Because The server utilizes multiple CPU cores, long running operations are ran in separate threads. Because
of the implementation, the more cores a CPU has, the server uses more memory, but able to run more of the implementation, the more cores a CPU has, the server uses more memory, but able to run more
threads, and serve more requests at once. The used cores can be limited with environment variables threads, and serve more requests at once. The used cores can be limited with environment variables
(detailed below). (detailed below, `NS_THREAD_COUNT`).
## Terminology ## Terminology
@ -60,7 +60,7 @@ read them very carefully, you should know about what was created!**
* By default the server redirects all HTTP traffic to HTTPS. To disable this use * By default the server redirects all HTTP traffic to HTTPS. To disable this use
`NS_NO_HTTPS_FORCE` `NS_NO_HTTPS_FORCE`
* The server launches a thread for each CPU core. This could be an overkill on 4+ cored CPU-s. Use * The server launches a thread for each CPU core. This could be an overkill on 4+ cored CPU-s. Use
`NS_THREAD_COUNT` to restrict the number of threads `NS_THREAD_COUNT` to restrict the number of threads, and potentially reduce memory usage of the server.
* The setup script can be also used to update and rebuild all git modules if ran after the initial * The setup script can be also used to update and rebuild all git modules if ran after the initial
setup setup

View file

@ -39,8 +39,8 @@ import {
Submodule, Submodule,
} from '../../types/basicTypes' } from '../../types/basicTypes'
import { loadJSON } from '../../utils/actions' import { loadJSON } from '../../utils/actions'
import { initWorkerPool } from '../../utils/workerPool'
import { paths } from '../../utils/files' import { paths } from '../../utils/files'
import { initWorkerPool } from '../../worker/workerPool'
// other paths // other paths
const moduleName = 'API' const moduleName = 'API'

View file

@ -33,7 +33,6 @@ import {
} from '../../../types/basicTypes' } from '../../../types/basicTypes'
import utils from '../../../utils/utils' import utils from '../../../utils/utils'
import { backupData, writeData } from '../../../utils/actions' import { backupData, writeData } from '../../../utils/actions'
import { WorkerResult } from '../../../utils/classes'
import dbtools from '../../../utils/dbtools' import dbtools from '../../../utils/dbtools'
import { import {
createKeyPair, createKeyPair,
@ -41,11 +40,6 @@ import {
encrypt, encrypt,
isKeypairValid, isKeypairValid,
} from '../../../utils/encryption' } from '../../../utils/encryption'
import {
doALongTask,
msgAllWorker,
setPendingJobsAlertCount,
} from '../../../utils/workerPool'
import { import {
countOfQdb, countOfQdb,
countOfQdbs, countOfQdbs,
@ -54,7 +48,18 @@ import {
removeCacheFromQuestion, removeCacheFromQuestion,
} from '../../../utils/qdbUtils' } from '../../../utils/qdbUtils'
import { files, paths, readAndValidateFile } from '../../../utils/files' import { files, paths, readAndValidateFile } from '../../../utils/files'
import { GetResult, get, post } from '../../../utils/networkUtils' import { GetResult, get } from '../../../utils/networkUtils'
import {
msgAllWorker,
queueWork,
setPendingJobsAlertCount,
} from '../../../worker/workerPool'
import { WorkerResult } from '../../../worker/worker'
import {
loginToPeer,
peerToString,
updatePeersFile,
} from '../../../utils/p2putils'
interface MergeResult { interface MergeResult {
newData: Subject[] newData: Subject[]
@ -87,6 +92,7 @@ interface RemotePeerInfo {
} }
interface SyncDataRes { interface SyncDataRes {
result?: string
questionDbs?: QuestionDb[] questionDbs?: QuestionDb[]
remoteInfo?: RemotePeerInfo remoteInfo?: RemotePeerInfo
encryptedUsers?: string encryptedUsers?: string
@ -120,14 +126,6 @@ function updateThirdPartyPeers(
) )
} }
function peerToString(peer: { host: string; port: string | number }) {
return `${peer.host}:${peer.port}`
}
function isPeerSameAs(peer1: PeerInfo, peer2: PeerInfo) {
return peer1.host === peer2.host && peer1.port === peer2.port
}
export function getNewDataSince(subjects: Subject[], date: number): Subject[] { export function getNewDataSince(subjects: Subject[], date: number): Subject[] {
return subjects return subjects
.map((subject) => { .map((subject) => {
@ -323,40 +321,40 @@ function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) {
} }
async function authAndGetNewData({ async function authAndGetNewData({
peers,
peer, peer,
selfInfo, selfInfo,
lastSyncWithPeer, lastSyncWithPeer,
lastSync, lastSync,
}: { }: {
peers: PeerInfo[]
peer: PeerInfo peer: PeerInfo
selfInfo: PeerInfo selfInfo: PeerInfo
lastSyncWithPeer: number lastSyncWithPeer: number
lastSync: number lastSync: number
}): Promise<GetResult<SyncDataRes & { peer: PeerInfo }>> { }): Promise<GetResult<SyncDataRes & { peer: PeerInfo }>> {
const { data, error, cookies } = await post<{ let sessionCookie = peer.sessionCookie
result: string
msg: string
}>({
hostname: peer.host,
path: '/api/login',
port: peer.port,
bodyObject: { pw: peer.pw },
http: peer.http,
})
if (error || !data || data.result !== 'success') { if (!sessionCookie) {
const loginResult = await loginToPeer(peer)
if (typeof loginResult === 'string') {
sessionCookie = loginResult
updatePeersFile(peers, { ...peer, sessionCookie: loginResult })
} else {
return { return {
error: data ? new Error(data.msg) : error, error: loginResult,
data: { data: {
peer: peer, peer: peer,
}, },
} }
} }
}
const getRes = await get<SyncDataRes>( const getSyncData = () => {
return get<SyncDataRes>(
{ {
headers: { headers: {
cookie: cookies.join(), cookies: `sessionID=${sessionCookie}`,
}, },
host: peer.host, host: peer.host,
port: peer.port, port: peer.port,
@ -366,6 +364,26 @@ async function authAndGetNewData({
}, },
peer.http peer.http
) )
}
let getRes = await getSyncData()
if (getRes.data?.result === 'nouser') {
// FIXME: make this more pretty? (duplicate code, see above)
const loginResult = await loginToPeer(peer)
if (typeof loginResult === 'string') {
sessionCookie = loginResult
updatePeersFile(peers, { ...peer, sessionCookie: loginResult })
} else {
return {
error: loginResult,
data: {
peer: peer,
},
}
}
getRes = await getSyncData()
}
return { ...getRes, data: { ...getRes.data, peer: peer } } return { ...getRes, data: { ...getRes.data, peer: peer } }
} }
@ -385,21 +403,6 @@ function setup(data: SubmoduleData): Submodule {
// SETUP // SETUP
// --------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------
if (!utils.FileExists(paths.peersFile)) {
logger.Log(
`Warning: peers file was missing, so it was created`,
'yellowbg'
)
utils.CreatePath(paths.peersPath)
utils.WriteFile('[]', paths.peersFile)
}
if (!utils.FileExists(paths.selfInfoFile)) {
const msg = `Self info file for p2p does not exist! (${paths.selfInfoFile}) P2P functionality will not be loaded`
logger.Log(msg, 'redbg')
return {}
}
let publicKey: string let publicKey: string
let privateKey: string let privateKey: string
@ -483,7 +486,7 @@ function setup(data: SubmoduleData): Submodule {
selfInfo: { ...selfInfo, publicKey: publicKey }, selfInfo: { ...selfInfo, publicKey: publicKey },
myPeers: peers.map((peer) => { myPeers: peers.map((peer) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars // eslint-disable-next-line @typescript-eslint/no-unused-vars
const { pw, ...restOfPeer } = peer const { pw, sessionCookie, ...restOfPeer } = peer
return restOfPeer return restOfPeer
}), }),
} }
@ -571,7 +574,7 @@ function setup(data: SubmoduleData): Submodule {
rawNewQuestionDbs.push(remoteQdb) rawNewQuestionDbs.push(remoteQdb)
} else { } else {
mergeJobs.push( mergeJobs.push(
doALongTask({ queueWork({
type: 'merge', type: 'merge',
data: { data: {
localQdbIndex: localQdb.index, localQdbIndex: localQdb.index,
@ -630,6 +633,7 @@ function setup(data: SubmoduleData): Submodule {
const lastSyncWithPeer = peer.lastSync || 0 const lastSyncWithPeer = peer.lastSync || 0
return authAndGetNewData({ return authAndGetNewData({
peers: peers,
peer: peer, peer: peer,
selfInfo: selfInfo, selfInfo: selfInfo,
lastSyncWithPeer: lastSyncWithPeer, lastSyncWithPeer: lastSyncWithPeer,
@ -844,21 +848,10 @@ function setup(data: SubmoduleData): Submodule {
newQuestions: newQuestionCount, newQuestions: newQuestionCount,
} }
// Processing result data is successfull // Processing result data is successfull
const updatedPeersFile = peers.map((x) => { updatePeersFile(peers, {
if (isPeerSameAs(peer, x)) { ...peer,
return {
...x,
lastSync: syncStart, lastSync: syncStart,
}
} else {
return x
}
}) })
utils.WriteFile(
JSON.stringify(updatedPeersFile, null, 2),
paths.peersFile
)
} }
// ------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------
@ -1014,7 +1007,6 @@ function setup(data: SubmoduleData): Submodule {
remotePublicKey, remotePublicKey,
JSON.stringify(newUsers) JSON.stringify(newUsers)
) )
// TODO: count sent user count
logger.Log( logger.Log(
`\tSending new users to "${remoteHost}" (encrypted)`, `\tSending new users to "${remoteHost}" (encrypted)`,
'green' 'green'
@ -1050,7 +1042,6 @@ function setup(data: SubmoduleData): Submodule {
? 'all time' ? 'all time'
: new Date(since).toLocaleString() : new Date(since).toLocaleString()
// TODO: count sent data
logger.Log( logger.Log(
`\tSending new data to ${logger.C( `\tSending new data to ${logger.C(
'blue' 'blue'

View file

@ -48,11 +48,6 @@ import {
editDb, editDb,
RecievedData, RecievedData,
} from '../../../utils/actions' } from '../../../utils/actions'
import {
WorkerResult,
// compareQuestionObj,
} from '../../../utils/classes'
import { doALongTask, msgAllWorker } from '../../../utils/workerPool'
import dbtools from '../../../utils/dbtools' import dbtools from '../../../utils/dbtools'
import { import {
dataToString, dataToString,
@ -65,6 +60,8 @@ import {
isJsonValidAndLogError, isJsonValidAndLogError,
TestUsersSchema, TestUsersSchema,
} from '../../../types/typeSchemas' } from '../../../types/typeSchemas'
import { msgAllWorker, queueWork } from '../../../worker/workerPool'
import { WorkerResult } from '../../../worker/worker'
interface SavedQuestionData { interface SavedQuestionData {
fname: string fname: string
@ -74,13 +71,12 @@ interface SavedQuestionData {
date: string | Date date: string | Date
} }
// interface SavedQuestion { export interface QuestionAddResponse {
// Questions: Question[] success: boolean
// subj: string newQuestions: number
// userid: number totalNewQuestions: number
// testUrl: string result?: string
// date: string }
// }
const line = '====================================================' // lol const line = '====================================================' // lol
@ -163,8 +159,8 @@ function searchInDbs(
// searchIn could be [0], [1], ... to search every db in different thread. Put this into a // searchIn could be [0], [1], ... to search every db in different thread. Put this into a
// forEach(qdbs) to achieve this // forEach(qdbs) to achieve this
return new Promise((resolve) => { return new Promise((resolve) => {
doALongTask({ queueWork({
type: 'work', type: 'search',
data: { data: {
searchIn: searchIn, searchIn: searchIn,
testUrl: testUrl, testUrl: testUrl,
@ -553,7 +549,9 @@ function setup(data: SubmoduleData): Submodule {
writeIsAddingData(req.body) writeIsAddingData(req.body)
const location = req.body.location.split('/')[2] const location = req.body.location.includes('/')
? req.body.location.split('/')[2]
: req.body.location
try { try {
let maxIndex = -1 let maxIndex = -1
@ -603,17 +601,27 @@ function setup(data: SubmoduleData): Submodule {
res.json({ res.json({
success: resultArray.length > 0, success: resultArray.length > 0,
newQuestions: resultArray, newQuestions: resultArray, // FIXME: this is useless?
totalNewQuestions: totalNewQuestions, totalNewQuestions: totalNewQuestions,
}) })
if (totalNewQuestions > 0) { if (totalNewQuestions > 0) {
resultArray.forEach((result) => { resultArray.forEach((result) => {
if (result.newQuestions.length > 0) {
msgAllWorker({ msgAllWorker({
// TODO: recognize base64 image
type: 'newQuestions', type: 'newQuestions',
data: result, data: result,
}) })
if (req.body.fromPeer) return
queueWork({
type: 'sendQuestionsToPeers',
data: {
newQuestions: result.newQuestions,
subj: result.subjName,
location: location, // TODO: location undefined?
},
})
}
}) })
} }
}) })

View file

@ -1,4 +1,4 @@
import { setNoPossibleAnswersPenalties } from '../utils/classes' import { setNoPossibleAnswersPenalties } from '../worker/handlers/handleSearch'
import { Question } from '../types/basicTypes' import { Question } from '../types/basicTypes'
import { import {
noPossibleAnswerMatchPenalty, noPossibleAnswerMatchPenalty,

View file

@ -175,6 +175,7 @@ export interface PeerInfo {
publicKey: string publicKey: string
contact: string contact: string
pw?: string pw?: string
sessionCookie?: string
lastSync?: number lastSync?: number
note?: string note?: string
http?: boolean http?: boolean

View file

@ -61,6 +61,7 @@ export const PeerInfoSchema: Schema = {
...PeerInfoSchemaBase.properties, ...PeerInfoSchemaBase.properties,
publicKey: { type: 'string' }, publicKey: { type: 'string' },
pw: { type: 'string' }, pw: { type: 'string' },
sessionCookie: { type: 'string' },
}, },
required: ['name', 'host', 'port', 'contact', 'pw'], required: ['name', 'host', 'port', 'contact', 'pw'],
} }

View file

@ -22,8 +22,6 @@ const recDataFile = './stats/recdata'
const dataLockFile = './data/lockData' const dataLockFile = './data/lockData'
import logger from '../utils/logger' import logger from '../utils/logger'
import { WorkerResult } from '../utils/classes'
import { doALongTask, msgAllWorker } from './workerPool'
import idStats from '../utils/ids' import idStats from '../utils/ids'
import utils from '../utils/utils' import utils from '../utils/utils'
import { import {
@ -42,6 +40,8 @@ import {
DataFile, DataFile,
} from '../types/basicTypes' } from '../types/basicTypes'
import { countOfQdbs } from './qdbUtils' import { countOfQdbs } from './qdbUtils'
import { WorkerResult } from '../worker/worker'
import { queueWork, msgAllWorker } from '../worker/workerPool'
// if a recievend question doesnt match at least this % to any other question in the db it gets // if a recievend question doesnt match at least this % to any other question in the db it gets
// added to db // added to db
@ -56,6 +56,7 @@ export interface RecievedData {
id: string id: string
version: string version: string
location: string location: string
fromPeer?: boolean
} }
export interface Result { export interface Result {
@ -183,8 +184,8 @@ function processIncomingRequestUsingDb(
recievedQuestions.push(currentQuestion) recievedQuestions.push(currentQuestion)
// This here searches only in relevant subjects, and not all subjects // This here searches only in relevant subjects, and not all subjects
questionSearchPromises.push( questionSearchPromises.push(
doALongTask({ queueWork({
type: 'work', type: 'search',
data: { data: {
searchIn: [qdb.index], searchIn: [qdb.index],
question: currentQuestion, question: currentQuestion,
@ -201,7 +202,7 @@ function processIncomingRequestUsingDb(
Promise.all(questionSearchPromises) Promise.all(questionSearchPromises)
.then((results: Array<WorkerResult>) => { .then((results: Array<WorkerResult>) => {
const allQuestions: Question[] = [] // all new questions here that do not have result const newQuestions: Question[] = [] // all new questions here that do not have result
results.forEach((result: WorkerResult, i) => { results.forEach((result: WorkerResult, i) => {
const add = ( const add = (
result.result as SearchResultQuestion[] result.result as SearchResultQuestion[]
@ -209,7 +210,7 @@ function processIncomingRequestUsingDb(
return res.match < minMatchAmmountToAdd return res.match < minMatchAmmountToAdd
}) })
if (add && !result.error) { if (add && !result.error) {
allQuestions.push(recievedQuestions[i]) newQuestions.push(recievedQuestions[i])
} }
}) })
@ -223,8 +224,8 @@ function processIncomingRequestUsingDb(
logger.GetColor('redbg') logger.GetColor('redbg')
) )
} }
if (allQuestions.length > 0 && subjName) { if (newQuestions.length > 0 && subjName) {
addQuestionsToDb(allQuestions, subjName, qdb) addQuestionsToDb(newQuestions, subjName, qdb)
currWrites++ currWrites++
logger.DebugLog( logger.DebugLog(
@ -246,12 +247,12 @@ function processIncomingRequestUsingDb(
idStats.LogId( idStats.LogId(
user.id, user.id,
recievedData.subj, recievedData.subj,
allQuestions.length, newQuestions.length,
allQLength allQLength
) )
logger.DebugLog('New Questions:', 'isadding', 2) logger.DebugLog('New Questions:', 'isadding', 2)
logger.DebugLog(allQuestions, 'isadding', 2) logger.DebugLog(newQuestions, 'isadding', 2)
logger.DebugLog( logger.DebugLog(
'ProcessIncomingRequest done', 'ProcessIncomingRequest done',
@ -259,7 +260,7 @@ function processIncomingRequestUsingDb(
1 1
) )
resolve({ resolve({
newQuestions: allQuestions, newQuestions: newQuestions,
subjName: recievedData.subj, subjName: recievedData.subj,
qdbIndex: qdb.index, qdbIndex: qdb.index,
qdbName: qdb.name, qdbName: qdb.name,
@ -343,7 +344,7 @@ function runCleanWorker(
// }${logger.C('')}")` // }${logger.C('')}")`
// ) // )
// pass recieved questions to a worker // pass recieved questions to a worker
doALongTask({ queueWork({
type: 'dbClean', type: 'dbClean',
data: { data: {
questions: recievedQuesitons, questions: recievedQuesitons,

View file

@ -1,539 +0,0 @@
/* ----------------------------------------------------------------------------
Question Server
GitLab: <https://gitlab.com/MrFry/mrfrys-node-server>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
------------------------------------------------------------------------- */
// FIXME: this should be renamed to worker.ts or something
import { isMainThread, parentPort, workerData } from 'worker_threads'
import { recognizeTextFromBase64, tesseractLoaded } from './tesseract'
import logger from './logger'
import {
Question,
QuestionData,
QuestionDb,
Subject,
} from '../types/basicTypes'
import { editDb, Edits, updateQuestionsInArray } from './actions'
import {
cleanDb,
countOfQdbs,
createQuestion,
getSubjectDifference,
getSubjNameWithoutYear,
minMatchToNotSearchOtherSubjects,
noPossibleAnswerMatchPenalty,
prepareQuestion,
SearchResultQuestion,
searchSubject,
} from './qdbUtils'
// import { TaskObject } from './workerPool'
export interface WorkerResult {
msg: string
workerIndex: number
result?: SearchResultQuestion[] | number[][]
error?: boolean
}
// ---------------------------------------------------------------------------------------------------------
// String Utils
// ---------------------------------------------------------------------------------------------------------
// Exported
// ---------------------------------------------------------------------------------------------------------
// Not exported
// ---------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------
// Question
// ---------------------------------------------------------------------------------------------------------
async function recognizeQuestionImage(question: Question): Promise<Question> {
const base64Data = question.data.base64
if (Array.isArray(base64Data) && base64Data.length) {
const res: string[] = []
for (let i = 0; i < base64Data.length; i++) {
const base64 = base64Data[i]
const text = await recognizeTextFromBase64(base64)
if (text && text.trim()) {
res.push(text)
}
}
if (res.length) {
return {
...question,
Q: res.join(' '),
data: {
...question.data,
type: 'simple',
},
}
}
}
return question
}
// ---------------------------------------------------------------------------------------------------------
// Subject
// ---------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------
// QuestionDB
// ---------------------------------------------------------------------------------------------------------
function doSearch(
data: Array<Subject>,
subjName: string,
question: Question,
searchTillMatchPercent?: number,
searchInAllIfNoResult?: Boolean
): SearchResultQuestion[] {
let result: SearchResultQuestion[] = []
const questionToSearch = prepareQuestion(question)
data.every((subj) => {
if (
subjName
.toLowerCase()
.includes(getSubjNameWithoutYear(subj.Name).toLowerCase())
) {
logger.DebugLog(`Searching in ${subj.Name} `, 'searchworker', 2)
const subjRes = searchSubject(
subj,
questionToSearch,
subjName,
searchTillMatchPercent
)
result = result.concat(subjRes)
if (searchTillMatchPercent) {
return !subjRes.some((sr) => {
return sr.match >= searchTillMatchPercent
})
}
return true
}
return true
})
if (searchInAllIfNoResult) {
// FIXME: dont research subject searched above
if (
result.length === 0 ||
result[0].match < minMatchToNotSearchOtherSubjects
) {
logger.DebugLog(
'Reqults length is zero when comparing names, trying all subjects',
'searchworker',
1
)
data.every((subj) => {
const subjRes = searchSubject(
subj,
questionToSearch,
subjName,
searchTillMatchPercent
)
result = result.concat(subjRes)
if (searchTillMatchPercent) {
const continueSearching = !subjRes.some((sr) => {
return sr.match >= searchTillMatchPercent
})
return continueSearching
}
return true
})
}
}
result = setNoPossibleAnswersPenalties(
questionToSearch.data.possibleAnswers,
result
)
result = result.sort((q1, q2) => {
if (q1.match < q2.match) {
return 1
} else if (q1.match > q2.match) {
return -1
} else {
return 0
}
})
return result
}
function setNoPossibleAnswersPenalties(
questionPossibleAnswers: QuestionData['possibleAnswers'],
results: SearchResultQuestion[]
): SearchResultQuestion[] {
if (!Array.isArray(questionPossibleAnswers)) {
return results
}
const noneHasPossibleAnswers = results.every((x) => {
return !Array.isArray(x.q.data.possibleAnswers)
})
if (noneHasPossibleAnswers) return results
let possibleAnswerMatch = false
const updated = results.map((result) => {
const matchCount = Array.isArray(result.q.data.possibleAnswers)
? result.q.data.possibleAnswers.filter((resultPossibleAnswer) => {
return questionPossibleAnswers.some(
(questionPossibleAnswer) => {
if (
questionPossibleAnswer.val &&
resultPossibleAnswer.val
) {
return questionPossibleAnswer.val.includes(
resultPossibleAnswer.val
)
} else {
return false
}
}
)
}).length
: 0
if (matchCount === questionPossibleAnswers.length) {
possibleAnswerMatch = true
return result
} else {
return {
...result,
match: result.match - noPossibleAnswerMatchPenalty,
detailedMatch: {
...result.detailedMatch,
qMatch:
result.detailedMatch.qMatch -
noPossibleAnswerMatchPenalty,
},
}
}
})
if (possibleAnswerMatch) {
return updated
} else {
return results
}
}
// ---------------------------------------------------------------------------------------------------------
// Multi threaded stuff
// ---------------------------------------------------------------------------------------------------------
interface WorkData {
subjName: string
question: Question
searchTillMatchPercent: number
searchInAllIfNoResult: boolean
searchIn: number[]
index: number
}
if (!isMainThread) {
handleWorkerData()
}
function handleWorkerData() {
const {
workerIndex,
initData,
}: { workerIndex: number; initData: Array<QuestionDb> } = workerData
let qdbs: Array<QuestionDb> = initData
const qdbCount = initData.length
const { subjCount, questionCount } = countOfQdbs(initData)
logger.Log(
`[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty! qdbs: ${qdbCount}, subjects: ${subjCount.toLocaleString()}, questions: ${questionCount.toLocaleString()}`
)
parentPort.on('message', async (msg /*: TaskObject */) => {
try {
await tesseractLoaded
if (msg.type === 'work') {
const {
subjName,
question: originalQuestion,
searchTillMatchPercent,
searchInAllIfNoResult,
searchIn,
index,
}: WorkData = msg.data
let searchResult: SearchResultQuestion[] = []
let error = false
const question = await recognizeQuestionImage(originalQuestion)
try {
qdbs.forEach((qdb) => {
if (searchIn.includes(qdb.index)) {
const res = doSearch(
qdb.data,
subjName,
question,
searchTillMatchPercent,
searchInAllIfNoResult
)
searchResult = [
...searchResult,
...res.map((x) => {
return {
...x,
detailedMatch: {
...x.detailedMatch,
qdb: qdb.name,
},
}
}),
]
}
})
} catch (err) {
logger.Log(
'Error in worker thread!',
logger.GetColor('redbg')
)
console.error(err)
console.error(
JSON.stringify(
{
subjName: subjName,
question: question,
searchTillMatchPercent: searchTillMatchPercent,
searchInAllIfNoResult: searchInAllIfNoResult,
searchIn: searchIn,
index: index,
},
null,
2
)
)
error = true
}
// sorting
const sortedResult: SearchResultQuestion[] = searchResult.sort(
(q1, q2) => {
if (q1.match < q2.match) {
return 1
} else if (q1.match > q2.match) {
return -1
} else {
return 0
}
}
)
const workerResult: WorkerResult = {
msg: `From thread #${workerIndex}: job ${
!isNaN(index) ? `#${index}` : ''
}done`,
workerIndex: workerIndex,
result: sortedResult,
error: error,
}
// ONDONE:
parentPort.postMessage(workerResult)
// console.log(
// `[THREAD #${workerIndex}]: Work ${
// !isNaN(index) ? `#${index}` : ''
// }done!`
// )
} else if (msg.type === 'merge') {
const {
localQdbIndex,
remoteQdb,
}: { localQdbIndex: number; remoteQdb: QuestionDb } = msg.data
const localQdb = qdbs.find((qdb) => qdb.index === localQdbIndex)
const { newData, newSubjects } = getSubjectDifference(
localQdb.data,
remoteQdb.data
)
parentPort.postMessage({
msg: `From thread #${workerIndex}: merge done`,
workerIndex: workerIndex,
newData: newData,
newSubjects: newSubjects,
localQdbIndex: localQdbIndex,
})
} else if (msg.type === 'dbEdit') {
const { dbIndex, edits }: { dbIndex: number; edits: Edits } =
msg.data
const { resultDb } = editDb(qdbs[dbIndex], edits)
qdbs[dbIndex] = resultDb
logger.DebugLog(
`Worker db edit ${workerIndex}`,
'worker update',
1
)
parentPort.postMessage({
msg: `From thread #${workerIndex}: db edit`,
workerIndex: workerIndex,
})
} else if (msg.type === 'newQuestions') {
const {
subjName,
qdbIndex,
newQuestions,
}: {
subjName: string
qdbIndex: number
newQuestions: Question[]
} = msg.data
const newQuestionsWithCache = newQuestions.map((question) => {
if (!question.cache) {
return createQuestion(question)
} else {
return question
}
})
let added = false
qdbs = qdbs.map((qdb) => {
if (qdb.index === qdbIndex) {
return {
...qdb,
data: qdb.data.map((subj) => {
if (subj.Name === subjName) {
added = true
return {
Name: subj.Name,
Questions: [
...subj.Questions,
...newQuestionsWithCache,
],
}
} else {
return subj
}
}),
}
} else {
return qdb
}
})
if (!added) {
qdbs = qdbs.map((qdb) => {
if (qdb.index === qdbIndex) {
return {
...qdb,
data: [
...qdb.data,
{
Name: subjName,
Questions: [...newQuestionsWithCache],
},
],
}
} else {
return qdb
}
})
}
logger.DebugLog(
`Worker new question ${workerIndex}`,
'worker update',
1
)
parentPort.postMessage({
msg: `From thread #${workerIndex}: update done`,
workerIndex: workerIndex,
})
// console.log(`[THREAD #${workerIndex}]: update`)
} else if (msg.type === 'newdb') {
const { data }: { data: QuestionDb } = msg
qdbs.push(data)
parentPort.postMessage({
msg: `From thread #${workerIndex}: new db add done`,
workerIndex: workerIndex,
})
// console.log(`[THREAD #${workerIndex}]: newdb`)
} else if (msg.type === 'dbClean') {
const removedIndexes = cleanDb(msg.data, qdbs)
const workerResult: WorkerResult = {
msg: `From thread #${workerIndex}: db clean done`,
workerIndex: workerIndex,
result: removedIndexes,
}
parentPort.postMessage(workerResult)
} else if (msg.type === 'rmQuestions') {
const {
questionIndexesToRemove,
subjIndex,
qdbIndex,
recievedQuestions,
} = msg.data
qdbs[qdbIndex].data[subjIndex].Questions =
updateQuestionsInArray(
questionIndexesToRemove,
qdbs[qdbIndex].data[subjIndex].Questions,
recievedQuestions
)
parentPort.postMessage({
msg: `From thread #${workerIndex}: rm question done`,
workerIndex: workerIndex,
})
} else {
logger.Log(`Invalid msg type!`, logger.GetColor('redbg'))
console.error(msg)
parentPort.postMessage({
msg: `From thread #${workerIndex}: Invalid message type (${msg.type})!`,
workerIndex: workerIndex,
})
}
} catch (e) {
console.error(e)
parentPort.postMessage({
msg: `From thread #${workerIndex}: unhandled error occured!`,
workerIndex: workerIndex,
e: e,
})
}
})
}
// ------------------------------------------------------------------------
export { doSearch, setNoPossibleAnswersPenalties }

View file

@ -23,7 +23,6 @@ export function get<T = any>(
try { try {
resolve({ data: JSON.parse(body) }) resolve({ data: JSON.parse(body) })
} catch (e) { } catch (e) {
console.log(body)
resolve({ error: e, options: options }) resolve({ error: e, options: options })
} }
}) })
@ -46,6 +45,7 @@ interface PostParams {
port: number port: number
bodyObject: any bodyObject: any
http?: boolean http?: boolean
cookies?: string
} }
// https://nodejs.org/api/http.html#httprequesturl-options-callback // https://nodejs.org/api/http.html#httprequesturl-options-callback
@ -55,6 +55,7 @@ export function post<T = any>({
port, port,
bodyObject, bodyObject,
http, http,
cookies,
}: PostParams): Promise<PostResult<T>> { }: PostParams): Promise<PostResult<T>> {
const provider = http ? httpRequest : httpsRequest const provider = http ? httpRequest : httpsRequest
const body = JSON.stringify(bodyObject) const body = JSON.stringify(bodyObject)
@ -69,6 +70,11 @@ export function post<T = any>({
headers: { headers: {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body), 'Content-Length': Buffer.byteLength(body),
...(cookies
? {
cookie: cookies,
}
: {}),
}, },
}, },
(res) => { (res) => {
@ -85,7 +91,6 @@ export function post<T = any>({
cookies: res.headers['set-cookie'], cookies: res.headers['set-cookie'],
}) })
} catch (e) { } catch (e) {
console.log(body)
resolve({ error: e }) resolve({ error: e })
} }
}) })
@ -100,3 +105,20 @@ export function post<T = any>({
req.end() req.end()
}) })
} }
export function parseCookies(responseCookies: string[]): {
[key: string]: string
} {
const cookiesArray = responseCookies.join('; ').split('; ')
const parsedCookies: { [key: string]: string } = cookiesArray.reduce(
(acc, cookieString) => {
const [key, val] = cookieString.split('=')
return {
...acc,
[key]: val || true,
}
},
{}
)
return parsedCookies
}

53
src/utils/p2putils.ts Normal file
View file

@ -0,0 +1,53 @@
import { PeerInfo } from '../types/basicTypes'
import { paths } from './files'
import { parseCookies, post } from './networkUtils'
import utils from './utils'
export function peerToString(peer: {
host: string
port: string | number
}): string {
return `${peer.host}:${peer.port}`
}
export function isPeerSameAs(peer1: PeerInfo, peer2: PeerInfo): boolean {
return peer1.host === peer2.host && peer1.port === peer2.port
}
export function updatePeersFile(
peers: PeerInfo[],
updatedPeer: PeerInfo
): void {
const updatedPeers = peers.map((x) => {
if (isPeerSameAs(updatedPeer, x)) {
return {
...x,
...updatedPeer,
}
} else {
return x
}
})
utils.WriteFile(JSON.stringify(updatedPeers, null, 2), paths.peersFile)
}
export async function loginToPeer(peer: PeerInfo): Promise<string | Error> {
const { data, error, cookies } = await post<{
result: string
msg: string
}>({
hostname: peer.host,
path: '/api/login',
port: peer.port,
bodyObject: { pw: peer.pw },
http: peer.http,
})
if (error || !data || data.result !== 'success') {
return data ? new Error(data.msg) : error
}
const parsedCookies = parseCookies(cookies)
return parsedCookies.sessionID
}

View file

@ -0,0 +1,30 @@
import { parentPort } from 'node:worker_threads'
import { cleanDb } from '../../utils/qdbUtils'
import { Question, QuestionDb } from '../../types/basicTypes'
import { WorkerResult } from '../worker'
export type DbCleanTaskObject = {
type: 'dbClean'
data: {
questions: Question[]
subjToClean: string
overwriteBeforeDate: number
qdbIndex: number
}
}
export const handleDbClean = async (
qdbs: QuestionDb[],
msg: DbCleanTaskObject,
workerIndex: number
): Promise<void> => {
const removedIndexes = cleanDb(msg.data, qdbs)
const workerResult: WorkerResult = {
msg: `From thread #${workerIndex}: db clean done`,
workerIndex: workerIndex,
result: removedIndexes,
}
parentPort.postMessage(workerResult)
}

View file

@ -0,0 +1,34 @@
import { parentPort } from 'node:worker_threads'
import { QuestionDb } from '../../types/basicTypes'
import { Edits, editDb } from '../../utils/actions'
import logger from '../../utils/logger'
export type DbEditTaskObject = {
type: 'dbEdit'
data: { dbIndex: number; edits: Edits }
}
export const handleDbEdit = async (
qdbs: QuestionDb[],
msg: DbEditTaskObject,
workerIndex: number,
setQdbs: (newVal: Array<QuestionDb>) => void
): Promise<void> => {
const { dbIndex, edits }: { dbIndex: number; edits: Edits } = msg.data
const { resultDb } = editDb(qdbs[dbIndex], edits)
setQdbs(
qdbs.map((qdb, i) => {
if (i === dbIndex) {
return resultDb
} else {
return qdb
}
})
)
logger.DebugLog(`Worker db edit ${workerIndex}`, 'worker update', 1)
parentPort.postMessage({
msg: `From thread #${workerIndex}: db edit`,
workerIndex: workerIndex,
})
}

View file

@ -0,0 +1,36 @@
import { parentPort } from 'node:worker_threads'
import { QuestionDb } from '../../types/basicTypes'
import { getSubjectDifference } from '../../utils/qdbUtils'
export type MergeTaskObject = {
type: 'merge'
data: {
localQdbIndex: number
remoteQdb: QuestionDb
}
}
export const handleMerge = async (
qdbs: QuestionDb[],
msg: MergeTaskObject,
workerIndex: number
): Promise<void> => {
const {
localQdbIndex,
remoteQdb,
}: { localQdbIndex: number; remoteQdb: QuestionDb } = msg.data
const localQdb = qdbs.find((qdb) => qdb.index === localQdbIndex)
const { newData, newSubjects } = getSubjectDifference(
localQdb.data,
remoteQdb.data
)
parentPort.postMessage({
msg: `From thread #${workerIndex}: merge done`,
workerIndex: workerIndex,
newData: newData,
newSubjects: newSubjects,
localQdbIndex: localQdbIndex,
})
}

View file

@ -0,0 +1,22 @@
import { parentPort } from 'node:worker_threads'
import { QuestionDb } from '../../types/basicTypes'
export type NewDbTaskObject = {
type: 'newdb'
data: QuestionDb
}
export const handleNewDb = async (
qdbs: QuestionDb[],
msg: NewDbTaskObject,
workerIndex: number,
setQdbs: (newVal: Array<QuestionDb>) => void
): Promise<void> => {
const { data }: { data: QuestionDb } = msg
setQdbs([...qdbs, data])
parentPort.postMessage({
msg: `From thread #${workerIndex}: new db add done`,
workerIndex: workerIndex,
})
}

View file

@ -0,0 +1,81 @@
import { parentPort } from 'node:worker_threads'
import { QuestionDb } from '../../types/basicTypes'
import logger from '../../utils/logger'
import { createQuestion } from '../../utils/qdbUtils'
import { Result } from '../../utils/actions'
export type NewQuestionTaskObject = {
type: 'newQuestions'
data: Omit<Result, 'qdbName'>
}
export const handleNewQuestions = async (
qdbs: QuestionDb[],
msg: NewQuestionTaskObject,
workerIndex: number,
setQdbs: (newVal: Array<QuestionDb>) => void
): Promise<void> => {
const { subjName, qdbIndex, newQuestions } = msg.data
const newQuestionsWithCache = newQuestions.map((question) => {
if (!question.cache) {
return createQuestion(question)
} else {
return question
}
})
let added = false
setQdbs(
qdbs.map((qdb) => {
if (qdb.index === qdbIndex) {
return {
...qdb,
data: qdb.data.map((subj) => {
if (subj.Name === subjName) {
added = true
return {
Name: subj.Name,
Questions: [
...subj.Questions,
...newQuestionsWithCache,
],
}
} else {
return subj
}
}),
}
} else {
return qdb
}
})
)
if (!added) {
setQdbs(
qdbs.map((qdb) => {
if (qdb.index === qdbIndex) {
return {
...qdb,
data: [
...qdb.data,
{
Name: subjName,
Questions: [...newQuestionsWithCache],
},
],
}
} else {
return qdb
}
})
)
}
logger.DebugLog(`Worker new question ${workerIndex}`, 'worker update', 1)
parentPort.postMessage({
msg: `From thread #${workerIndex}: update done`,
workerIndex: workerIndex,
})
}

View file

@ -0,0 +1,153 @@
import { parentPort } from 'node:worker_threads'
import { PeerInfo, Question, QuestionDb } from '../../types/basicTypes'
import { files, paths, readAndValidateFile } from '../../utils/files'
import utils from '../../utils/utils'
import { RecievedData } from '../../utils/actions'
import { removeCacheFromQuestion } from '../../utils/qdbUtils'
import { QuestionAddResponse } from '../../modules/api/submodules/qminingapi'
import logger from '../../utils/logger'
import {
loginToPeer,
peerToString,
updatePeersFile,
} from '../../utils/p2putils'
import { post } from '../../utils/networkUtils'
const login = async (peer: PeerInfo): Promise<string> => {
const loginResult = await loginToPeer(peer)
if (typeof loginResult === 'string') {
return loginResult
} else {
return null
}
}
export type QuestionsToPeersTaskObject = {
type: 'sendQuestionsToPeers'
data: {
newQuestions: Question[]
location: string
subj: string
}
}
export const handleQuestionsToPeers = async (
_qdbs: QuestionDb[],
msg: QuestionsToPeersTaskObject,
workerIndex: number
): Promise<void> => {
const { newQuestions, location, subj } = msg.data
const domain = utils.ReadFile(paths.domainFile).trim()
const peers = readAndValidateFile<PeerInfo[]>(files.peersFile)
if (!peers || peers.length === 0 || newQuestions.length === 0) {
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendQuestionsToPeers done`,
workerIndex: workerIndex,
})
return
}
const dataToSend: RecievedData = {
fromPeer: true,
subj: subj,
location: location,
id: domain, // client ID
version: 'P2P',
quiz: newQuestions.map((question) => {
return removeCacheFromQuestion({
...question,
data: {
...question.data,
source: domain,
},
})
}),
}
const results: {
errors: PeerInfo[]
hasNew: PeerInfo[]
sent: PeerInfo[]
loginErrors: PeerInfo[]
} = {
errors: [],
hasNew: [],
sent: [],
loginErrors: [],
}
const postData = (peer: PeerInfo, sessionCookie: string) => {
return post<QuestionAddResponse>({
hostname: peer.host,
port: peer.port,
http: peer.http,
path: '/api/isAdding',
bodyObject: dataToSend,
cookies: `sessionID=${sessionCookie}`,
})
}
for (const peer of peers) {
let sessionCookie = peer.sessionCookie
if (!sessionCookie) {
sessionCookie = await login(peer)
if (!sessionCookie) {
results.loginErrors.push(peer)
continue
}
updatePeersFile(peers, { ...peer, sessionCookie: sessionCookie })
}
let res = await postData(peer, sessionCookie)
if (res.data?.result === 'nouser' && sessionCookie) {
sessionCookie = await login(peer)
if (!sessionCookie) {
results.loginErrors.push(peer)
continue
}
updatePeersFile(peers, { ...peer, sessionCookie: sessionCookie })
res = await postData(peer, sessionCookie)
}
if (res.error || !res.data?.success) {
results.errors.push(peer)
} else if (res.data?.totalNewQuestions > 0) {
results.hasNew.push(peer)
} else {
results.sent.push(peer)
}
}
const logMsg: string[] = []
const addToLogMsg = (
peerResult: PeerInfo[],
prefix: string,
color: string
) => {
if (peerResult.length > 0) {
logMsg.push(
`${logger.C(color)}${prefix}:${logger.C()} ` +
peerResult.map((x) => peerToString(x)).join(', ')
)
}
}
addToLogMsg(results.loginErrors, 'Login error', 'red')
addToLogMsg(results.errors, 'Error', 'red')
addToLogMsg(results.hasNew, 'Had new questions', 'blue')
addToLogMsg(results.sent, 'Sent', 'green')
logger.Log(
`\t${logger.C(
'green'
)}Sent new questions to peers${logger.C()}; ${logMsg.join(', ')}`
)
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendQuestionsToPeers done`,
workerIndex: workerIndex,
})
}

View file

@ -0,0 +1,53 @@
import { parentPort } from 'node:worker_threads'
import { Question, QuestionDb } from '../../types/basicTypes'
import { updateQuestionsInArray } from '../../utils/actions'
export type RmQuestionsTaskObject = {
type: 'rmQuestions'
data: {
questionIndexesToRemove: number[][]
subjIndex: number
qdbIndex: number
recievedQuestions: Question[]
}
}
export const handleRmQuestions = async (
qdbs: QuestionDb[],
msg: RmQuestionsTaskObject,
workerIndex: number,
setQdbs: (newVal: QuestionDb[]) => void
): Promise<void> => {
const { questionIndexesToRemove, subjIndex, qdbIndex, recievedQuestions } =
msg.data
const newQdbs = qdbs.map((qdb, i) => {
if (i === qdbIndex) {
return {
...qdb,
data: qdb.data.map((subj, j) => {
if (j === subjIndex) {
return {
...subj,
Questions: updateQuestionsInArray(
questionIndexesToRemove,
qdbs[qdbIndex].data[subjIndex].Questions,
recievedQuestions
),
}
} else {
return subj
}
}),
}
} else {
return qdb
}
})
setQdbs(newQdbs)
parentPort.postMessage({
msg: `From thread #${workerIndex}: rm question done`,
workerIndex: workerIndex,
})
}

View file

@ -0,0 +1,286 @@
import { parentPort } from 'worker_threads'
import {
Question,
QuestionData,
QuestionDb,
Subject,
} from '../../types/basicTypes'
import logger from '../../utils/logger'
import {
SearchResultQuestion,
getSubjNameWithoutYear,
minMatchToNotSearchOtherSubjects,
noPossibleAnswerMatchPenalty,
prepareQuestion,
searchSubject,
} from '../../utils/qdbUtils'
import { recognizeTextFromBase64 } from '../../utils/tesseract'
import { WorkerResult } from '../worker'
export type SearchTaskObject = {
type: 'search'
data: {
searchIn: number[]
question: Question
subjName: string
testUrl?: string
questionData?: QuestionData
searchInAllIfNoResult?: boolean
searchTillMatchPercent?: number
[key: string]: any
}
}
export function doSearch(
data: Array<Subject>,
subjName: string,
question: Question,
searchTillMatchPercent?: number,
searchInAllIfNoResult?: Boolean
): SearchResultQuestion[] {
let result: SearchResultQuestion[] = []
const questionToSearch = prepareQuestion(question)
data.every((subj) => {
if (
subjName
.toLowerCase()
.includes(getSubjNameWithoutYear(subj.Name).toLowerCase())
) {
logger.DebugLog(`Searching in ${subj.Name} `, 'searchworker', 2)
const subjRes = searchSubject(
subj,
questionToSearch,
subjName,
searchTillMatchPercent
)
result = result.concat(subjRes)
if (searchTillMatchPercent) {
return !subjRes.some((sr) => {
return sr.match >= searchTillMatchPercent
})
}
return true
}
return true
})
if (searchInAllIfNoResult) {
// FIXME: dont research subject searched above
if (
result.length === 0 ||
result[0].match < minMatchToNotSearchOtherSubjects
) {
logger.DebugLog(
'Reqults length is zero when comparing names, trying all subjects',
'searchworker',
1
)
data.every((subj) => {
const subjRes = searchSubject(
subj,
questionToSearch,
subjName,
searchTillMatchPercent
)
result = result.concat(subjRes)
if (searchTillMatchPercent) {
const continueSearching = !subjRes.some((sr) => {
return sr.match >= searchTillMatchPercent
})
return continueSearching
}
return true
})
}
}
result = setNoPossibleAnswersPenalties(
questionToSearch.data.possibleAnswers,
result
)
result = result.sort((q1, q2) => {
if (q1.match < q2.match) {
return 1
} else if (q1.match > q2.match) {
return -1
} else {
return 0
}
})
return result
}
export function setNoPossibleAnswersPenalties(
questionPossibleAnswers: QuestionData['possibleAnswers'],
results: SearchResultQuestion[]
): SearchResultQuestion[] {
if (!Array.isArray(questionPossibleAnswers)) {
return results
}
const noneHasPossibleAnswers = results.every((x) => {
return !Array.isArray(x.q.data.possibleAnswers)
})
if (noneHasPossibleAnswers) return results
let possibleAnswerMatch = false
const updated = results.map((result) => {
const matchCount = Array.isArray(result.q.data.possibleAnswers)
? result.q.data.possibleAnswers.filter((resultPossibleAnswer) => {
return questionPossibleAnswers.some(
(questionPossibleAnswer) => {
if (
questionPossibleAnswer.val &&
resultPossibleAnswer.val
) {
return questionPossibleAnswer.val.includes(
resultPossibleAnswer.val
)
} else {
return false
}
}
)
}).length
: 0
if (matchCount === questionPossibleAnswers.length) {
possibleAnswerMatch = true
return result
} else {
return {
...result,
match: result.match - noPossibleAnswerMatchPenalty,
detailedMatch: {
...result.detailedMatch,
qMatch:
result.detailedMatch.qMatch -
noPossibleAnswerMatchPenalty,
},
}
}
})
if (possibleAnswerMatch) {
return updated
} else {
return results
}
}
async function recognizeQuestionImage(question: Question): Promise<Question> {
const base64Data = question.data.base64
if (Array.isArray(base64Data) && base64Data.length) {
const res: string[] = []
for (let i = 0; i < base64Data.length; i++) {
const base64 = base64Data[i]
const text = await recognizeTextFromBase64(base64)
if (text && text.trim()) {
res.push(text)
}
}
if (res.length) {
return {
...question,
Q: res.join(' '),
data: {
...question.data,
type: 'simple',
},
}
}
}
return question
}
export const handleSearch = async (
qdbs: QuestionDb[],
msg: SearchTaskObject,
workerIndex: number
): Promise<void> => {
const {
subjName,
question: originalQuestion,
searchTillMatchPercent,
searchInAllIfNoResult,
searchIn,
index,
} = msg.data
let searchResult: SearchResultQuestion[] = []
let error = false
const question = await recognizeQuestionImage(originalQuestion)
try {
qdbs.forEach((qdb) => {
if (searchIn.includes(qdb.index)) {
const res = doSearch(
qdb.data,
subjName,
question,
searchTillMatchPercent,
searchInAllIfNoResult
)
searchResult = [
...searchResult,
...res.map((x) => {
return {
...x,
detailedMatch: {
...x.detailedMatch,
qdb: qdb.name,
},
}
}),
]
}
})
} catch (err) {
logger.Log('Error in worker thread!', logger.GetColor('redbg'))
console.error(err)
console.error(
JSON.stringify(
{
subjName: subjName,
question: question,
searchTillMatchPercent: searchTillMatchPercent,
searchInAllIfNoResult: searchInAllIfNoResult,
searchIn: searchIn,
index: index,
},
null,
2
)
)
error = true
}
// sorting
const sortedResult: SearchResultQuestion[] = searchResult.sort((q1, q2) => {
if (q1.match < q2.match) {
return 1
} else if (q1.match > q2.match) {
return -1
} else {
return 0
}
})
const workerResult: WorkerResult = {
msg: `From thread #${workerIndex}: job ${
!isNaN(index) ? `#${index}` : ''
}done`,
workerIndex: workerIndex,
result: sortedResult,
error: error,
}
parentPort.postMessage(workerResult)
}

95
src/worker/worker.ts Normal file
View file

@ -0,0 +1,95 @@
import { isMainThread, parentPort, workerData } from 'worker_threads'
import { QuestionDb } from '../types/basicTypes'
import { SearchResultQuestion, countOfQdbs } from '../utils/qdbUtils'
import logger from '../utils/logger'
import { TaskObject } from './workerPool'
import { tesseractLoaded } from '../utils/tesseract'
import { handleSearch } from './handlers/handleSearch'
import { handleMerge } from './handlers/handleMerge'
import { handleDbEdit } from './handlers/handleDbEdit'
import { handleNewQuestions } from './handlers/handleNewQuestion'
import { handleNewDb } from './handlers/handleNewDb'
import { handleDbClean } from './handlers/handleDbClean'
import { handleQuestionsToPeers } from './handlers/handleQuestionsToPeers'
import { handleRmQuestions } from './handlers/handleRmQuestions'
export interface WorkerResult {
msg: string
workerIndex: number
result?: SearchResultQuestion[] | number[][]
error?: boolean
}
if (!isMainThread) {
handleWorkerData()
}
async function handleWorkerData() {
const {
workerIndex,
initData,
}: { workerIndex: number; initData: Array<QuestionDb> } = workerData
let qdbs: Array<QuestionDb> = initData
const setQdbs = (newVal: Array<QuestionDb>) => {
qdbs = newVal
}
const qdbCount = initData.length
const { subjCount, questionCount } = countOfQdbs(initData)
logger.Log(
`[THREAD #${workerIndex}]: Worker ${workerIndex} reporting for duty! qdbs: ${qdbCount}, subjects: ${subjCount.toLocaleString()}, questions: ${questionCount.toLocaleString()}`
)
parentPort.on('message', async (msg: TaskObject) => {
try {
await tesseractLoaded
await handleMessage(qdbs, msg, workerIndex, setQdbs)
} catch (e) {
console.error(e)
parentPort.postMessage({
msg: `From thread #${workerIndex}: unhandled error occured! (${
(msg as any)?.type
})`,
workerIndex: workerIndex,
e: e,
})
}
})
}
async function handleMessage(
qdbs: QuestionDb[],
msg: TaskObject,
workerIndex: number,
setQdbs: (newVal: QuestionDb[]) => void
) {
if (msg.type === 'search') {
await handleSearch(qdbs, msg, workerIndex)
} else if (msg.type === 'merge') {
await handleMerge(qdbs, msg, workerIndex)
} else if (msg.type === 'dbEdit') {
await handleDbEdit(qdbs, msg, workerIndex, setQdbs)
} else if (msg.type === 'newQuestions') {
await handleNewQuestions(qdbs, msg, workerIndex, setQdbs)
} else if (msg.type === 'newdb') {
await handleNewDb(qdbs, msg, workerIndex, setQdbs)
} else if (msg.type === 'dbClean') {
await handleDbClean(qdbs, msg, workerIndex)
} else if (msg.type === 'rmQuestions') {
await handleRmQuestions(qdbs, msg, workerIndex, setQdbs)
} else if (msg.type === 'sendQuestionsToPeers') {
await handleQuestionsToPeers(qdbs, msg, workerIndex)
} else {
logger.Log(`Invalid msg type!`, logger.GetColor('redbg'))
console.error(msg)
parentPort.postMessage({
msg: `From thread #${workerIndex}: Invalid message type (${
(msg as any)?.type
})!`,
workerIndex: workerIndex,
})
}
}

View file

@ -23,10 +23,17 @@ import { v4 as uuidv4 } from 'uuid'
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import os from 'os' import os from 'os'
import logger from './logger' import type { QuestionDb } from '../types/basicTypes'
import { Result, Edits } from './actions' import { SearchTaskObject } from './handlers/handleSearch'
import type { Question, QuestionDb, QuestionData } from '../types/basicTypes' import { DbEditTaskObject } from './handlers/handleDbEdit'
import type { WorkerResult } from './classes' import { NewQuestionTaskObject } from './handlers/handleNewQuestion'
import { NewDbTaskObject } from './handlers/handleNewDb'
import { DbCleanTaskObject } from './handlers/handleDbClean'
import { RmQuestionsTaskObject } from './handlers/handleRmQuestions'
import { MergeTaskObject } from './handlers/handleMerge'
import { QuestionsToPeersTaskObject } from './handlers/handleQuestionsToPeers'
import { WorkerResult } from './worker'
import logger from '../utils/logger'
const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length
@ -36,47 +43,15 @@ interface WorkerObj {
free: Boolean free: Boolean
} }
// FIXME: type depending on type export type TaskObject =
export interface TaskObject { | SearchTaskObject
type: | DbEditTaskObject
| 'work' | NewQuestionTaskObject
| 'dbEdit' | NewDbTaskObject
| 'newQuestions' | DbCleanTaskObject
| 'newdb' | RmQuestionsTaskObject
| 'dbClean' | MergeTaskObject
| 'rmQuestions' | QuestionsToPeersTaskObject
| 'merge'
data:
| {
searchIn: number[]
question: Question
subjName: string
testUrl?: string
questionData?: QuestionData
searchInAllIfNoResult?: boolean
searchTillMatchPercent?: number
[key: string]: any
}
| { dbIndex: number; edits: Edits }
| QuestionDb
| Omit<Result, 'qdbName'>
| {
questions: Question[]
subjToClean: string
overwriteBeforeDate: number
qdbIndex: number
}
| {
questionIndexesToRemove: number[][]
subjIndex: number
qdbIndex: number
recievedQuestions: Question[]
}
| {
localQdbIndex: number
remoteQdb: QuestionDb
}
}
interface PendingJob { interface PendingJob {
workData: TaskObject workData: TaskObject
@ -98,7 +73,7 @@ interface DoneEvent extends EventEmitter {
export const defaultAlertOnPendingCount = 100 export const defaultAlertOnPendingCount = 100
let alertOnPendingCount = defaultAlertOnPendingCount let alertOnPendingCount = defaultAlertOnPendingCount
const workerFile = './src/utils/classes.ts' const workerFile = './src/worker/worker.ts'
let workers: Array<WorkerObj> let workers: Array<WorkerObj>
let getInitData: () => Array<QuestionDb> = null let getInitData: () => Array<QuestionDb> = null
const pendingJobs: { const pendingJobs: {
@ -129,7 +104,7 @@ export function msgAllWorker(data: TaskObject): Promise<WorkerResult[]> {
return new Promise((resolve) => { return new Promise((resolve) => {
const promises: Promise<WorkerResult>[] = [] const promises: Promise<WorkerResult>[] = []
workers.forEach((worker) => { workers.forEach((worker) => {
promises.push(doALongTask(data, worker.index)) promises.push(queueWork(data, worker.index))
}) })
Promise.all(promises).then((res) => { Promise.all(promises).then((res) => {
logger.DebugLog('MSGING ALL WORKER DONE', 'job', 1) logger.DebugLog('MSGING ALL WORKER DONE', 'job', 1)
@ -144,7 +119,7 @@ export function setPendingJobsAlertCount(newVal?: number): void {
alertOnPendingCount = count alertOnPendingCount = count
} }
export function doALongTask( export function queueWork(
obj: TaskObject, obj: TaskObject,
targetWorkerIndex?: number targetWorkerIndex?: number
): Promise<WorkerResult> { ): Promise<WorkerResult> {
@ -335,11 +310,16 @@ const workerTs = (
wkOpts.workerData.__filename = file wkOpts.workerData.__filename = file
return new Worker( return new Worker(
` `
try {
const wk = require('worker_threads'); const wk = require('worker_threads');
require('ts-node').register(); require('ts-node').register();
let file = wk.workerData.__filename; let file = wk.workerData.__filename;
delete wk.workerData.__filename; delete wk.workerData.__filename;
require(file); require(file);
} catch (e) {
console.error('Error while creating new Worker:')
console.error(e)
}
`, `,
wkOpts wkOpts
) )

@ -1 +1 @@
Subproject commit 9453aa8b978f944df1ae28eeaf28120b91965c1f Subproject commit 8e192d3b23736851423cd469b950a2e97139f357

@ -1 +1 @@
Subproject commit 6eeb83b2e7a48aa9743e53c651629fd500c8e6d5 Subproject commit 075271ca01786c849dd8604934bb2de6b5a0ee82

View file

@ -16,5 +16,5 @@ else
--cookie "sessionID=e0ac328d-86cc-4dbf-a00b-213bec6011e7" \ --cookie "sessionID=e0ac328d-86cc-4dbf-a00b-213bec6011e7" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-X POST --data "$data" \ -X POST --data "$data" \
"$url/isAdding" "$url/api/isAdding"
fi fi

View file

@ -19,6 +19,9 @@
"lib": ["dom", "ES2020"], "lib": ["dom", "ES2020"],
"resolveJsonModule": true "resolveJsonModule": true
}, },
"ts-node": {
"files": true
},
"files": ["src/server.ts"], "files": ["src/server.ts"],
"include": ["src/**/*"], "include": ["src/**/*"],
"exclude": [ "exclude": [