diff --git a/src/utils/p2putils.ts b/src/modules/api/p2p/p2putils.ts similarity index 58% rename from src/utils/p2putils.ts rename to src/modules/api/p2p/p2putils.ts index ecd8d25..fb5687a 100644 --- a/src/utils/p2putils.ts +++ b/src/modules/api/p2p/p2putils.ts @@ -1,8 +1,67 @@ -import { PeerInfo } from '../types/basicTypes' -import { files, paths, readAndValidateFile } from './files' -import logger from './logger' -import { PostResult, parseCookie, post } from './networkUtils' -import utils from './utils' +import { PeerInfo, QuestionDb } from '../../../types/basicTypes' +import { files, paths, readAndValidateFile } from '../../../utils/files' +import logger from '../../../utils/logger' +import { PostResult, parseCookie, post } from '../../../utils/networkUtils' +import utils from '../../../utils/utils' +import { UserDirDataFile } from '../submodules/userFiles' + +export interface SyncResponseBase { + success: boolean + message?: string +} + +export interface RemotePeerInfo { + selfInfo: PeerInfo + myPeers: PeerInfo[] + serverRevision?: string + scriptRevision?: string + qminingPageRevision?: string + dataEditorRevision?: string + serverLastCommitDate?: number + scriptLastCommitDate?: number + qminingPageLastCommitDate?: number + dataEditorLastCommitDate?: number + serverBuildTime?: number + qminingPageBuildTime?: number + dataEditorBuildTime?: number + scriptVersion?: string + userCount?: number + qdbInfo?: { + questionDbCount: number + subjectCount: number + questionCount: number + } +} + +export interface SyncResult { + old?: { [key: string]: number } + added?: { [key: string]: number } + final?: { [key: string]: number } + msg?: string +} + +export interface SyncDataResult { + remoteInfo?: RemotePeerInfo + users?: SyncResponseBase & { + encryptedUsers?: string + sentUsers?: number + } + questions?: SyncResponseBase & { + questionDbs: QuestionDb[] + count: { + qdbs: number + subjects: number + questions: number + } + } + userFiles?: SyncResponseBase & { + newFiles: { + [key: string]: { + [key: string]: UserDirDataFile + } + } + } +} export function peerToString(peer: { host: string @@ -18,6 +77,36 @@ export function isPeerSameAs( return peer1.host === peer2.host && peer1.port === peer2.port } +export function updateLastSync(selfInfo: PeerInfo, newDate: number): void { + utils.WriteFile( + JSON.stringify({ ...selfInfo, lastSync: newDate }, null, 2), + paths.selfInfoFile + ) +} + +export function updateThirdPartyPeers( + newVal: Omit[] +): void { + const prevVal = utils.FileExists(paths.thirdPartyPeersFile) + ? utils.ReadJSON(paths.thirdPartyPeersFile) + : [] + + const dataToWrite = newVal.reduce((acc, peer) => { + const isIncluded = acc.find((x) => { + return peerToString(x) === peerToString(peer) + }) + if (!isIncluded) { + return [...acc, peer] + } + return acc + }, prevVal) + + utils.WriteFile( + JSON.stringify(dataToWrite, null, 2), + paths.thirdPartyPeersFile + ) +} + export function updatePeersFile( peerToUpdate: PeerInfo, updatedPeer: Partial @@ -112,7 +201,10 @@ export async function loginAndPostDataToAllPeers< if (res.error || !res.data?.success) { results.errors.push(peer) - console.error(res.error || JSON.stringify(res.data)) + console.error( + `Error: posting data to ${peerToString(peer)}`, + res.error || JSON.stringify(res.data) + ) } else { results.sent.push(peer) } diff --git a/src/modules/api/p2p/questions.ts b/src/modules/api/p2p/questions.ts new file mode 100644 index 0000000..8456286 --- /dev/null +++ b/src/modules/api/p2p/questions.ts @@ -0,0 +1,541 @@ +import { + DataFile, + PeerInfo, + QuestionDb, + Subject, +} from '../../../types/basicTypes' +import { backupData, writeData } from '../../../utils/actions' +import { publicDir } from '../../../utils/files' +import logger from '../../../utils/logger' +import { + countOfQdb, + countOfQdbs, + createQuestion, + getAvailableQdbIndexes, + removeCacheFromQuestion, +} from '../../../utils/qdbUtils' +import utils from '../../../utils/utils' +import { WorkerResult } from '../../../worker/worker' +import { msgAllWorker, queueWork } from '../../../worker/workerPool' +import { + SyncDataResult, + SyncResponseBase, + SyncResult, + peerToString, + updateLastSync, + updatePeersFile, +} from './p2putils' + +interface MergeResult { + newData: Subject[] + newSubjects: Subject[] + localQdbIndex: number + e: Error +} + +interface SyncQuestionsProps { + questionData: (SyncDataResult['questions'] & { peer: PeerInfo })[] + syncStart: number + getQuestionDbs: () => QuestionDb[] + setQuestionDbs: (newVal: QuestionDb[]) => void + selfInfo: PeerInfo + dbsFile: string +} + +// --------------------------------------------------------------------------------------------- +// Getting +// --------------------------------------------------------------------------------------------- +export function getNewQuestionsSince( + subjects: Subject[], + date: number +): Subject[] { + return subjects + .map((subject) => { + return { + ...subject, + Questions: subject.Questions.filter((question) => { + return (question.data.date || 0) >= date + }).map((question) => removeCacheFromQuestion(question)), + } + }) + .filter((subject) => subject.Questions.length !== 0) +} + +export function getQuestions( + questionsSince: number, + getQuestionDbs: () => QuestionDb[] +): SyncResponseBase & { + questionDbs: QuestionDb[] + count: { qdbs: number; subjects: number; questions: number } +} { + const questionDbsWithNewQuestions = Number.isNaN(questionsSince) + ? getQuestionDbs() + : getQuestionDbs() + .map((qdb) => { + return { + ...qdb, + data: getNewQuestionsSince(qdb.data, questionsSince), + } + }) + .filter((qdb) => { + const { questionCount: questionCount } = countOfQdb(qdb) + return questionCount > 0 + }) + + const { subjCount: subjects, questionCount: questions } = countOfQdbs( + questionDbsWithNewQuestions + ) + + return { + success: true, + questionDbs: questionDbsWithNewQuestions, + count: { + qdbs: questionDbsWithNewQuestions.length, + subjects: subjects, + questions: questions, + }, + } +} + +// --------------------------------------------------------------------------------------------- +// Syncing utils +// --------------------------------------------------------------------------------------------- + +function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) { + return { + ...qdb, + data: qdb.data.map((subj) => { + return { + ...subj, + Questions: subj.Questions.map((q) => { + const initializedQuestion = q.cache ? q : createQuestion(q) + initializedQuestion.data.source = peerToString(peer) + return initializedQuestion + }), + } + }), + } +} + +async function getMergeResults( + remoteQuestionDbs: QuestionDb[], + getQuestionDbs: () => QuestionDb[] +) { + const mergeJobs: Promise[] = [] + const rawNewQuestionDbs: QuestionDb[] = [] + remoteQuestionDbs.forEach((remoteQdb) => { + const localQdb = getQuestionDbs().find( + (lqdb) => lqdb.name === remoteQdb.name + ) + + if (!localQdb) { + rawNewQuestionDbs.push(remoteQdb) + } else { + mergeJobs.push( + queueWork({ + type: 'merge', + data: { + localQdbIndex: localQdb.index, + remoteQdb: remoteQdb, + }, + }) + ) + } + }) + + const mergeResults: MergeResult[] = await Promise.all(mergeJobs) + + return { + mergeResults: mergeResults, + rawNewQuestionDbs: rawNewQuestionDbs, + } +} + +function updateQdbForLocalUse( + qdb: QuestionDb[], + getQuestionDbs: () => QuestionDb[] +) { + const availableIndexes = getAvailableQdbIndexes( + getQuestionDbs(), + qdb.length + ) + return qdb.map((qdb, i) => { + return { + ...qdb, + index: availableIndexes[i], + path: `${publicDir}questionDbs/${qdb.name}.json`, + } + }) +} + +export function mergeSubjects( + subjectsToMergeTo: Subject[], + subjectsToMerge: Subject[], + newSubjects: Subject[] +): Subject[] { + return [ + ...subjectsToMergeTo.map((subj) => { + const newSubjs = subjectsToMerge.filter( + (subjRes) => subjRes.Name === subj.Name + ) + + if (newSubjs) { + const newQuestions = newSubjs.flatMap((subj) => { + return subj.Questions + }) + + return { + ...subj, + Questions: [...subj.Questions, ...newQuestions], + } + } else { + return subj + } + }), + ...newSubjects, + ] +} + +export function mergeQdbs( + qdbToMergeTo: QuestionDb[], + mergeResults: MergeResult[] +): { mergedQuestionDbs: QuestionDb[]; changedQdbIndexes: number[] } { + const changedQdbIndexes: number[] = [] + const mergedQuestionDbs = qdbToMergeTo.map((qdb) => { + const qdbMergeResult = mergeResults.find( + (mergeRes) => mergeRes.localQdbIndex === qdb.index + ) + if ( + qdbMergeResult && + (qdbMergeResult.newData.length > 0 || + qdbMergeResult.newSubjects.length > 0) + ) { + const mergedQdb = { + ...qdb, + data: mergeSubjects( + qdb.data, + qdbMergeResult.newData, + qdbMergeResult.newSubjects + ), + } + changedQdbIndexes.push(qdb.index) + return mergedQdb + } else { + // unchanged + return qdb + } + }) + return { + mergedQuestionDbs: mergedQuestionDbs, + changedQdbIndexes: changedQdbIndexes, + } +} + +function writeNewData( + newQuestionDbs: QuestionDb[], + changedQuestionDbs: QuestionDb[], + dbsFilePath: string +) { + const qdbsToWrite = [...changedQuestionDbs, ...newQuestionDbs] + const existingQdbs = utils.ReadJSON(dbsFilePath) + + const qdbDataToWrite = qdbsToWrite + .reduce((acc, qdb) => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { data, index, ...qdbData } = qdb + const existingQdbData = acc.find((data) => { + return data.name === qdbData.name + }) + if (!existingQdbData) { + return [...acc, qdbData] + } else { + return acc + } + }, existingQdbs) + .map((qdb) => { + if (qdb.path.includes(publicDir)) { + return { + ...qdb, + path: qdb.path.replace(publicDir, ''), + } + } else { + return qdb + } + }) + + utils.WriteFile(JSON.stringify(qdbDataToWrite, null, 2), dbsFilePath) + qdbsToWrite.forEach((qdb) => { + try { + writeData(qdb.data, qdb.path) + } catch (e) { + logger.Log(`Error writing ${qdb.name} qdb to file!`, 'redbg') + console.error(e) + } + }) +} + +async function sendNewDataToWorkers( + mergeResults: MergeResult[], + newQuestionDbs: QuestionDb[] +) { + // FIXME: this might be slow, maybe make a new type of message for workers? + const updatePromises: Promise[] = [] + let newQuestionCount = 0 + let newSubjectCount = 0 + let newQuestionDbCount = 0 + + mergeResults.forEach((mergeRes) => { + if (mergeRes.e) { + logger.Log(`There was an error processing the merge!`, 'redbg') + console.error(mergeRes.e) + return + } + + mergeRes.newData.forEach((subjectWithNewData) => { + newQuestionCount += subjectWithNewData.Questions.length + updatePromises.push( + msgAllWorker({ + type: 'newQuestions', + data: { + subjName: subjectWithNewData.Name, + qdbIndex: mergeRes.localQdbIndex, + newQuestions: subjectWithNewData.Questions, + }, + }) + ) + }) + + newSubjectCount += mergeRes.newSubjects.length + mergeRes.newSubjects.forEach((newSubject) => { + newQuestionCount += newSubject.Questions.length + updatePromises.push( + msgAllWorker({ + type: 'newQuestions', + data: { + subjName: newSubject.Name, + qdbIndex: mergeRes.localQdbIndex, + newQuestions: newSubject.Questions, + }, + }) + ) + }) + }) + newQuestionDbCount += newQuestionDbs.length + newQuestionDbs.forEach((newQdb) => { + const { subjCount: sc, questionCount: qc } = countOfQdb(newQdb) + newSubjectCount += sc + newQuestionCount += qc + msgAllWorker({ + data: newQdb, + type: 'newdb', + }) + }) + + await Promise.all(updatePromises) + + return { + newQuestionDbCount: newQuestionDbCount, + newSubjectCount: newSubjectCount, + newQuestionCount: newQuestionCount, + } +} + +// --------------------------------------------------------------------------------------------- +// Syncing +// --------------------------------------------------------------------------------------------- + +export async function syncQuestions({ + questionData, + syncStart, + getQuestionDbs, + setQuestionDbs, + selfInfo, + dbsFile, +}: SyncQuestionsProps): Promise { + logger.Log('Syncing questions...') + const recievedDataCounts: (number | string)[][] = [] + // all results statistics + const resultsCount: { + [key: string]: { + newQuestionDbs?: number + newSubjects?: number + newQuestions?: number + } + } = {} + + const resultDataWithoutEmptyDbs: (SyncDataResult['questions'] & { + peer: PeerInfo + })[] = [] + questionData.forEach((res) => { + const qdbCount = res.questionDbs.length + const { subjCount, questionCount } = countOfQdbs(res.questionDbs) + + recievedDataCounts.push([ + peerToString(res.peer), + qdbCount, + subjCount, + questionCount, + ]) + + if (questionCount > 0) { + resultDataWithoutEmptyDbs.push(res) + } else { + updatePeersFile(res.peer, { + lastQuestionsSync: syncStart, + }) + } + }) + + const resultData = resultDataWithoutEmptyDbs.map((res) => { + return { + ...res, + questionDbs: res.questionDbs.map((qdb) => { + return setupQuestionsForMerge(qdb, res.peer) + }), + } + }) + + const hasNewData = resultData.length > 0 + if (!hasNewData) { + logger.Log( + `No peers returned any new questions. Question sync successfully finished!`, + 'green' + ) + updateLastSync(selfInfo, syncStart) + return { + msg: 'No peers returned any new questions', + } + } + + logger.Log(`\tRecieved data from peers:`) + logger.logTable( + [['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts], + { + colWidth: [20], + rowPrefix: '\t', + } + ) + + // ------------------------------------------------------------------------------------------------------- + // backup + // ------------------------------------------------------------------------------------------------------- + const { subjCount: oldSubjCount, questionCount: oldQuestionCount } = + countOfQdbs(getQuestionDbs()) + const oldQuestionDbCount = getQuestionDbs().length + backupData(getQuestionDbs()) + logger.Log('\tOld data backed up!') + + // ------------------------------------------------------------------------------------------------------- + // adding questions to db + // ------------------------------------------------------------------------------------------------------- + for (let i = 0; i < resultData.length; i++) { + const { questionDbs: remoteQuestionDbs, peer } = resultData[i] + logger.Log( + `\tProcessing result from "${logger.C('blue')}${peerToString( + peer + )}${logger.C()}" (${logger.C('green')}${ + resultData.length + }${logger.C()}/${logger.C('green')}${i + 1}${logger.C()})` + ) + // FIXME: if remoteQuestionDbs contain multiple dbs with the same name, then the merging + // process could get wonky. Ideally it should not contain, but we will see + + const { rawNewQuestionDbs, mergeResults } = await getMergeResults( + remoteQuestionDbs, + getQuestionDbs + ) + + const newQuestionDbs = updateQdbForLocalUse( + rawNewQuestionDbs, + getQuestionDbs + ) + + const { mergedQuestionDbs, changedQdbIndexes } = mergeQdbs( + getQuestionDbs(), + mergeResults + ) + // setting new index & path + writeNewData( + newQuestionDbs, + getQuestionDbs().filter((qdb) => { + return changedQdbIndexes.includes(qdb.index) + }), + dbsFile + ) + + setQuestionDbs([...mergedQuestionDbs, ...newQuestionDbs]) + + const { newQuestionDbCount, newSubjectCount, newQuestionCount } = + await sendNewDataToWorkers(mergeResults, newQuestionDbs) + + resultsCount[peerToString(peer)] = { + ...(resultsCount[peerToString(peer)] || { newUsers: 0 }), + newQuestionDbs: newQuestionDbCount, + newSubjects: newSubjectCount, + newQuestions: newQuestionCount, + } + // Processing result data is successfull + updatePeersFile(peer, { + lastQuestionsSync: syncStart, + }) + } + + // ------------------------------------------------------------------------------------------------------- + updateLastSync(selfInfo, syncStart) + + const newQdb = getQuestionDbs() + const { subjCount: newSubjCount, questionCount: newQuestionCount } = + countOfQdbs(newQdb) + const newQuestionDbCount = newQdb.length + + const resultsTable = Object.entries(resultsCount).map(([key, value]) => { + return [ + key.length > 14 ? key.substring(0, 14) + '...' : key, + value.newQuestionDbs, + value.newSubjects, + value.newQuestions, + ] + }) + + const sumNewCount = (key: string) => { + return Object.values(resultsCount).reduce( + (acc, val) => acc + val[key], + 0 + ) + } + + const totalNewQuestions = sumNewCount('newQuestions') + const totalNewSubjects = sumNewCount('newSubjects') + const totalNewQdbs = sumNewCount('newQuestionDbs') + + logger.logTable( + [ + ['', 'QDBs', 'Subjs', 'Questions'], + ['Old', oldQuestionDbCount, oldSubjCount, oldQuestionCount], + ...resultsTable, + ['Added total', totalNewQdbs, totalNewSubjects, totalNewQuestions], + ['Final', newQuestionDbCount, newSubjCount, newQuestionCount], + ], + { colWidth: [20], rowPrefix: '\t' } + ) + + logger.Log(`Successfully synced questions!`, 'green') + + return { + old: { + questionDbs: oldQuestionDbCount, + subjects: oldSubjCount, + questions: oldQuestionCount, + }, + added: { + questionDbs: totalNewQdbs, + subjects: totalNewSubjects, + questions: totalNewQuestions, + }, + final: { + questionDbs: newQuestionDbCount, + subjects: newSubjCount, + questions: newQuestionCount, + }, + } +} diff --git a/src/modules/api/p2p/userFiles.ts b/src/modules/api/p2p/userFiles.ts new file mode 100644 index 0000000..c4a1d53 --- /dev/null +++ b/src/modules/api/p2p/userFiles.ts @@ -0,0 +1,295 @@ +import path from 'node:path' +import { paths } from '../../../utils/files' +import utils from '../../../utils/utils' +import { UserDirDataFile } from '../submodules/userFiles' +import { + SyncDataResult, + SyncResponseBase, + SyncResult, + peerToString, + updatePeersFile, +} from './p2putils' +import constants from '../../../constants' +import { PeerInfo } from '../../../types/basicTypes' +import { downloadFile } from '../../../utils/networkUtils' +import logger from '../../../utils/logger' + +interface UserFileToGet { + fileName: string + dir: string + filePath: string + data: UserDirDataFile + peer: PeerInfo +} + +export interface NewUserFilesRequestBody { + host: string + newFiles: { + [key: string]: { + // key: dir + [key: string]: UserDirDataFile // key: file name + } + } +} + +// --------------------------------------------------------------------------------------------- +// Getting +// --------------------------------------------------------------------------------------------- + +export function getUserFiles(since: number): SyncResponseBase & { + newFiles: { + [key: string]: { + [key: string]: UserDirDataFile + } + } +} { + const newFiles: SyncDataResult['userFiles']['newFiles'] = {} + + const dirs = utils.ReadDir(paths.userFilesDir) + dirs.forEach((dir) => { + const userDirPath = path.join(paths.userFilesDir, dir) + const dataFilePath = path.join( + userDirPath, + constants.userFilesDataFileName + ) + + if (!utils.FileExists(dataFilePath)) { + return + } + const dataFile = + utils.ReadJSON>(dataFilePath) + Object.entries(dataFile).forEach(([fileName, data]) => { + const mtime = utils.statFile(path.join(userDirPath, fileName)).mtime + if (mtime.getTime() >= since) { + if (!newFiles[dir]) { + newFiles[dir] = {} + } + newFiles[dir][fileName] = data + } + }) + }) + + return { success: true, newFiles: newFiles } +} + +function setupFilesToGet( + newFiles: SyncDataResult['userFiles']['newFiles'], + peer: PeerInfo +): UserFileToGet[] { + const filesToGet: UserFileToGet[] = [] + Object.entries(newFiles).forEach(([dirName, userFilesDir]) => { + Object.entries(userFilesDir).forEach(([fileName, data]) => { + filesToGet.push({ + fileName: fileName, + dir: dirName, + filePath: path.join(paths.userFilesDir, dirName, fileName), + data: data, + peer: peer, + }) + }) + }) + + return filesToGet +} + +async function downloadUserFiles(filesToGet: UserFileToGet[]) { + let addedFiles = 0 + for (const fileToGet of filesToGet) { + const { peer, dir, fileName, filePath, data } = fileToGet + + try { + await downloadFile( + { + host: peer.host, + port: peer.port, + path: `/api/userFiles/${dir}/${fileName}`, + }, + filePath, + peer.http + ) + + const dataFilePath = path.join( + paths.userFilesDir, + dir, + constants.userFilesDataFileName + ) + if (!utils.FileExists(dataFilePath)) { + utils.WriteFile(JSON.stringify({}), dataFilePath) + } + const dataFile = utils.ReadJSON<{ + [key: string]: UserDirDataFile + }>(dataFilePath) + + if (dataFile[fileName]) { + // dataFile[fileName].views += data.views // views are not unique + dataFile[fileName].upvotes = dataFile[fileName].upvotes + ? dataFile[fileName].upvotes + .concat(data.upvotes) + .reduce((acc, x) => { + if (acc.includes(x)) return acc + return [...acc, x] + }, []) + : [] + dataFile[fileName].downvotes = dataFile[fileName].downvotes + ? dataFile[fileName].downvotes + .concat(data.downvotes) + .reduce((acc, x) => { + if (acc.includes(x)) return acc + return [...acc, x] + }, []) + : [] + } else { + dataFile[fileName] = data + } + + utils.WriteFile(JSON.stringify(dataFile), dataFilePath) + addedFiles += 1 + } catch (e) { + logger.Log(`Unable to download "${fileName}": ${e.message}`) + console.error(e) + } + } + + return addedFiles +} + +// --------------------------------------------------------------------------------------------- +// Adding new +// --------------------------------------------------------------------------------------------- + +export async function handleNewUserFiles( + props: NewUserFilesRequestBody & { peers: PeerInfo[] } +): Promise<{ + success: boolean + addedFileCount?: number + message?: string +}> { + const result = await addNewUserFiles(props) + + if (!result.success) { + logger.Log( + `Error while adding new user files: "${result.message}", from host: "${props.host}"`, + 'yellowbg' + ) + } + + return result +} + +async function addNewUserFiles({ + newFiles, + host, + peers, +}: NewUserFilesRequestBody & { peers: PeerInfo[] }): Promise<{ + success: boolean + addedFileCount?: number + message?: string +}> { + if (!newFiles || !host) { + return { + success: false, + message: 'newFiles or host key are missing from body', + } + } + + const remotePeerInfo = peers.find((peer) => { + return peerToString(peer) === host + }) + + if (!remotePeerInfo) { + return { + success: false, + message: "couldn't find remote peer info based on host", + } + } + + try { + const filesToGet = setupFilesToGet(newFiles, remotePeerInfo) + const addedFileCount = await downloadUserFiles(filesToGet) + + logger.Log( + `\tAdded ${logger.C( + 'blue' + )}${addedFileCount}${logger.C()} new files from ${logger.C( + 'blue' + )}${peerToString(remotePeerInfo)}${logger.C()}` + ) + + return { success: true, addedFileCount: addedFileCount } + } catch (e) { + return { success: false, message: e.message } + } +} + +// --------------------------------------------------------------------------------------------- +// Syncing +// --------------------------------------------------------------------------------------------- + +export async function syncUserFiles( + newData: (SyncDataResult['userFiles'] & { peer: PeerInfo })[], + syncStart: number +): Promise { + logger.Log('Syncing user files...') + + const recievedUserFilesCount: (string | number)[][] = [] + let totalRecievedd = 0 + newData.forEach((res) => { + const count = Object.values(res.newFiles).reduce((acc, data) => { + totalRecievedd += Object.keys(data).length + return acc + Object.keys(data).length + }, 0) + recievedUserFilesCount.push([peerToString(res.peer), count]) + }) + + if (totalRecievedd === 0) { + logger.Log( + `No peers returned any new files. User file sync successfully finished!`, + 'green' + ) + return { + old: { + userFiles: 0, + }, + added: { + userFiles: 0, + }, + final: { + userFiles: 0, + }, + } + } + + logger.logTable([['', 'Files'], ...recievedUserFilesCount], { + colWidth: [20], + rowPrefix: '\t', + }) + + const filesToGet: UserFileToGet[] = [] + newData.forEach((res) => { + filesToGet.push(...setupFilesToGet(res.newFiles, res.peer)) + }) + + const addedFiles = await downloadUserFiles(filesToGet) + + newData.forEach((res) => { + updatePeersFile(res.peer, { + lastUserFilesSync: syncStart, + }) + }) + + logger.Log( + `Successfully synced user files! Added ${addedFiles} files`, + 'green' + ) + return { + old: { + userFiles: 0, + }, + added: { + userFiles: addedFiles, + }, + final: { + userFiles: 0, + }, + } +} diff --git a/src/modules/api/p2p/users.ts b/src/modules/api/p2p/users.ts new file mode 100644 index 0000000..1b041a7 --- /dev/null +++ b/src/modules/api/p2p/users.ts @@ -0,0 +1,270 @@ +import { Database } from 'better-sqlite3' +import dbtools from '../../../utils/dbtools' +import { PeerInfo, User } from '../../../types/basicTypes' +import { decrypt, encrypt } from '../../../utils/encryption' +import { + SyncDataResult, + SyncResponseBase, + SyncResult, + peerToString, + updatePeersFile, +} from './p2putils' +import logger from '../../../utils/logger' + +interface SyncUsersProps { + userData: (SyncDataResult['users'] & { peer: PeerInfo })[] + syncStart: number + userDB: Database + privateKey: string +} + +interface HandleNewUsersProps { + encryptedNewUsers: string + host: string + peers: PeerInfo[] + privateKey: string + userDB: Database +} + +// --------------------------------------------------------------------------------------------- +// Getting +// --------------------------------------------------------------------------------------------- + +export function getNewUsersSince(since: number, userDB: Database): User[] { + const users: User[] = dbtools.runStatement( + userDB, + `SELECT * + FROM users + WHERE created >= ${since} + AND id != 1 + AND isAdmin is null + ;` + ) + return users +} + +export function getUsers( + remoteHost: string, + remotePeerInfo: PeerInfo, + usersSince: number, + userDB: Database +): SyncResponseBase & { + encryptedUsers?: string + sentUsers?: number +} { + let sentUsers = 0 + if (!remoteHost) { + return { + success: false, + message: + 'remoteHost key is missing from body. Users will not be sent', + } + } + if (!remotePeerInfo) { + return { + success: false, + message: `couldn't find remote peer info based on remoteHost (${remoteHost}). Users will not be sent`, + } + } + + const remotePublicKey = remotePeerInfo.publicKey + if (remotePublicKey) { + // FIXME: sign data? + const newUsers = getNewUsersSince(usersSince, userDB) + sentUsers = newUsers.length + const encryptedUsers = encrypt( + remotePublicKey, + JSON.stringify(newUsers) + ) + + return { + success: true, + encryptedUsers: encryptedUsers, + sentUsers: sentUsers, + } + } else if (remotePeerInfo) { + return { + success: false, + message: `Warning: "${peerToString( + remotePeerInfo + )}" has no public key saved! Users will not be sent`, + } + } + return { success: false, message: 'this shouldt be a case lol' } +} + +// --------------------------------------------------------------------------------------------- +// Adding new +// --------------------------------------------------------------------------------------------- + +export function handleNewUsers(props: HandleNewUsersProps): { + success: boolean + addedUserCount?: number + message?: string +} { + const result = addNewUsers(props) + + if (!result.success) { + logger.Log( + `Error while adding new users file: "${result.message}", from host: "${props.host}"`, + 'yellowbg' + ) + } + + return result +} + +function addNewUsers({ + encryptedNewUsers, + host, + peers, + privateKey, + userDB, +}: HandleNewUsersProps): { + success: boolean + addedUserCount?: number + message?: string +} { + if (!encryptedNewUsers || !host) { + return { + success: false, + message: 'encryptedNewUsers or host key are missing from body', + } + } + + const remotePeerInfo = peers.find((peer) => { + return peerToString(peer) === host + }) + + if (!remotePeerInfo) { + return { + success: false, + message: "couldn't find remote peer info based on host", + } + } + + const decryptedUsers: User[] = JSON.parse( + decrypt(privateKey, encryptedNewUsers) + ) + + const addedUserCount = addUsersToDb(decryptedUsers, userDB, { + sourceHost: peerToString(remotePeerInfo), + }) + + if (addedUserCount > 0) { + logger.Log( + `\tAdded ${addedUserCount} new users from "${peerToString( + remotePeerInfo + )}"`, + 'cyan' + ) + } + + return { success: true, addedUserCount: addedUserCount } +} + +// --------------------------------------------------------------------------------------------- +// Syncing utils +// --------------------------------------------------------------------------------------------- + +function addUsersToDb( + users: User[], + userDB: Database, + extraProps: Partial +) { + let addedUserCount = 0 + users.forEach((remoteUser) => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { id, ...remoteUserWithoutId } = remoteUser + const localUser = dbtools.Select(userDB, 'users', { + pw: remoteUser.pw, + }) + if (localUser.length === 0) { + addedUserCount += 1 + // FIXME: users will not have consistend id across servers. This may be + // harmless, will see + dbtools.Insert(userDB, 'users', { + ...(remoteUserWithoutId as Omit), + ...extraProps, + }) + } + }) + return addedUserCount +} + +// --------------------------------------------------------------------------------------------- +// Syncing +// --------------------------------------------------------------------------------------------- + +export async function syncUsers({ + userData, + syncStart, + userDB, + privateKey, +}: SyncUsersProps): Promise { + logger.Log('Syncing users...') + let totalRecievedUsers = 0 + const resultsCount: { + [key: string]: { + newUsers?: number + } + } = {} + const oldUserCount = dbtools.SelectAll(userDB, 'users').length + + try { + userData.forEach((res) => { + if (res.encryptedUsers) { + const decryptedUsers: User[] = JSON.parse( + decrypt(privateKey, res.encryptedUsers) + ) + const addedUserCount = addUsersToDb(decryptedUsers, userDB, { + sourceHost: peerToString(res.peer), + }) + resultsCount[peerToString(res.peer)] = { + newUsers: addedUserCount, + } + totalRecievedUsers += decryptedUsers.length + updatePeersFile(res.peer, { + lastUsersSync: syncStart, + }) + } + }) + } catch (e) { + logger.Log('\tError while trying to sync users: ' + e.message, 'redbg') + console.error(e) + } + const newUserCount = dbtools.SelectAll(userDB, 'users').length + + if (totalRecievedUsers === 0) { + logger.Log( + `No peers returned any new users. User sync successfully finished!`, + 'green' + ) + } else { + logger.logTable( + [ + ['', 'Users'], + ['Old', oldUserCount], + ...Object.entries(resultsCount).map(([key, result]) => { + return [key, result.newUsers] + }), + ['Added total', newUserCount - oldUserCount], + ['Final', newUserCount], + ], + { colWidth: [20], rowPrefix: '\t' } + ) + logger.Log(`Successfully synced users!`, 'green') + } + + return { + old: { + users: oldUserCount, + }, + added: { + users: newUserCount - oldUserCount, + }, + final: { + users: newUserCount, + }, + } +} diff --git a/src/modules/api/submodules/p2p.ts b/src/modules/api/submodules/p2p.ts index 0913e15..c27f5e8 100644 --- a/src/modules/api/submodules/p2p.ts +++ b/src/modules/api/submodules/p2p.ts @@ -26,340 +26,32 @@ import { SubmoduleData, Submodule, PeerInfo, - Subject, - QuestionDb, - User, - DataFile, } from '../../../types/basicTypes' import utils from '../../../utils/utils' -import { backupData, writeData } from '../../../utils/actions' import dbtools from '../../../utils/dbtools' +import { createKeyPair, isKeypairValid } from '../../../utils/encryption' +import { countOfQdbs } from '../../../utils/qdbUtils' +import { files, paths, readAndValidateFile } from '../../../utils/files' +import { GetResult, get } from '../../../utils/networkUtils' +import { setPendingJobsAlertCount } from '../../../worker/workerPool' import { - createKeyPair, - decrypt, - encrypt, - isKeypairValid, -} from '../../../utils/encryption' -import { - countOfQdb, - countOfQdbs, - createQuestion, - getAvailableQdbIndexes, - removeCacheFromQuestion, -} from '../../../utils/qdbUtils' -import { - files, - paths, - publicDir, - readAndValidateFile, -} from '../../../utils/files' -import { GetResult, downloadFile, get } from '../../../utils/networkUtils' -import { - msgAllWorker, - queueWork, - setPendingJobsAlertCount, -} from '../../../worker/workerPool' -import { WorkerResult } from '../../../worker/worker' -import { + RemotePeerInfo, + SyncDataResult, + SyncResult, isPeerSameAs, loginToPeer, peerToString, updatePeersFile, -} from '../../../utils/p2putils' -import { Database } from 'better-sqlite3' -import constants from '../../../constants' -import path from 'node:path' -import { UserDirDataFile } from './userFiles' - -interface MergeResult { - newData: Subject[] - newSubjects: Subject[] - localQdbIndex: number - e: Error -} - -interface RemotePeerInfo { - selfInfo: PeerInfo - myPeers: PeerInfo[] - serverRevision?: string - scriptRevision?: string - qminingPageRevision?: string - dataEditorRevision?: string - serverLastCommitDate?: number - scriptLastCommitDate?: number - qminingPageLastCommitDate?: number - dataEditorLastCommitDate?: number - serverBuildTime?: number - qminingPageBuildTime?: number - dataEditorBuildTime?: number - scriptVersion?: string - userCount?: number - qdbInfo?: { - questionDbCount: number - subjectCount: number - questionCount: number - } -} - -interface SyncResult { - old?: { [key: string]: number } - added?: { [key: string]: number } - final?: { [key: string]: number } - msg?: string -} - -interface SyncDataResult { - remoteInfo?: RemotePeerInfo - users?: { - encryptedUsers: string - } - questions?: { - questionDbs: QuestionDb[] - count: { - qdbs: number - subjects: number - questions: number - } - } - userFiles?: { - newFiles: { - [key: string]: { - [key: string]: UserDirDataFile - } - } - } -} - -function updateThirdPartyPeers( - newVal: Omit[] -) { - const prevVal = utils.FileExists(paths.thirdPartyPeersFile) - ? utils.ReadJSON(paths.thirdPartyPeersFile) - : [] - - const dataToWrite = newVal.reduce((acc, peer) => { - const isIncluded = acc.find((x) => { - return peerToString(x) === peerToString(peer) - }) - if (!isIncluded) { - return [...acc, peer] - } - return acc - }, prevVal) - - utils.WriteFile( - JSON.stringify(dataToWrite, null, 2), - paths.thirdPartyPeersFile - ) -} - -export function getNewDataSince(subjects: Subject[], date: number): Subject[] { - return subjects - .map((subject) => { - return { - ...subject, - Questions: subject.Questions.filter((question) => { - return (question.data.date || 0) >= date - }).map((question) => removeCacheFromQuestion(question)), - } - }) - .filter((subject) => subject.Questions.length !== 0) -} - -export function mergeSubjects( - subjectsToMergeTo: Subject[], - subjectsToMerge: Subject[], - newSubjects: Subject[] -): Subject[] { - return [ - ...subjectsToMergeTo.map((subj) => { - const newSubjs = subjectsToMerge.filter( - (subjRes) => subjRes.Name === subj.Name - ) - - if (newSubjs) { - const newQuestions = newSubjs.flatMap((subj) => { - return subj.Questions - }) - - return { - ...subj, - Questions: [...subj.Questions, ...newQuestions], - } - } else { - return subj - } - }), - ...newSubjects, - ] -} - -export function mergeQdbs( - qdbToMergeTo: QuestionDb[], - mergeResults: MergeResult[] -): { mergedQuestionDbs: QuestionDb[]; changedQdbIndexes: number[] } { - const changedQdbIndexes: number[] = [] - const mergedQuestionDbs = qdbToMergeTo.map((qdb) => { - const qdbMergeResult = mergeResults.find( - (mergeRes) => mergeRes.localQdbIndex === qdb.index - ) - if ( - qdbMergeResult && - (qdbMergeResult.newData.length > 0 || - qdbMergeResult.newSubjects.length > 0) - ) { - const mergedQdb = { - ...qdb, - data: mergeSubjects( - qdb.data, - qdbMergeResult.newData, - qdbMergeResult.newSubjects - ), - } - changedQdbIndexes.push(qdb.index) - return mergedQdb - } else { - // unchanged - return qdb - } - }) - return { - mergedQuestionDbs: mergedQuestionDbs, - changedQdbIndexes: changedQdbIndexes, - } -} - -async function sendNewDataToWorkers( - mergeResults: MergeResult[], - newQuestionDbs: QuestionDb[] -) { - // FIXME: this might be slow, maybe make a new type of message for workers? - const updatePromises: Promise[] = [] - let newQuestionCount = 0 - let newSubjectCount = 0 - let newQuestionDbCount = 0 - - mergeResults.forEach((mergeRes) => { - if (mergeRes.e) { - logger.Log(`There was an error processing the merge!`, 'redbg') - console.error(mergeRes.e) - return - } - - mergeRes.newData.forEach((subjectWithNewData) => { - newQuestionCount += subjectWithNewData.Questions.length - updatePromises.push( - msgAllWorker({ - type: 'newQuestions', - data: { - subjName: subjectWithNewData.Name, - qdbIndex: mergeRes.localQdbIndex, - newQuestions: subjectWithNewData.Questions, - }, - }) - ) - }) - - newSubjectCount += mergeRes.newSubjects.length - mergeRes.newSubjects.forEach((newSubject) => { - newQuestionCount += newSubject.Questions.length - updatePromises.push( - msgAllWorker({ - type: 'newQuestions', - data: { - subjName: newSubject.Name, - qdbIndex: mergeRes.localQdbIndex, - newQuestions: newSubject.Questions, - }, - }) - ) - }) - }) - newQuestionDbCount += newQuestionDbs.length - newQuestionDbs.forEach((newQdb) => { - const { subjCount: sc, questionCount: qc } = countOfQdb(newQdb) - newSubjectCount += sc - newQuestionCount += qc - msgAllWorker({ - data: newQdb, - type: 'newdb', - }) - }) - - await Promise.all(updatePromises) - - return { - newQuestionDbCount: newQuestionDbCount, - newSubjectCount: newSubjectCount, - newQuestionCount: newQuestionCount, - } -} - -function writeNewData( - newQuestionDbs: QuestionDb[], - changedQuestionDbs: QuestionDb[], - dbsFilePath: string -) { - const qdbsToWrite = [...changedQuestionDbs, ...newQuestionDbs] - const existingQdbs = utils.ReadJSON(dbsFilePath) - - const qdbDataToWrite = qdbsToWrite - .reduce((acc, qdb) => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { data, index, ...qdbData } = qdb - const existingQdbData = acc.find((data) => { - return data.name === qdbData.name - }) - if (!existingQdbData) { - return [...acc, qdbData] - } else { - return acc - } - }, existingQdbs) - .map((qdb) => { - if (qdb.path.includes(publicDir)) { - return { - ...qdb, - path: qdb.path.replace(publicDir, ''), - } - } else { - return qdb - } - }) - - utils.WriteFile(JSON.stringify(qdbDataToWrite, null, 2), dbsFilePath) - qdbsToWrite.forEach((qdb) => { - try { - writeData(qdb.data, qdb.path) - } catch (e) { - logger.Log(`Error writing ${qdb.name} qdb to file!`, 'redbg') - console.error(e) - } - }) -} - -function updateLastSync(selfInfo: PeerInfo, newDate: number) { - utils.WriteFile( - JSON.stringify({ ...selfInfo, lastSync: newDate }, null, 2), - paths.selfInfoFile - ) -} - -function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) { - return { - ...qdb, - data: qdb.data.map((subj) => { - return { - ...subj, - Questions: subj.Questions.map((q) => { - const initializedQuestion = q.cache ? q : createQuestion(q) - initializedQuestion.data.source = peerToString(peer) - return initializedQuestion - }), - } - }), - } -} + updateThirdPartyPeers, +} from '../p2p/p2putils' +import { getQuestions, syncQuestions } from '../p2p/questions' +import { getUsers, handleNewUsers, syncUsers } from '../p2p/users' +import { + NewUserFilesRequestBody, + getUserFiles, + handleNewUserFiles, + syncUserFiles, +} from '../p2p/userFiles' async function authAndGetNewData({ peer, @@ -461,59 +153,24 @@ async function authAndGetNewData({ } } -function addUsersToDb( - users: User[], - userDB: Database, - extraProps: Partial -) { - let addedUserCount = 0 - users.forEach((remoteUser) => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { id, ...remoteUserWithoutId } = remoteUser - const localUser = dbtools.Select(userDB, 'users', { - pw: remoteUser.pw, - }) - if (localUser.length === 0) { - addedUserCount += 1 - // FIXME: users will not have consistend id across servers. This may be - // harmless, will see - dbtools.Insert(userDB, 'users', { - ...(remoteUserWithoutId as Omit), - ...extraProps, - }) - } - }) - return addedUserCount -} - -function getNewUserFilesSince(since: number) { - const newData: SyncDataResult['userFiles']['newFiles'] = {} - - const dirs = utils.ReadDir(paths.userFilesDir) - dirs.forEach((dir) => { - const userDirPath = path.join(paths.userFilesDir, dir) - const dataFilePath = path.join( - userDirPath, - constants.userFilesDataFileName - ) - - if (!utils.FileExists(dataFilePath)) { - return - } - const dataFile = - utils.ReadJSON>(dataFilePath) - Object.entries(dataFile).forEach(([fileName, data]) => { - const mtime = utils.statFile(path.join(userDirPath, fileName)).mtime - if (mtime.getTime() >= since) { - if (!newData[dir]) { - newData[dir] = {} - } - newData[dir][fileName] = data - } - }) - }) - - return newData +function handleNewThirdPartyPeer(remoteHost: string) { + logger.Log( + 'Couldn\'t find remote peer info based on remoteHost: "' + + remoteHost + + '". This could mean that the host uses this server as peer, but this server does not ' + + 'use it as a peer.', + 'yellowbg' + ) + if (remoteHost.includes(':')) { + const [host, port] = remoteHost.split(':') + updateThirdPartyPeers([ + { + host: host, + port: +port, + }, + ]) + logger.Log('Host info written to host info file') + } } function setup(data: SubmoduleData): Submodule { @@ -661,64 +318,6 @@ function setup(data: SubmoduleData): Submodule { return result } - function getNewUsersSince(since: number) { - const users: User[] = dbtools.runStatement( - userDB, - `SELECT * - FROM users - WHERE created >= ${since} - AND id != 1 - AND isAdmin is null - ;` - ) - return users - } - - function updateQdbForLocalUse(qdb: QuestionDb[]) { - const availableIndexes = getAvailableQdbIndexes( - getQuestionDbs(), - qdb.length - ) - return qdb.map((qdb, i) => { - return { - ...qdb, - index: availableIndexes[i], - path: `${publicDir}questionDbs/${qdb.name}.json`, - } - }) - } - - async function getMergeResults(remoteQuestionDbs: QuestionDb[]) { - const mergeJobs: Promise[] = [] - const rawNewQuestionDbs: QuestionDb[] = [] - remoteQuestionDbs.forEach((remoteQdb) => { - const localQdb = getQuestionDbs().find( - (lqdb) => lqdb.name === remoteQdb.name - ) - - if (!localQdb) { - rawNewQuestionDbs.push(remoteQdb) - } else { - mergeJobs.push( - queueWork({ - type: 'merge', - data: { - localQdbIndex: localQdb.index, - remoteQdb: remoteQdb, - }, - }) - ) - } - }) - - const mergeResults: MergeResult[] = await Promise.all(mergeJobs) - - return { - mergeResults: mergeResults, - rawNewQuestionDbs: rawNewQuestionDbs, - } - } - async function syncData({ shouldSync, allTime, @@ -855,23 +454,60 @@ function setup(data: SubmoduleData): Submodule { // data syncing // ------------------------------------------------------------------------------------------------------- - const getData = (key: T) => { - return resultDataWithoutErrors - .map((x) => ({ ...x.data[key], peer: x.peer })) - .filter((x) => Object.keys(x).length > 1) + const getData = >( + key: T + ) => { + let data = resultDataWithoutErrors.map((x) => ({ + ...x.data[key], + peer: x.peer, + })) + + data.forEach((x) => { + if (!x.success) { + logger.Log( + `Error syncing "${key}" with ${peerToString( + x.peer + )}: "${x.message}"`, + 'yellowbg' + ) + } + }) + + if ((!data || data.length === 0) && (shouldSync[key] || syncAll)) { + logger.Log( + `"${key}" data was requested, but not received!`, + 'yellowbg' + ) + } + + data = data.filter((x) => x.success) + + return data } const syncResults: SyncResult[] = [] const questionData = getData('questions') if (questionData && questionData.length > 0) { - const res = await syncQuestions(questionData, syncStart) + const res = await syncQuestions({ + questionData: questionData, + syncStart: syncStart, + getQuestionDbs: getQuestionDbs, + setQuestionDbs: setQuestionDbs, + selfInfo: selfInfo, + dbsFile: dbsFile, + }) syncResults.push(res) } const userData = getData('users') if (userData && userData.length > 0) { - const res = await syncUsers(userData, syncStart) + const res = await syncUsers({ + userData: userData, + syncStart: syncStart, + userDB: userDB, + privateKey: privateKey, + }) syncResults.push(res) } @@ -881,6 +517,8 @@ function setup(data: SubmoduleData): Submodule { syncResults.push(res) } + logger.Log('Sync finished', 'green') + return syncResults.reduce( (acc, x) => { return { @@ -893,438 +531,6 @@ function setup(data: SubmoduleData): Submodule { ) } - async function syncUsers( - userData: (SyncDataResult['users'] & { peer: PeerInfo })[], - syncStart: number - ): Promise { - logger.Log('Syncing users...') - let totalRecievedUsers = 0 - const resultsCount: { - [key: string]: { - newUsers?: number - } - } = {} - const oldUserCount = dbtools.SelectAll(userDB, 'users').length - - try { - userData.forEach((res) => { - if (res.encryptedUsers) { - const decryptedUsers: User[] = JSON.parse( - decrypt(privateKey, res.encryptedUsers) - ) - const addedUserCount = addUsersToDb( - decryptedUsers, - userDB, - { - sourceHost: peerToString(res.peer), - } - ) - resultsCount[peerToString(res.peer)] = { - newUsers: addedUserCount, - } - totalRecievedUsers += decryptedUsers.length - updatePeersFile(res.peer, { - lastUsersSync: syncStart, - }) - } - }) - } catch (e) { - logger.Log( - '\tError while trying to sync users: ' + e.message, - 'redbg' - ) - console.error(e) - } - const newUserCount = dbtools.SelectAll(userDB, 'users').length - - if (totalRecievedUsers === 0) { - logger.Log( - `No peers returned any new users. User sync successfully finished!`, - 'green' - ) - } else { - logger.logTable( - [ - ['', 'Users'], - ['Old', oldUserCount], - ...Object.entries(resultsCount).map(([key, result]) => { - return [key, result.newUsers] - }), - ['Added total', newUserCount - oldUserCount], - ['Final', newUserCount], - ], - { colWidth: [20], rowPrefix: '\t' } - ) - logger.Log(`Successfully synced users!`, 'green') - } - - return { - old: { - users: oldUserCount, - }, - added: { - users: newUserCount - oldUserCount, - }, - final: { - users: newUserCount, - }, - } - } - - async function syncQuestions( - questionData: (SyncDataResult['questions'] & { peer: PeerInfo })[], - syncStart: number - ): Promise { - logger.Log('Syncing questions...') - const recievedDataCounts: (number | string)[][] = [] - // all results statistics - const resultsCount: { - [key: string]: { - newQuestionDbs?: number - newSubjects?: number - newQuestions?: number - } - } = {} - - const resultDataWithoutEmptyDbs: (SyncDataResult['questions'] & { - peer: PeerInfo - })[] = [] - questionData.forEach((res) => { - const qdbCount = res.questionDbs.length - const { subjCount, questionCount } = countOfQdbs(res.questionDbs) - - recievedDataCounts.push([ - peerToString(res.peer), - qdbCount, - subjCount, - questionCount, - ]) - - if (questionCount > 0) { - resultDataWithoutEmptyDbs.push(res) - } else { - updatePeersFile(res.peer, { - lastQuestionsSync: syncStart, - }) - } - }) - - const resultData = resultDataWithoutEmptyDbs.map((res) => { - return { - ...res, - questionDbs: res.questionDbs.map((qdb) => { - return setupQuestionsForMerge(qdb, res.peer) - }), - } - }) - - const hasNewData = resultData.length > 0 - if (!hasNewData) { - logger.Log( - `No peers returned any new questions. Question sync successfully finished!`, - 'green' - ) - updateLastSync(selfInfo, syncStart) - return { - msg: 'No peers returned any new questions', - } - } - - logger.Log(`\tRecieved data from peers:`) - logger.logTable( - [['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts], - { - colWidth: [20], - rowPrefix: '\t', - } - ) - - // ------------------------------------------------------------------------------------------------------- - // backup - // ------------------------------------------------------------------------------------------------------- - const { subjCount: oldSubjCount, questionCount: oldQuestionCount } = - countOfQdbs(getQuestionDbs()) - const oldQuestionDbCount = getQuestionDbs().length - backupData(getQuestionDbs()) - logger.Log('\tOld data backed up!') - - // ------------------------------------------------------------------------------------------------------- - // adding questions to db - // ------------------------------------------------------------------------------------------------------- - for (let i = 0; i < resultData.length; i++) { - const { questionDbs: remoteQuestionDbs, peer } = resultData[i] - logger.Log( - `\tProcessing result from "${logger.C('blue')}${peerToString( - peer - )}${logger.C()}" (${logger.C('green')}${ - resultData.length - }${logger.C()}/${logger.C('green')}${i + 1}${logger.C()})` - ) - // FIXME: if remoteQuestionDbs contain multiple dbs with the same name, then the merging - // process could get wonky. Ideally it should not contain, but we will see - - const { rawNewQuestionDbs, mergeResults } = await getMergeResults( - remoteQuestionDbs - ) - - const newQuestionDbs = updateQdbForLocalUse(rawNewQuestionDbs) - - const { mergedQuestionDbs, changedQdbIndexes } = mergeQdbs( - getQuestionDbs(), - mergeResults - ) - // setting new index & path - writeNewData( - newQuestionDbs, - getQuestionDbs().filter((qdb) => { - return changedQdbIndexes.includes(qdb.index) - }), - dbsFile - ) - - setQuestionDbs([...mergedQuestionDbs, ...newQuestionDbs]) - - const { newQuestionDbCount, newSubjectCount, newQuestionCount } = - await sendNewDataToWorkers(mergeResults, newQuestionDbs) - - resultsCount[peerToString(peer)] = { - ...(resultsCount[peerToString(peer)] || { newUsers: 0 }), - newQuestionDbs: newQuestionDbCount, - newSubjects: newSubjectCount, - newQuestions: newQuestionCount, - } - // Processing result data is successfull - updatePeersFile(peer, { - lastQuestionsSync: syncStart, - }) - } - - // ------------------------------------------------------------------------------------------------------- - updateLastSync(selfInfo, syncStart) - - const newQdb = getQuestionDbs() - const { subjCount: newSubjCount, questionCount: newQuestionCount } = - countOfQdbs(newQdb) - const newQuestionDbCount = newQdb.length - - const resultsTable = Object.entries(resultsCount).map( - ([key, value]) => { - return [ - key.length > 14 ? key.substring(0, 14) + '...' : key, - value.newQuestionDbs, - value.newSubjects, - value.newQuestions, - ] - } - ) - - const sumNewCount = (key: string) => { - return Object.values(resultsCount).reduce( - (acc, val) => acc + val[key], - 0 - ) - } - - const totalNewQuestions = sumNewCount('newQuestions') - const totalNewSubjects = sumNewCount('newSubjects') - const totalNewQdbs = sumNewCount('newQuestionDbs') - - logger.logTable( - [ - ['', 'QDBs', 'Subjs', 'Questions'], - ['Old', oldQuestionDbCount, oldSubjCount, oldQuestionCount], - ...resultsTable, - [ - 'Added total', - totalNewQdbs, - totalNewSubjects, - totalNewQuestions, - ], - ['Final', newQuestionDbCount, newSubjCount, newQuestionCount], - ], - { colWidth: [20], rowPrefix: '\t' } - ) - - logger.Log(`Successfully synced questions!`, 'green') - - return { - old: { - questionDbs: oldQuestionDbCount, - subjects: oldSubjCount, - questions: oldQuestionCount, - }, - added: { - questionDbs: totalNewQdbs, - subjects: totalNewSubjects, - questions: totalNewQuestions, - }, - final: { - questionDbs: newQuestionDbCount, - subjects: newSubjCount, - questions: newQuestionCount, - }, - } - } - - async function syncUserFiles( - newData: (SyncDataResult['userFiles'] & { peer: PeerInfo })[], - syncStart: number - ): Promise { - logger.Log('Syncing user files...') - - const recievedUserFilesCount: (string | number)[][] = [] - let totalRecievedd = 0 - newData.forEach((res) => { - const count = Object.values(res.newFiles).reduce((acc, data) => { - totalRecievedd += Object.keys(data).length - return acc + Object.keys(data).length - }, 0) - recievedUserFilesCount.push([peerToString(res.peer), count]) - }) - - if (totalRecievedd === 0) { - logger.Log( - `No peers returned any new files. User file sync successfully finished!`, - 'green' - ) - return { - old: { - userFiles: 0, - }, - added: { - userFiles: 0, - }, - final: { - userFiles: 0, - }, - } - } - - logger.logTable([['', 'Files'], ...recievedUserFilesCount], { - colWidth: [20], - rowPrefix: '\t', - }) - - const filesToGet: { - fileName: string - dir: string - filePath: string - data: UserDirDataFile - peer: PeerInfo - }[] = [] - newData.forEach((res) => { - Object.entries(res.newFiles).forEach(([dirName, userFilesDir]) => { - Object.entries(userFilesDir).forEach(([fileName, data]) => { - filesToGet.push({ - fileName: fileName, - dir: dirName, - filePath: path.join( - paths.userFilesDir, - dirName, - fileName - ), - data: data, - peer: res.peer, - }) - }) - }) - }) - - let addedFiles = 0 - for (const fileToGet of filesToGet) { - const { peer, dir, fileName, filePath, data } = fileToGet - - try { - await downloadFile( - { - host: peer.host, - port: peer.port, - path: `/api/userFiles/${dir}/${fileName}`, - }, - filePath, - peer.http - ) - - const dataFilePath = path.join( - paths.userFilesDir, - dir, - constants.userFilesDataFileName - ) - if (!utils.FileExists(dataFilePath)) { - utils.WriteFile(JSON.stringify({}), dataFilePath) - } - const dataFile = utils.ReadJSON<{ - [key: string]: UserDirDataFile - }>(dataFilePath) - - if (dataFile[fileName]) { - // dataFile[fileName].views += data.views // views are not unique - dataFile[fileName].upvotes = dataFile[fileName].upvotes - .concat(data.upvotes) - .reduce((acc, x) => { - if (acc.includes(x)) return acc - return [...acc, x] - }, []) - dataFile[fileName].downvotes = dataFile[fileName].downvotes - .concat(data.downvotes) - .reduce((acc, x) => { - if (acc.includes(x)) return acc - return [...acc, x] - }, []) - } else { - dataFile[fileName] = data - } - - utils.WriteFile(JSON.stringify(dataFile), dataFilePath) - addedFiles += 1 - } catch (e) { - logger.Log(`Unable to download "${fileName}": ${e.message}`) - } - } - - newData.forEach((res) => { - updatePeersFile(res.peer, { - lastUserFilesSync: syncStart, - }) - }) - - logger.Log( - `Successfully synced user files! Added ${addedFiles} files`, - 'green' - ) - return { - old: { - userFiles: 0, - }, - added: { - userFiles: addedFiles, - }, - final: { - userFiles: 0, - }, - } - } - - function handleNewThirdPartyPeer(remoteHost: string) { - logger.Log( - 'Couldn\'t find remote peer info based on remoteHost: "' + - remoteHost + - '". This could mean that the host uses this server as peer, but this server does not ' + - 'use it as a peer.', - 'yellowbg' - ) - if (remoteHost.includes(':')) { - const [host, port] = remoteHost.split(':') - updateThirdPartyPeers([ - { - host: host, - port: +port, - }, - ]) - logger.Log('Host info written to host info file') - } - } - // --------------------------------------------------------------------------------------- // APP SETUP // --------------------------------------------------------------------------------------- @@ -1374,32 +580,7 @@ function setup(data: SubmoduleData): Submodule { } if (questions || sendAll) { - const questionDbsWithNewQuestions = Number.isNaN(questionsSince) - ? getQuestionDbs() - : getQuestionDbs() - .map((qdb) => { - return { - ...qdb, - data: getNewDataSince(qdb.data, questionsSince), - } - }) - .filter((qdb) => { - const { questionCount: questionCount } = - countOfQdb(qdb) - return questionCount > 0 - }) - - const { subjCount: subjects, questionCount: questions } = - countOfQdbs(questionDbsWithNewQuestions) - - result.questions = { - questionDbs: questionDbsWithNewQuestions, - count: { - qdbs: questionDbsWithNewQuestions.length, - subjects: subjects, - questions: questions, - }, - } + result.questions = getQuestions(questionsSince, getQuestionDbs) const questionsSinceDate = questionsSince ? new Date(questionsSince).toLocaleString() @@ -1427,35 +608,14 @@ function setup(data: SubmoduleData): Submodule { } if (users || sendAll) { - let sentUsers = 0 - if (!remoteHost) { - res.json({ - ...result, - success: false, - message: 'remoteHost key is missing from body', - }) - return - } - if (!remotePeerInfo) { - res.json({ - success: false, - message: - "couldn't find remote peer info based on remoteHost", - }) - return - } - const remotePublicKey = remotePeerInfo.publicKey - if (remotePublicKey) { - // FIXME: sign data? - const newUsers = getNewUsersSince(usersSince) - sentUsers = newUsers.length - result.users = { - encryptedUsers: encrypt( - remotePublicKey, - JSON.stringify(newUsers) - ), - } + result.users = getUsers( + remoteHost, + remotePeerInfo, + usersSince, + userDB + ) + if (result.users.success) { const usersSinceDate = usersSince ? new Date(usersSince).toLocaleString() : 'all time' @@ -1467,25 +627,21 @@ function setup(data: SubmoduleData): Submodule { 'blue' )}${usersSinceDate}${logger.C()}. Sent users: ${logger.C( 'blue' - )}${sentUsers}${logger.C()}` - ) - } else if (remotePeerInfo) { - logger.Log( - `Warning: "${hostToLog}" has no public key saved!`, - 'yellowbg' + )}${result.users.sentUsers}${logger.C()}` ) + } else { + logger.Log(result.users.message, 'yellowbg') } } if (userFiles || sendAll) { - const newFiles = getNewUserFilesSince(userFilesSince) - const sentFilesCount = Object.values(newFiles).reduce( - (acc, data) => { - return acc + Object.keys(data).length - }, - 0 - ) - result.userFiles = { newFiles: newFiles } + const newUserFilesResult = getUserFiles(userFilesSince) + const sentFilesCount = Object.values( + newUserFilesResult.newFiles + ).reduce((acc, data) => { + return acc + Object.keys(data).length + }, 0) + result.userFiles = newUserFilesResult const userFilesSinceDate = questionsSince ? new Date(questionsSince).toLocaleString() @@ -1565,48 +721,35 @@ function setup(data: SubmoduleData): Submodule { logger.LogReq(req) const encryptedNewUsers = req.body.newUsers + const host = req.body.host + + const result = handleNewUsers({ + encryptedNewUsers: encryptedNewUsers, + host: host, + peers: peers, + privateKey: privateKey, + userDB: userDB, + }) + + res.json(result) + } + ) + + app.post( + '/newuserfilecreated', + async (req: Request, res: Response) => { + logger.LogReq(req) + + const newFiles = req.body.newFiles const remoteHost = req.body.host - if (!encryptedNewUsers || !remoteHost) { - res.json({ - success: false, - message: - 'encryptedNewUsers or remoteHost key are missing from body', - }) - return - } - - const remotePeerInfo = peers.find((peer) => { - return peerToString(peer) === remoteHost + const result = await handleNewUserFiles({ + newFiles: newFiles, + host: remoteHost, + peers: peers, }) - if (!remotePeerInfo) { - res.json({ - success: false, - message: - "couldn't find remote peer info based on remoteHost", - }) - return - } - - const decryptedUsers: User[] = JSON.parse( - decrypt(privateKey, encryptedNewUsers) - ) - - const addedUserCount = addUsersToDb(decryptedUsers, userDB, { - sourceHost: peerToString(remotePeerInfo), - }) - - if (addedUserCount > 0) { - logger.Log( - `\tAdded ${addedUserCount} new users from "${peerToString( - remotePeerInfo - )}"`, - 'cyan' - ) - } - - res.json({ success: true, addedUserCount: addedUserCount }) + res.json(result) } ) diff --git a/src/modules/api/submodules/userFiles.ts b/src/modules/api/submodules/userFiles.ts index 2f42f8b..24d8c00 100644 --- a/src/modules/api/submodules/userFiles.ts +++ b/src/modules/api/submodules/userFiles.ts @@ -25,12 +25,13 @@ import utils from '../../../utils/utils' import { Request, SubmoduleData, User } from '../../../types/basicTypes' import { paths } from '../../../utils/files' import constants from '../../../constants' +import { queueWork } from '../../../worker/workerPool' export interface UserDirDataFile { uid: number - views: number - upvotes: number[] - downvotes: number[] + views?: number + upvotes?: number[] + downvotes?: number[] } function listDir(subdir: string) { @@ -273,6 +274,15 @@ function setup(data: SubmoduleData): void { res.json({ success: true, }) + + queueWork({ + type: 'sendUserFilesToPeers', + data: { + dir: safeDir, + fileName: body.fileName, + fileData: { uid: user.id }, + }, + }) }) .catch(() => { res.json({ success: false, msg: 'something bad happened :s' }) diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 3111293..6096026 100755 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -102,7 +102,7 @@ function GetDateString( } function CopyFile(from: string, to: string): void { - CreatePath(to) + createDirsForFile(to) fs.copyFileSync(from, to) } diff --git a/src/worker/handlers/handleQuestionsToPeers.ts b/src/worker/handlers/handleQuestionsToPeers.ts index 71388eb..9c313f8 100644 --- a/src/worker/handlers/handleQuestionsToPeers.ts +++ b/src/worker/handlers/handleQuestionsToPeers.ts @@ -6,8 +6,11 @@ import { RecievedData } from '../../utils/actions' import { removeCacheFromQuestion } from '../../utils/qdbUtils' import { QuestionAddResponse } from '../../modules/api/submodules/qminingapi' import logger from '../../utils/logger' -import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils' import { post } from '../../utils/networkUtils' +import { + loginAndPostDataToAllPeers, + peerToString, +} from '../../modules/api/p2p/p2putils' export type QuestionsToPeersTaskObject = { type: 'sendQuestionsToPeers' @@ -74,7 +77,11 @@ export const handleQuestionsToPeers = async ( } } ) - logger.Log(`Peers that added new questions: ${hadNewQuestions.join(', ')}`) + if (hadNewQuestions.length > 0) { + logger.Log( + `\t Peers that added new questions: ${hadNewQuestions.join(', ')}` + ) + } parentPort.postMessage({ msg: `From thread #${workerIndex}: sendQuestionsToPeers done`, diff --git a/src/worker/handlers/handleUserFilesToPeers.ts b/src/worker/handlers/handleUserFilesToPeers.ts new file mode 100644 index 0000000..f7bdc2e --- /dev/null +++ b/src/worker/handlers/handleUserFilesToPeers.ts @@ -0,0 +1,64 @@ +import { parentPort } from 'node:worker_threads' +import { PeerInfo, QuestionDb } from '../../types/basicTypes' +import { files, readAndValidateFile } from '../../utils/files' +import { post } from '../../utils/networkUtils' +import { + loginAndPostDataToAllPeers, + peerToString, +} from '../../modules/api/p2p/p2putils' +import { NewUserFilesRequestBody } from '../../modules/api/p2p/userFiles' +import { UserDirDataFile } from '../../modules/api/submodules/userFiles' + +export type UserFilesToPeersTaskObject = { + type: 'sendUserFilesToPeers' + data: { + dir: string + fileName: string + fileData: UserDirDataFile + } +} + +export const handleUserFilesToPeers = async ( + _qdbs: QuestionDb[], + msg: UserFilesToPeersTaskObject, + workerIndex: number +): Promise => { + const { dir, fileName, fileData } = msg.data + + const selfInfo = readAndValidateFile(files.selfInfoFile) + const host = peerToString(selfInfo) + const peers = readAndValidateFile(files.peersFile) + + if (!peers || peers.length === 0) { + parentPort.postMessage({ + msg: `From thread #${workerIndex}: sendUserFilesToPeers done`, + workerIndex: workerIndex, + }) + return + } + + const dataToSend: NewUserFilesRequestBody = { + host: host, + newFiles: { + [dir]: { [fileName]: fileData }, + }, + } + + const postData = (peer: PeerInfo, sessionCookie: string) => { + return post({ + hostname: peer.host, + port: peer.port, + http: peer.http, + path: '/api/newuserfilecreated', + bodyObject: dataToSend, + cookie: `sessionID=${sessionCookie}`, + }) + } + + loginAndPostDataToAllPeers(peers, postData) + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: sendQuestionsToPeers done`, + workerIndex: workerIndex, + }) +} diff --git a/src/worker/handlers/handleUsersToPeers.ts b/src/worker/handlers/handleUsersToPeers.ts index b4fc327..999a1b8 100644 --- a/src/worker/handlers/handleUsersToPeers.ts +++ b/src/worker/handlers/handleUsersToPeers.ts @@ -2,9 +2,12 @@ import { parentPort } from 'node:worker_threads' import { PeerInfo, QuestionDb, User } from '../../types/basicTypes' import { files, readAndValidateFile } from '../../utils/files' import logger from '../../utils/logger' -import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils' import { post } from '../../utils/networkUtils' import { encrypt } from '../../utils/encryption' +import { + loginAndPostDataToAllPeers, + peerToString, +} from '../../modules/api/p2p/p2putils' export type UsersToPeersTaskObject = { type: 'sendUsersToPeers' diff --git a/src/worker/worker.ts b/src/worker/worker.ts index 50c1956..8338308 100644 --- a/src/worker/worker.ts +++ b/src/worker/worker.ts @@ -13,6 +13,7 @@ import { handleDbClean } from './handlers/handleDbClean' import { handleQuestionsToPeers } from './handlers/handleQuestionsToPeers' import { handleRmQuestions } from './handlers/handleRmQuestions' import { handleUsersToPeers } from './handlers/handleUsersToPeers' +import { handleUserFilesToPeers } from './handlers/handleUserFilesToPeers' export interface WorkerResult { msg: string @@ -84,6 +85,8 @@ async function handleMessage( await handleQuestionsToPeers(qdbs, msg, workerIndex) } else if (msg.type === 'sendUsersToPeers') { await handleUsersToPeers(qdbs, msg, workerIndex) + } else if (msg.type === 'sendUserFilesToPeers') { + await handleUserFilesToPeers(qdbs, msg, workerIndex) } else { logger.Log(`Invalid msg type!`, logger.GetColor('redbg')) console.error(msg) diff --git a/src/worker/workerPool.ts b/src/worker/workerPool.ts index 5e92d72..984de35 100644 --- a/src/worker/workerPool.ts +++ b/src/worker/workerPool.ts @@ -23,6 +23,8 @@ import { v4 as uuidv4 } from 'uuid' import { EventEmitter } from 'events' import os from 'os' +import logger from '../utils/logger' + import type { QuestionDb } from '../types/basicTypes' import { SearchTaskObject } from './handlers/handleSearch' import { DbEditTaskObject } from './handlers/handleDbEdit' @@ -33,8 +35,8 @@ import { RmQuestionsTaskObject } from './handlers/handleRmQuestions' import { MergeTaskObject } from './handlers/handleMerge' import { QuestionsToPeersTaskObject } from './handlers/handleQuestionsToPeers' import { WorkerResult } from './worker' -import logger from '../utils/logger' import { UsersToPeersTaskObject } from './handlers/handleUsersToPeers' +import { UserFilesToPeersTaskObject } from './handlers/handleUserFilesToPeers' const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length @@ -54,6 +56,7 @@ export type TaskObject = | MergeTaskObject | QuestionsToPeersTaskObject | UsersToPeersTaskObject + | UserFilesToPeersTaskObject interface PendingJob { workData: TaskObject