p2p submodule split, bug fixes

This commit is contained in:
mrfry 2023-05-03 16:07:45 +02:00
parent 0fea32c204
commit 54c3e1252b
12 changed files with 1426 additions and 995 deletions

View file

@ -1,8 +1,67 @@
import { PeerInfo } from '../types/basicTypes'
import { files, paths, readAndValidateFile } from './files'
import logger from './logger'
import { PostResult, parseCookie, post } from './networkUtils'
import utils from './utils'
import { PeerInfo, QuestionDb } from '../../../types/basicTypes'
import { files, paths, readAndValidateFile } from '../../../utils/files'
import logger from '../../../utils/logger'
import { PostResult, parseCookie, post } from '../../../utils/networkUtils'
import utils from '../../../utils/utils'
import { UserDirDataFile } from '../submodules/userFiles'
export interface SyncResponseBase {
success: boolean
message?: string
}
export interface RemotePeerInfo {
selfInfo: PeerInfo
myPeers: PeerInfo[]
serverRevision?: string
scriptRevision?: string
qminingPageRevision?: string
dataEditorRevision?: string
serverLastCommitDate?: number
scriptLastCommitDate?: number
qminingPageLastCommitDate?: number
dataEditorLastCommitDate?: number
serverBuildTime?: number
qminingPageBuildTime?: number
dataEditorBuildTime?: number
scriptVersion?: string
userCount?: number
qdbInfo?: {
questionDbCount: number
subjectCount: number
questionCount: number
}
}
export interface SyncResult {
old?: { [key: string]: number }
added?: { [key: string]: number }
final?: { [key: string]: number }
msg?: string
}
export interface SyncDataResult {
remoteInfo?: RemotePeerInfo
users?: SyncResponseBase & {
encryptedUsers?: string
sentUsers?: number
}
questions?: SyncResponseBase & {
questionDbs: QuestionDb[]
count: {
qdbs: number
subjects: number
questions: number
}
}
userFiles?: SyncResponseBase & {
newFiles: {
[key: string]: {
[key: string]: UserDirDataFile
}
}
}
}
export function peerToString(peer: {
host: string
@ -18,6 +77,36 @@ export function isPeerSameAs(
return peer1.host === peer2.host && peer1.port === peer2.port
}
export function updateLastSync(selfInfo: PeerInfo, newDate: number): void {
utils.WriteFile(
JSON.stringify({ ...selfInfo, lastSync: newDate }, null, 2),
paths.selfInfoFile
)
}
export function updateThirdPartyPeers(
newVal: Omit<PeerInfo, 'publicKey' | 'name' | 'contact'>[]
): void {
const prevVal = utils.FileExists(paths.thirdPartyPeersFile)
? utils.ReadJSON<PeerInfo[]>(paths.thirdPartyPeersFile)
: []
const dataToWrite = newVal.reduce((acc, peer) => {
const isIncluded = acc.find((x) => {
return peerToString(x) === peerToString(peer)
})
if (!isIncluded) {
return [...acc, peer]
}
return acc
}, prevVal)
utils.WriteFile(
JSON.stringify(dataToWrite, null, 2),
paths.thirdPartyPeersFile
)
}
export function updatePeersFile(
peerToUpdate: PeerInfo,
updatedPeer: Partial<PeerInfo>
@ -112,7 +201,10 @@ export async function loginAndPostDataToAllPeers<
if (res.error || !res.data?.success) {
results.errors.push(peer)
console.error(res.error || JSON.stringify(res.data))
console.error(
`Error: posting data to ${peerToString(peer)}`,
res.error || JSON.stringify(res.data)
)
} else {
results.sent.push(peer)
}

View file

@ -0,0 +1,541 @@
import {
DataFile,
PeerInfo,
QuestionDb,
Subject,
} from '../../../types/basicTypes'
import { backupData, writeData } from '../../../utils/actions'
import { publicDir } from '../../../utils/files'
import logger from '../../../utils/logger'
import {
countOfQdb,
countOfQdbs,
createQuestion,
getAvailableQdbIndexes,
removeCacheFromQuestion,
} from '../../../utils/qdbUtils'
import utils from '../../../utils/utils'
import { WorkerResult } from '../../../worker/worker'
import { msgAllWorker, queueWork } from '../../../worker/workerPool'
import {
SyncDataResult,
SyncResponseBase,
SyncResult,
peerToString,
updateLastSync,
updatePeersFile,
} from './p2putils'
interface MergeResult {
newData: Subject[]
newSubjects: Subject[]
localQdbIndex: number
e: Error
}
interface SyncQuestionsProps {
questionData: (SyncDataResult['questions'] & { peer: PeerInfo })[]
syncStart: number
getQuestionDbs: () => QuestionDb[]
setQuestionDbs: (newVal: QuestionDb[]) => void
selfInfo: PeerInfo
dbsFile: string
}
// ---------------------------------------------------------------------------------------------
// Getting
// ---------------------------------------------------------------------------------------------
export function getNewQuestionsSince(
subjects: Subject[],
date: number
): Subject[] {
return subjects
.map((subject) => {
return {
...subject,
Questions: subject.Questions.filter((question) => {
return (question.data.date || 0) >= date
}).map((question) => removeCacheFromQuestion(question)),
}
})
.filter((subject) => subject.Questions.length !== 0)
}
export function getQuestions(
questionsSince: number,
getQuestionDbs: () => QuestionDb[]
): SyncResponseBase & {
questionDbs: QuestionDb[]
count: { qdbs: number; subjects: number; questions: number }
} {
const questionDbsWithNewQuestions = Number.isNaN(questionsSince)
? getQuestionDbs()
: getQuestionDbs()
.map((qdb) => {
return {
...qdb,
data: getNewQuestionsSince(qdb.data, questionsSince),
}
})
.filter((qdb) => {
const { questionCount: questionCount } = countOfQdb(qdb)
return questionCount > 0
})
const { subjCount: subjects, questionCount: questions } = countOfQdbs(
questionDbsWithNewQuestions
)
return {
success: true,
questionDbs: questionDbsWithNewQuestions,
count: {
qdbs: questionDbsWithNewQuestions.length,
subjects: subjects,
questions: questions,
},
}
}
// ---------------------------------------------------------------------------------------------
// Syncing utils
// ---------------------------------------------------------------------------------------------
function setupQuestionsForMerge(qdb: QuestionDb, peer: PeerInfo) {
return {
...qdb,
data: qdb.data.map((subj) => {
return {
...subj,
Questions: subj.Questions.map((q) => {
const initializedQuestion = q.cache ? q : createQuestion(q)
initializedQuestion.data.source = peerToString(peer)
return initializedQuestion
}),
}
}),
}
}
async function getMergeResults(
remoteQuestionDbs: QuestionDb[],
getQuestionDbs: () => QuestionDb[]
) {
const mergeJobs: Promise<any>[] = []
const rawNewQuestionDbs: QuestionDb[] = []
remoteQuestionDbs.forEach((remoteQdb) => {
const localQdb = getQuestionDbs().find(
(lqdb) => lqdb.name === remoteQdb.name
)
if (!localQdb) {
rawNewQuestionDbs.push(remoteQdb)
} else {
mergeJobs.push(
queueWork({
type: 'merge',
data: {
localQdbIndex: localQdb.index,
remoteQdb: remoteQdb,
},
})
)
}
})
const mergeResults: MergeResult[] = await Promise.all(mergeJobs)
return {
mergeResults: mergeResults,
rawNewQuestionDbs: rawNewQuestionDbs,
}
}
function updateQdbForLocalUse(
qdb: QuestionDb[],
getQuestionDbs: () => QuestionDb[]
) {
const availableIndexes = getAvailableQdbIndexes(
getQuestionDbs(),
qdb.length
)
return qdb.map((qdb, i) => {
return {
...qdb,
index: availableIndexes[i],
path: `${publicDir}questionDbs/${qdb.name}.json`,
}
})
}
export function mergeSubjects(
subjectsToMergeTo: Subject[],
subjectsToMerge: Subject[],
newSubjects: Subject[]
): Subject[] {
return [
...subjectsToMergeTo.map((subj) => {
const newSubjs = subjectsToMerge.filter(
(subjRes) => subjRes.Name === subj.Name
)
if (newSubjs) {
const newQuestions = newSubjs.flatMap((subj) => {
return subj.Questions
})
return {
...subj,
Questions: [...subj.Questions, ...newQuestions],
}
} else {
return subj
}
}),
...newSubjects,
]
}
export function mergeQdbs(
qdbToMergeTo: QuestionDb[],
mergeResults: MergeResult[]
): { mergedQuestionDbs: QuestionDb[]; changedQdbIndexes: number[] } {
const changedQdbIndexes: number[] = []
const mergedQuestionDbs = qdbToMergeTo.map((qdb) => {
const qdbMergeResult = mergeResults.find(
(mergeRes) => mergeRes.localQdbIndex === qdb.index
)
if (
qdbMergeResult &&
(qdbMergeResult.newData.length > 0 ||
qdbMergeResult.newSubjects.length > 0)
) {
const mergedQdb = {
...qdb,
data: mergeSubjects(
qdb.data,
qdbMergeResult.newData,
qdbMergeResult.newSubjects
),
}
changedQdbIndexes.push(qdb.index)
return mergedQdb
} else {
// unchanged
return qdb
}
})
return {
mergedQuestionDbs: mergedQuestionDbs,
changedQdbIndexes: changedQdbIndexes,
}
}
function writeNewData(
newQuestionDbs: QuestionDb[],
changedQuestionDbs: QuestionDb[],
dbsFilePath: string
) {
const qdbsToWrite = [...changedQuestionDbs, ...newQuestionDbs]
const existingQdbs = utils.ReadJSON<DataFile[]>(dbsFilePath)
const qdbDataToWrite = qdbsToWrite
.reduce((acc, qdb) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { data, index, ...qdbData } = qdb
const existingQdbData = acc.find((data) => {
return data.name === qdbData.name
})
if (!existingQdbData) {
return [...acc, qdbData]
} else {
return acc
}
}, existingQdbs)
.map((qdb) => {
if (qdb.path.includes(publicDir)) {
return {
...qdb,
path: qdb.path.replace(publicDir, ''),
}
} else {
return qdb
}
})
utils.WriteFile(JSON.stringify(qdbDataToWrite, null, 2), dbsFilePath)
qdbsToWrite.forEach((qdb) => {
try {
writeData(qdb.data, qdb.path)
} catch (e) {
logger.Log(`Error writing ${qdb.name} qdb to file!`, 'redbg')
console.error(e)
}
})
}
async function sendNewDataToWorkers(
mergeResults: MergeResult[],
newQuestionDbs: QuestionDb[]
) {
// FIXME: this might be slow, maybe make a new type of message for workers?
const updatePromises: Promise<WorkerResult[]>[] = []
let newQuestionCount = 0
let newSubjectCount = 0
let newQuestionDbCount = 0
mergeResults.forEach((mergeRes) => {
if (mergeRes.e) {
logger.Log(`There was an error processing the merge!`, 'redbg')
console.error(mergeRes.e)
return
}
mergeRes.newData.forEach((subjectWithNewData) => {
newQuestionCount += subjectWithNewData.Questions.length
updatePromises.push(
msgAllWorker({
type: 'newQuestions',
data: {
subjName: subjectWithNewData.Name,
qdbIndex: mergeRes.localQdbIndex,
newQuestions: subjectWithNewData.Questions,
},
})
)
})
newSubjectCount += mergeRes.newSubjects.length
mergeRes.newSubjects.forEach((newSubject) => {
newQuestionCount += newSubject.Questions.length
updatePromises.push(
msgAllWorker({
type: 'newQuestions',
data: {
subjName: newSubject.Name,
qdbIndex: mergeRes.localQdbIndex,
newQuestions: newSubject.Questions,
},
})
)
})
})
newQuestionDbCount += newQuestionDbs.length
newQuestionDbs.forEach((newQdb) => {
const { subjCount: sc, questionCount: qc } = countOfQdb(newQdb)
newSubjectCount += sc
newQuestionCount += qc
msgAllWorker({
data: newQdb,
type: 'newdb',
})
})
await Promise.all(updatePromises)
return {
newQuestionDbCount: newQuestionDbCount,
newSubjectCount: newSubjectCount,
newQuestionCount: newQuestionCount,
}
}
// ---------------------------------------------------------------------------------------------
// Syncing
// ---------------------------------------------------------------------------------------------
export async function syncQuestions({
questionData,
syncStart,
getQuestionDbs,
setQuestionDbs,
selfInfo,
dbsFile,
}: SyncQuestionsProps): Promise<SyncResult> {
logger.Log('Syncing questions...')
const recievedDataCounts: (number | string)[][] = []
// all results statistics
const resultsCount: {
[key: string]: {
newQuestionDbs?: number
newSubjects?: number
newQuestions?: number
}
} = {}
const resultDataWithoutEmptyDbs: (SyncDataResult['questions'] & {
peer: PeerInfo
})[] = []
questionData.forEach((res) => {
const qdbCount = res.questionDbs.length
const { subjCount, questionCount } = countOfQdbs(res.questionDbs)
recievedDataCounts.push([
peerToString(res.peer),
qdbCount,
subjCount,
questionCount,
])
if (questionCount > 0) {
resultDataWithoutEmptyDbs.push(res)
} else {
updatePeersFile(res.peer, {
lastQuestionsSync: syncStart,
})
}
})
const resultData = resultDataWithoutEmptyDbs.map((res) => {
return {
...res,
questionDbs: res.questionDbs.map((qdb) => {
return setupQuestionsForMerge(qdb, res.peer)
}),
}
})
const hasNewData = resultData.length > 0
if (!hasNewData) {
logger.Log(
`No peers returned any new questions. Question sync successfully finished!`,
'green'
)
updateLastSync(selfInfo, syncStart)
return {
msg: 'No peers returned any new questions',
}
}
logger.Log(`\tRecieved data from peers:`)
logger.logTable(
[['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts],
{
colWidth: [20],
rowPrefix: '\t',
}
)
// -------------------------------------------------------------------------------------------------------
// backup
// -------------------------------------------------------------------------------------------------------
const { subjCount: oldSubjCount, questionCount: oldQuestionCount } =
countOfQdbs(getQuestionDbs())
const oldQuestionDbCount = getQuestionDbs().length
backupData(getQuestionDbs())
logger.Log('\tOld data backed up!')
// -------------------------------------------------------------------------------------------------------
// adding questions to db
// -------------------------------------------------------------------------------------------------------
for (let i = 0; i < resultData.length; i++) {
const { questionDbs: remoteQuestionDbs, peer } = resultData[i]
logger.Log(
`\tProcessing result from "${logger.C('blue')}${peerToString(
peer
)}${logger.C()}" (${logger.C('green')}${
resultData.length
}${logger.C()}/${logger.C('green')}${i + 1}${logger.C()})`
)
// FIXME: if remoteQuestionDbs contain multiple dbs with the same name, then the merging
// process could get wonky. Ideally it should not contain, but we will see
const { rawNewQuestionDbs, mergeResults } = await getMergeResults(
remoteQuestionDbs,
getQuestionDbs
)
const newQuestionDbs = updateQdbForLocalUse(
rawNewQuestionDbs,
getQuestionDbs
)
const { mergedQuestionDbs, changedQdbIndexes } = mergeQdbs(
getQuestionDbs(),
mergeResults
)
// setting new index & path
writeNewData(
newQuestionDbs,
getQuestionDbs().filter((qdb) => {
return changedQdbIndexes.includes(qdb.index)
}),
dbsFile
)
setQuestionDbs([...mergedQuestionDbs, ...newQuestionDbs])
const { newQuestionDbCount, newSubjectCount, newQuestionCount } =
await sendNewDataToWorkers(mergeResults, newQuestionDbs)
resultsCount[peerToString(peer)] = {
...(resultsCount[peerToString(peer)] || { newUsers: 0 }),
newQuestionDbs: newQuestionDbCount,
newSubjects: newSubjectCount,
newQuestions: newQuestionCount,
}
// Processing result data is successfull
updatePeersFile(peer, {
lastQuestionsSync: syncStart,
})
}
// -------------------------------------------------------------------------------------------------------
updateLastSync(selfInfo, syncStart)
const newQdb = getQuestionDbs()
const { subjCount: newSubjCount, questionCount: newQuestionCount } =
countOfQdbs(newQdb)
const newQuestionDbCount = newQdb.length
const resultsTable = Object.entries(resultsCount).map(([key, value]) => {
return [
key.length > 14 ? key.substring(0, 14) + '...' : key,
value.newQuestionDbs,
value.newSubjects,
value.newQuestions,
]
})
const sumNewCount = (key: string) => {
return Object.values(resultsCount).reduce(
(acc, val) => acc + val[key],
0
)
}
const totalNewQuestions = sumNewCount('newQuestions')
const totalNewSubjects = sumNewCount('newSubjects')
const totalNewQdbs = sumNewCount('newQuestionDbs')
logger.logTable(
[
['', 'QDBs', 'Subjs', 'Questions'],
['Old', oldQuestionDbCount, oldSubjCount, oldQuestionCount],
...resultsTable,
['Added total', totalNewQdbs, totalNewSubjects, totalNewQuestions],
['Final', newQuestionDbCount, newSubjCount, newQuestionCount],
],
{ colWidth: [20], rowPrefix: '\t' }
)
logger.Log(`Successfully synced questions!`, 'green')
return {
old: {
questionDbs: oldQuestionDbCount,
subjects: oldSubjCount,
questions: oldQuestionCount,
},
added: {
questionDbs: totalNewQdbs,
subjects: totalNewSubjects,
questions: totalNewQuestions,
},
final: {
questionDbs: newQuestionDbCount,
subjects: newSubjCount,
questions: newQuestionCount,
},
}
}

View file

@ -0,0 +1,295 @@
import path from 'node:path'
import { paths } from '../../../utils/files'
import utils from '../../../utils/utils'
import { UserDirDataFile } from '../submodules/userFiles'
import {
SyncDataResult,
SyncResponseBase,
SyncResult,
peerToString,
updatePeersFile,
} from './p2putils'
import constants from '../../../constants'
import { PeerInfo } from '../../../types/basicTypes'
import { downloadFile } from '../../../utils/networkUtils'
import logger from '../../../utils/logger'
interface UserFileToGet {
fileName: string
dir: string
filePath: string
data: UserDirDataFile
peer: PeerInfo
}
export interface NewUserFilesRequestBody {
host: string
newFiles: {
[key: string]: {
// key: dir
[key: string]: UserDirDataFile // key: file name
}
}
}
// ---------------------------------------------------------------------------------------------
// Getting
// ---------------------------------------------------------------------------------------------
export function getUserFiles(since: number): SyncResponseBase & {
newFiles: {
[key: string]: {
[key: string]: UserDirDataFile
}
}
} {
const newFiles: SyncDataResult['userFiles']['newFiles'] = {}
const dirs = utils.ReadDir(paths.userFilesDir)
dirs.forEach((dir) => {
const userDirPath = path.join(paths.userFilesDir, dir)
const dataFilePath = path.join(
userDirPath,
constants.userFilesDataFileName
)
if (!utils.FileExists(dataFilePath)) {
return
}
const dataFile =
utils.ReadJSON<Map<string, UserDirDataFile>>(dataFilePath)
Object.entries(dataFile).forEach(([fileName, data]) => {
const mtime = utils.statFile(path.join(userDirPath, fileName)).mtime
if (mtime.getTime() >= since) {
if (!newFiles[dir]) {
newFiles[dir] = {}
}
newFiles[dir][fileName] = data
}
})
})
return { success: true, newFiles: newFiles }
}
function setupFilesToGet(
newFiles: SyncDataResult['userFiles']['newFiles'],
peer: PeerInfo
): UserFileToGet[] {
const filesToGet: UserFileToGet[] = []
Object.entries(newFiles).forEach(([dirName, userFilesDir]) => {
Object.entries(userFilesDir).forEach(([fileName, data]) => {
filesToGet.push({
fileName: fileName,
dir: dirName,
filePath: path.join(paths.userFilesDir, dirName, fileName),
data: data,
peer: peer,
})
})
})
return filesToGet
}
async function downloadUserFiles(filesToGet: UserFileToGet[]) {
let addedFiles = 0
for (const fileToGet of filesToGet) {
const { peer, dir, fileName, filePath, data } = fileToGet
try {
await downloadFile(
{
host: peer.host,
port: peer.port,
path: `/api/userFiles/${dir}/${fileName}`,
},
filePath,
peer.http
)
const dataFilePath = path.join(
paths.userFilesDir,
dir,
constants.userFilesDataFileName
)
if (!utils.FileExists(dataFilePath)) {
utils.WriteFile(JSON.stringify({}), dataFilePath)
}
const dataFile = utils.ReadJSON<{
[key: string]: UserDirDataFile
}>(dataFilePath)
if (dataFile[fileName]) {
// dataFile[fileName].views += data.views // views are not unique
dataFile[fileName].upvotes = dataFile[fileName].upvotes
? dataFile[fileName].upvotes
.concat(data.upvotes)
.reduce((acc, x) => {
if (acc.includes(x)) return acc
return [...acc, x]
}, [])
: []
dataFile[fileName].downvotes = dataFile[fileName].downvotes
? dataFile[fileName].downvotes
.concat(data.downvotes)
.reduce((acc, x) => {
if (acc.includes(x)) return acc
return [...acc, x]
}, [])
: []
} else {
dataFile[fileName] = data
}
utils.WriteFile(JSON.stringify(dataFile), dataFilePath)
addedFiles += 1
} catch (e) {
logger.Log(`Unable to download "${fileName}": ${e.message}`)
console.error(e)
}
}
return addedFiles
}
// ---------------------------------------------------------------------------------------------
// Adding new
// ---------------------------------------------------------------------------------------------
export async function handleNewUserFiles(
props: NewUserFilesRequestBody & { peers: PeerInfo[] }
): Promise<{
success: boolean
addedFileCount?: number
message?: string
}> {
const result = await addNewUserFiles(props)
if (!result.success) {
logger.Log(
`Error while adding new user files: "${result.message}", from host: "${props.host}"`,
'yellowbg'
)
}
return result
}
async function addNewUserFiles({
newFiles,
host,
peers,
}: NewUserFilesRequestBody & { peers: PeerInfo[] }): Promise<{
success: boolean
addedFileCount?: number
message?: string
}> {
if (!newFiles || !host) {
return {
success: false,
message: 'newFiles or host key are missing from body',
}
}
const remotePeerInfo = peers.find((peer) => {
return peerToString(peer) === host
})
if (!remotePeerInfo) {
return {
success: false,
message: "couldn't find remote peer info based on host",
}
}
try {
const filesToGet = setupFilesToGet(newFiles, remotePeerInfo)
const addedFileCount = await downloadUserFiles(filesToGet)
logger.Log(
`\tAdded ${logger.C(
'blue'
)}${addedFileCount}${logger.C()} new files from ${logger.C(
'blue'
)}${peerToString(remotePeerInfo)}${logger.C()}`
)
return { success: true, addedFileCount: addedFileCount }
} catch (e) {
return { success: false, message: e.message }
}
}
// ---------------------------------------------------------------------------------------------
// Syncing
// ---------------------------------------------------------------------------------------------
export async function syncUserFiles(
newData: (SyncDataResult['userFiles'] & { peer: PeerInfo })[],
syncStart: number
): Promise<SyncResult> {
logger.Log('Syncing user files...')
const recievedUserFilesCount: (string | number)[][] = []
let totalRecievedd = 0
newData.forEach((res) => {
const count = Object.values(res.newFiles).reduce((acc, data) => {
totalRecievedd += Object.keys(data).length
return acc + Object.keys(data).length
}, 0)
recievedUserFilesCount.push([peerToString(res.peer), count])
})
if (totalRecievedd === 0) {
logger.Log(
`No peers returned any new files. User file sync successfully finished!`,
'green'
)
return {
old: {
userFiles: 0,
},
added: {
userFiles: 0,
},
final: {
userFiles: 0,
},
}
}
logger.logTable([['', 'Files'], ...recievedUserFilesCount], {
colWidth: [20],
rowPrefix: '\t',
})
const filesToGet: UserFileToGet[] = []
newData.forEach((res) => {
filesToGet.push(...setupFilesToGet(res.newFiles, res.peer))
})
const addedFiles = await downloadUserFiles(filesToGet)
newData.forEach((res) => {
updatePeersFile(res.peer, {
lastUserFilesSync: syncStart,
})
})
logger.Log(
`Successfully synced user files! Added ${addedFiles} files`,
'green'
)
return {
old: {
userFiles: 0,
},
added: {
userFiles: addedFiles,
},
final: {
userFiles: 0,
},
}
}

View file

@ -0,0 +1,270 @@
import { Database } from 'better-sqlite3'
import dbtools from '../../../utils/dbtools'
import { PeerInfo, User } from '../../../types/basicTypes'
import { decrypt, encrypt } from '../../../utils/encryption'
import {
SyncDataResult,
SyncResponseBase,
SyncResult,
peerToString,
updatePeersFile,
} from './p2putils'
import logger from '../../../utils/logger'
interface SyncUsersProps {
userData: (SyncDataResult['users'] & { peer: PeerInfo })[]
syncStart: number
userDB: Database
privateKey: string
}
interface HandleNewUsersProps {
encryptedNewUsers: string
host: string
peers: PeerInfo[]
privateKey: string
userDB: Database
}
// ---------------------------------------------------------------------------------------------
// Getting
// ---------------------------------------------------------------------------------------------
export function getNewUsersSince(since: number, userDB: Database): User[] {
const users: User[] = dbtools.runStatement(
userDB,
`SELECT *
FROM users
WHERE created >= ${since}
AND id != 1
AND isAdmin is null
;`
)
return users
}
export function getUsers(
remoteHost: string,
remotePeerInfo: PeerInfo,
usersSince: number,
userDB: Database
): SyncResponseBase & {
encryptedUsers?: string
sentUsers?: number
} {
let sentUsers = 0
if (!remoteHost) {
return {
success: false,
message:
'remoteHost key is missing from body. Users will not be sent',
}
}
if (!remotePeerInfo) {
return {
success: false,
message: `couldn't find remote peer info based on remoteHost (${remoteHost}). Users will not be sent`,
}
}
const remotePublicKey = remotePeerInfo.publicKey
if (remotePublicKey) {
// FIXME: sign data?
const newUsers = getNewUsersSince(usersSince, userDB)
sentUsers = newUsers.length
const encryptedUsers = encrypt(
remotePublicKey,
JSON.stringify(newUsers)
)
return {
success: true,
encryptedUsers: encryptedUsers,
sentUsers: sentUsers,
}
} else if (remotePeerInfo) {
return {
success: false,
message: `Warning: "${peerToString(
remotePeerInfo
)}" has no public key saved! Users will not be sent`,
}
}
return { success: false, message: 'this shouldt be a case lol' }
}
// ---------------------------------------------------------------------------------------------
// Adding new
// ---------------------------------------------------------------------------------------------
export function handleNewUsers(props: HandleNewUsersProps): {
success: boolean
addedUserCount?: number
message?: string
} {
const result = addNewUsers(props)
if (!result.success) {
logger.Log(
`Error while adding new users file: "${result.message}", from host: "${props.host}"`,
'yellowbg'
)
}
return result
}
function addNewUsers({
encryptedNewUsers,
host,
peers,
privateKey,
userDB,
}: HandleNewUsersProps): {
success: boolean
addedUserCount?: number
message?: string
} {
if (!encryptedNewUsers || !host) {
return {
success: false,
message: 'encryptedNewUsers or host key are missing from body',
}
}
const remotePeerInfo = peers.find((peer) => {
return peerToString(peer) === host
})
if (!remotePeerInfo) {
return {
success: false,
message: "couldn't find remote peer info based on host",
}
}
const decryptedUsers: User[] = JSON.parse(
decrypt(privateKey, encryptedNewUsers)
)
const addedUserCount = addUsersToDb(decryptedUsers, userDB, {
sourceHost: peerToString(remotePeerInfo),
})
if (addedUserCount > 0) {
logger.Log(
`\tAdded ${addedUserCount} new users from "${peerToString(
remotePeerInfo
)}"`,
'cyan'
)
}
return { success: true, addedUserCount: addedUserCount }
}
// ---------------------------------------------------------------------------------------------
// Syncing utils
// ---------------------------------------------------------------------------------------------
function addUsersToDb(
users: User[],
userDB: Database,
extraProps: Partial<User>
) {
let addedUserCount = 0
users.forEach((remoteUser) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { id, ...remoteUserWithoutId } = remoteUser
const localUser = dbtools.Select(userDB, 'users', {
pw: remoteUser.pw,
})
if (localUser.length === 0) {
addedUserCount += 1
// FIXME: users will not have consistend id across servers. This may be
// harmless, will see
dbtools.Insert(userDB, 'users', {
...(remoteUserWithoutId as Omit<User, 'id'>),
...extraProps,
})
}
})
return addedUserCount
}
// ---------------------------------------------------------------------------------------------
// Syncing
// ---------------------------------------------------------------------------------------------
export async function syncUsers({
userData,
syncStart,
userDB,
privateKey,
}: SyncUsersProps): Promise<SyncResult> {
logger.Log('Syncing users...')
let totalRecievedUsers = 0
const resultsCount: {
[key: string]: {
newUsers?: number
}
} = {}
const oldUserCount = dbtools.SelectAll(userDB, 'users').length
try {
userData.forEach((res) => {
if (res.encryptedUsers) {
const decryptedUsers: User[] = JSON.parse(
decrypt(privateKey, res.encryptedUsers)
)
const addedUserCount = addUsersToDb(decryptedUsers, userDB, {
sourceHost: peerToString(res.peer),
})
resultsCount[peerToString(res.peer)] = {
newUsers: addedUserCount,
}
totalRecievedUsers += decryptedUsers.length
updatePeersFile(res.peer, {
lastUsersSync: syncStart,
})
}
})
} catch (e) {
logger.Log('\tError while trying to sync users: ' + e.message, 'redbg')
console.error(e)
}
const newUserCount = dbtools.SelectAll(userDB, 'users').length
if (totalRecievedUsers === 0) {
logger.Log(
`No peers returned any new users. User sync successfully finished!`,
'green'
)
} else {
logger.logTable(
[
['', 'Users'],
['Old', oldUserCount],
...Object.entries(resultsCount).map(([key, result]) => {
return [key, result.newUsers]
}),
['Added total', newUserCount - oldUserCount],
['Final', newUserCount],
],
{ colWidth: [20], rowPrefix: '\t' }
)
logger.Log(`Successfully synced users!`, 'green')
}
return {
old: {
users: oldUserCount,
},
added: {
users: newUserCount - oldUserCount,
},
final: {
users: newUserCount,
},
}
}

File diff suppressed because it is too large Load diff

View file

@ -25,12 +25,13 @@ import utils from '../../../utils/utils'
import { Request, SubmoduleData, User } from '../../../types/basicTypes'
import { paths } from '../../../utils/files'
import constants from '../../../constants'
import { queueWork } from '../../../worker/workerPool'
export interface UserDirDataFile {
uid: number
views: number
upvotes: number[]
downvotes: number[]
views?: number
upvotes?: number[]
downvotes?: number[]
}
function listDir(subdir: string) {
@ -273,6 +274,15 @@ function setup(data: SubmoduleData): void {
res.json({
success: true,
})
queueWork({
type: 'sendUserFilesToPeers',
data: {
dir: safeDir,
fileName: body.fileName,
fileData: { uid: user.id },
},
})
})
.catch(() => {
res.json({ success: false, msg: 'something bad happened :s' })

View file

@ -102,7 +102,7 @@ function GetDateString(
}
function CopyFile(from: string, to: string): void {
CreatePath(to)
createDirsForFile(to)
fs.copyFileSync(from, to)
}

View file

@ -6,8 +6,11 @@ import { RecievedData } from '../../utils/actions'
import { removeCacheFromQuestion } from '../../utils/qdbUtils'
import { QuestionAddResponse } from '../../modules/api/submodules/qminingapi'
import logger from '../../utils/logger'
import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils'
import { post } from '../../utils/networkUtils'
import {
loginAndPostDataToAllPeers,
peerToString,
} from '../../modules/api/p2p/p2putils'
export type QuestionsToPeersTaskObject = {
type: 'sendQuestionsToPeers'
@ -74,7 +77,11 @@ export const handleQuestionsToPeers = async (
}
}
)
logger.Log(`Peers that added new questions: ${hadNewQuestions.join(', ')}`)
if (hadNewQuestions.length > 0) {
logger.Log(
`\t Peers that added new questions: ${hadNewQuestions.join(', ')}`
)
}
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendQuestionsToPeers done`,

View file

@ -0,0 +1,64 @@
import { parentPort } from 'node:worker_threads'
import { PeerInfo, QuestionDb } from '../../types/basicTypes'
import { files, readAndValidateFile } from '../../utils/files'
import { post } from '../../utils/networkUtils'
import {
loginAndPostDataToAllPeers,
peerToString,
} from '../../modules/api/p2p/p2putils'
import { NewUserFilesRequestBody } from '../../modules/api/p2p/userFiles'
import { UserDirDataFile } from '../../modules/api/submodules/userFiles'
export type UserFilesToPeersTaskObject = {
type: 'sendUserFilesToPeers'
data: {
dir: string
fileName: string
fileData: UserDirDataFile
}
}
export const handleUserFilesToPeers = async (
_qdbs: QuestionDb[],
msg: UserFilesToPeersTaskObject,
workerIndex: number
): Promise<void> => {
const { dir, fileName, fileData } = msg.data
const selfInfo = readAndValidateFile<PeerInfo>(files.selfInfoFile)
const host = peerToString(selfInfo)
const peers = readAndValidateFile<PeerInfo[]>(files.peersFile)
if (!peers || peers.length === 0) {
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendUserFilesToPeers done`,
workerIndex: workerIndex,
})
return
}
const dataToSend: NewUserFilesRequestBody = {
host: host,
newFiles: {
[dir]: { [fileName]: fileData },
},
}
const postData = (peer: PeerInfo, sessionCookie: string) => {
return post({
hostname: peer.host,
port: peer.port,
http: peer.http,
path: '/api/newuserfilecreated',
bodyObject: dataToSend,
cookie: `sessionID=${sessionCookie}`,
})
}
loginAndPostDataToAllPeers(peers, postData)
parentPort.postMessage({
msg: `From thread #${workerIndex}: sendQuestionsToPeers done`,
workerIndex: workerIndex,
})
}

View file

@ -2,9 +2,12 @@ import { parentPort } from 'node:worker_threads'
import { PeerInfo, QuestionDb, User } from '../../types/basicTypes'
import { files, readAndValidateFile } from '../../utils/files'
import logger from '../../utils/logger'
import { peerToString, loginAndPostDataToAllPeers } from '../../utils/p2putils'
import { post } from '../../utils/networkUtils'
import { encrypt } from '../../utils/encryption'
import {
loginAndPostDataToAllPeers,
peerToString,
} from '../../modules/api/p2p/p2putils'
export type UsersToPeersTaskObject = {
type: 'sendUsersToPeers'

View file

@ -13,6 +13,7 @@ import { handleDbClean } from './handlers/handleDbClean'
import { handleQuestionsToPeers } from './handlers/handleQuestionsToPeers'
import { handleRmQuestions } from './handlers/handleRmQuestions'
import { handleUsersToPeers } from './handlers/handleUsersToPeers'
import { handleUserFilesToPeers } from './handlers/handleUserFilesToPeers'
export interface WorkerResult {
msg: string
@ -84,6 +85,8 @@ async function handleMessage(
await handleQuestionsToPeers(qdbs, msg, workerIndex)
} else if (msg.type === 'sendUsersToPeers') {
await handleUsersToPeers(qdbs, msg, workerIndex)
} else if (msg.type === 'sendUserFilesToPeers') {
await handleUserFilesToPeers(qdbs, msg, workerIndex)
} else {
logger.Log(`Invalid msg type!`, logger.GetColor('redbg'))
console.error(msg)

View file

@ -23,6 +23,8 @@ import { v4 as uuidv4 } from 'uuid'
import { EventEmitter } from 'events'
import os from 'os'
import logger from '../utils/logger'
import type { QuestionDb } from '../types/basicTypes'
import { SearchTaskObject } from './handlers/handleSearch'
import { DbEditTaskObject } from './handlers/handleDbEdit'
@ -33,8 +35,8 @@ import { RmQuestionsTaskObject } from './handlers/handleRmQuestions'
import { MergeTaskObject } from './handlers/handleMerge'
import { QuestionsToPeersTaskObject } from './handlers/handleQuestionsToPeers'
import { WorkerResult } from './worker'
import logger from '../utils/logger'
import { UsersToPeersTaskObject } from './handlers/handleUsersToPeers'
import { UserFilesToPeersTaskObject } from './handlers/handleUserFilesToPeers'
const threadCount = +process.env.NS_THREAD_COUNT || os.cpus().length
@ -54,6 +56,7 @@ export type TaskObject =
| MergeTaskObject
| QuestionsToPeersTaskObject
| UsersToPeersTaskObject
| UserFilesToPeersTaskObject
interface PendingJob {
workData: TaskObject