storage.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. /*
  2. * This file is part of the storage node for the Joystream project.
  3. * Copyright (C) 2019 Joystream Contributors
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. */
  18. 'use strict'
  19. const { Transform } = require('stream')
  20. const fs = require('fs')
  21. const debug = require('debug')('joystream:storage:storage')
  22. const Promise = require('bluebird')
  23. Promise.config({
  24. cancellation: true,
  25. })
  26. const fileType = require('file-type')
  27. const ipfsClient = require('ipfs-http-client')
  28. const temp = require('temp').track()
  29. const _ = require('lodash')
  30. // Default request timeout; imposed on top of the IPFS client, because the
  31. // client doesn't seem to care.
  32. const DEFAULT_TIMEOUT = 30 * 1000
  33. // Default/dummy resolution implementation.
  34. const DEFAULT_RESOLVE_CONTENT_ID = async (original) => {
  35. debug('Warning: Default resolution returns original CID', original)
  36. return original
  37. }
  38. // Default file info if nothing could be detected.
  39. const DEFAULT_FILE_INFO = {
  40. mimeType: 'application/octet-stream',
  41. ext: 'bin',
  42. }
  43. /*
  44. * fileType is a weird name, because we're really looking at MIME types.
  45. * Also, the type field includes extension info, so we're going to call
  46. * it fileInfo { mimeType, ext } instead.
  47. * Nitpicking, but it also means we can add our default type if things
  48. * go wrong.
  49. */
  50. function fixFileInfo(info) {
  51. if (!info) {
  52. info = DEFAULT_FILE_INFO
  53. } else {
  54. info.mimeType = info.mime
  55. delete info.mime
  56. }
  57. return info
  58. }
  59. function fixFileInfoOnStream(stream) {
  60. const info = fixFileInfo(stream.fileType)
  61. delete stream.fileType
  62. stream.fileInfo = info
  63. return stream
  64. }
  65. /*
  66. * Internal Transform stream for helping write to a temporary location, adding
  67. * MIME type detection, and a commit() function.
  68. */
  69. class StorageWriteStream extends Transform {
  70. constructor(storage, options) {
  71. options = _.clone(options || {})
  72. super(options)
  73. this.storage = storage
  74. // Create temp target.
  75. this.temp = temp.createWriteStream()
  76. this.buf = Buffer.alloc(0)
  77. }
  78. _transform(chunk, encoding, callback) {
  79. // Deal with buffers only
  80. if (typeof chunk === 'string') {
  81. chunk = Buffer.from(chunk)
  82. }
  83. // Logging this all the time is too verbose
  84. // debug('Writing temporary chunk', chunk.length, chunk);
  85. this.temp.write(chunk)
  86. // Try to detect file type during streaming.
  87. if (!this.fileInfo && this.buf < fileType.minimumBytes) {
  88. this.buf = Buffer.concat([this.buf, chunk])
  89. if (this.buf >= fileType.minimumBytes) {
  90. const info = fileType(this.buf)
  91. // No info? We can try again at the end of the stream.
  92. if (info) {
  93. this.fileInfo = fixFileInfo(info)
  94. this.emit('fileInfo', this.fileInfo)
  95. }
  96. }
  97. }
  98. callback(null)
  99. }
  100. _flush(callback) {
  101. debug('Flushing temporary stream:', this.temp.path)
  102. this.temp.end()
  103. // Since we're finished, we can try to detect the file type again.
  104. if (!this.fileInfo) {
  105. const read = fs.createReadStream(this.temp.path)
  106. fileType
  107. .stream(read)
  108. .then((stream) => {
  109. this.fileInfo = fixFileInfoOnStream(stream).fileInfo
  110. this.emit('fileInfo', this.fileInfo)
  111. })
  112. .catch((err) => {
  113. debug('Error trying to detect file type at end-of-stream:', err)
  114. })
  115. }
  116. callback(null)
  117. }
  118. /*
  119. * Commit this stream to the IPFS backend.
  120. */
  121. commit() {
  122. // Create a read stream from the temp file.
  123. if (!this.temp) {
  124. throw new Error('Cannot commit a temporary stream that does not exist. Did you call cleanup()?')
  125. }
  126. debug('Committing temporary stream: ', this.temp.path)
  127. this.storage.ipfs
  128. .addFromFs(this.temp.path)
  129. .then(async (result) => {
  130. const hash = result[0].hash
  131. debug('Stream committed as', hash)
  132. this.emit('committed', hash)
  133. await this.storage.ipfs.pin.add(hash)
  134. })
  135. .catch((err) => {
  136. debug('Error committing stream', err)
  137. this.emit('error', err)
  138. })
  139. }
  140. /*
  141. * Clean up temporary data.
  142. */
  143. cleanup() {
  144. debug('Cleaning up temporary file: ', this.temp.path)
  145. fs.unlink(this.temp.path, () => {
  146. /* Ignore errors.*/
  147. })
  148. delete this.temp
  149. }
  150. }
  151. /*
  152. * Manages the storage backend interaction. This provides a Promise-based API.
  153. *
  154. * Usage:
  155. *
  156. * const store = await Storage.create({ ... });
  157. * store.open(...);
  158. */
  159. class Storage {
  160. /*
  161. * Create a Storage instance. Options include:
  162. *
  163. * - an `ipfs` property, which is itself a hash containing
  164. * - `connect_options` to be passed to the IPFS client library for
  165. * connecting to an IPFS node.
  166. * - a `resolve_content_id` function, which translates Joystream
  167. * content IDs to IPFS content IDs or vice versa. The default is to
  168. * not perform any translation, which is not practical for a production
  169. * system, but serves its function during development and testing. The
  170. * function must be asynchronous.
  171. * - a `timeout` parameter, defaulting to DEFAULT_TIMEOUT. After this time,
  172. * requests to the IPFS backend time out.
  173. *
  174. * Functions in this class accept an optional timeout parameter. If the
  175. * timeout is given, it is used - otherwise, the `option.timeout` value
  176. * above is used.
  177. */
  178. static create(options) {
  179. const storage = new Storage()
  180. storage._init(options)
  181. return storage
  182. }
  183. _init(options) {
  184. this.options = _.clone(options || {})
  185. this.options.ipfs = this.options.ipfs || {}
  186. this._timeout = this.options.timeout || DEFAULT_TIMEOUT
  187. this._resolve_content_id = this.options.resolve_content_id || DEFAULT_RESOLVE_CONTENT_ID
  188. this.ipfs = ipfsClient(this.options.ipfs.connect_options)
  189. this.pins = {}
  190. this.ipfs.id((err, identity) => {
  191. if (err) {
  192. debug(`Warning IPFS daemon not running: ${err.message}`)
  193. } else {
  194. debug(`IPFS node is up with identity: ${identity.id}`)
  195. }
  196. })
  197. }
  198. /*
  199. * Uses bluebird's timeout mechanism to return a Promise that times out after
  200. * the given timeout interval, and tries to execute the given operation within
  201. * that time.
  202. */
  203. async withSpecifiedTimeout(timeout, operation) {
  204. // TODO: rewrite this method to async-await style
  205. // eslint-disable-next-line no-async-promise-executor
  206. return new Promise(async (resolve, reject) => {
  207. try {
  208. resolve(await new Promise(operation))
  209. } catch (err) {
  210. reject(err)
  211. }
  212. }).timeout(timeout || this._timeout)
  213. }
  214. /*
  215. * Resolve content ID with timeout.
  216. */
  217. async resolveContentIdWithTimeout(timeout, contentId) {
  218. return await this.withSpecifiedTimeout(timeout, async (resolve, reject) => {
  219. try {
  220. resolve(await this._resolve_content_id(contentId))
  221. } catch (err) {
  222. reject(err)
  223. }
  224. })
  225. }
  226. /*
  227. * Stat a content ID.
  228. */
  229. async stat(contentId, timeout) {
  230. const resolved = await this.resolveContentIdWithTimeout(timeout, contentId)
  231. return await this.withSpecifiedTimeout(timeout, (resolve, reject) => {
  232. this.ipfs.files.stat(`/ipfs/${resolved}`, { withLocal: true }, (err, res) => {
  233. if (err) {
  234. reject(err)
  235. return
  236. }
  237. resolve(res)
  238. })
  239. })
  240. }
  241. /*
  242. * Return the size of a content ID.
  243. */
  244. async size(contentId, timeout) {
  245. const stat = await this.stat(contentId, timeout)
  246. return stat.size
  247. }
  248. /*
  249. * Opens the specified content in read or write mode, and returns a Promise
  250. * with the stream.
  251. *
  252. * Read streams will contain a fileInfo property, with:
  253. * - a `mimeType` field providing the file's MIME type, or a default.
  254. * - an `ext` property, providing a file extension suggestion, or a default.
  255. *
  256. * Write streams have a slightly different flow, in order to allow for MIME
  257. * type detection and potential filtering. First off, they are written to a
  258. * temporary location, and only committed to the backend once their
  259. * `commit()` function is called.
  260. *
  261. * When the commit has finished, a `committed` event is emitted, which
  262. * contains the IPFS backend's content ID.
  263. *
  264. * Write streams also emit a `fileInfo` event during writing. It is passed
  265. * the `fileInfo` field as described above. Event listeners may now opt to
  266. * abort the write or continue and eventually `commit()` the file. There is
  267. * an explicit `cleanup()` function that removes temporary files as well,
  268. * in case comitting is not desired.
  269. */
  270. async open(contentId, mode, timeout) {
  271. if (mode !== 'r' && mode !== 'w') {
  272. throw Error('The only supported modes are "r", "w" and "a".')
  273. }
  274. // Write stream
  275. if (mode === 'w') {
  276. return await this.createWriteStream(contentId, timeout)
  277. }
  278. // Read stream - with file type detection
  279. return await this.createReadStream(contentId, timeout)
  280. }
  281. async createWriteStream() {
  282. // IPFS wants us to just dump a stream into its storage, then returns a
  283. // content ID (of its own).
  284. // We need to instead return a stream immediately, that we eventually
  285. // decorate with the content ID when that's available.
  286. return new Promise((resolve) => {
  287. const stream = new StorageWriteStream(this)
  288. resolve(stream)
  289. })
  290. }
  291. async createReadStream(contentId, timeout) {
  292. const resolved = await this.resolveContentIdWithTimeout(timeout, contentId)
  293. let found = false
  294. return await this.withSpecifiedTimeout(timeout, (resolve, reject) => {
  295. const ls = this.ipfs.getReadableStream(resolved)
  296. ls.on('data', async (result) => {
  297. if (result.path === resolved) {
  298. found = true
  299. const ftStream = await fileType.stream(result.content)
  300. resolve(fixFileInfoOnStream(ftStream))
  301. }
  302. })
  303. ls.on('error', (err) => {
  304. ls.end()
  305. debug(err)
  306. reject(err)
  307. })
  308. ls.on('end', () => {
  309. if (!found) {
  310. const err = new Error('No matching content found for', contentId)
  311. debug(err)
  312. reject(err)
  313. }
  314. })
  315. ls.resume()
  316. })
  317. }
  318. /*
  319. * Synchronize the given content ID
  320. */
  321. async synchronize(contentId) {
  322. const resolved = await this.resolveContentIdWithTimeout(this._timeout, contentId)
  323. // validate resolved id is proper ipfs_cid, not null or empty string
  324. if (this.pins[resolved]) {
  325. return
  326. }
  327. debug(`Pinning ${resolved}`)
  328. // This call blocks until file is retrieved..
  329. this.ipfs.pin.add(resolved, { quiet: true, pin: true }, (err) => {
  330. if (err) {
  331. debug(`Error Pinning: ${resolved}`)
  332. delete this.pins[resolved]
  333. } else {
  334. debug(`Pinned ${resolved}`)
  335. // why aren't we doing this.pins[resolved] = true
  336. }
  337. })
  338. }
  339. }
  340. module.exports = {
  341. Storage,
  342. }