sending new users to peers on user creation

This commit is contained in:
mrfry 2023-04-29 09:59:39 +02:00
parent 2f24f214b2
commit cb0ad03336
9 changed files with 351 additions and 172 deletions

View file

@ -6,22 +6,9 @@ import { RecievedData } from '../../utils/actions'
import { removeCacheFromQuestion } from '../../utils/qdbUtils'
import { QuestionAddResponse } from '../../modules/api/submodules/qminingapi'
import logger from '../../utils/logger'
import {
loginToPeer,
peerToString,
updatePeersFile,
} from '../../utils/p2putils'
import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils'
import { post } from '../../utils/networkUtils'
const login = async (peer: PeerInfo): Promise<string> => {
const loginResult = await loginToPeer(peer)
if (typeof loginResult === 'string') {
return loginResult
} else {
return null
}
}
export type QuestionsToPeersTaskObject = {
type: 'sendQuestionsToPeers'
data: {
@ -66,18 +53,6 @@ export const handleQuestionsToPeers = async (
}),
}
const results: {
errors: PeerInfo[]
hasNew: PeerInfo[]
sent: PeerInfo[]
loginErrors: PeerInfo[]
} = {
errors: [],
hasNew: [],
sent: [],
loginErrors: [],
}
const postData = (peer: PeerInfo, sessionCookie: string) => {
return post<QuestionAddResponse>({
hostname: peer.host,
@ -89,63 +64,17 @@ export const handleQuestionsToPeers = async (
})
}
for (const peer of peers) {
let sessionCookie = peer.sessionCookie
if (!sessionCookie) {
sessionCookie = await login(peer)
if (!sessionCookie) {
results.loginErrors.push(peer)
continue
const hadNewQuestions: string[] = []
loginAndPostDataToAllPeers<QuestionAddResponse & { success: boolean }>(
peers,
postData,
(peer, res) => {
if (res.data?.totalNewQuestions > 0) {
hadNewQuestions.push(peerToString(peer))
}
updatePeersFile(peer, { sessionCookie: sessionCookie })
}
let res = await postData(peer, sessionCookie)
if (res.data?.result === 'nouser' && sessionCookie) {
sessionCookie = await login(peer)
if (!sessionCookie) {
results.loginErrors.push(peer)
continue
}
updatePeersFile(peer, { sessionCookie: sessionCookie })
res = await postData(peer, sessionCookie)
}
if (res.error || !res.data?.success) {
results.errors.push(peer)
console.error(res.error || JSON.stringify(res.data))
} else if (res.data?.totalNewQuestions > 0) {
results.hasNew.push(peer)
} else {
results.sent.push(peer)
}
}
const logMsg: string[] = []
const addToLogMsg = (
peerResult: PeerInfo[],
prefix: string,
color: string
) => {
if (peerResult.length > 0) {
logMsg.push(
`${logger.C(color)}${prefix}:${logger.C()} ` +
peerResult.map((x) => peerToString(x)).join(', ')
)
}
}
addToLogMsg(results.loginErrors, 'Login error', 'red')
addToLogMsg(results.errors, 'Error', 'red')
addToLogMsg(results.hasNew, 'Had new questions', 'blue')
addToLogMsg(results.sent, 'Sent', 'green')
logger.Log(
`\t${logger.C(
'green'
)}Sent new questions to peers${logger.C()}; ${logMsg.join(', ')}`
)
logger.Log(`Peers that added new questions: ${hadNewQuestions.join(', ')}`)
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendQuestionsToPeers done`,

View file

@ -0,0 +1,92 @@
import { parentPort } from 'node:worker_threads'
import { PeerInfo, QuestionDb, User } from '../../types/basicTypes'
import { files, readAndValidateFile } from '../../utils/files'
import logger from '../../utils/logger'
import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils'
import { post } from '../../utils/networkUtils'
import { encrypt } from '../../utils/encryption'
export type UsersToPeersTaskObject = {
type: 'sendUsersToPeers'
data: {
newUsers: (Omit<User, 'id'> & { id?: number })[]
}
}
export const handleUsersToPeers = async (
_qdbs: QuestionDb[],
msg: UsersToPeersTaskObject,
workerIndex: number
): Promise<void> => {
const { newUsers } = msg.data
const selfInfo = readAndValidateFile<PeerInfo>(files.selfInfoFile)
const host = peerToString(selfInfo)
const peers = readAndValidateFile<PeerInfo[]>(files.peersFile)
if (!peers || peers.length === 0 || newUsers.length === 0) {
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendUsersToPeers done`,
workerIndex: workerIndex,
})
return
}
const postData = (peer: PeerInfo, sessionCookie: string) => {
if (!peer.publicKey) {
logger.Log(
`"${peerToString(peer)}" has no public key saved!`,
'yellowbg'
)
return Promise.resolve({
error: new Error(
`"${peerToString(peer)}" has no public key saved!`
),
})
}
const encryptedUsers = encrypt(peer.publicKey, JSON.stringify(newUsers))
const dataToSend: { host: string; newUsers: string } = {
host: host,
newUsers: encryptedUsers,
}
return post<{
addedUserCount?: number
result?: string
success?: boolean
}>({
hostname: peer.host,
port: peer.port,
http: peer.http,
path: '/api/newusercreated',
bodyObject: dataToSend,
cookie: `sessionID=${sessionCookie}`,
})
}
const newUserAdded: string[] = []
loginAndPostDataToAllPeers<{
addedUserCount?: number
result?: string
success?: boolean
}>(peers, postData, (peer, res) => {
if (res.data?.addedUserCount > 0) {
newUserAdded.push(peerToString(peer))
}
})
if (newUserAdded.length > 0) {
logger.Log(
`Peers that saved new users: ${newUserAdded.join(', ')}`,
'cyan'
)
}
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendUsersToPeers done`,
workerIndex: workerIndex,
})
}

View file

@ -12,6 +12,7 @@ import { handleNewDb } from './handlers/handleNewDb'
import { handleDbClean } from './handlers/handleDbClean'
import { handleQuestionsToPeers } from './handlers/handleQuestionsToPeers'
import { handleRmQuestions } from './handlers/handleRmQuestions'
import { handleUsersToPeers } from './handlers/handleUsersToPeers'
export interface WorkerResult {
msg: string
@ -81,6 +82,8 @@ async function handleMessage(
await handleRmQuestions(qdbs, msg, workerIndex, setQdbs)
} else if (msg.type === 'sendQuestionsToPeers') {
await handleQuestionsToPeers(qdbs, msg, workerIndex)
} else if (msg.type === 'sendUsersToPeers') {
await handleUsersToPeers(qdbs, msg, workerIndex)
} else {
logger.Log(`Invalid msg type!`, logger.GetColor('redbg'))
console.error(msg)

View file

@ -34,6 +34,7 @@ import { MergeTaskObject } from './handlers/handleMerge'
import { QuestionsToPeersTaskObject } from './handlers/handleQuestionsToPeers'
import { WorkerResult } from './worker'
import logger from '../utils/logger'
import { UsersToPeersTaskObject } from './handlers/handleUsersToPeers'
const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length
@ -52,6 +53,7 @@ export type TaskObject =
| RmQuestionsTaskObject
| MergeTaskObject
| QuestionsToPeersTaskObject
| UsersToPeersTaskObject
interface PendingJob {
workData: TaskObject