storage.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. /*
  2. * This file is part of the storage node for the Joystream project.
  3. * Copyright (C) 2019 Joystream Contributors
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. */
  18. 'use strict';
  19. const { Transform } = require('stream');
  20. const fs = require('fs');
  21. const debug = require('debug')('joystream:storage:storage');
  22. const Promise = require('bluebird');
  23. Promise.config({
  24. cancellation: true,
  25. });
  26. const file_type = require('file-type');
  27. const ipfs_client = require('ipfs-http-client');
  28. const temp = require('temp').track();
  29. const _ = require('lodash');
  30. // Default request timeout; imposed on top of the IPFS client, because the
  31. // client doesn't seem to care.
  32. const DEFAULT_TIMEOUT = 30 * 1000;
  33. // Default/dummy resolution implementation.
  34. const DEFAULT_RESOLVE_CONTENT_ID = async (original) => {
  35. debug('Warning: Default resolution returns original CID', original);
  36. return original;
  37. }
  38. // Default file info if nothing could be detected.
  39. const DEFAULT_FILE_INFO = {
  40. mime_type: 'application/octet-stream',
  41. ext: 'bin',
  42. };
  43. /*
  44. * fileType is a weird name, because we're really looking at MIME types.
  45. * Also, the type field includes extension info, so we're going to call
  46. * it file_info { mime_type, ext } instead.
  47. * Nitpicking, but it also means we can add our default type if things
  48. * go wrong.
  49. */
  50. function fix_file_info(info)
  51. {
  52. if (!info) {
  53. info = DEFAULT_FILE_INFO;
  54. }
  55. else {
  56. info.mime_type = info.mime;
  57. delete(info.mime);
  58. }
  59. return info;
  60. }
  61. function fix_file_info_on_stream(stream)
  62. {
  63. var info = fix_file_info(stream.fileType);
  64. delete(stream.fileType);
  65. stream.file_info = info;
  66. return stream;
  67. }
  68. /*
  69. * Internal Transform stream for helping write to a temporary location, adding
  70. * MIME type detection, and a commit() function.
  71. */
  72. class StorageWriteStream extends Transform
  73. {
  74. constructor(storage, options)
  75. {
  76. options = _.clone(options || {});
  77. super(options);
  78. this.storage = storage;
  79. // Create temp target.
  80. this.temp = temp.createWriteStream();
  81. this.buf = Buffer.alloc(0);
  82. }
  83. _transform(chunk, encoding, callback)
  84. {
  85. // Deal with buffers only
  86. if (typeof chunk === 'string') {
  87. chunk = Buffer.from(chunk);
  88. }
  89. // Logging this all the time is too verbose
  90. // debug('Writing temporary chunk', chunk.length, chunk);
  91. this.temp.write(chunk);
  92. // Try to detect file type during streaming.
  93. if (!this.file_info && this.buf < file_type.minimumBytes) {
  94. this.buf = Buffer.concat([this.buf, chunk]);
  95. if (this.buf >= file_type.minimumBytes) {
  96. const info = file_type(this.buf);
  97. // No info? We can try again at the end of the stream.
  98. if (info) {
  99. this.file_info = fix_file_info(info);
  100. this.emit('file_info', this.file_info);
  101. }
  102. }
  103. }
  104. callback(null);
  105. }
  106. _flush(callback)
  107. {
  108. debug('Flushing temporary stream:', this.temp.path);
  109. this.temp.end();
  110. // Since we're finished, we can try to detect the file type again.
  111. if (!this.file_info) {
  112. const read = fs.createReadStream(this.temp.path);
  113. file_type.stream(read)
  114. .then((stream) => {
  115. this.file_info = fix_file_info_on_stream(stream).file_info;
  116. this.emit('file_info', this.file_info);
  117. })
  118. .catch((err) => {
  119. debug('Error trying to detect file type at end-of-stream:', err);
  120. });
  121. }
  122. callback(null);
  123. }
  124. /*
  125. * Commit this stream to the IPFS backend.
  126. */
  127. commit()
  128. {
  129. // Create a read stream from the temp file.
  130. if (!this.temp) {
  131. throw new Error('Cannot commit a temporary stream that does not exist. Did you call cleanup()?');
  132. }
  133. debug('Committing temporary stream: ', this.temp.path);
  134. this.storage.ipfs.addFromFs(this.temp.path)
  135. .then(async (result) => {
  136. const hash = result[0].hash;
  137. debug('Stream committed as', hash);
  138. this.emit('committed', hash);
  139. await this.storage.ipfs.pin.add(hash);
  140. })
  141. .catch((err) => {
  142. debug('Error committing stream', err);
  143. this.emit('error', err);
  144. })
  145. }
  146. /*
  147. * Clean up temporary data.
  148. */
  149. cleanup()
  150. {
  151. debug('Cleaning up temporary file: ', this.temp.path);
  152. fs.unlink(this.temp.path, () => {}); // Ignore errors
  153. delete(this.temp);
  154. }
  155. }
  156. /*
  157. * Manages the storage backend interaction. This provides a Promise-based API.
  158. *
  159. * Usage:
  160. *
  161. * const store = await Storage.create({ ... });
  162. * store.open(...);
  163. */
  164. class Storage
  165. {
  166. /*
  167. * Create a Storage instance. Options include:
  168. *
  169. * - an `ipfs` property, which is itself a hash containing
  170. * - `connect_options` to be passed to the IPFS client library for
  171. * connecting to an IPFS node.
  172. * - a `resolve_content_id` function, which translates Joystream
  173. * content IDs to IPFS content IDs or vice versa. The default is to
  174. * not perform any translation, which is not practical for a production
  175. * system, but serves its function during development and testing. The
  176. * function must be asynchronous.
  177. * - a `timeout` parameter, defaulting to DEFAULT_TIMEOUT. After this time,
  178. * requests to the IPFS backend time out.
  179. *
  180. * Functions in this class accept an optional timeout parameter. If the
  181. * timeout is given, it is used - otherwise, the `option.timeout` value
  182. * above is used.
  183. */
  184. static create(options)
  185. {
  186. const storage = new Storage();
  187. storage._init(options);
  188. return storage;
  189. }
  190. _init(options)
  191. {
  192. this.options = _.clone(options || {});
  193. this.options.ipfs = this.options.ipfs || {};
  194. this._timeout = this.options.timeout || DEFAULT_TIMEOUT;
  195. this._resolve_content_id = this.options.resolve_content_id || DEFAULT_RESOLVE_CONTENT_ID;
  196. this.ipfs = ipfs_client(this.options.ipfs.connect_options);
  197. this.pins = {};
  198. this.ipfs.id((err, identity) => {
  199. if (err) {
  200. debug(`Warning IPFS daemon not running: ${err.message}`);
  201. } else {
  202. debug(`IPFS node is up with identity: ${identity.id}`);
  203. }
  204. });
  205. }
  206. /*
  207. * Uses bluebird's timeout mechanism to return a Promise that times out after
  208. * the given timeout interval, and tries to execute the given operation within
  209. * that time.
  210. */
  211. async _with_specified_timeout(timeout, operation)
  212. {
  213. return new Promise(async (resolve, reject) => {
  214. try {
  215. resolve(await new Promise(operation));
  216. } catch (err) {
  217. reject(err);
  218. }
  219. }).timeout(timeout || this._timeout);
  220. }
  221. /*
  222. * Resolve content ID with timeout.
  223. */
  224. async _resolve_content_id_with_timeout(timeout, content_id)
  225. {
  226. return await this._with_specified_timeout(timeout, async (resolve, reject) => {
  227. try {
  228. resolve(await this._resolve_content_id(content_id));
  229. } catch (err) {
  230. reject(err);
  231. }
  232. });
  233. }
  234. /*
  235. * Stat a content ID.
  236. */
  237. async stat(content_id, timeout)
  238. {
  239. const resolved = await this._resolve_content_id_with_timeout(timeout, content_id);
  240. return await this._with_specified_timeout(timeout, (resolve, reject) => {
  241. this.ipfs.files.stat(`/ipfs/${resolved}`, { withLocal: true }, (err, res) => {
  242. if (err) {
  243. reject(err);
  244. return;
  245. }
  246. resolve(res);
  247. });
  248. });
  249. }
  250. /*
  251. * Return the size of a content ID.
  252. */
  253. async size(content_id, timeout)
  254. {
  255. const stat = await this.stat(content_id, timeout);
  256. return stat.size;
  257. }
  258. /*
  259. * Opens the specified content in read or write mode, and returns a Promise
  260. * with the stream.
  261. *
  262. * Read streams will contain a file_info property, with:
  263. * - a `mime_type` field providing the file's MIME type, or a default.
  264. * - an `ext` property, providing a file extension suggestion, or a default.
  265. *
  266. * Write streams have a slightly different flow, in order to allow for MIME
  267. * type detection and potential filtering. First off, they are written to a
  268. * temporary location, and only committed to the backend once their
  269. * `commit()` function is called.
  270. *
  271. * When the commit has finished, a `committed` event is emitted, which
  272. * contains the IPFS backend's content ID.
  273. *
  274. * Write streams also emit a `file_info` event during writing. It is passed
  275. * the `file_info` field as described above. Event listeners may now opt to
  276. * abort the write or continue and eventually `commit()` the file. There is
  277. * an explicit `cleanup()` function that removes temporary files as well,
  278. * in case comitting is not desired.
  279. */
  280. async open(content_id, mode, timeout)
  281. {
  282. if (mode != 'r' && mode != 'w') {
  283. throw Error('The only supported modes are "r", "w" and "a".');
  284. }
  285. // Write stream
  286. if (mode === 'w') {
  287. return await this._create_write_stream(content_id, timeout);
  288. }
  289. // Read stream - with file type detection
  290. return await this._create_read_stream(content_id, timeout);
  291. }
  292. async _create_write_stream(content_id)
  293. {
  294. // IPFS wants us to just dump a stream into its storage, then returns a
  295. // content ID (of its own).
  296. // We need to instead return a stream immediately, that we eventually
  297. // decorate with the content ID when that's available.
  298. return new Promise((resolve, reject) => {
  299. const stream = new StorageWriteStream(this);
  300. resolve(stream);
  301. });
  302. }
  303. async _create_read_stream(content_id, timeout)
  304. {
  305. const resolved = await this._resolve_content_id_with_timeout(timeout, content_id);
  306. var found = false;
  307. return await this._with_specified_timeout(timeout, (resolve, reject) => {
  308. const ls = this.ipfs.getReadableStream(resolved);
  309. ls.on('data', async (result) => {
  310. if (result.path === resolved) {
  311. found = true;
  312. const ft_stream = await file_type.stream(result.content);
  313. resolve(fix_file_info_on_stream(ft_stream));
  314. }
  315. });
  316. ls.on('error', (err) => {
  317. ls.end();
  318. debug(err);
  319. reject(err);
  320. });
  321. ls.on('end', () => {
  322. if (!found) {
  323. const err = new Error('No matching content found for', content_id);
  324. debug(err);
  325. reject(err);
  326. }
  327. });
  328. ls.resume();
  329. });
  330. }
  331. /*
  332. * Synchronize the given content ID
  333. */
  334. async synchronize(content_id)
  335. {
  336. const resolved = await this._resolve_content_id_with_timeout(this._timeout, content_id);
  337. if (this.pins[resolved]) {
  338. return;
  339. }
  340. debug(`Pinning ${resolved}`);
  341. // This call blocks until file is retreived..
  342. this.ipfs.pin.add(resolved, {quiet: true, pin: true}, (err, res) => {
  343. if (err) {
  344. debug(`Error Pinning: ${resolved}`)
  345. delete this.pins[resolved];
  346. } else {
  347. debug(`Pinned ${resolved}`);
  348. }
  349. });
  350. }
  351. }
  352. module.exports = {
  353. Storage: Storage,
  354. };