Browse Source

storage-node-v2: Add temp file on syncing.

Shamil Gadelshin 3 years ago
parent
commit
2b4c5c2bdc

+ 1 - 0
storage-node-v2/package.json

@@ -43,6 +43,7 @@
     "superagent": "^6.1.0",
     "tslib": "^1",
     "url-join": "^4.0.1",
+    "uuid": "^8.3.2",
     "winston": "^3.3.3",
     "winston-elasticsearch": "^0.15.8"
   },

+ 1 - 1
storage-node-v2/scripts/generate-test-data.ts

@@ -21,7 +21,7 @@ async function doJob(): Promise<void> {
   ]
 
   const updateDb = true
-  const generateFiles = true
+  const generateFiles = false
 
   if (updateDb) {
     const config : ClientConfig = {

+ 17 - 6
storage-node-v2/src/services/logger.ts

@@ -1,5 +1,4 @@
-import winston, { Logger, transport } from 'winston'
-
+import winston, { transport } from 'winston'
 import expressWinston from 'express-winston'
 import { Handler, ErrorRequestHandler } from 'express'
 import { ElasticsearchTransport } from 'winston-elasticsearch'
@@ -66,8 +65,10 @@ function createDefaultLogger(): winston.Logger {
   return winston.createLogger(defaultOptions)
 }
 
+// Default global logger variable
 let InnerLogger = createDefaultLogger()
 
+// Enables changing the underlying logger which is default import in other modules.
 const proxy = new Proxy(InnerLogger, {
   get(target: winston.Logger, propKey: symbol) {
     const method = Reflect.get(target, propKey)
@@ -82,6 +83,7 @@ export default proxy
 /**
  * Creates Express-Winston logger handler.
  *
+ * @param elasticSearchEndpoint - elastic search engine endpoint (optional).
  * @returns  Express-Winston logger handler
  *
  */
@@ -185,19 +187,28 @@ function createElasticLogger(elasticSearchEndpoint: string): winston.Logger {
   return logger
 }
 
-export function initElasticLogger(elasticSearchEndpoint: string): Logger {
+/**
+ * Updates the default system logger with elastic search capabilities.
+ *
+ * @param elasticSearchEndpoint - elastic search engine endpoint.
+ */
+export function initElasticLogger(elasticSearchEndpoint: string): void {
   InnerLogger = createElasticLogger(elasticSearchEndpoint)
-
-  return InnerLogger
 }
 
+/**
+ * Creates winston logger transport for the elastic search engine.
+ *
+ * @param elasticSearchEndpoint - elastic search engine endpoint.
+ * @returns elastic search winston transport
+ */
 function createElasticTransport(
   elasticSearchEndpoint: string
 ): winston.transport {
   const esTransportOpts = {
     level: 'debug', // TODO: consider changing to warn
     clientOpts: { node: elasticSearchEndpoint, maxRetries: 5 },
-    index: 'storage-node'
+    index: 'storage-node',
   }
   return new ElasticsearchTransport(esTransportOpts)
 }

+ 19 - 10
storage-node-v2/src/services/sync/synchronizer.ts

@@ -10,6 +10,7 @@ import superagent from 'superagent'
 import urljoin from 'url-join'
 import AwaitLock from 'await-lock'
 import sleep from 'sleep-promise'
+import { v4 as uuidv4 } from 'uuid'
 const fsPromises = fs.promises
 
 // TODO: use caching
@@ -102,20 +103,24 @@ class DeleteLocalFileTask implements SyncTask {
 }
 
 class DownloadFileTask implements SyncTask {
-  filepath: string
+  id: string
+  uploadsDirectory: string
   url: string
 
-  constructor(baseUrl: string, filename: string, uploadsDirectory: string) {
-    this.filepath = path.join(uploadsDirectory, filename)
-    this.url = urljoin(baseUrl, 'api/v1/files', filename)
+  constructor(baseUrl: string, id: string, uploadsDirectory: string) {
+    this.id = id
+    this.uploadsDirectory = uploadsDirectory
+    this.url = urljoin(baseUrl, 'api/v1/files', id)
   }
 
   description(): string {
-    return `Sync - downloading file: ${this.url} as ${this.filepath} ....`
+    return `Sync - downloading file: ${this.url} to ${this.uploadsDirectory} ....`
   }
 
   async execute(): Promise<void> {
     const streamPipeline = promisify(pipeline)
+    const filepath = path.join(this.uploadsDirectory, this.id)
+
     try {
       const timeoutMs = 30 * 60 * 1000 // 30 min for large files (~ 10 GB)
       // Casting because of:
@@ -124,16 +129,20 @@ class DownloadFileTask implements SyncTask {
         .get(this.url)
         .timeout(timeoutMs) as unknown as NodeJS.ReadableStream
 
-      const fileStream = fs.createWriteStream(this.filepath)
-
+      // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
+      // This partial downloads will be cleaned up during the next sync iteration.
+      const tempFilePath = path.join(this.uploadsDirectory, uuidv4())
+      const fileStream = fs.createWriteStream(tempFilePath)
       await streamPipeline(request, fileStream)
+
+      await fsPromises.rename(tempFilePath, filepath)
     } catch (err) {
       logger.error(`Sync - fetching data error for ${this.url}: ${err}`)
       try {
-        logger.warn(`Cleaning up file ${this.filepath}`)
-        await fs.unlinkSync(this.filepath)
+        logger.warn(`Cleaning up file ${filepath}`)
+        await fs.unlinkSync(filepath)
       } catch (err) {
-        logger.error(`Sync - cannot cleanup file ${this.filepath}: ${err}`)
+        logger.error(`Sync - cannot cleanup file ${filepath}: ${err}`)
       }
     }
   }

+ 5 - 0
yarn.lock

@@ -31358,6 +31358,11 @@ uuid@^8.2.0, uuid@^8.3.0:
   resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.0.tgz#ab738085ca22dc9a8c92725e459b1d507df5d6ea"
   integrity sha512-fX6Z5o4m6XsXBdli9g7DtWgAx+osMsRRZFKma1mIUsLCz6vRvv+pz5VNbyu9UEDzpMWulZfvpgb/cmDXVulYFQ==
 
+uuid@^8.3.2:
+  version "8.3.2"
+  resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2"
+  integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==
+
 v8-compile-cache@^2.0.3:
   version "2.1.0"
   resolved "https://registry.yarnpkg.com/v8-compile-cache/-/v8-compile-cache-2.1.0.tgz#e14de37b31a6d194f5690d67efc4e7f6fc6ab30e"