Explorar o código

Merge pull request #2544 from mnaamani/storage-node/sync-improvements

Storage-Node: Improve Sync Run
Martin %!s(int64=3) %!d(string=hai) anos
pai
achega
3e63695b5e

+ 2 - 2
storage-node/packages/colossus/api-base.yml

@@ -1,7 +1,7 @@
 openapi: '3.0.0'
 info:
-  title: 'Joystream Storage Node API.'
-  version: '1.0.0'
+  title: 'Colossus - Joystream Storage Node'
+  version: '1.1.0'
 paths: {} # Will be populated by express-openapi
 
 components:

+ 32 - 4
storage-node/packages/colossus/bin/cli.js

@@ -18,9 +18,6 @@ const debug = require('debug')('joystream:colossus')
 // Project root
 const PROJECT_ROOT = path.resolve(__dirname, '..')
 
-// Number of milliseconds to wait between synchronization runs.
-const SYNC_PERIOD_MS = 120000 // 2min
-
 // Parse CLI
 const FLAG_DEFINITIONS = {
   port: {
@@ -75,6 +72,10 @@ const FLAG_DEFINITIONS = {
     type: 'boolean',
     default: false,
   },
+  maxSync: {
+    type: 'number',
+    default: 200,
+  },
 }
 
 const cli = meow(
@@ -98,6 +99,7 @@ const cli = meow(
     --ipfs-host   hostname  ipfs host to use, default to 'localhost'. Default port 5001 is always used
     --anonymous             Runs server in anonymous mode. Replicates content without need to register
                             on-chain, and can serve content. Cannot be used to upload content.
+    --maxSync               The max number of items to sync concurrently. Defaults to 30.
   `,
   { flags: FLAG_DEFINITIONS }
 )
@@ -139,6 +141,10 @@ function getStorage(runtimeApi, { ipfsHost }) {
 
   const options = {
     resolve_content_id: async (contentId) => {
+      // Resolve accepted content from cache
+      const hash = runtimeApi.assets.resolveContentIdToIpfsHash(contentId)
+      if (hash) return hash
+
       // Resolve via API
       const obj = await runtimeApi.assets.getDataObject(contentId)
       if (!obj) {
@@ -269,6 +275,28 @@ const commands = {
       port = cli.flags.port
     }
 
+    // Get initlal data objects into cache
+    while (true) {
+      try {
+        debug('Fetching data objects')
+        await api.assets.fetchDataObjects()
+        break
+      } catch (err) {
+        debug('Failed fetching data objects', err)
+        await sleep(5000)
+      }
+    }
+
+    // Regularly update data objects
+    setInterval(async () => {
+      try {
+        debug('Fetching data objects')
+        await api.assets.fetchDataObjects()
+      } catch (err) {
+        debug('Failed updating data objects from chain', err)
+      }
+    }, 60000)
+
     // TODO: check valid url, and valid port number
     const store = getStorage(api, cli.flags)
 
@@ -276,7 +304,7 @@ const commands = {
     const ipfsHttpGatewayUrl = `http://${ipfsHost}:8080/`
 
     const { startSyncing } = require('../lib/sync')
-    startSyncing(api, { syncPeriod: SYNC_PERIOD_MS, anonymous: cli.flags.anonymous }, store)
+    startSyncing(api, { anonymous: cli.flags.anonymous, maxSync: cli.flags.maxSync }, store)
 
     if (!cli.flags.anonymous) {
       announcePublicUrl(api, publicUrl)

+ 46 - 109
storage-node/packages/colossus/lib/sync.js

@@ -21,39 +21,41 @@
 const debug = require('debug')('joystream:sync')
 const _ = require('lodash')
 const { ContentId } = require('@joystream/types/storage')
-// The number of concurrent sync sessions allowed. Must be greater than zero.
-const MAX_CONCURRENT_SYNC_ITEMS = 20
+const { nextTick } = require('@joystream/storage-utils/sleep')
 
-async function syncContent({ api, storage, contentBeingSynced, contentCompleteSynced }) {
-  const knownEncodedContentIds = (await api.assets.getAcceptedContentIds()).map((id) => id.encode())
+// Time to wait between sync runs. The lower the better chance to consume all
+// available sync sessions allowed.
+const INTERVAL_BETWEEN_SYNC_RUNS_MS = 3000
 
-  // Select ids which we have not yet fully synced
-  const needsSync = knownEncodedContentIds
-    .filter((id) => !contentCompleteSynced.has(id))
+async function syncRun({ api, storage, contentBeingSynced, contentCompletedSync, flags }) {
+  // The number of concurrent items to attemp to fetch.
+  const MAX_CONCURRENT_SYNC_ITEMS = Math.max(1, flags.maxSync)
+
+  const contentIds = api.assets.getAcceptedIpfsHashes()
+
+  // Select ids which may need to be synced
+  const idsNotSynced = contentIds
+    .filter((id) => !contentCompletedSync.has(id))
     .filter((id) => !contentBeingSynced.has(id))
 
-  // Since we are limiting concurrent content ids being synced, to ensure
+  // We are limiting how many content ids can be synced concurrently, so to ensure
   // better distribution of content across storage nodes during a potentially long
   // sync process we don't want all nodes to replicate items in the same order, so
   // we simply shuffle.
-  const candidatesForSync = _.shuffle(needsSync)
+  const idsToSync = _.shuffle(idsNotSynced)
 
-  // TODO: get the data object
-  // make sure the data object was Accepted by the liaison,
-  // don't just blindly attempt to fetch them
-  while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && candidatesForSync.length) {
-    const id = candidatesForSync.shift()
+  while (contentBeingSynced.size < MAX_CONCURRENT_SYNC_ITEMS && idsToSync.length) {
+    const id = idsToSync.shift()
 
     try {
       contentBeingSynced.set(id)
-      const contentId = ContentId.decode(api.api.registry, id)
-      await storage.synchronize(contentId, (err, status) => {
+      await storage.pin(id, (err, status) => {
         if (err) {
           contentBeingSynced.delete(id)
           debug(`Error Syncing ${err}`)
         } else if (status.synced) {
           contentBeingSynced.delete(id)
-          contentCompleteSynced.set(id)
+          contentCompletedSync.set(id)
         }
       })
     } catch (err) {
@@ -61,121 +63,56 @@ async function syncContent({ api, storage, contentBeingSynced, contentCompleteSy
       debug(`Failed calling synchronize ${err}`)
       contentBeingSynced.delete(id)
     }
-  }
-}
-
-async function createNewRelationships({ api, contentCompleteSynced }) {
-  const roleAddress = api.identities.key.address
-  const providerId = api.storageProviderId
-
-  // Create new relationships for synced content if required and
-  // compose list of relationship ids to be set to ready.
-  return (
-    await Promise.all(
-      [...contentCompleteSynced.keys()].map(async (id) => {
-        const contentId = ContentId.decode(api.api.registry, id)
-        const { relationship, relationshipId } = await api.assets.getStorageRelationshipAndId(providerId, contentId)
-
-        if (relationship) {
-          // maybe prior transaction to set ready failed for some reason..
-          if (!relationship.ready) {
-            return relationshipId
-          }
-        } else {
-          // create relationship
-          debug(`Creating new storage relationship for ${id}`)
-          try {
-            return await api.assets.createStorageRelationship(roleAddress, providerId, contentId)
-          } catch (err) {
-            debug(`Error creating new storage relationship ${id}: ${err.stack}`)
-          }
-        }
 
-        return null
-      })
-    )
-  ).filter((id) => id !== null)
-}
-
-async function setRelationshipsReady({ api, relationshipIds }) {
-  const roleAddress = api.identities.key.address
-  const providerId = api.storageProviderId
-
-  return Promise.all(
-    relationshipIds.map(async (relationshipId) => {
-      try {
-        await api.assets.toggleStorageRelationshipReady(roleAddress, providerId, relationshipId, true)
-      } catch (err) {
-        debug('Error setting relationship ready')
-      }
-    })
-  )
+    // Allow callbacks to call to storage.synchronize() to be invoked during this sync run
+    // This will happen if content is found to be local and will speed overall sync process.
+    await nextTick()
+  }
 }
 
-async function syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced }) {
+async function syncRunner({ api, flags, storage, contentBeingSynced, contentCompletedSync }) {
   const retry = () => {
-    setTimeout(syncPeriodic, flags.syncPeriod, {
+    setTimeout(syncRunner, INTERVAL_BETWEEN_SYNC_RUNS_MS, {
       api,
       flags,
       storage,
       contentBeingSynced,
-      contentCompleteSynced,
+      contentCompletedSync,
     })
   }
 
   try {
-    debug('Sync run started.')
-
-    const chainIsSyncing = await api.chainIsSyncing()
-    if (chainIsSyncing) {
-      debug('Chain is syncing. Postponing sync run.')
-      return retry()
-    }
-
-    if (!flags.anonymous) {
-      // Retry later if provider is not active
-      if (!(await api.providerIsActiveWorker())) {
-        debug(
-          'storage provider role account and storageProviderId are not associated with a worker. Postponing sync run.'
-        )
-        return retry()
-      }
-
-      const recommendedBalance = await api.providerHasMinimumBalance(300)
-      if (!recommendedBalance) {
-        debug('Warning: Provider role account is running low on balance.')
-      }
-
-      const sufficientBalance = await api.providerHasMinimumBalance(100)
-      if (!sufficientBalance) {
-        debug('Provider role account does not have sufficient balance. Postponing sync run!')
-        return retry()
-      }
-    }
-
-    await syncContent({ api, storage, contentBeingSynced, contentCompleteSynced })
-
-    // Only update on-chain state if not in anonymous mode
-    if (!flags.anonymous) {
-      const relationshipIds = await createNewRelationships({ api, contentCompleteSynced })
-      await setRelationshipsReady({ api, relationshipIds })
-      debug(`Sync run completed, set ${relationshipIds.length} new relationships to ready`)
+    if (await api.chainIsSyncing()) {
+      debug('Chain is syncing. Postponing sync.')
+    } else {
+      await syncRun({
+        api,
+        storage,
+        contentBeingSynced,
+        contentCompletedSync,
+        flags,
+      })
     }
   } catch (err) {
-    debug(`Error in sync run ${err.stack}`)
+    debug(`Error during sync ${err.stack}`)
   }
 
-  // always try again
+  // schedule next sync run
   retry()
 }
 
 function startSyncing(api, flags, storage) {
   // ids of content currently being synced
   const contentBeingSynced = new Map()
-  // ids of content that completed sync and may require creating a new relationship
-  const contentCompleteSynced = new Map()
+  // ids of content that completed sync
+  const contentCompletedSync = new Map()
+
+  syncRunner({ api, flags, storage, contentBeingSynced, contentCompletedSync })
 
-  syncPeriodic({ api, flags, storage, contentBeingSynced, contentCompleteSynced })
+  setInterval(() => {
+    debug(`objects syncing: ${contentBeingSynced.size}`)
+    debug(`objects local: ${contentCompletedSync.size}`)
+  }, 60000)
 }
 
 module.exports = {

+ 1 - 1
storage-node/packages/colossus/package.json

@@ -1,7 +1,7 @@
 {
   "name": "@joystream/colossus",
   "private": true,
-  "version": "0.3.0",
+  "version": "0.4.0",
   "description": "Colossus - Joystream Storage Node",
   "author": "Joystream",
   "homepage": "https://github.com/Joystream/joystream",

+ 57 - 19
storage-node/packages/colossus/paths/asset/v0/{id}.js

@@ -38,19 +38,63 @@ function errorHandler(response, err, code) {
 const PROCESS_UPLOAD_TX_COSTS = 3
 
 module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
+  debug('created path handler')
+
   // Creat the IPFS HTTP Gateway proxy middleware
   const proxy = ipfsProxy.createProxy(ipfsHttpGatewayUrl)
 
+  // Cache of known content mappings and local availability info
+  const ipfsContentIdMap = new Map()
+
+  // Make sure id is valid and was 'Accepted', only then proxy if content is local
   const proxyAcceptedContentToIpfsGateway = async (req, res, next) => {
-    // make sure id exists and was Accepted only then proxy
-    const dataObject = await runtime.assets.getDataObject(req.params.id)
-
-    if (dataObject && dataObject.liaison_judgement.type === 'Accepted') {
-      req.params.ipfs_content_id = dataObject.ipfs_content_id.toString()
-      proxy(req, res, next)
-    } else {
-      res.status(404).send({ message: 'Content not found' })
+    const content_id = req.params.id
+
+    if (!ipfsContentIdMap.has(content_id)) {
+      const hash = runtime.assets.resolveContentIdToIpfsHash(req.params.id)
+
+      if (!hash) {
+        return res.status(404).send({ message: 'Unknown content' })
+      }
+
+      ipfsContentIdMap.set(content_id, {
+        local: false,
+        ipfs_content_id: hash,
+      })
     }
+
+    const { ipfs_content_id, local } = ipfsContentIdMap.get(content_id)
+
+    // Pass on the ipfs hash to the middleware
+    req.params.ipfs_content_id = ipfs_content_id
+
+    // Serve it if we know we have it, or it was recently synced successfully
+    if (local || storage.syncStatus(ipfs_content_id).synced) {
+      return proxy(req, res, next)
+    }
+
+    // Not yet processed by sync run, check if we have it locally
+    try {
+      const stat = await storage.ipfsStat(ipfs_content_id, 4000)
+
+      if (stat.local) {
+        ipfsContentIdMap.set(content_id, {
+          local: true,
+          ipfs_content_id,
+        })
+
+        // We know we have the full content locally, serve it
+        return proxy(req, res, next)
+      }
+    } catch (_err) {
+      // timeout trying to stat which most likely means we do not have it
+      // debug('Failed to stat', ipfs_content_id)
+    }
+
+    // Valid content but no certainty that the node has it locally yet.
+    // We a void serving it to prevent poor performance (ipfs node will have to retrieve it on demand
+    // which might be slow and wasteful if content is not cached locally)
+    res.status(404).send({ message: 'Content not available locally' })
   }
 
   const doc = {
@@ -183,6 +227,11 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
           // they cannot be different unless we did something stupid!
           assert(hash === dataObject.ipfs_content_id.toString())
 
+          ipfsContentIdMap.set(id, {
+            ipfs_content_id: hash,
+            local: true,
+          })
+
           // Send ok response early, no need for client to wait for relationships to be created.
           debug('Sending OK response.')
           res.status(200).send({ message: 'Asset uploaded.' })
@@ -193,17 +242,6 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
             if (dataObject.liaison_judgement.type === 'Pending') {
               await runtime.assets.acceptContent(roleAddress, providerId, id)
             }
-
-            // Is there any real value in updating this state? Nobody uses it!
-            const { relationshipId } = await runtime.assets.getStorageRelationshipAndId(providerId, id)
-            if (!relationshipId) {
-              debug('creating storage relationship for newly uploaded content')
-              // Create storage relationship and flip it to ready.
-              const dosrId = await runtime.assets.createStorageRelationship(roleAddress, providerId, id)
-
-              debug('toggling storage relationship for newly uploaded content')
-              await runtime.assets.toggleStorageRelationshipReady(roleAddress, providerId, dosrId, true)
-            }
           } catch (err) {
             debug(`${err.message}`)
           }

+ 61 - 60
storage-node/packages/helios/bin/cli.js

@@ -10,49 +10,47 @@ function makeAssetUrl(contentId, source) {
   return `${source}/asset/v0/${encodeAddress(contentId)}`
 }
 
-async function assetRelationshipState(api, contentId, providers) {
-  const dataObject = await api.query.dataDirectory.dataByContentId(contentId)
-
-  const relationshipIds = await api.query.dataObjectStorageRegistry.relationshipsByContentId(contentId)
-
-  // how many relationships associated with active providers and in ready state
-  const activeRelationships = await Promise.all(
-    relationshipIds.map(async (id) => {
-      let relationship = await api.query.dataObjectStorageRegistry.relationships(id)
-      relationship = relationship.unwrap()
-      // only interested in ready relationships
-      if (!relationship.ready) {
-        return undefined
+// HTTP HEAD with axios all known content ids on given endpoint
+async function countContentAvailability(providerId, endpoint, contentIds) {
+  let found = 0
+  let errored = 0
+  let requestsSent = 0
+  // Avoid opening too many connections, do it in chunks.. otherwise we get
+  // "Client network socket disconnected before secure TLS connection was established" errors
+  while (contentIds.length) {
+    const chunk = contentIds.splice(0, 100)
+    requestsSent += chunk.length
+    const results = await Promise.allSettled(chunk.map((id) => axios.head(makeAssetUrl(id, endpoint))))
+
+    results.forEach((result, _ix) => {
+      if (result.status === 'rejected') {
+        errored++
+      } else {
+        found++
       }
-      // Does the relationship belong to an active provider ?
-      return providers.find((provider) => relationship.storage_provider.eq(provider))
     })
-  )
-
-  return [activeRelationships.filter((active) => active).length, dataObject.liaison_judgement]
-}
 
-// HTTP HEAD with axios all known content ids on each provider
-async function countContentAvailability(contentIds, source) {
-  const content = {}
-  let found = 0
-  let missing = 0
-  for (let i = 0; i < contentIds.length; i++) {
-    const assetUrl = makeAssetUrl(contentIds[i], source)
-    try {
-      const info = await axios.head(assetUrl)
-      content[encodeAddress(contentIds[i])] = {
-        type: info.headers['content-type'],
-        bytes: info.headers['content-length'],
-      }
-      // TODO: cross check against dataobject size
-      found++
-    } catch (err) {
-      missing++
-    }
+    // show some progress
+    console.error(`provider: ${providerId}:`, `total checks: ${requestsSent}`, `ok: ${found}`, `errors: ${errored}`)
   }
 
-  return { found, missing, content }
+  return { found, errored }
+}
+
+async function testProviderHasAssets(providerId, endpoint, contentIds) {
+  const total = contentIds.length
+  const startedAt = Date.now()
+  const { found, errored } = await countContentAvailability(providerId, endpoint, contentIds)
+  const completedAt = Date.now()
+  console.log(`
+    ---------------------------------------
+    Final Result for provider ${providerId}
+    url: ${endpoint}
+    fetched: ${found}/${total}
+    failed: ${errored}
+    check took: ${(completedAt - startedAt) / 1000}s
+    ------------------------------------------
+  `)
 }
 
 async function main() {
@@ -85,42 +83,45 @@ async function main() {
         return
       }
       const swaggerUrl = `${stripEndingSlash(provider.endpoint)}/swagger.json`
-      let error
       try {
-        await axios.get(swaggerUrl)
-        // maybe print out api version information to detect which version of colossus is running?
-        // or add anothe api endpoint for diagnostics information
+        const { data } = await axios.get(swaggerUrl)
+        console.log(
+          `${provider.providerId}:`,
+          `${provider.endpoint}`,
+          '- OK -',
+          `storage node version ${data.info.version}`
+        )
       } catch (err) {
-        error = err
+        console.log(`${provider.providerId}`, `${provider.endpoint} - ${err.message}`)
       }
-      console.log(`${provider.providerId}`, `${provider.endpoint} - ${error ? error.message : 'OK'}`)
     })
   )
 
-  const knownContentIds = await runtime.assets.getKnownContentIds()
-  const assetStatuses = {}
-  await Promise.all(
-    knownContentIds.map(async (contentId) => {
-      const [, judgement] = await assetRelationshipState(api, contentId, storageProviders)
-      const j = judgement.toString()
-      assetStatuses[j] = assetStatuses[j] ? assetStatuses[j] + 1 : 1
-    })
-  )
-  console.log(`\nData Directory has ${knownContentIds.length} assets:`, assetStatuses)
+  // Load data objects
+  await runtime.assets.fetchDataObjects()
+
+  const allContentIds = await runtime.assets.getKnownContentIds()
+  const acceptedContentIds = runtime.assets.getAcceptedContentIds()
+  const ipfsHashes = runtime.assets.getAcceptedIpfsHashes()
+
+  console.log('\nData Directory objects:')
+  console.log(allContentIds.length, 'created')
+  console.log(acceptedContentIds.length, 'accepted')
+  console.log(ipfsHashes.length, 'unique accepted hashes')
 
-  // interesting disconnect doesn't work unless an explicit provider was created
-  // for underlying api instance
   // We no longer need a connection to the chain
   api.disconnect()
 
-  console.log(`\nChecking available assets on providers (this can take some time)...`)
+  console.log(`
+    Checking available assets on providers (this can take some time)
+    This is done by sending HEAD requests for all 'Accepted' assets.
+  `)
+
   endpoints.forEach(async ({ providerId, endpoint }) => {
     if (!endpoint) {
       return
     }
-    const total = knownContentIds.length
-    const { found } = await countContentAvailability(knownContentIds, endpoint)
-    console.log(`provider ${providerId}: has ${found} out of ${total}`)
+    return testProviderHasAssets(providerId, endpoint, acceptedContentIds.slice())
   })
 }
 

+ 36 - 3
storage-node/packages/runtime-api/assets.js

@@ -158,9 +158,12 @@ class AssetsApi {
   /*
    * Returns array of all content ids in storage where liaison judgement was Accepted
    */
-  async getAcceptedContentIds() {
-    const entries = await this.base.api.query.dataDirectory.dataByContentId.entries()
-    return entries
+  getAcceptedContentIds() {
+    if (!this._cachedEntries) {
+      return []
+    }
+
+    return this._cachedEntries
       .filter(([, dataObject]) => dataObject.liaison_judgement.type === 'Accepted')
       .map(
         ([
@@ -170,6 +173,36 @@ class AssetsApi {
         ]) => contentId
       )
   }
+
+  /*
+   * Returns array of all ipfs hashes in storage where liaison judgement was Accepted
+   */
+  getAcceptedIpfsHashes() {
+    if (!this._cachedEntries) {
+      return []
+    }
+    const hashes = new Map()
+    this._cachedEntries
+      .filter(([, dataObject]) => dataObject.liaison_judgement.type === 'Accepted')
+      .forEach(([, dataObject]) => hashes.set(dataObject.ipfs_content_id.toString()))
+    return Array.from(hashes.keys())
+  }
+
+  /*
+   * Fetch and cache all data objects
+   */
+  async fetchDataObjects() {
+    this._cachedEntries = await this.base.api.query.dataDirectory.dataByContentId.entries()
+    this._idMappings = new Map()
+    this._cachedEntries.forEach(([{ args: [contentId] }, dataObject]) =>
+      this._idMappings.set(contentId.encode(), dataObject.ipfs_content_id.toString())
+    )
+  }
+
+  resolveContentIdToIpfsHash(contentId) {
+    if (!this._idMappings) return null
+    return this._idMappings.get(contentId)
+  }
 }
 
 module.exports = {

+ 25 - 22
storage-node/packages/storage/storage.js

@@ -300,10 +300,17 @@ class Storage {
    * Stat a content ID.
    */
   async stat(contentId, timeout) {
-    const resolved = await this.resolveContentIdWithTimeout(timeout, contentId)
+    const ipfsHash = await this.resolveContentIdWithTimeout(timeout, contentId)
 
-    return await this.withSpecifiedTimeout(timeout, (resolve, reject) => {
-      this.ipfs.files.stat(`/ipfs/${resolved}`, { withLocal: true }, (err, res) => {
+    return this.ipfsStat(ipfsHash, timeout)
+  }
+
+  /*
+   * Stat IPFS hash
+   */
+  async ipfsStat(hash, timeout) {
+    return this.withSpecifiedTimeout(timeout, (resolve, reject) => {
+      this.ipfs.files.stat(`/ipfs/${hash}`, { withLocal: true }, (err, res) => {
         if (err) {
           reject(err)
           return
@@ -362,13 +369,13 @@ class Storage {
   }
 
   async createReadStream(contentId, timeout) {
-    const resolved = await this.resolveContentIdWithTimeout(timeout, contentId)
+    const ipfsHash = await this.resolveContentIdWithTimeout(timeout, contentId)
 
     let found = false
     return await this.withSpecifiedTimeout(timeout, (resolve, reject) => {
-      const ls = this.ipfs.getReadableStream(resolved)
+      const ls = this.ipfs.getReadableStream(ipfsHash)
       ls.on('data', async (result) => {
-        if (result.path === resolved) {
+        if (result.path === ipfsHash) {
           found = true
 
           const ftStream = await fileType.stream(result.content)
@@ -392,32 +399,28 @@ class Storage {
   }
 
   /*
-   * Synchronize the given content ID
+   * Pin the given IPFS CID
    */
-  async synchronize(contentId, callback) {
-    const resolved = await this.resolveContentIdWithTimeout(this._timeout, contentId)
-
-    // TODO: validate resolved id is proper ipfs_cid, not null or empty string
-
-    if (!this.pinning[resolved] && !this.pinned[resolved]) {
-      debug(`Pinning hash: ${resolved} content-id: ${contentId}`)
-      this.pinning[resolved] = true
+  async pin(ipfsHash, callback) {
+    if (!this.pinning[ipfsHash] && !this.pinned[ipfsHash]) {
+      // debug(`Pinning hash: ${ipfsHash} content-id: ${contentId}`)
+      this.pinning[ipfsHash] = true
 
       // Callback passed to add() will be called on error or when the entire file
       // is retrieved. So on success we consider the content synced.
-      this.ipfs.pin.add(resolved, { quiet: true, pin: true }, (err) => {
-        delete this.pinning[resolved]
+      this.ipfs.pin.add(ipfsHash, { quiet: true, pin: true }, (err) => {
+        delete this.pinning[ipfsHash]
         if (err) {
-          debug(`Error Pinning: ${resolved}`)
+          debug(`Error Pinning: ${ipfsHash}`)
           callback && callback(err)
         } else {
-          debug(`Pinned ${resolved}`)
-          this.pinned[resolved] = true
-          callback && callback(null, this.syncStatus(resolved))
+          // debug(`Pinned ${ipfsHash}`)
+          this.pinned[ipfsHash] = true
+          callback && callback(null, this.syncStatus(ipfsHash))
         }
       })
     } else {
-      callback && callback(null, this.syncStatus(resolved))
+      callback && callback(null, this.syncStatus(ipfsHash))
     }
   }
 

+ 7 - 0
storage-node/packages/util/sleep.js

@@ -4,6 +4,13 @@ function sleep(ms) {
   })
 }
 
+function nextTick() {
+  return new Promise((resolve) => {
+    process.nextTick(resolve)
+  })
+}
+
 module.exports = {
   sleep,
+  nextTick,
 }