Browse Source

storage-node: limit concurrent items being synced and shorten sync period

Mokhtar Naamani 4 years ago
parent
commit
f6fccde144

+ 1 - 1
storage-node/packages/colossus/bin/cli.js

@@ -18,7 +18,7 @@ const debug = require('debug')('joystream:colossus')
 const PROJECT_ROOT = path.resolve(__dirname, '..')
 
 // Number of milliseconds to wait between synchronization runs.
-const SYNC_PERIOD_MS = 300000 // 5min
+const SYNC_PERIOD_MS = 120000 // 2min
 
 // Parse CLI
 const FLAG_DEFINITIONS = {

+ 22 - 4
storage-node/packages/colossus/lib/sync.js

@@ -19,6 +19,10 @@
 'use strict'
 
 const debug = require('debug')('joystream:sync')
+const _ = require('lodash')
+
+const MAX_CONCURRENT_SYNC_ITEMS = 15
+const contentBeingSynced = new Map()
 
 async function syncCallback(api, storage) {
   const knownContentIds = await api.assets.getKnownContentIds()
@@ -31,9 +35,16 @@ async function syncCallback(api, storage) {
   // by storage to ipfs cid, maybe we can resolve them locally
   // and cache result to simplify async code below and reduce
   // queries
+
+  // Since we are limiting concurrent content ids being synced, to ensure
+  // better distribution of content across storage nodes during a potentially long
+  // sync process we don't want all nodes to replicate items in the same order, so
+  // we simply shuffle ids around.
+  const shuffledIds = _.shuffle(knownContentIds)
+
   const syncedIds = (
     await Promise.all(
-      knownContentIds.map(async (contentId) => {
+      shuffledIds.map(async (contentId) => {
         // TODO: get the data object
         // make sure the data object was Accepted by the liaison,
         // don't just blindly attempt to fetch them
@@ -45,9 +56,16 @@ async function syncCallback(api, storage) {
             return contentId
           }
 
-          if (!synced && !syncing) {
-            // Todo: limit concurrent syncing ?
-            storage.synchronize(contentId).catch()
+          if (!synced && !syncing && contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS) {
+            try {
+              contentBeingSynced.set(contentId, true)
+
+              await storage.synchronize(contentId, () => {
+                contentBeingSynced.delete(contentId)
+              })
+            } catch (err) {
+              contentBeingSynced.delete(contentId)
+            }
           }
         } catch (err) {
           //

+ 3 - 1
storage-node/packages/storage/storage.js

@@ -364,7 +364,7 @@ class Storage {
   /*
    * Synchronize the given content ID
    */
-  async synchronize(contentId) {
+  async synchronize(contentId, callback) {
     const resolved = await this.resolveContentIdWithTimeout(this._timeout, contentId)
 
     // TODO: validate resolved id is proper ipfs_cid, not null or empty string
@@ -385,9 +385,11 @@ class Storage {
       delete this.pinning[resolved]
       if (err) {
         debug(`Error Pinning: ${resolved}`)
+        callback && callback(err)
       } else {
         debug(`Pinned ${resolved}`)
         this.pinned[resolved] = true
+        callback && callback()
       }
     })
   }