Browse Source

storage-node: do not reject content unless ipfs hash computed

Mokhtar Naamani 4 years ago
parent
commit
499de56dc3

+ 45 - 50
storage-node/packages/colossus/paths/asset/v0/{id}.js

@@ -21,6 +21,7 @@
 const debug = require('debug')('joystream:colossus:api:asset')
 const filter = require('@joystream/storage-node-backend/filter')
 const ipfsProxy = require('../../../lib/middleware/ipfs_proxy')
+const assert = require('assert')
 
 function errorHandler(response, err, code) {
   debug(err)
@@ -95,48 +96,21 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
       try {
         stream = await storage.open(id, 'w')
 
-        // We don't know whether the filtering occurs before or after the
-        // stream was finished, and can only commit if both passed.
-        let finished = false
-        let accepted = false
-        const possiblyCommit = () => {
-          if (finished && accepted) {
-            debug('Stream is finished and passed filters; committing.')
-            stream.commit()
-          }
-        }
+        let aborted = false
 
-        // May be emitted before entire stream is processed. If there was an error detecting the
-        // file info at end of stream info will be null.
+        // Early file info detection so we can abort early on.. but we do not reject
+        // content because we don't yet have ipfs computed
         stream.on('fileInfo', async (info) => {
           try {
-            debug('Detected file info:', info)
-
-            if (!info) {
-              // Do not process unknown content.
-              debug('Failed to detect content type!')
-              stream.end()
-              res.status(403).send({ message: 'Uknown content type' })
-              return
-            }
+            debug('Early file detection info:', info)
 
-            // Filter allowed content types
-            // == We haven't computed ipfs hash yet so is it really fair to reject content?
-            // == It may not be the real uploader doing the upload!
             const filterResult = filter({}, req.headers, info.mimeType)
             if (filterResult.code !== 200) {
-              debug('Rejecting content', filterResult.message)
+              aborted = true
+              debug('Ending stream', filterResult.message)
               stream.end()
+              stream.cleanup()
               res.status(filterResult.code).send({ message: filterResult.message })
-
-              // Reject the content
-              await runtime.assets.rejectContent(roleAddress, providerId, id)
-            } else {
-              debug('Content accepted.')
-              accepted = true
-
-              // We may have to commit the stream.
-              possiblyCommit()
             }
           } catch (err) {
             errorHandler(res, err)
@@ -144,24 +118,49 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
         })
 
         // `finish` comes before `fileInfo` event if file info detection happened at end of stream.
-        stream.on('finish', () => {
-          try {
-            finished = true
-            possiblyCommit()
-          } catch (err) {
-            errorHandler(res, err)
+        stream.on('finish', async () => {
+          if (!aborted) {
+            try {
+              // try to get file info and compute ipfs hash before committing the stream to ifps node.
+              await stream.info()
+            } catch (err) {
+              errorHandler(res, err)
+            }
           }
         })
 
-        stream.on('committed', async (hash) => {
-          try {
-            if (hash !== dataObject.ipfs_content_id.toString()) {
-              debug('Rejecting content. IPFS hash does not match value in objectId')
+        // At end of stream we should have file info and computed ipfs hash - this event is emitted
+        // only by explicitly calling stream.info() in the stream.on('finish') event handler
+        stream.once('info', async (info, hash) => {
+          if (hash === dataObject.ipfs_content_id.toString()) {
+            const filterResult = filter({}, req.headers, info.mimeType)
+            if (filterResult.code !== 200) {
+              debug('Rejecting content')
+              stream.cleanup()
               await runtime.assets.rejectContent(roleAddress, providerId, id)
-              res.status(400).send({ message: "Uploaded content doesn't match IPFS hash" })
-              return
+              res.status(400).send({ message: 'Rejecting content type' })
+            } else {
+              try {
+                await stream.commit()
+              } catch (err) {
+                errorHandler(res, err)
+              }
             }
+          } else {
+            stream.cleanup()
+            res.status(400).send({ message: 'Aborting - Not expected IPFS hash for content' })
+          }
+        })
 
+        stream.on('committed', async (hash) => {
+          // they cannot be different unless we did something stupid!
+          assert(hash === dataObject.ipfs_content_id.toString())
+
+          // Send ok response early, no need for client to wait for relationships to be created.
+          debug('Sending OK response.')
+          res.status(200).send({ message: 'Asset uploaded.' })
+
+          try {
             debug('accepting Content')
             await runtime.assets.acceptContent(roleAddress, providerId, id)
 
@@ -171,12 +170,8 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
 
             debug('toggling storage relationship for newly uploaded content')
             await runtime.assets.toggleStorageRelationshipReady(roleAddress, providerId, dosrId, true)
-
-            debug('Sending OK response.')
-            res.status(200).send({ message: 'Asset uploaded.' })
           } catch (err) {
             debug(`${err.message}`)
-            errorHandler(res, err)
           }
         })
 

+ 30 - 17
storage-node/packages/storage/storage.js

@@ -25,6 +25,8 @@ const debug = require('debug')('joystream:storage:storage')
 
 const Promise = require('bluebird')
 
+const Hash = require('ipfs-only-hash')
+
 Promise.config({
   cancellation: true,
 })
@@ -97,12 +99,10 @@ class StorageWriteStream extends Transform {
       chunk = Buffer.from(chunk)
     }
 
-    // Logging this all the time is too verbose
-    // debug('Writing temporary chunk', chunk.length, chunk);
     this.temp.write(chunk)
 
     // Try to detect file type during streaming.
-    if (!this.fileInfo && this.buf.byteLength < fileType.minimumBytes) {
+    if (!this.fileInfo && this.buf.byteLength <= fileType.minimumBytes) {
       this.buf = Buffer.concat([this.buf, chunk])
 
       if (this.buf >= fileType.minimumBytes) {
@@ -121,26 +121,37 @@ class StorageWriteStream extends Transform {
   _flush(callback) {
     debug('Flushing temporary stream:', this.temp.path)
     this.temp.end()
+    callback(null)
+  }
 
-    // TODO: compute ipfs hash and include it in emitted event fileInfo
+  /*
+   * Get file info
+   */
+
+  async info() {
+    if (!this.temp) {
+      throw new Error('Cannot get info on temporary stream that does not exist. Did you call cleanup()?')
+    }
 
-    // Since we're finished, we can try to detect the file type again.
-    // If we don't find type we should still emit with some indication of detection error.
     if (!this.fileInfo) {
       const read = fs.createReadStream(this.temp.path)
-      fileType
-        .stream(read)
-        .then((stream) => {
-          this.fileInfo = fixFileInfoOnStream(stream).fileInfo
-          this.emit('fileInfo', this.fileInfo)
-        })
-        .catch((err) => {
-          debug('Error trying to detect file type at end-of-stream:', err)
-          this.emit('fileInfo', null)
-        })
+
+      const stream = await fileType.stream(read)
+
+      this.fileInfo = fixFileInfoOnStream(stream).fileInfo
     }
 
-    callback(null)
+    if (!this.hash) {
+      const read = fs.createReadStream(this.temp.path)
+      this.hash = await Hash.of(read)
+    }
+
+    this.emit('info', this.fileInfo, this.hash)
+
+    return {
+      info: this.fileInfo,
+      hash: this.hash,
+    }
   }
 
   /*
@@ -159,10 +170,12 @@ class StorageWriteStream extends Transform {
         debug('Stream committed as', hash)
         this.emit('committed', hash)
         await this.storage.ipfs.pin.add(hash)
+        this.cleanup()
       })
       .catch((err) => {
         debug('Error committing stream', err)
         this.emit('error', err)
+        this.cleanup()
       })
   }