Browse Source

storage-node-v2: Add ‘sync’ feature to the `server` command.

Shamil Gadelshin 3 years ago
parent
commit
b07dd26fef

+ 1 - 1
storage-node-v2/scripts/generate-test-data.ts

@@ -10,7 +10,7 @@ async function doJob(): Promise<void> {
   const uploadDirectory = '/Users/shamix/uploads2'
   const fileSize = 100
 
-  const objectNumber = 1000
+  const objectNumber = 2000
   const bagNumber = 10
   const bucketNumber = 10
 

+ 8 - 18
storage-node-v2/src/commands/dev/sync.ts

@@ -1,5 +1,6 @@
 import { Command, flags } from '@oclif/command'
 import { performSync } from '../../services/sync/synchronizer'
+import logger from '../../services/logger'
 
 export default class DevSync extends Command {
   static description =
@@ -12,7 +13,7 @@ export default class DevSync extends Command {
       required: true,
       description: 'Storage node operator worker ID.',
     }),
-    processNumber: flags.integer({
+    syncWorkersNumber: flags.integer({
       char: 'p',
       required: false,
       description: 'Sync workers number (max async operations in progress).',
@@ -35,14 +36,14 @@ export default class DevSync extends Command {
     }),
   }
 
-  async run() {
+  async run(): Promise<void> {
     const { flags } = this.parse(DevSync)
 
-    console.log('Syncing...')
+    logger.info('Syncing...')
 
     const queryNodeHost = flags.queryNodeHost ?? 'localhost:8081'
     const queryNodeUrl = `http://${queryNodeHost}/graphql`
-    const processNumber = flags.processNumber ?? 20
+    const syncWorkersNumber = flags.syncWorkersNumber ?? 20
     const dataSourceOperatorHost =
       flags.dataSourceOperatorHost ?? 'localhost:3333'
     const operatorUrl = `http://${dataSourceOperatorHost}/`
@@ -50,25 +51,14 @@ export default class DevSync extends Command {
     try {
       await performSync(
         flags.workerId,
-        processNumber,
+        syncWorkersNumber,
         queryNodeUrl,
         flags.uploads,
         operatorUrl
       )
     } catch (err) {
-      console.log(err)
-      console.log(JSON.stringify(err, null, 2))
+      logger.error(err)
+      logger.error(JSON.stringify(err, null, 2))
     }
   }
 }
-
-// TODO: implement periodical sync
-// import sleep from 'sleep-promise'
-// export function runSyncWithInterval() {
-//   setTimeout(async () => {
-//     await sleep(5000)
-//     console.log('Syncing with timeout...')
-//     await performSync()
-//     runSyncWithInterval()
-//   }, 0)
-// }

+ 77 - 0
storage-node-v2/src/commands/server.ts

@@ -2,6 +2,9 @@ import { flags } from '@oclif/command'
 import { createApp } from '../services/webApi/app'
 import ApiCommandBase from '../command-base/ApiCommandBase'
 import logger from '../services/logger'
+import { performSync } from '../services/sync/synchronizer'
+import sleep from 'sleep-promise'
+import _ from 'lodash'
 
 /**
  * CLI command:
@@ -29,6 +32,27 @@ export default class Server extends ApiCommandBase {
       required: true,
       description: 'Server port.',
     }),
+    sync: flags.boolean({
+      char: 's',
+      description: 'Enable data synchronization.',
+      default: false, // TODO: turn on by default
+    }),
+    syncInterval: flags.integer({
+      char: 'i',
+      description: 'Interval between syncronizations (in minutes)',
+      default: 1,
+    }),
+    queryNodeHost: flags.string({
+      char: 'q',
+      required: false,
+      description: 'Query node host and port (e.g.: some.com:8081)',
+    }),
+    syncWorkersNumber: flags.integer({
+      char: 'r',
+      required: false,
+      description: 'Sync workers number (max async operations in progress).',
+      default: 20,
+    }),
     ...ApiCommandBase.flags,
   }
 
@@ -39,6 +63,25 @@ export default class Server extends ApiCommandBase {
       await this.ensureDevelopmentChain()
     }
 
+    if (flags.sync) {
+      logger.info(`Synchronization enabled.`)
+
+      if (_.isEmpty(flags.queryNodeHost)) {
+        this.error('Query node host parameter required with enabled sync.')
+      }
+
+      const queryNodeUrl = `http://${flags.queryNodeHost}/graphql`
+      logger.info(`Query node endpoint set: ${queryNodeUrl}`)
+
+      runSyncWithInterval(
+        flags.worker,
+        queryNodeUrl,
+        flags.uploads,
+        flags.syncWorkersNumber,
+        flags.syncInterval
+      )
+    }
+
     const account = this.getAccount(flags)
     const api = await this.getApi()
 
@@ -60,3 +103,37 @@ export default class Server extends ApiCommandBase {
   /* eslint-disable @typescript-eslint/no-empty-function */
   async finally(): Promise<void> {}
 }
+
+function runSyncWithInterval(
+  workerId: number,
+  queryNodeUrl: string,
+  uploadsDirectory: string,
+  syncWorkersNumber: number,
+  syncIntervalMinutes: number
+) {
+  setTimeout(async () => {
+    const sleepIntevalInSeconds = syncIntervalMinutes * 60 * 1000
+    logger.info(`Sync paused for ${syncIntervalMinutes} minute(s).`)
+    await sleep(sleepIntevalInSeconds)
+    logger.info(`Resume syncing....`)
+
+    try {
+      await performSync(
+        workerId,
+        syncWorkersNumber,
+        queryNodeUrl,
+        uploadsDirectory
+      )
+    } catch (err) {
+      logger.error(`Critical sync error: ${err}`)
+    }
+
+    runSyncWithInterval(
+      workerId,
+      queryNodeUrl,
+      uploadsDirectory,
+      syncWorkersNumber,
+      syncIntervalMinutes
+    )
+  }, 0)
+}

+ 1 - 1
storage-node-v2/src/services/helpers/tokenNonceKeeper.ts

@@ -1,7 +1,7 @@
 import NodeCache from 'node-cache'
 
 // Expiration period in seconds for the local nonce cache.
-const TokenExpirationPeriod: number = 30 * 1000 // seconds
+const TokenExpirationPeriod = 30 // seconds
 
 // Max nonce number in local cache
 const MaxNonces = 100000

+ 2 - 2
storage-node-v2/src/services/sync/remoteData.ts

@@ -4,7 +4,7 @@ import logger from '../../services/logger'
 import NodeCache from 'node-cache'
 
 // Expiration period in seconds for the local cache.
-const ExpirationPeriod: number = 5 * (60 * 1000) // minutes
+const ExpirationPeriod: number = 5 * 60 // minutes
 
 // Max data entries in local cache
 const MaxEntries = 10000
@@ -34,7 +34,7 @@ export async function getAvailableData(operatorUrl: string): Promise<string[]> {
   }
 
   const cachedData = availableCidsCache.get<string[]>(url)
-  if (!!cachedData) {
+  if (cachedData) {
     logger.debug(`Sync - getting from cache available data for ${url}`)
     return cachedData
   }

+ 6 - 3
storage-node-v2/src/services/sync/synchronizer.ts

@@ -23,7 +23,7 @@ export async function getLocalDataObjects(
 
 export async function performSync(
   workerId: number,
-  processNumber: number,
+  syncWorkersNumber: number,
   queryNodeUrl: string,
   uploadDirectory: string,
   operatorUrl?: string
@@ -33,7 +33,6 @@ export async function performSync(
     getRuntimeModel(queryNodeUrl, workerId),
     getLocalFileNames(uploadDirectory),
   ])
-  console.log(model)
 
   const requiredCids = model.dataObjects.map((obj) => obj.cid)
 
@@ -62,7 +61,10 @@ export async function performSync(
 
   logger.debug(`Sync - started processing...`)
 
-  const processSpawner = new TaskProcessorSpawner(workingStack, processNumber)
+  const processSpawner = new TaskProcessorSpawner(
+    workingStack,
+    syncWorkersNumber
+  )
 
   await workingStack.add(addedTasks)
   await workingStack.add(deletedTasks)
@@ -295,6 +297,7 @@ class PrepareDownloadFileTask implements SyncTask {
   ) {
     this.cid = cid
     this.taskSink = taskSink
+    // TODO: remove heavy operation
     // Cloning is critical here. The list will be modified.
     this.operatorUrlCandidates = _.cloneDeep(operatorUrlCandidates)
     this.uploadsDirectory = uploadsDirectory