From 1a70024a989d5654c72ede2f0ab7853843843fdb Mon Sep 17 00:00:00 2001 From: mrfry Date: Mon, 1 May 2023 19:44:46 +0200 Subject: [PATCH] user files syncing --- src/modules/api/submodules/p2p.ts | 209 ++++++++++++++++++++++-------- src/utils/networkUtils.ts | 29 +++++ src/utils/utils.ts | 4 +- 3 files changed, 189 insertions(+), 53 deletions(-) diff --git a/src/modules/api/submodules/p2p.ts b/src/modules/api/submodules/p2p.ts index 22deb38..0913e15 100644 --- a/src/modules/api/submodules/p2p.ts +++ b/src/modules/api/submodules/p2p.ts @@ -53,7 +53,7 @@ import { publicDir, readAndValidateFile, } from '../../../utils/files' -import { GetResult, get } from '../../../utils/networkUtils' +import { GetResult, downloadFile, get } from '../../../utils/networkUtils' import { msgAllWorker, queueWork, @@ -130,12 +130,6 @@ interface SyncDataResult { } } -interface NewDataResult { - peer: PeerInfo - result?: GetResult - error?: Error -} - function updateThirdPartyPeers( newVal: Omit[] ) { @@ -381,7 +375,7 @@ async function authAndGetNewData({ users: boolean userFiles: boolean } -}): Promise { +}): Promise & { peer: PeerInfo }> { try { const syncAll = !shouldSync || @@ -421,27 +415,28 @@ async function authAndGetNewData({ ) } - let result: NewDataResult['result'] + let result: GetResult const setResult = async () => { let url = `/api/getnewdata?host=${encodeURIComponent( peerToString(selfInfo) )}` - if (allTime) url += '&allTime=true' + if (!allTime) { + url += `&questionsSince=${peer.lastQuestionsSync}` + url += `&usersSince=${peer.lastUsersSync}` + url += `&userFilesSince=${peer.lastUserFilesSync}` + } if (!syncAll) { if (shouldSync.questions) { url += '&questions=true' - url += `&questionsSince=${peer.lastQuestionsSync}` } if (shouldSync.users) { url += '&users=true' - url += `&usersSince=${peer.lastUsersSync}` } if (shouldSync.userFiles) { url += '&userFiles=true' - url += `&userFilesSince=${peer.lastUserFilesSync}` } } @@ -459,7 +454,7 @@ async function authAndGetNewData({ await setResult() } - return { result: result, peer: peer } + return { ...result, peer: peer } } catch (e) { console.error(e) return { error: e, peer: peer } @@ -505,12 +500,14 @@ function getNewUserFilesSince(since: number) { if (!utils.FileExists(dataFilePath)) { return } - newData[dir] = {} const dataFile = utils.ReadJSON>(dataFilePath) Object.entries(dataFile).forEach(([fileName, data]) => { const mtime = utils.statFile(path.join(userDirPath, fileName)).mtime if (mtime.getTime() >= since) { + if (!newData[dir]) { + newData[dir] = {} + } newData[dir][fileName] = data } }) @@ -816,9 +813,9 @@ function setup(data: SubmoduleData): Submodule { } }) - const resultDataWithoutErrors = allResults - .filter((resData) => !resData.error && resData.result.data) - .map((resData) => resData.result.data) + const resultDataWithoutErrors = allResults.filter( + (resData) => !resData.error && resData.data + ) if (resultDataWithoutErrors.length === 0) { logger.Log( @@ -835,7 +832,7 @@ function setup(data: SubmoduleData): Submodule { // ------------------------------------------------------------------------------------------------------- const peersHosts = [...peers, selfInfo] const thirdPartyPeers = resultDataWithoutErrors - .map((res) => res.remoteInfo) + .map((res) => res.data.remoteInfo) .flatMap((res) => res.myPeers) .filter((res) => { return !peersHosts.some((localPeer) => { @@ -859,10 +856,9 @@ function setup(data: SubmoduleData): Submodule { // ------------------------------------------------------------------------------------------------------- const getData = (key: T) => { - return resultDataWithoutErrors.map((x) => ({ - ...x[key], - peer: x.remoteInfo.selfInfo, - })) + return resultDataWithoutErrors + .map((x) => ({ ...x.data[key], peer: x.peer })) + .filter((x) => Object.keys(x).length > 1) } const syncResults: SyncResult[] = [] @@ -902,6 +898,7 @@ function setup(data: SubmoduleData): Submodule { syncStart: number ): Promise { logger.Log('Syncing users...') + let totalRecievedUsers = 0 const resultsCount: { [key: string]: { newUsers?: number @@ -925,6 +922,7 @@ function setup(data: SubmoduleData): Submodule { resultsCount[peerToString(res.peer)] = { newUsers: addedUserCount, } + totalRecievedUsers += decryptedUsers.length updatePeersFile(res.peer, { lastUsersSync: syncStart, }) @@ -939,8 +937,11 @@ function setup(data: SubmoduleData): Submodule { } const newUserCount = dbtools.SelectAll(userDB, 'users').length - if (Object.keys(resultsCount).length === 0) { - logger.Log('No new users received') + if (totalRecievedUsers === 0) { + logger.Log( + `No peers returned any new users. User sync successfully finished!`, + 'green' + ) } else { logger.logTable( [ @@ -954,8 +955,8 @@ function setup(data: SubmoduleData): Submodule { ], { colWidth: [20], rowPrefix: '\t' } ) + logger.Log(`Successfully synced users!`, 'green') } - logger.Log(`Successfully synced users!`, 'green') return { old: { @@ -1003,20 +1004,11 @@ function setup(data: SubmoduleData): Submodule { resultDataWithoutEmptyDbs.push(res) } else { updatePeersFile(res.peer, { - lastSync: syncStart, + lastQuestionsSync: syncStart, }) } }) - logger.Log(`\tRecieved data from peers:`) - logger.logTable( - [['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts], - { - colWidth: [20], - rowPrefix: '\t', - } - ) - const resultData = resultDataWithoutEmptyDbs.map((res) => { return { ...res, @@ -1038,6 +1030,15 @@ function setup(data: SubmoduleData): Submodule { } } + logger.Log(`\tRecieved data from peers:`) + logger.logTable( + [['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts], + { + colWidth: [20], + rowPrefix: '\t', + } + ) + // ------------------------------------------------------------------------------------------------------- // backup // ------------------------------------------------------------------------------------------------------- @@ -1094,7 +1095,7 @@ function setup(data: SubmoduleData): Submodule { } // Processing result data is successfull updatePeersFile(peer, { - lastSync: syncStart, + lastQuestionsSync: syncStart, }) } @@ -1171,30 +1172,132 @@ function setup(data: SubmoduleData): Submodule { ): Promise { logger.Log('Syncing user files...') - // ... magic magic code code code ... - console.log(newData, syncStart) + const recievedUserFilesCount: (string | number)[][] = [] + let totalRecievedd = 0 + newData.forEach((res) => { + const count = Object.values(res.newFiles).reduce((acc, data) => { + totalRecievedd += Object.keys(data).length + return acc + Object.keys(data).length + }, 0) + recievedUserFilesCount.push([peerToString(res.peer), count]) + }) - const filesToGet = [] + if (totalRecievedd === 0) { + logger.Log( + `No peers returned any new files. User file sync successfully finished!`, + 'green' + ) + return { + old: { + userFiles: 0, + }, + added: { + userFiles: 0, + }, + final: { + userFiles: 0, + }, + } + } + + logger.logTable([['', 'Files'], ...recievedUserFilesCount], { + colWidth: [20], + rowPrefix: '\t', + }) + + const filesToGet: { + fileName: string + dir: string + filePath: string + data: UserDirDataFile + peer: PeerInfo + }[] = [] newData.forEach((res) => { Object.entries(res.newFiles).forEach(([dirName, userFilesDir]) => { Object.entries(userFilesDir).forEach(([fileName, data]) => { - console.log(fileName, data) filesToGet.push({ fileName: fileName, - path: path.join(publicDir, dirName, fileName), + dir: dirName, + filePath: path.join( + paths.userFilesDir, + dirName, + fileName + ), data: data, + peer: res.peer, }) }) }) }) - logger.Log(`Successfully synced user files!`, 'green') + let addedFiles = 0 + for (const fileToGet of filesToGet) { + const { peer, dir, fileName, filePath, data } = fileToGet + + try { + await downloadFile( + { + host: peer.host, + port: peer.port, + path: `/api/userFiles/${dir}/${fileName}`, + }, + filePath, + peer.http + ) + + const dataFilePath = path.join( + paths.userFilesDir, + dir, + constants.userFilesDataFileName + ) + if (!utils.FileExists(dataFilePath)) { + utils.WriteFile(JSON.stringify({}), dataFilePath) + } + const dataFile = utils.ReadJSON<{ + [key: string]: UserDirDataFile + }>(dataFilePath) + + if (dataFile[fileName]) { + // dataFile[fileName].views += data.views // views are not unique + dataFile[fileName].upvotes = dataFile[fileName].upvotes + .concat(data.upvotes) + .reduce((acc, x) => { + if (acc.includes(x)) return acc + return [...acc, x] + }, []) + dataFile[fileName].downvotes = dataFile[fileName].downvotes + .concat(data.downvotes) + .reduce((acc, x) => { + if (acc.includes(x)) return acc + return [...acc, x] + }, []) + } else { + dataFile[fileName] = data + } + + utils.WriteFile(JSON.stringify(dataFile), dataFilePath) + addedFiles += 1 + } catch (e) { + logger.Log(`Unable to download "${fileName}": ${e.message}`) + } + } + + newData.forEach((res) => { + updatePeersFile(res.peer, { + lastUserFilesSync: syncStart, + }) + }) + + logger.Log( + `Successfully synced user files! Added ${addedFiles} files`, + 'green' + ) return { old: { userFiles: 0, }, added: { - userFiles: 0, + userFiles: addedFiles, }, final: { userFiles: 0, @@ -1311,8 +1414,9 @@ function setup(data: SubmoduleData): Submodule { ) logger.logTable( [ - ['QDBs', 'Subjs', 'Questions'], + ['', 'QDBs', 'Subjs', 'Questions'], [ + 'Count', result.questions.questionDbs.length, result.questions.count.subjects, result.questions.count.questions, @@ -1375,6 +1479,12 @@ function setup(data: SubmoduleData): Submodule { if (userFiles || sendAll) { const newFiles = getNewUserFilesSince(userFilesSince) + const sentFilesCount = Object.values(newFiles).reduce( + (acc, data) => { + return acc + Object.keys(data).length + }, + 0 + ) result.userFiles = { newFiles: newFiles } const userFilesSinceDate = questionsSince @@ -1386,12 +1496,9 @@ function setup(data: SubmoduleData): Submodule { 'blue' )}${hostToLog}${logger.C()} since ${logger.C( 'blue' - )}${userFilesSinceDate}${logger.C()}` - ) - logger.Log( - `\tSent files count: ${ - Object.keys(result.userFiles.newFiles).length - }` + )}${userFilesSinceDate}${logger.C()} Sent files: ${logger.C( + 'blue' + )}${sentFilesCount}${logger.C()}` ) } diff --git a/src/utils/networkUtils.ts b/src/utils/networkUtils.ts index 1daf002..9b34dfa 100644 --- a/src/utils/networkUtils.ts +++ b/src/utils/networkUtils.ts @@ -1,5 +1,7 @@ import http, { request as httpRequest } from 'http' import https, { request as httpsRequest } from 'https' +import fs from 'node:fs' +import utils from './utils' export interface GetResult { data?: T @@ -123,6 +125,33 @@ export function post({ }) } +export function downloadFile( + options: http.RequestOptions, + destination: string, + useHttp?: boolean +): Promise { + const provider = useHttp ? http : https + + utils.createDirsForFile(destination) + const file = fs.createWriteStream(destination) + return new Promise((resolve, reject) => { + provider.get(options, function (response) { + response.pipe(file) + + file.on('finish', () => { + file.close() + resolve() + }) + + response.on('error', (e) => { + file.close() + fs.unlinkSync(destination) + reject(e) + }) + }) + }) +} + export function parseCookie(responseCookie: string[]): { [key: string]: string } { diff --git a/src/utils/utils.ts b/src/utils/utils.ts index bd696fe..faa882a 100755 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -42,7 +42,7 @@ export default { } import * as child_process from 'child_process' -import fs from 'fs' +import fs from 'node:fs' import { v4 as uuidv4 } from 'uuid' import logger from '../utils/logger' @@ -151,7 +151,7 @@ function WatchFile(file: string, callback: Function): void { } } -function createDirsForFile(path: string) { +function createDirsForFile(path: string): void { let pathPart = path if (pathPart.endsWith('/')) { pathPart = pathPart.slice(0, -1)