From cb0ad03336312f89db2d49f5a8d8bf0c3783ace6 Mon Sep 17 00:00:00 2001 From: mrfry Date: Sat, 29 Apr 2023 09:59:39 +0200 Subject: [PATCH] sending new users to peers on user creation --- src/modules/api/submodules/p2p.ts | 224 +++++++++++------- src/modules/api/submodules/qminingapi.ts | 3 +- src/modules/api/submodules/userManagement.ts | 14 +- src/types/basicTypes.ts | 10 +- src/utils/p2putils.ts | 86 ++++++- src/worker/handlers/handleQuestionsToPeers.ts | 89 +------ src/worker/handlers/handleUsersToPeers.ts | 92 +++++++ src/worker/worker.ts | 3 + src/worker/workerPool.ts | 2 + 9 files changed, 351 insertions(+), 172 deletions(-) create mode 100644 src/worker/handlers/handleUsersToPeers.ts diff --git a/src/modules/api/submodules/p2p.ts b/src/modules/api/submodules/p2p.ts index 4540f3f..61d2961 100644 --- a/src/modules/api/submodules/p2p.ts +++ b/src/modules/api/submodules/p2p.ts @@ -60,6 +60,7 @@ import { peerToString, updatePeersFile, } from '../../../utils/p2putils' +import { Database } from 'better-sqlite3' interface MergeResult { newData: Subject[] @@ -456,6 +457,31 @@ async function authAndGetNewData({ } } +function addUsersToDb( + users: User[], + userDB: Database, + extraProps: Partial +) { + let addedUserCount = 0 + users.forEach((remoteUser) => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { id, ...remoteUserWithoutId } = remoteUser + const localUser = dbtools.Select(userDB, 'users', { + pw: remoteUser.pw, + }) + if (localUser.length === 0) { + addedUserCount += 1 + // FIXME: users will not have consistend id across servers. This may be + // harmless, will see + dbtools.Insert(userDB, 'users', { + ...(remoteUserWithoutId as Omit), + ...extraProps, + }) + } + }) + return addedUserCount +} + function setup(data: SubmoduleData): Submodule { const { app, @@ -696,6 +722,7 @@ function setup(data: SubmoduleData): Submodule { syncAll ? 'everything' : Object.entries(shouldSync) + // eslint-disable-next-line @typescript-eslint/no-unused-vars .filter(([_key, value]) => value) .map(([key]) => key) .join(', ') @@ -870,26 +897,16 @@ function setup(data: SubmoduleData): Submodule { try { userData.forEach((res) => { if (res.encryptedUsers) { - let addedUserCount = 0 const decryptedUsers: User[] = JSON.parse( decrypt(privateKey, res.encryptedUsers) ) - decryptedUsers.forEach((remoteUser) => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { id, ...remoteUserWithoutId } = remoteUser - const localUser = dbtools.Select(userDB, 'users', { - pw: remoteUser.pw, - }) - if (localUser.length === 0) { - addedUserCount += 1 - // FIXME: users will not have consistend id across servers. This may be - // harmless, will see - dbtools.Insert(userDB, 'users', { - ...(remoteUserWithoutId as Omit), - sourceHost: peerToString(res.peer), - }) + const addedUserCount = addUsersToDb( + decryptedUsers, + userDB, + { + sourceHost: peerToString(res.peer), } - }) + ) resultsCount[peerToString(res.peer)] = { newUsers: addedUserCount, } @@ -1169,39 +1186,17 @@ function setup(data: SubmoduleData): Submodule { app.get('/getnewfilessince', (req: Request, res: Response) => { const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since - const remoteHost = req.query.host - const hostToLog = remoteHost || 'Unknown host' - - const result: any = { - remoteInfo: getSelfInfo(), - } - - if (remoteHost) { - const remotePeerInfo = peers.find((peer) => { - return peerToString(peer) === remoteHost - }) - if (!remotePeerInfo) { - handleNewThirdPartyPeer(remoteHost) - } - } - const usersSinceDate = since - ? new Date(since).toLocaleString() - : 'all time' - - logger.Log( - `\tSending new files to ${logger.C( - 'blue' - )}${hostToLog}${logger.C()} since ${logger.C( - 'blue' - )}${usersSinceDate}${logger.C()}` - ) - - res.json(result) + res.json({ since: since, message: 'unimplemented' }) }) app.get( '/getnewuserssince', - (req: Request, res: Response) => { + ( + req: Request, + res: Response< + UserSyncDataRes & { message?: string; success?: boolean } + > + ) => { logger.LogReq(req) const since = Number.isNaN(+req.query.since) ? 0 : +req.query.since @@ -1213,47 +1208,61 @@ function setup(data: SubmoduleData): Submodule { remoteInfo: getSelfInfo(), } - if (remoteHost) { - const remotePeerInfo = peers.find((peer) => { - return peerToString(peer) === remoteHost + if (!remoteHost) { + res.json({ + ...result, + success: false, + message: 'remoteHost key is missing from body', }) - if (!remotePeerInfo) { - handleNewThirdPartyPeer(remoteHost) - } else { - hostToLog = peerToString(remotePeerInfo) - } + return + } - if (remotePeerInfo) { - const remotePublicKey = remotePeerInfo?.publicKey - if (remotePublicKey) { - // FIXME: sign data? - const newUsers = getNewUsersSince(since) - sentUsers = newUsers.length - result.encryptedUsers = encrypt( - remotePublicKey, - JSON.stringify(newUsers) - ) + const remotePeerInfo = peers.find((peer) => { + return peerToString(peer) === remoteHost + }) + if (!remotePeerInfo) { + handleNewThirdPartyPeer(remoteHost) + } else { + hostToLog = peerToString(remotePeerInfo) + } - const usersSinceDate = since - ? new Date(since).toLocaleString() - : 'all time' + if (!remotePeerInfo) { + res.json({ + success: false, + message: + "couldn't find remote peer info based on remoteHost", + }) + return + } - logger.Log( - `\tSending new users to ${logger.C( - 'blue' - )}${hostToLog}${logger.C()} since ${logger.C( - 'blue' - )}${usersSinceDate}${logger.C()}. Sent users: ${logger.C( - 'blue' - )}${sentUsers}${logger.C()}` - ) - } else if (remotePeerInfo) { - logger.Log( - `Warning: "${hostToLog}" has no public key saved!`, - 'yellowbg' - ) - } - } + const remotePublicKey = remotePeerInfo.publicKey + if (remotePublicKey) { + // FIXME: sign data? + const newUsers = getNewUsersSince(since) + sentUsers = newUsers.length + result.encryptedUsers = encrypt( + remotePublicKey, + JSON.stringify(newUsers) + ) + + const usersSinceDate = since + ? new Date(since).toLocaleString() + : 'all time' + + logger.Log( + `\tSending new users to ${logger.C( + 'blue' + )}${hostToLog}${logger.C()} since ${logger.C( + 'blue' + )}${usersSinceDate}${logger.C()}. Sent users: ${logger.C( + 'blue' + )}${sentUsers}${logger.C()}` + ) + } else if (remotePeerInfo) { + logger.Log( + `Warning: "${hostToLog}" has no public key saved!`, + 'yellowbg' + ) } res.json(result) @@ -1389,6 +1398,57 @@ function setup(data: SubmoduleData): Submodule { }) }) + app.post( + '/newusercreated', + (req: Request<{ host: string; newUsers: string }>, res: Response) => { + logger.LogReq(req) + + const encryptedNewUsers = req.body.newUsers + const remoteHost = req.body.host + + if (!encryptedNewUsers || !remoteHost) { + res.json({ + success: false, + message: + 'encryptedNewUsers or remoteHost key are missing from body', + }) + return + } + + const remotePeerInfo = peers.find((peer) => { + return peerToString(peer) === remoteHost + }) + + if (!remotePeerInfo) { + res.json({ + success: false, + message: + "couldn't find remote peer info based on remoteHost", + }) + return + } + + const decryptedUsers: User[] = JSON.parse( + decrypt(privateKey, encryptedNewUsers) + ) + + const addedUserCount = addUsersToDb(decryptedUsers, userDB, { + sourceHost: peerToString(remotePeerInfo), + }) + + if (addedUserCount > 0) { + logger.Log( + `\tAdded ${addedUserCount} new users from "${peerToString( + remotePeerInfo + )}"`, + 'cyan' + ) + } + + res.json({ success: true, addedUserCount: addedUserCount }) + } + ) + logger.Log( 'P2P functionality set up. Peers (' + peers.length + diff --git a/src/modules/api/submodules/qminingapi.ts b/src/modules/api/submodules/qminingapi.ts index 9ecba81..022c009 100644 --- a/src/modules/api/submodules/qminingapi.ts +++ b/src/modules/api/submodules/qminingapi.ts @@ -73,9 +73,8 @@ interface SavedQuestionData { export interface QuestionAddResponse { success: boolean - newQuestions: number totalNewQuestions: number - result?: string + result: string } const line = '====================================================' // lol diff --git a/src/modules/api/submodules/userManagement.ts b/src/modules/api/submodules/userManagement.ts index 20d9d34..0f561e7 100644 --- a/src/modules/api/submodules/userManagement.ts +++ b/src/modules/api/submodules/userManagement.ts @@ -30,6 +30,7 @@ import { Submodule, } from '../../../types/basicTypes' import dbtools from '../../../utils/dbtools' +import { queueWork } from '../../../worker/workerPool' const minimumAlowwedSessions = 2 // how many sessions are allowed for a user const usersDbBackupPath = 'data/dbs/backup' @@ -150,12 +151,14 @@ function setup(data: SubmoduleData): Submodule { ) const pw = uuidv4() - const insertRes = dbtools.Insert(userDB, 'users', { + const newUser: Omit = { pw: pw, avaiblePWRequests: 0, created: new Date().getTime(), createdBy: requestingUser.id, - }) + } + + const insertRes = dbtools.Insert(userDB, 'users', newUser) logger.Log( `User #${requestingUser.id} created new user #${insertRes.lastInsertRowid}`, @@ -173,6 +176,13 @@ function setup(data: SubmoduleData): Submodule { dayDiff: getDayDiff(requestingUser.created), userCount: dbtools.TableInfo(userDB, 'users').dataCount, }) + + queueWork({ + type: 'sendUsersToPeers', + data: { + newUsers: [newUser], + }, + }) }) app.post('/login', (req: Request, res: any) => { diff --git a/src/types/basicTypes.ts b/src/types/basicTypes.ts index 22ba3d3..0d89211 100644 --- a/src/types/basicTypes.ts +++ b/src/types/basicTypes.ts @@ -89,14 +89,14 @@ export interface User { id: number pw: string notes?: string - loginCount: number + loginCount?: number avaiblePWRequests: number - pwRequestCount: number + pwRequestCount?: number createdBy: number created: number - lastLogin: number - lastAccess: number - sourceHost?: number + lastLogin?: number + lastAccess?: number + sourceHost?: string // isAdmin: boolean // TODO } diff --git a/src/utils/p2putils.ts b/src/utils/p2putils.ts index ba0890f..ecd8d25 100644 --- a/src/utils/p2putils.ts +++ b/src/utils/p2putils.ts @@ -1,6 +1,7 @@ import { PeerInfo } from '../types/basicTypes' import { files, paths, readAndValidateFile } from './files' -import { parseCookie, post } from './networkUtils' +import logger from './logger' +import { PostResult, parseCookie, post } from './networkUtils' import utils from './utils' export function peerToString(peer: { @@ -62,3 +63,86 @@ export async function loginToPeer(peer: PeerInfo): Promise { const parsedCookies = parseCookie(cookie) return parsedCookies.sessionID } + +export async function loginAndPostDataToAllPeers< + T extends { result?: string; success?: boolean } +>( + peers: PeerInfo[], + postDataFn: ( + peer: PeerInfo, + sessionCookie: string + ) => Promise>, + resultCallback?: (peer: PeerInfo, res: PostResult) => void +): Promise { + const results: { + errors: PeerInfo[] + sent: PeerInfo[] + loginErrors: PeerInfo[] + } = { + errors: [], + sent: [], + loginErrors: [], + } + + for (const peer of peers) { + try { + let sessionCookie = peer.sessionCookie + + const login = async (peer: PeerInfo) => { + const loginResult = await loginToPeer(peer) + if (typeof loginResult === 'string') { + sessionCookie = loginResult + updatePeersFile(peer, { sessionCookie: loginResult }) + } else { + throw new Error('Error logging in to' + peerToString(peer)) + } + } + + if (!sessionCookie) { + await login(peer) + } + + let res = await postDataFn(peer, sessionCookie) + + if (res.data?.result === 'nouser' && sessionCookie) { + await login(peer) + + res = await postDataFn(peer, sessionCookie) + } + + if (res.error || !res.data?.success) { + results.errors.push(peer) + console.error(res.error || JSON.stringify(res.data)) + } else { + results.sent.push(peer) + } + + if (resultCallback) resultCallback(peer, res) + } catch (e) { + results.loginErrors.push(peer) + } + } + + const logMsg: string[] = [] + const addToLogMsg = ( + peerResult: PeerInfo[], + prefix: string, + color: string + ) => { + if (peerResult.length > 0) { + logMsg.push( + `${logger.C(color)}${prefix}:${logger.C()} ` + + peerResult.map((x) => peerToString(x)).join(', ') + ) + } + } + addToLogMsg(results.loginErrors, 'Login error', 'red') + addToLogMsg(results.errors, 'Error', 'red') + addToLogMsg(results.sent, 'Sent', 'green') + + logger.Log( + `\t${logger.C('green')}Sent data to peers${logger.C()}; ${logMsg.join( + ', ' + )}` + ) +} diff --git a/src/worker/handlers/handleQuestionsToPeers.ts b/src/worker/handlers/handleQuestionsToPeers.ts index 2115de1..71388eb 100644 --- a/src/worker/handlers/handleQuestionsToPeers.ts +++ b/src/worker/handlers/handleQuestionsToPeers.ts @@ -6,22 +6,9 @@ import { RecievedData } from '../../utils/actions' import { removeCacheFromQuestion } from '../../utils/qdbUtils' import { QuestionAddResponse } from '../../modules/api/submodules/qminingapi' import logger from '../../utils/logger' -import { - loginToPeer, - peerToString, - updatePeersFile, -} from '../../utils/p2putils' +import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils' import { post } from '../../utils/networkUtils' -const login = async (peer: PeerInfo): Promise => { - const loginResult = await loginToPeer(peer) - if (typeof loginResult === 'string') { - return loginResult - } else { - return null - } -} - export type QuestionsToPeersTaskObject = { type: 'sendQuestionsToPeers' data: { @@ -66,18 +53,6 @@ export const handleQuestionsToPeers = async ( }), } - const results: { - errors: PeerInfo[] - hasNew: PeerInfo[] - sent: PeerInfo[] - loginErrors: PeerInfo[] - } = { - errors: [], - hasNew: [], - sent: [], - loginErrors: [], - } - const postData = (peer: PeerInfo, sessionCookie: string) => { return post({ hostname: peer.host, @@ -89,63 +64,17 @@ export const handleQuestionsToPeers = async ( }) } - for (const peer of peers) { - let sessionCookie = peer.sessionCookie - - if (!sessionCookie) { - sessionCookie = await login(peer) - if (!sessionCookie) { - results.loginErrors.push(peer) - continue + const hadNewQuestions: string[] = [] + loginAndPostDataToAllPeers( + peers, + postData, + (peer, res) => { + if (res.data?.totalNewQuestions > 0) { + hadNewQuestions.push(peerToString(peer)) } - updatePeersFile(peer, { sessionCookie: sessionCookie }) } - - let res = await postData(peer, sessionCookie) - - if (res.data?.result === 'nouser' && sessionCookie) { - sessionCookie = await login(peer) - if (!sessionCookie) { - results.loginErrors.push(peer) - continue - } - updatePeersFile(peer, { sessionCookie: sessionCookie }) - res = await postData(peer, sessionCookie) - } - - if (res.error || !res.data?.success) { - results.errors.push(peer) - console.error(res.error || JSON.stringify(res.data)) - } else if (res.data?.totalNewQuestions > 0) { - results.hasNew.push(peer) - } else { - results.sent.push(peer) - } - } - - const logMsg: string[] = [] - const addToLogMsg = ( - peerResult: PeerInfo[], - prefix: string, - color: string - ) => { - if (peerResult.length > 0) { - logMsg.push( - `${logger.C(color)}${prefix}:${logger.C()} ` + - peerResult.map((x) => peerToString(x)).join(', ') - ) - } - } - addToLogMsg(results.loginErrors, 'Login error', 'red') - addToLogMsg(results.errors, 'Error', 'red') - addToLogMsg(results.hasNew, 'Had new questions', 'blue') - addToLogMsg(results.sent, 'Sent', 'green') - - logger.Log( - `\t${logger.C( - 'green' - )}Sent new questions to peers${logger.C()}; ${logMsg.join(', ')}` ) + logger.Log(`Peers that added new questions: ${hadNewQuestions.join(', ')}`) parentPort.postMessage({ msg: `From thread #${workerIndex}: sendQuestionsToPeers done`, diff --git a/src/worker/handlers/handleUsersToPeers.ts b/src/worker/handlers/handleUsersToPeers.ts new file mode 100644 index 0000000..b4fc327 --- /dev/null +++ b/src/worker/handlers/handleUsersToPeers.ts @@ -0,0 +1,92 @@ +import { parentPort } from 'node:worker_threads' +import { PeerInfo, QuestionDb, User } from '../../types/basicTypes' +import { files, readAndValidateFile } from '../../utils/files' +import logger from '../../utils/logger' +import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils' +import { post } from '../../utils/networkUtils' +import { encrypt } from '../../utils/encryption' + +export type UsersToPeersTaskObject = { + type: 'sendUsersToPeers' + data: { + newUsers: (Omit & { id?: number })[] + } +} + +export const handleUsersToPeers = async ( + _qdbs: QuestionDb[], + msg: UsersToPeersTaskObject, + workerIndex: number +): Promise => { + const { newUsers } = msg.data + + const selfInfo = readAndValidateFile(files.selfInfoFile) + const host = peerToString(selfInfo) + const peers = readAndValidateFile(files.peersFile) + + if (!peers || peers.length === 0 || newUsers.length === 0) { + parentPort.postMessage({ + msg: `From thread #${workerIndex}: sendUsersToPeers done`, + workerIndex: workerIndex, + }) + return + } + + const postData = (peer: PeerInfo, sessionCookie: string) => { + if (!peer.publicKey) { + logger.Log( + `"${peerToString(peer)}" has no public key saved!`, + 'yellowbg' + ) + + return Promise.resolve({ + error: new Error( + `"${peerToString(peer)}" has no public key saved!` + ), + }) + } + + const encryptedUsers = encrypt(peer.publicKey, JSON.stringify(newUsers)) + + const dataToSend: { host: string; newUsers: string } = { + host: host, + newUsers: encryptedUsers, + } + + return post<{ + addedUserCount?: number + result?: string + success?: boolean + }>({ + hostname: peer.host, + port: peer.port, + http: peer.http, + path: '/api/newusercreated', + bodyObject: dataToSend, + cookie: `sessionID=${sessionCookie}`, + }) + } + + const newUserAdded: string[] = [] + loginAndPostDataToAllPeers<{ + addedUserCount?: number + result?: string + success?: boolean + }>(peers, postData, (peer, res) => { + if (res.data?.addedUserCount > 0) { + newUserAdded.push(peerToString(peer)) + } + }) + + if (newUserAdded.length > 0) { + logger.Log( + `Peers that saved new users: ${newUserAdded.join(', ')}`, + 'cyan' + ) + } + + parentPort.postMessage({ + msg: `From thread #${workerIndex}: sendUsersToPeers done`, + workerIndex: workerIndex, + }) +} diff --git a/src/worker/worker.ts b/src/worker/worker.ts index b3e4181..50c1956 100644 --- a/src/worker/worker.ts +++ b/src/worker/worker.ts @@ -12,6 +12,7 @@ import { handleNewDb } from './handlers/handleNewDb' import { handleDbClean } from './handlers/handleDbClean' import { handleQuestionsToPeers } from './handlers/handleQuestionsToPeers' import { handleRmQuestions } from './handlers/handleRmQuestions' +import { handleUsersToPeers } from './handlers/handleUsersToPeers' export interface WorkerResult { msg: string @@ -81,6 +82,8 @@ async function handleMessage( await handleRmQuestions(qdbs, msg, workerIndex, setQdbs) } else if (msg.type === 'sendQuestionsToPeers') { await handleQuestionsToPeers(qdbs, msg, workerIndex) + } else if (msg.type === 'sendUsersToPeers') { + await handleUsersToPeers(qdbs, msg, workerIndex) } else { logger.Log(`Invalid msg type!`, logger.GetColor('redbg')) console.error(msg) diff --git a/src/worker/workerPool.ts b/src/worker/workerPool.ts index 196c739..4b329db 100644 --- a/src/worker/workerPool.ts +++ b/src/worker/workerPool.ts @@ -34,6 +34,7 @@ import { MergeTaskObject } from './handlers/handleMerge' import { QuestionsToPeersTaskObject } from './handlers/handleQuestionsToPeers' import { WorkerResult } from './worker' import logger from '../utils/logger' +import { UsersToPeersTaskObject } from './handlers/handleUsersToPeers' const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length @@ -52,6 +53,7 @@ export type TaskObject = | RmQuestionsTaskObject | MergeTaskObject | QuestionsToPeersTaskObject + | UsersToPeersTaskObject interface PendingJob { workData: TaskObject