user files syncing

This commit is contained in:
mrfry 2023-05-01 19:44:46 +02:00
parent 5ececa2de5
commit 1a70024a98
3 changed files with 189 additions and 53 deletions

View file

@ -53,7 +53,7 @@ import {
publicDir, publicDir,
readAndValidateFile, readAndValidateFile,
} from '../../../utils/files' } from '../../../utils/files'
import { GetResult, get } from '../../../utils/networkUtils' import { GetResult, downloadFile, get } from '../../../utils/networkUtils'
import { import {
msgAllWorker, msgAllWorker,
queueWork, queueWork,
@ -130,12 +130,6 @@ interface SyncDataResult {
} }
} }
interface NewDataResult {
peer: PeerInfo
result?: GetResult<SyncDataResult>
error?: Error
}
function updateThirdPartyPeers( function updateThirdPartyPeers(
newVal: Omit<PeerInfo, 'publicKey' | 'name' | 'contact'>[] newVal: Omit<PeerInfo, 'publicKey' | 'name' | 'contact'>[]
) { ) {
@ -381,7 +375,7 @@ async function authAndGetNewData({
users: boolean users: boolean
userFiles: boolean userFiles: boolean
} }
}): Promise<NewDataResult> { }): Promise<GetResult<SyncDataResult> & { peer: PeerInfo }> {
try { try {
const syncAll = const syncAll =
!shouldSync || !shouldSync ||
@ -421,27 +415,28 @@ async function authAndGetNewData({
) )
} }
let result: NewDataResult['result'] let result: GetResult<SyncDataResult>
const setResult = async () => { const setResult = async () => {
let url = `/api/getnewdata?host=${encodeURIComponent( let url = `/api/getnewdata?host=${encodeURIComponent(
peerToString(selfInfo) peerToString(selfInfo)
)}` )}`
if (allTime) url += '&allTime=true' if (!allTime) {
url += `&questionsSince=${peer.lastQuestionsSync}`
url += `&usersSince=${peer.lastUsersSync}`
url += `&userFilesSince=${peer.lastUserFilesSync}`
}
if (!syncAll) { if (!syncAll) {
if (shouldSync.questions) { if (shouldSync.questions) {
url += '&questions=true' url += '&questions=true'
url += `&questionsSince=${peer.lastQuestionsSync}`
} }
if (shouldSync.users) { if (shouldSync.users) {
url += '&users=true' url += '&users=true'
url += `&usersSince=${peer.lastUsersSync}`
} }
if (shouldSync.userFiles) { if (shouldSync.userFiles) {
url += '&userFiles=true' url += '&userFiles=true'
url += `&userFilesSince=${peer.lastUserFilesSync}`
} }
} }
@ -459,7 +454,7 @@ async function authAndGetNewData({
await setResult() await setResult()
} }
return { result: result, peer: peer } return { ...result, peer: peer }
} catch (e) { } catch (e) {
console.error(e) console.error(e)
return { error: e, peer: peer } return { error: e, peer: peer }
@ -505,12 +500,14 @@ function getNewUserFilesSince(since: number) {
if (!utils.FileExists(dataFilePath)) { if (!utils.FileExists(dataFilePath)) {
return return
} }
newData[dir] = {}
const dataFile = const dataFile =
utils.ReadJSON<Map<string, UserDirDataFile>>(dataFilePath) utils.ReadJSON<Map<string, UserDirDataFile>>(dataFilePath)
Object.entries(dataFile).forEach(([fileName, data]) => { Object.entries(dataFile).forEach(([fileName, data]) => {
const mtime = utils.statFile(path.join(userDirPath, fileName)).mtime const mtime = utils.statFile(path.join(userDirPath, fileName)).mtime
if (mtime.getTime() >= since) { if (mtime.getTime() >= since) {
if (!newData[dir]) {
newData[dir] = {}
}
newData[dir][fileName] = data newData[dir][fileName] = data
} }
}) })
@ -816,9 +813,9 @@ function setup(data: SubmoduleData): Submodule {
} }
}) })
const resultDataWithoutErrors = allResults const resultDataWithoutErrors = allResults.filter(
.filter((resData) => !resData.error && resData.result.data) (resData) => !resData.error && resData.data
.map((resData) => resData.result.data) )
if (resultDataWithoutErrors.length === 0) { if (resultDataWithoutErrors.length === 0) {
logger.Log( logger.Log(
@ -835,7 +832,7 @@ function setup(data: SubmoduleData): Submodule {
// ------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------
const peersHosts = [...peers, selfInfo] const peersHosts = [...peers, selfInfo]
const thirdPartyPeers = resultDataWithoutErrors const thirdPartyPeers = resultDataWithoutErrors
.map((res) => res.remoteInfo) .map((res) => res.data.remoteInfo)
.flatMap((res) => res.myPeers) .flatMap((res) => res.myPeers)
.filter((res) => { .filter((res) => {
return !peersHosts.some((localPeer) => { return !peersHosts.some((localPeer) => {
@ -859,10 +856,9 @@ function setup(data: SubmoduleData): Submodule {
// ------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------
const getData = <T extends keyof SyncDataResult>(key: T) => { const getData = <T extends keyof SyncDataResult>(key: T) => {
return resultDataWithoutErrors.map((x) => ({ return resultDataWithoutErrors
...x[key], .map((x) => ({ ...x.data[key], peer: x.peer }))
peer: x.remoteInfo.selfInfo, .filter((x) => Object.keys(x).length > 1)
}))
} }
const syncResults: SyncResult[] = [] const syncResults: SyncResult[] = []
@ -902,6 +898,7 @@ function setup(data: SubmoduleData): Submodule {
syncStart: number syncStart: number
): Promise<SyncResult> { ): Promise<SyncResult> {
logger.Log('Syncing users...') logger.Log('Syncing users...')
let totalRecievedUsers = 0
const resultsCount: { const resultsCount: {
[key: string]: { [key: string]: {
newUsers?: number newUsers?: number
@ -925,6 +922,7 @@ function setup(data: SubmoduleData): Submodule {
resultsCount[peerToString(res.peer)] = { resultsCount[peerToString(res.peer)] = {
newUsers: addedUserCount, newUsers: addedUserCount,
} }
totalRecievedUsers += decryptedUsers.length
updatePeersFile(res.peer, { updatePeersFile(res.peer, {
lastUsersSync: syncStart, lastUsersSync: syncStart,
}) })
@ -939,8 +937,11 @@ function setup(data: SubmoduleData): Submodule {
} }
const newUserCount = dbtools.SelectAll(userDB, 'users').length const newUserCount = dbtools.SelectAll(userDB, 'users').length
if (Object.keys(resultsCount).length === 0) { if (totalRecievedUsers === 0) {
logger.Log('No new users received') logger.Log(
`No peers returned any new users. User sync successfully finished!`,
'green'
)
} else { } else {
logger.logTable( logger.logTable(
[ [
@ -954,8 +955,8 @@ function setup(data: SubmoduleData): Submodule {
], ],
{ colWidth: [20], rowPrefix: '\t' } { colWidth: [20], rowPrefix: '\t' }
) )
}
logger.Log(`Successfully synced users!`, 'green') logger.Log(`Successfully synced users!`, 'green')
}
return { return {
old: { old: {
@ -1003,20 +1004,11 @@ function setup(data: SubmoduleData): Submodule {
resultDataWithoutEmptyDbs.push(res) resultDataWithoutEmptyDbs.push(res)
} else { } else {
updatePeersFile(res.peer, { updatePeersFile(res.peer, {
lastSync: syncStart, lastQuestionsSync: syncStart,
}) })
} }
}) })
logger.Log(`\tRecieved data from peers:`)
logger.logTable(
[['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts],
{
colWidth: [20],
rowPrefix: '\t',
}
)
const resultData = resultDataWithoutEmptyDbs.map((res) => { const resultData = resultDataWithoutEmptyDbs.map((res) => {
return { return {
...res, ...res,
@ -1038,6 +1030,15 @@ function setup(data: SubmoduleData): Submodule {
} }
} }
logger.Log(`\tRecieved data from peers:`)
logger.logTable(
[['', 'QDBs', 'Subjs', 'Questions'], ...recievedDataCounts],
{
colWidth: [20],
rowPrefix: '\t',
}
)
// ------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------
// backup // backup
// ------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------
@ -1094,7 +1095,7 @@ function setup(data: SubmoduleData): Submodule {
} }
// Processing result data is successfull // Processing result data is successfull
updatePeersFile(peer, { updatePeersFile(peer, {
lastSync: syncStart, lastQuestionsSync: syncStart,
}) })
} }
@ -1171,24 +1172,21 @@ function setup(data: SubmoduleData): Submodule {
): Promise<SyncResult> { ): Promise<SyncResult> {
logger.Log('Syncing user files...') logger.Log('Syncing user files...')
// ... magic magic code code code ... const recievedUserFilesCount: (string | number)[][] = []
console.log(newData, syncStart) let totalRecievedd = 0
const filesToGet = []
newData.forEach((res) => { newData.forEach((res) => {
Object.entries(res.newFiles).forEach(([dirName, userFilesDir]) => { const count = Object.values(res.newFiles).reduce((acc, data) => {
Object.entries(userFilesDir).forEach(([fileName, data]) => { totalRecievedd += Object.keys(data).length
console.log(fileName, data) return acc + Object.keys(data).length
filesToGet.push({ }, 0)
fileName: fileName, recievedUserFilesCount.push([peerToString(res.peer), count])
path: path.join(publicDir, dirName, fileName),
data: data,
})
})
})
}) })
logger.Log(`Successfully synced user files!`, 'green') if (totalRecievedd === 0) {
logger.Log(
`No peers returned any new files. User file sync successfully finished!`,
'green'
)
return { return {
old: { old: {
userFiles: 0, userFiles: 0,
@ -1202,6 +1200,111 @@ function setup(data: SubmoduleData): Submodule {
} }
} }
logger.logTable([['', 'Files'], ...recievedUserFilesCount], {
colWidth: [20],
rowPrefix: '\t',
})
const filesToGet: {
fileName: string
dir: string
filePath: string
data: UserDirDataFile
peer: PeerInfo
}[] = []
newData.forEach((res) => {
Object.entries(res.newFiles).forEach(([dirName, userFilesDir]) => {
Object.entries(userFilesDir).forEach(([fileName, data]) => {
filesToGet.push({
fileName: fileName,
dir: dirName,
filePath: path.join(
paths.userFilesDir,
dirName,
fileName
),
data: data,
peer: res.peer,
})
})
})
})
let addedFiles = 0
for (const fileToGet of filesToGet) {
const { peer, dir, fileName, filePath, data } = fileToGet
try {
await downloadFile(
{
host: peer.host,
port: peer.port,
path: `/api/userFiles/${dir}/${fileName}`,
},
filePath,
peer.http
)
const dataFilePath = path.join(
paths.userFilesDir,
dir,
constants.userFilesDataFileName
)
if (!utils.FileExists(dataFilePath)) {
utils.WriteFile(JSON.stringify({}), dataFilePath)
}
const dataFile = utils.ReadJSON<{
[key: string]: UserDirDataFile
}>(dataFilePath)
if (dataFile[fileName]) {
// dataFile[fileName].views += data.views // views are not unique
dataFile[fileName].upvotes = dataFile[fileName].upvotes
.concat(data.upvotes)
.reduce((acc, x) => {
if (acc.includes(x)) return acc
return [...acc, x]
}, [])
dataFile[fileName].downvotes = dataFile[fileName].downvotes
.concat(data.downvotes)
.reduce((acc, x) => {
if (acc.includes(x)) return acc
return [...acc, x]
}, [])
} else {
dataFile[fileName] = data
}
utils.WriteFile(JSON.stringify(dataFile), dataFilePath)
addedFiles += 1
} catch (e) {
logger.Log(`Unable to download "${fileName}": ${e.message}`)
}
}
newData.forEach((res) => {
updatePeersFile(res.peer, {
lastUserFilesSync: syncStart,
})
})
logger.Log(
`Successfully synced user files! Added ${addedFiles} files`,
'green'
)
return {
old: {
userFiles: 0,
},
added: {
userFiles: addedFiles,
},
final: {
userFiles: 0,
},
}
}
function handleNewThirdPartyPeer(remoteHost: string) { function handleNewThirdPartyPeer(remoteHost: string) {
logger.Log( logger.Log(
'Couldn\'t find remote peer info based on remoteHost: "' + 'Couldn\'t find remote peer info based on remoteHost: "' +
@ -1311,8 +1414,9 @@ function setup(data: SubmoduleData): Submodule {
) )
logger.logTable( logger.logTable(
[ [
['QDBs', 'Subjs', 'Questions'], ['', 'QDBs', 'Subjs', 'Questions'],
[ [
'Count',
result.questions.questionDbs.length, result.questions.questionDbs.length,
result.questions.count.subjects, result.questions.count.subjects,
result.questions.count.questions, result.questions.count.questions,
@ -1375,6 +1479,12 @@ function setup(data: SubmoduleData): Submodule {
if (userFiles || sendAll) { if (userFiles || sendAll) {
const newFiles = getNewUserFilesSince(userFilesSince) const newFiles = getNewUserFilesSince(userFilesSince)
const sentFilesCount = Object.values(newFiles).reduce(
(acc, data) => {
return acc + Object.keys(data).length
},
0
)
result.userFiles = { newFiles: newFiles } result.userFiles = { newFiles: newFiles }
const userFilesSinceDate = questionsSince const userFilesSinceDate = questionsSince
@ -1386,12 +1496,9 @@ function setup(data: SubmoduleData): Submodule {
'blue' 'blue'
)}${hostToLog}${logger.C()} since ${logger.C( )}${hostToLog}${logger.C()} since ${logger.C(
'blue' 'blue'
)}${userFilesSinceDate}${logger.C()}` )}${userFilesSinceDate}${logger.C()} Sent files: ${logger.C(
) 'blue'
logger.Log( )}${sentFilesCount}${logger.C()}`
`\tSent files count: ${
Object.keys(result.userFiles.newFiles).length
}`
) )
} }

View file

@ -1,5 +1,7 @@
import http, { request as httpRequest } from 'http' import http, { request as httpRequest } from 'http'
import https, { request as httpsRequest } from 'https' import https, { request as httpsRequest } from 'https'
import fs from 'node:fs'
import utils from './utils'
export interface GetResult<T> { export interface GetResult<T> {
data?: T data?: T
@ -123,6 +125,33 @@ export function post<T = any>({
}) })
} }
export function downloadFile(
options: http.RequestOptions,
destination: string,
useHttp?: boolean
): Promise<void> {
const provider = useHttp ? http : https
utils.createDirsForFile(destination)
const file = fs.createWriteStream(destination)
return new Promise((resolve, reject) => {
provider.get(options, function (response) {
response.pipe(file)
file.on('finish', () => {
file.close()
resolve()
})
response.on('error', (e) => {
file.close()
fs.unlinkSync(destination)
reject(e)
})
})
})
}
export function parseCookie(responseCookie: string[]): { export function parseCookie(responseCookie: string[]): {
[key: string]: string [key: string]: string
} { } {

View file

@ -42,7 +42,7 @@ export default {
} }
import * as child_process from 'child_process' import * as child_process from 'child_process'
import fs from 'fs' import fs from 'node:fs'
import { v4 as uuidv4 } from 'uuid' import { v4 as uuidv4 } from 'uuid'
import logger from '../utils/logger' import logger from '../utils/logger'
@ -151,7 +151,7 @@ function WatchFile(file: string, callback: Function): void {
} }
} }
function createDirsForFile(path: string) { function createDirsForFile(path: string): void {
let pathPart = path let pathPart = path
if (pathPart.endsWith('/')) { if (pathPart.endsWith('/')) {
pathPart = pathPart.slice(0, -1) pathPart = pathPart.slice(0, -1)