Browse Source

storage-node: sync run refactor
shorter interval between sync runs
separate interval for fetching new content ids
maxSync as argument

Mokhtar Naamani 3 years ago
parent
commit
ce1ea66868
2 changed files with 53 additions and 37 deletions
  1. 6 4
      storage-node/packages/colossus/bin/cli.js
  2. 47 33
      storage-node/packages/colossus/lib/sync.js

+ 6 - 4
storage-node/packages/colossus/bin/cli.js

@@ -18,9 +18,6 @@ const debug = require('debug')('joystream:colossus')
 // Project root
 const PROJECT_ROOT = path.resolve(__dirname, '..')
 
-// Number of milliseconds to wait between synchronization runs.
-const SYNC_PERIOD_MS = 30000
-
 // Parse CLI
 const FLAG_DEFINITIONS = {
   port: {
@@ -75,6 +72,10 @@ const FLAG_DEFINITIONS = {
     type: 'boolean',
     default: false,
   },
+  maxSync: {
+    type: 'number',
+    default: 30,
+  },
 }
 
 const cli = meow(
@@ -98,6 +99,7 @@ const cli = meow(
     --ipfs-host   hostname  ipfs host to use, default to 'localhost'. Default port 5001 is always used
     --anonymous             Runs server in anonymous mode. Replicates content without need to register
                             on-chain, and can serve content. Cannot be used to upload content.
+    --maxSync               The max number of items to sync concurrently. Defaults to 30. Must be greater than 0.
   `,
   { flags: FLAG_DEFINITIONS }
 )
@@ -276,7 +278,7 @@ const commands = {
     const ipfsHttpGatewayUrl = `http://${ipfsHost}:8080/`
 
     const { startSyncing } = require('../lib/sync')
-    startSyncing(api, { syncPeriod: SYNC_PERIOD_MS, anonymous: cli.flags.anonymous }, store)
+    startSyncing(api, { anonymous: cli.flags.anonymous, maxSync: cli.flags.maxSync }, store)
 
     if (!cli.flags.anonymous) {
       announcePublicUrl(api, publicUrl)

+ 47 - 33
storage-node/packages/colossus/lib/sync.js

@@ -23,26 +23,28 @@ const _ = require('lodash')
 const { ContentId } = require('@joystream/types/storage')
 const { nextTick } = require('@joystream/storage-utils/sleep')
 
-// The number of concurrent items to attemp to fetch. Must be greater than zero.
-const MAX_CONCURRENT_SYNC_ITEMS = 30
-
-async function syncContent({ api, storage, contentBeingSynced, contentCompleteSynced }) {
-  if (contentBeingSynced.size === MAX_CONCURRENT_SYNC_ITEMS) return
-
-  const knownEncodedContentIds = (await api.assets.getAcceptedContentIds()).map((id) => id.encode())
-
-  // Select ids which we have not yet fully synced
-  const needsSync = knownEncodedContentIds
-    .filter((id) => !contentCompleteSynced.has(id))
-    .filter((id) => !contentBeingSynced.has(id))
-
-  // Since we are limiting concurrent content ids being synced, to ensure
+// Time to wait between sync runs. The lower the better chance to consume all
+// available sync sessions allowed.
+const INTERVAL_BETWEEN_SYNC_RUNS_MS = 3000
+// Time between refreshing content ids from chain
+const CONTENT_ID_REFRESH_INTERVAL_MS = 60000
+// Minimum concurrency. Must be greater than zero.
+const MIN_CONCURRENT_SYNC_ITEMS = 5
+
+async function syncRun({ api, storage, contentBeingSynced, contentCompletedSync, flags, contentIds }) {
+  // The number of concurrent items to attemp to fetch.
+  const MAX_CONCURRENT_SYNC_ITEMS = Math.max(MIN_CONCURRENT_SYNC_ITEMS, flags.maxSync)
+
+  // Select ids which may need to be synced
+  const needsSync = contentIds.filter((id) => !contentCompletedSync.has(id)).filter((id) => !contentBeingSynced.has(id))
+
+  // We are limiting how many content ids can be synced concurrently, so to ensure
   // better distribution of content across storage nodes during a potentially long
   // sync process we don't want all nodes to replicate items in the same order, so
   // we simply shuffle.
   const candidatesForSync = _.shuffle(needsSync)
 
-  debug(`${candidatesForSync.length} items remaining to process`)
+  // Number of items synced in current run
   let syncedItemsCount = 0
 
   while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && candidatesForSync.length) {
@@ -63,57 +65,69 @@ async function syncContent({ api, storage, contentBeingSynced, contentCompleteSy
         } else if (status.synced) {
           syncedItemsCount++
           contentBeingSynced.delete(id)
-          contentCompleteSynced.set(id)
+          contentCompletedSync.set(id)
         }
       })
-
-      // Allow short time for checking if content is already stored locally.
-      // So we can handle more new items per run.
-      await nextTick()
     } catch (err) {
       // Most likely failed to resolve the content id
       debug(`Failed calling synchronize ${err}`)
       contentBeingSynced.delete(id)
     }
-  }
 
-  debug(`Items processed in this sync run ${syncedItemsCount}`)
+    // Allow callbacks to call to storage.synchronize() to be invoked during this sync run
+    // This will happen if content is found to be local and will speed overall sync process.
+    await nextTick()
+  }
 }
 
-async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced }) {
+async function syncRunner({ api, flags, storage, contentBeingSynced, contentCompletedSync, contentIds }) {
   const retry = () => {
-    setTimeout(syncPeriodic, flags.syncPeriod, {
+    setTimeout(syncRunner, INTERVAL_BETWEEN_SYNC_RUNS_MS, {
       api,
       flags,
       storage,
       contentBeingSynced,
-      contentCompleteSynced,
+      contentCompletedSync,
+      contentIds,
     })
   }
 
   try {
-    const chainIsSyncing = await api.chainIsSyncing()
-
-    if (chainIsSyncing) {
+    if (await api.chainIsSyncing()) {
       debug('Chain is syncing. Postponing sync.')
     } else {
-      await syncContent({ api, storage, contentBeingSynced, contentCompleteSynced })
+      // Do not fetch content ids on every singe run as it is expensive operation
+      const now = Date.now()
+      if (!contentIds || now - contentIds.fetchedAt > CONTENT_ID_REFRESH_INTERVAL_MS) {
+        // re-fetch content ids
+        contentIds = (await api.assets.getAcceptedContentIds()).map((id) => id.encode())
+        contentIds.fetchedAt = Date.now()
+      }
+
+      await syncRun({
+        api,
+        storage,
+        contentBeingSynced,
+        contentCompletedSync,
+        flags,
+        contentIds,
+      })
     }
   } catch (err) {
     debug(`Error during sync ${err.stack}`)
   }
 
-  // always try again
+  // schedule next sync run
   retry()
 }
 
 function startSyncing(api, flags, storage) {
   // ids of content currently being synced
   const contentBeingSynced = new Map()
-  // ids of content that completed sync and may require creating a new relationship
-  const contentCompleteSynced = new Map()
+  // ids of content that completed sync
+  const contentCompletedSync = new Map()
 
-  syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced })
+  syncRunner({ api, flags, storage, contentBeingSynced, contentCompletedSync })
 }
 
 module.exports = {