Browse Source

storage-node-v2: Fix review comments.

- simpify sync iterations
- add file hash checking for sync
Shamil Gadelshin 3 years ago
parent
commit
010e985171

+ 1 - 0
storage-node-v2/src/commands/dev/sync.ts

@@ -56,6 +56,7 @@ export default class DevSync extends Command {
 
     try {
       await performSync(
+        undefined,
         flags.workerId,
         syncWorkersNumber,
         flags.queryNodeEndpoint,

+ 31 - 24
storage-node-v2/src/commands/server.ts

@@ -13,6 +13,8 @@ import { KeyringPair } from '@polkadot/keyring/types'
 import ExitCodes from './../command-base/ExitCodes'
 import { CLIError } from '@oclif/errors'
 import { Worker } from '@joystream/types/working-group'
+import fs from 'fs'
+const fsPromises = fs.promises
 
 /**
  * CLI command:
@@ -80,7 +82,7 @@ Supported values: warn, error, debug, info. Default:debug`,
   async run(): Promise<void> {
     const { flags } = this.parse(Server)
 
-    await removeTempDirectory(flags.uploads, TempDirName)
+    await recreateTempDirectory(flags.uploads, TempDirName)
 
     if (!_.isEmpty(flags.elasticSearchEndpoint)) {
       initElasticLogger(flags.elasticSearchEndpoint ?? '')
@@ -96,21 +98,26 @@ Supported values: warn, error, debug, info. Default:debug`,
       logger.warn(`Uploading auth-schema disabled.`)
     }
 
+    const api = await this.getApi()
+
     if (flags.sync) {
       logger.info(`Synchronization enabled.`)
-
-      runSyncWithInterval(
-        flags.worker,
-        flags.queryNodeEndpoint,
-        flags.uploads,
-        TempDirName,
-        flags.syncWorkersNumber,
-        flags.syncInterval
+      setTimeout(
+        async () =>
+          runSyncWithInterval(
+            api,
+            flags.worker,
+            flags.queryNodeEndpoint,
+            flags.uploads,
+            TempDirName,
+            flags.syncWorkersNumber,
+            flags.syncInterval
+          ),
+        0
       )
     }
 
     const account = this.getAccount(flags)
-    const api = await this.getApi()
 
     await verifyWorkerId(api, flags.worker, account)
 
@@ -157,7 +164,8 @@ Supported values: warn, error, debug, info. Default:debug`,
  *
  * @returns void promise.
  */
-function runSyncWithInterval(
+async function runSyncWithInterval(
+  api: ApiPromise,
   workerId: number,
   queryNodeUrl: string,
   uploadsDirectory: string,
@@ -165,40 +173,39 @@ function runSyncWithInterval(
   syncWorkersNumber: number,
   syncIntervalMinutes: number
 ) {
-  setTimeout(async () => {
-    const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 1000
-
+  const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 1000
+  while (true) {
     logger.info(`Sync paused for ${syncIntervalMinutes} minute(s).`)
     await sleep(sleepIntevalInSeconds)
-    logger.info(`Resume syncing....`)
-
     try {
-      await performSync(workerId, syncWorkersNumber, queryNodeUrl, uploadsDirectory, tempDirectory)
+      logger.info(`Resume syncing....`)
+      await performSync(api, workerId, syncWorkersNumber, queryNodeUrl, uploadsDirectory, tempDirectory)
     } catch (err) {
       logger.error(`Critical sync error: ${err}`)
     }
-
-    runSyncWithInterval(workerId, queryNodeUrl, uploadsDirectory, tempDirectory, syncWorkersNumber, syncIntervalMinutes)
-  }, 0)
+  }
 }
 
 /**
- * Removes the temporary directory from the uploading directory.
+ * Removes and recreates the temporary directory from the uploading directory.
  * All files in the temp directory are deleted.
  *
  * @param uploadsDirectory - data uploading directory
  * @param tempDirName - temporary directory name within the uploading directory
  * @returns void promise.
  */
-async function removeTempDirectory(uploadsDirectory: string, tempDirName: string): Promise<void> {
+async function recreateTempDirectory(uploadsDirectory: string, tempDirName: string): Promise<void> {
   try {
-    logger.info(`Removing temp directory ...`)
     const tempFileUploadingDir = path.join(uploadsDirectory, tempDirName)
 
+    logger.info(`Removing temp directory ...`)
     const rimrafAsync = promisify(rimraf)
     await rimrafAsync(tempFileUploadingDir)
+
+    logger.info(`Creating temp directory ...`)
+    await fsPromises.mkdir(tempFileUploadingDir)
   } catch (err) {
-    logger.error(`Removing temp directory error: ${err}`)
+    logger.error(`Temp directory IO error: ${err}`)
   }
 }
 

+ 12 - 6
storage-node-v2/src/services/sync/synchronizer.ts

@@ -5,6 +5,7 @@ import { WorkingStack, TaskProcessorSpawner, TaskSink } from './workingProcess'
 import _ from 'lodash'
 import fs from 'fs'
 import path from 'path'
+import { ApiPromise } from '@polkadot/api'
 const fsPromises = fs.promises
 
 /**
@@ -18,6 +19,7 @@ export const TempDirName = 'temp'
  * The sync process uses the QueryNode for defining storage obligations and
  * remote storage nodes' URL for data obtaining.
  *
+ * @param api - (optional) runtime API promise
  * @param workerId - current storage provider ID
  * @param asyncWorkersNumber - maximum parallel downloads number
  * @param queryNodeUrl - Query Node endpoint URL
@@ -27,6 +29,7 @@ export const TempDirName = 'temp'
  * Node information about the storage providers.
  */
 export async function performSync(
+  api: ApiPromise | undefined,
   workerId: number,
   asyncWorkersNumber: number,
   queryNodeUrl: string,
@@ -53,7 +56,7 @@ export async function performSync(
 
   let addedTasks: SyncTask[]
   if (operatorUrl === undefined) {
-    addedTasks = await getPrepareDownloadTasks(model, added, uploadDirectory, tempDirectory, workingStack)
+    addedTasks = await getPrepareDownloadTasks(api, model, added, uploadDirectory, tempDirectory, workingStack)
   } else {
     addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory)
   }
@@ -72,6 +75,7 @@ export async function performSync(
 /**
  * Creates the download preparation tasks.
  *
+ * @param api - Runtime API promise
  * @param dataObligations - defines the current data obligations for the node
  * @param addedIds - data object IDs to download
  * @param uploadDirectory - local directory for data uploading
@@ -79,6 +83,7 @@ export async function performSync(
  * @param taskSink - a destination for the newly created tasks
  */
 async function getPrepareDownloadTasks(
+  api: ApiPromise | undefined,
   dataObligations: DataObligations,
   addedIds: string[],
   uploadDirectory: string,
@@ -113,14 +118,15 @@ async function getPrepareDownloadTasks(
 
   const tasks = addedIds.map((id) => {
     let operatorUrls: string[] = [] // can be empty after look up
+    let bagId = null
     if (bagIdByDataObjectId.has(id)) {
-      const bagid = bagIdByDataObjectId.get(id)
-      if (bagOperatorsUrlsById.has(bagid)) {
-        operatorUrls = bagOperatorsUrlsById.get(bagid)
+      bagId = bagIdByDataObjectId.get(id)
+      if (bagOperatorsUrlsById.has(bagId)) {
+        operatorUrls = bagOperatorsUrlsById.get(bagId)
       }
     }
 
-    return new PrepareDownloadFileTask(operatorUrls, id, uploadDirectory, tempDirectory, taskSink)
+    return new PrepareDownloadFileTask(operatorUrls, bagId, id, uploadDirectory, tempDirectory, taskSink, api)
   })
 
   return tasks
@@ -141,7 +147,7 @@ async function getDownloadTasks(
   tempDirectory: string
 ): Promise<DownloadFileTask[]> {
   const addedTasks = addedIds.map(
-    (fileName) => new DownloadFileTask(operatorUrl, fileName, uploadDirectory, tempDirectory)
+    (fileName) => new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory)
   )
 
   return addedTasks

+ 67 - 12
storage-node-v2/src/services/sync/tasks.ts

@@ -9,6 +9,10 @@ import logger from '../../services/logger'
 import _ from 'lodash'
 import { getRemoteDataObjects } from './remoteStorageData'
 import { TaskSink } from './workingProcess'
+import { hashFile } from '../helpers/hashing'
+import { parseBagId } from '../helpers/bagTypes'
+import { hexToString } from '@polkadot/util'
+import { ApiPromise } from '@polkadot/api'
 const fsPromises = fs.promises
 
 /**
@@ -53,12 +57,20 @@ export class DeleteLocalFileTask implements SyncTask {
  */
 export class DownloadFileTask implements SyncTask {
   id: string
+  expectedHash?: string
   uploadsDirectory: string
   tempDirectory: string
   url: string
 
-  constructor(baseUrl: string, id: string, uploadsDirectory: string, tempDirectory: string) {
+  constructor(
+    baseUrl: string,
+    id: string,
+    expectedHash: string | undefined,
+    uploadsDirectory: string,
+    tempDirectory: string
+  ) {
     this.id = id
+    this.expectedHash = expectedHash
     this.uploadsDirectory = uploadsDirectory
     this.tempDirectory = tempDirectory
     this.url = urljoin(baseUrl, 'api/v1/files', id)
@@ -71,27 +83,45 @@ export class DownloadFileTask implements SyncTask {
   async execute(): Promise<void> {
     const streamPipeline = promisify(pipeline)
     const filepath = path.join(this.uploadsDirectory, this.id)
-
+    // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
+    // This partial downloads will be cleaned up during the next sync iteration.
+    const tempFilePath = path.join(this.uploadsDirectory, this.tempDirectory, uuidv4())
     try {
       const timeoutMs = 30 * 60 * 1000 // 30 min for large files (~ 10 GB)
       // Casting because of:
       // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response
       const request = superagent.get(this.url).timeout(timeoutMs) as unknown as NodeJS.ReadableStream
-
-      // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
-      // This partial downloads will be cleaned up during the next sync iteration.
-      const tempFilePath = path.join(this.tempDirectory, uuidv4())
       const fileStream = fs.createWriteStream(tempFilePath)
-      await streamPipeline(request, fileStream)
 
+      request.on('response', (res) => {
+        if (!res.ok) {
+          logger.error(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`)
+        }
+      })
+      await streamPipeline(request, fileStream)
+      await this.verifyDownloadedFile(tempFilePath)
       await fsPromises.rename(tempFilePath, filepath)
     } catch (err) {
       logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err })
       try {
-        logger.warn(`Cleaning up file ${filepath}`)
-        await fsPromises.unlink(filepath)
+        logger.warn(`Cleaning up file ${tempFilePath}`)
+        await fsPromises.unlink(tempFilePath)
       } catch (err) {
-        logger.error(`Sync - cannot cleanup file ${filepath}: ${err}`, { err })
+        logger.error(`Sync - cannot cleanup file ${tempFilePath}: ${err}`, { err })
+      }
+    }
+  }
+
+  /** Compares expected and real IPFS hashes
+   *
+   * @param filePath downloaded file path
+   */
+  async verifyDownloadedFile(filePath: string): Promise<void> {
+    if (!_.isEmpty(this.expectedHash)) {
+      const hash = await hashFile(filePath)
+
+      if (hash !== this.expectedHash) {
+        throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`)
       }
     }
   }
@@ -101,19 +131,25 @@ export class DownloadFileTask implements SyncTask {
  * Resolve remote storage node URLs and creates file downloading tasks (DownloadFileTask).
  */
 export class PrepareDownloadFileTask implements SyncTask {
+  bagId: string
   dataObjectId: string
   operatorUrlCandidates: string[]
   taskSink: TaskSink
   uploadsDirectory: string
   tempDirectory: string
+  api?: ApiPromise
 
   constructor(
     operatorUrlCandidates: string[],
+    bagId: string,
     dataObjectId: string,
     uploadsDirectory: string,
     tempDirectory: string,
-    taskSink: TaskSink
+    taskSink: TaskSink,
+    api?: ApiPromise
   ) {
+    this.api = api
+    this.bagId = bagId
     this.dataObjectId = dataObjectId
     this.taskSink = taskSink
     this.operatorUrlCandidates = operatorUrlCandidates
@@ -131,6 +167,11 @@ export class PrepareDownloadFileTask implements SyncTask {
     // And cloning it seems like a heavy operation.
     const operatorUrlIndices: number[] = [...Array(this.operatorUrlCandidates.length).keys()]
 
+    if (_.isEmpty(this.bagId)) {
+      logger.error(`Sync - invalid task - no bagId for ${this.dataObjectId}`)
+      return
+    }
+
     while (!_.isEmpty(operatorUrlIndices)) {
       const randomUrlIndex = _.sample(operatorUrlIndices)
       if (randomUrlIndex === undefined) {
@@ -146,12 +187,16 @@ export class PrepareDownloadFileTask implements SyncTask {
 
       try {
         const chosenBaseUrl = randomUrl
-        const remoteOperatorIds: string[] = await getRemoteDataObjects(chosenBaseUrl)
+        const [remoteOperatorIds, hash] = await Promise.all([
+          getRemoteDataObjects(chosenBaseUrl),
+          this.getExpectedHash(),
+        ])
 
         if (remoteOperatorIds.includes(this.dataObjectId)) {
           const newTask = new DownloadFileTask(
             chosenBaseUrl,
             this.dataObjectId,
+            hash,
             this.uploadsDirectory,
             this.tempDirectory
           )
@@ -165,4 +210,14 @@ export class PrepareDownloadFileTask implements SyncTask {
 
     logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`)
   }
+
+  async getExpectedHash(): Promise<string | undefined> {
+    if (this.api !== undefined) {
+      const convertedBagId = parseBagId(this.bagId)
+      const dataObject = await this.api.query.storage.dataObjectsById(convertedBagId, this.dataObjectId)
+      return hexToString(dataObject.ipfsContentId.toString())
+    }
+
+    return undefined
+  }
 }

+ 4 - 0
storage-node-v2/src/services/webApi/controllers/filesApi.ts

@@ -43,6 +43,10 @@ export async function getFile(req: express.Request, res: express.Response): Prom
     const uploadsDir = getUploadsDir(res)
     const fullPath = path.resolve(uploadsDir, dataObjectId)
 
+    if (dataObjectId === '1') {
+      throw new Error('Articifial file error')
+    }
+
     const fileInfo = await getFileInfo(fullPath)
     const fileStats = await fsPromises.stat(fullPath)