mirror of
https://gitlab.com/MrFry/mrfrys-node-server
synced 2025-04-01 20:24:18 +02:00
p2p user files sync login fix
This commit is contained in:
parent
a29d4d3541
commit
a61e473df0
6 changed files with 215 additions and 83 deletions
|
@ -77,9 +77,9 @@ files `./src/modules.json`.
|
|||
|
||||
This server implements P2P functionality. It can fetch question databases and users from other
|
||||
server instances, and merge the response data to its own databases. The server also instantly sends
|
||||
new questions received from users and new users to all registered peers. The sync feature should be
|
||||
used for initialization, new user getting, and rarely for catching up, since all new questions
|
||||
should be received instantly.
|
||||
new questions received from users, new users and uploaded user files to all registered peers. The
|
||||
sync feature should be used for initialization and rarely for catching up, since important data is
|
||||
received instantly.
|
||||
|
||||
To setup P2P functionality you have to create a few files in `./data/p2p`:
|
||||
|
||||
|
@ -104,7 +104,7 @@ To setup P2P functionality you have to create a few files in `./data/p2p`:
|
|||
Public key is optional, but needed to encrypt and add the users database in the response, so they
|
||||
can be synced too.
|
||||
|
||||
New keys will be added during certain actions, such as: `sessionCookie` and `lastSync`
|
||||
New keys will be added during certain actions, such as: `sessionCookie` and `last${key}Sync`
|
||||
|
||||
### Using `/syncp2pdata`
|
||||
|
||||
|
|
|
@ -1,7 +1,12 @@
|
|||
import { PeerInfo, QuestionDb } from '../../../types/basicTypes'
|
||||
import { files, paths, readAndValidateFile } from '../../../utils/files'
|
||||
import logger from '../../../utils/logger'
|
||||
import { PostResult, parseCookie, post } from '../../../utils/networkUtils'
|
||||
import {
|
||||
PostResult,
|
||||
downloadFile,
|
||||
parseCookie,
|
||||
post,
|
||||
} from '../../../utils/networkUtils'
|
||||
import utils from '../../../utils/utils'
|
||||
import { UserDirDataFile } from '../submodules/userFiles'
|
||||
|
||||
|
@ -199,7 +204,11 @@ export async function loginAndPostDataToAllPeers<
|
|||
res = await postDataFn(peer, sessionCookie)
|
||||
}
|
||||
|
||||
if (res.error || !res.data?.success) {
|
||||
if (
|
||||
res.error ||
|
||||
!res.data?.success ||
|
||||
res.data?.result === 'nouser'
|
||||
) {
|
||||
results.errors.push(peer)
|
||||
console.error(
|
||||
`Error: posting data to ${peerToString(peer)}`,
|
||||
|
@ -238,3 +247,60 @@ export async function loginAndPostDataToAllPeers<
|
|||
)}`
|
||||
)
|
||||
}
|
||||
|
||||
export async function loginAndDownloadFile(
|
||||
peer: PeerInfo,
|
||||
destination: string,
|
||||
fileName: string,
|
||||
dir: string
|
||||
): Promise<{ success: boolean; message?: string }> {
|
||||
const download = (sessionCookie: string) => {
|
||||
return downloadFile(
|
||||
{
|
||||
host: peer.host,
|
||||
port: peer.port,
|
||||
path: `/api/userFiles/${encodeURIComponent(
|
||||
dir
|
||||
)}/${encodeURIComponent(fileName)}`,
|
||||
},
|
||||
destination,
|
||||
`sessionID=${sessionCookie}`,
|
||||
peer.http
|
||||
)
|
||||
}
|
||||
|
||||
try {
|
||||
let sessionCookie = peer.sessionCookie
|
||||
|
||||
const login = async (peer: PeerInfo) => {
|
||||
const loginResult = await loginToPeer(peer)
|
||||
if (typeof loginResult === 'string') {
|
||||
sessionCookie = loginResult
|
||||
updatePeersFile(peer, { sessionCookie: loginResult })
|
||||
} else {
|
||||
throw new Error('Error logging in to' + peerToString(peer))
|
||||
}
|
||||
}
|
||||
|
||||
if (!sessionCookie) {
|
||||
await login(peer)
|
||||
}
|
||||
|
||||
let res = await download(sessionCookie)
|
||||
|
||||
if (res.result === 'nouser' && sessionCookie) {
|
||||
await login(peer)
|
||||
|
||||
res = await download(sessionCookie)
|
||||
} else if (!res.success) {
|
||||
throw new Error(res.message)
|
||||
}
|
||||
|
||||
if (res.result === 'nouser') {
|
||||
throw new Error(`Unable to login to peer: ${peerToString(peer)}`)
|
||||
}
|
||||
return { success: true }
|
||||
} catch (e) {
|
||||
return { success: false, message: e.message }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,12 +6,12 @@ import {
|
|||
SyncDataResult,
|
||||
SyncResponseBase,
|
||||
SyncResult,
|
||||
loginAndDownloadFile,
|
||||
peerToString,
|
||||
updatePeersFile,
|
||||
} from './p2putils'
|
||||
import constants from '../../../constants'
|
||||
import { PeerInfo } from '../../../types/basicTypes'
|
||||
import { downloadFile } from '../../../utils/networkUtils'
|
||||
import logger from '../../../utils/logger'
|
||||
|
||||
interface UserFileToGet {
|
||||
|
@ -101,18 +101,17 @@ async function downloadUserFiles(filesToGet: UserFileToGet[]) {
|
|||
const { peer, dir, fileName, filePath, data } = fileToGet
|
||||
|
||||
try {
|
||||
await downloadFile(
|
||||
{
|
||||
host: peer.host,
|
||||
port: peer.port,
|
||||
path: `/api/userFiles/${encodeURIComponent(
|
||||
dir
|
||||
)}/${encodeURIComponent(fileName)}`,
|
||||
},
|
||||
const { success, message } = await loginAndDownloadFile(
|
||||
peer,
|
||||
filePath,
|
||||
peer.http
|
||||
fileName,
|
||||
dir
|
||||
)
|
||||
|
||||
if (!success) {
|
||||
throw new Error(message)
|
||||
}
|
||||
|
||||
const dataFilePath = path.join(
|
||||
paths.userFilesDir,
|
||||
dir,
|
||||
|
@ -150,7 +149,11 @@ async function downloadUserFiles(filesToGet: UserFileToGet[]) {
|
|||
utils.WriteFile(JSON.stringify(dataFile), dataFilePath)
|
||||
addedFiles += 1
|
||||
} catch (e) {
|
||||
logger.Log(`Unable to download "${fileName}": ${e.message}`)
|
||||
logger.Log(
|
||||
`Unable to download "${fileName}" from "${peerToString(
|
||||
peer
|
||||
)}": "${e.message}"`
|
||||
)
|
||||
console.error(e)
|
||||
}
|
||||
}
|
||||
|
@ -264,6 +267,7 @@ export async function syncUserFiles(
|
|||
}
|
||||
}
|
||||
|
||||
logger.Log('\tRecieved user files:', 'green')
|
||||
logger.logTable([['', 'Files'], ...recievedUserFilesCount], {
|
||||
colWidth: [20],
|
||||
rowPrefix: '\t',
|
||||
|
@ -274,13 +278,18 @@ export async function syncUserFiles(
|
|||
filesToGet.push(...setupFilesToGet(res.newFiles, res.peer))
|
||||
})
|
||||
|
||||
const addedFiles = await downloadUserFiles(filesToGet)
|
||||
let addedFiles = 0
|
||||
if (filesToGet.length > 0) {
|
||||
logger.Log(`\tDownloading new files ...`)
|
||||
|
||||
newData.forEach((res) => {
|
||||
updatePeersFile(res.peer, {
|
||||
lastUserFilesSync: syncStart,
|
||||
addedFiles = await downloadUserFiles(filesToGet)
|
||||
|
||||
newData.forEach((res) => {
|
||||
updatePeersFile(res.peer, {
|
||||
lastUserFilesSync: syncStart,
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
logger.Log(
|
||||
`Successfully synced user files! Added ${addedFiles} files`,
|
||||
|
|
|
@ -457,13 +457,15 @@ function setup(data: SubmoduleData): Submodule {
|
|||
const getData = <T extends keyof Omit<SyncDataResult, 'remoteInfo'>>(
|
||||
key: T
|
||||
) => {
|
||||
const shouldHaveSynced = shouldSync[key] || syncAll
|
||||
|
||||
let data = resultDataWithoutErrors.map((x) => ({
|
||||
...x.data[key],
|
||||
peer: x.peer,
|
||||
}))
|
||||
|
||||
data.forEach((x) => {
|
||||
if (!x.success) {
|
||||
if (!x.success && shouldHaveSynced) {
|
||||
logger.Log(
|
||||
`Error syncing "${key}" with ${peerToString(
|
||||
x.peer
|
||||
|
@ -473,7 +475,7 @@ function setup(data: SubmoduleData): Submodule {
|
|||
}
|
||||
})
|
||||
|
||||
if ((!data || data.length === 0) && (shouldSync[key] || syncAll)) {
|
||||
if ((!data || data.length === 0) && shouldHaveSynced) {
|
||||
logger.Log(
|
||||
`"${key}" data was requested, but not received!`,
|
||||
'yellowbg'
|
||||
|
@ -668,15 +670,15 @@ function setup(data: SubmoduleData): Submodule {
|
|||
const userFiles = !!req.query.userFiles
|
||||
|
||||
const allTime = !!req.query.allTime
|
||||
const user = req.session.user
|
||||
// const user = req.session.user
|
||||
|
||||
if (!user || user.id !== 1) {
|
||||
res.json({
|
||||
status: 'error',
|
||||
message: 'only user 1 can call this EP',
|
||||
})
|
||||
return
|
||||
}
|
||||
// if (!user || user.id !== 1) {
|
||||
// res.json({
|
||||
// status: 'error',
|
||||
// message: 'only user 1 can call this EP',
|
||||
// })
|
||||
// return
|
||||
// }
|
||||
|
||||
// FIXME: /syncResult EP if this EP times out, but we still need the result
|
||||
if (syncInProgress) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import http, { request as httpRequest } from 'http'
|
||||
import http, { IncomingMessage, request as httpRequest } from 'http'
|
||||
import https, { request as httpsRequest } from 'https'
|
||||
import fs from 'node:fs'
|
||||
import utils from './utils'
|
||||
|
@ -9,50 +9,73 @@ export interface GetResult<T> {
|
|||
options?: http.RequestOptions
|
||||
}
|
||||
|
||||
export function get<T = any>(
|
||||
export function getRaw(
|
||||
options: http.RequestOptions,
|
||||
useHttp?: boolean
|
||||
): Promise<GetResult<T>> {
|
||||
): Promise<GetResult<Buffer> & { response?: IncomingMessage }> {
|
||||
const provider = useHttp ? http : https
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const req = provider.get(
|
||||
{
|
||||
...options,
|
||||
headers: {
|
||||
...options?.headers,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
},
|
||||
function (res) {
|
||||
const bodyChunks: Uint8Array[] = []
|
||||
res.on('data', (chunk) => {
|
||||
bodyChunks.push(chunk)
|
||||
}).on('end', () => {
|
||||
const body = Buffer.concat(bodyChunks).toString()
|
||||
try {
|
||||
if (res.statusCode === 200) {
|
||||
resolve({ data: JSON.parse(body) })
|
||||
} else {
|
||||
resolve({
|
||||
data: JSON.parse(body),
|
||||
error: new Error(
|
||||
`HTTP response code: ${res.statusCode}`
|
||||
),
|
||||
})
|
||||
}
|
||||
} catch (e) {
|
||||
resolve({ error: e, options: options })
|
||||
const req = provider.get(options, function (res) {
|
||||
const bodyChunks: Uint8Array[] = []
|
||||
res.on('data', (chunk) => {
|
||||
bodyChunks.push(chunk)
|
||||
}).on('end', () => {
|
||||
const body = Buffer.concat(bodyChunks)
|
||||
try {
|
||||
if (res.statusCode === 200) {
|
||||
resolve({ data: body, response: res })
|
||||
} else {
|
||||
resolve({
|
||||
data: body,
|
||||
error: new Error(
|
||||
`HTTP response code: ${res.statusCode}`
|
||||
),
|
||||
response: res,
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
)
|
||||
} catch (e) {
|
||||
resolve({ error: e, options: options, response: res })
|
||||
}
|
||||
})
|
||||
})
|
||||
req.on('error', function (e) {
|
||||
resolve({ error: e, options: options })
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export async function get<T = any>(
|
||||
options: http.RequestOptions,
|
||||
useHttp?: boolean
|
||||
): Promise<GetResult<T>> {
|
||||
const { data, response } = await getRaw(
|
||||
{
|
||||
...options,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
...options?.headers,
|
||||
},
|
||||
},
|
||||
useHttp
|
||||
)
|
||||
|
||||
const body = data.toString()
|
||||
|
||||
try {
|
||||
if (response.statusCode === 200) {
|
||||
return { data: JSON.parse(body) }
|
||||
} else {
|
||||
return {
|
||||
data: JSON.parse(body),
|
||||
error: new Error(`HTTP response code: ${response.statusCode}`),
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
return { error: e, options: options }
|
||||
}
|
||||
}
|
||||
|
||||
export interface PostResult<T> {
|
||||
data?: T
|
||||
error?: Error
|
||||
|
@ -128,27 +151,59 @@ export function post<T = any>({
|
|||
export function downloadFile(
|
||||
options: http.RequestOptions,
|
||||
destination: string,
|
||||
cookie: string,
|
||||
useHttp?: boolean
|
||||
): Promise<void> {
|
||||
): Promise<{ message?: string; result?: string; success: boolean }> {
|
||||
const provider = useHttp ? http : https
|
||||
|
||||
utils.createDirsForFile(destination)
|
||||
const file = fs.createWriteStream(destination)
|
||||
return new Promise((resolve, reject) => {
|
||||
provider.get(options, function (response) {
|
||||
response.pipe(file)
|
||||
|
||||
file.on('finish', () => {
|
||||
file.close()
|
||||
resolve()
|
||||
})
|
||||
|
||||
response.on('error', (e) => {
|
||||
file.close()
|
||||
fs.unlinkSync(destination)
|
||||
reject(e)
|
||||
})
|
||||
if (utils.FileExists(destination)) {
|
||||
return Promise.resolve({
|
||||
success: true,
|
||||
message: `\tDownload file: "${destination}" already esxists, skipping download`,
|
||||
})
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
provider.get(
|
||||
{
|
||||
...options,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
...options?.headers,
|
||||
...(cookie
|
||||
? {
|
||||
cookie: cookie,
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
},
|
||||
function (res) {
|
||||
if (res.statusCode === 200) {
|
||||
utils.createDirsForFile(destination)
|
||||
const file = fs.createWriteStream(destination)
|
||||
|
||||
res.pipe(file)
|
||||
|
||||
file.on('finish', () => {
|
||||
file.close()
|
||||
resolve({ success: true })
|
||||
})
|
||||
|
||||
res.on('error', (e) => {
|
||||
file.close()
|
||||
utils.deleteFile(destination)
|
||||
reject(e)
|
||||
})
|
||||
} else if (res.statusCode === 401) {
|
||||
resolve({ success: false, result: 'nouser' })
|
||||
} else {
|
||||
resolve({
|
||||
success: false,
|
||||
message: `Unhandled status code: ${res.statusCode}`,
|
||||
})
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 055a732733b05d4579fa8e9a85da6b97c29957de
|
||||
Subproject commit 96d1dafe90a55a476876958b384958b3d394f963
|
Loading…
Add table
Add a link
Reference in a new issue