UploadCommandBase.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. import ContentDirectoryCommandBase from './ContentDirectoryCommandBase'
  2. import {
  3. AssetToUpload,
  4. ResolvedAsset,
  5. StorageNodeInfo,
  6. TokenRequest,
  7. TokenRequestData,
  8. VideoFFProbeMetadata,
  9. VideoFileMetadata,
  10. } from '../Types'
  11. import { MultiBar, Options, SingleBar } from 'cli-progress'
  12. import ExitCodes from '../ExitCodes'
  13. import fs from 'fs'
  14. import _ from 'lodash'
  15. import axios from 'axios'
  16. import ffprobeInstaller from '@ffprobe-installer/ffprobe'
  17. import ffmpeg from 'fluent-ffmpeg'
  18. import path from 'path'
  19. import mimeTypes from 'mime-types'
  20. import { Assets } from '../schemas/typings/Assets.schema'
  21. import chalk from 'chalk'
  22. import { DataObjectCreationParameters } from '@joystream/types/storage'
  23. import { createHash } from 'blake3'
  24. import * as multihash from 'multihashes'
  25. import { u8aToHex, formatBalance } from '@polkadot/util'
  26. import { KeyringPair } from '@polkadot/keyring/types'
  27. import FormData from 'form-data'
  28. import BN from 'bn.js'
  29. import { createTypeFromConstructor } from '@joystream/types'
  30. import { StorageAssets } from '@joystream/types/content'
  31. ffmpeg.setFfprobePath(ffprobeInstaller.path)
  32. /**
  33. * Abstract base class for commands that require uploading functionality
  34. */
  35. export default abstract class UploadCommandBase extends ContentDirectoryCommandBase {
  36. private fileSizeCache: Map<string, number> = new Map<string, number>()
  37. private maxFileSize: undefined | BN = undefined
  38. private progressBarOptions: Options = {
  39. noTTYOutput: true,
  40. format: `{barTitle} | {bar} | {value}/{total} KB processed`,
  41. }
  42. protected requiresQueryNode = true
  43. getFileSize(path: string): number {
  44. const cachedSize = this.fileSizeCache.get(path)
  45. return cachedSize !== undefined ? cachedSize : fs.statSync(path).size
  46. }
  47. createReadStreamWithProgressBar(
  48. filePath: string,
  49. barTitle: string,
  50. multiBar?: MultiBar
  51. ): {
  52. fileStream: fs.ReadStream
  53. progressBar: SingleBar
  54. } {
  55. // Progress CLI UX:
  56. // https://github.com/oclif/cli-ux#cliprogress
  57. // https://www.npmjs.com/package/cli-progress
  58. const fileSize = this.getFileSize(filePath)
  59. let processedKB = 0
  60. const fileSizeKB = Math.ceil(fileSize / 1024)
  61. const progress = multiBar
  62. ? (multiBar.create(fileSizeKB, processedKB, { barTitle }) as SingleBar | undefined)
  63. : new SingleBar(this.progressBarOptions)
  64. if (!progress) {
  65. throw new Error('Provided multibar does not support noTTY mode!')
  66. }
  67. progress.start(fileSizeKB, processedKB, { barTitle })
  68. return {
  69. fileStream: fs
  70. .createReadStream(filePath)
  71. .pause() // Explicitly pause to prevent switching to flowing mode (https://nodejs.org/api/stream.html#stream_event_data)
  72. .on('error', () => {
  73. progress.stop()
  74. this.error(`Error while trying to read data from: ${filePath}!`, {
  75. exit: ExitCodes.FsOperationFailed,
  76. })
  77. })
  78. .on('data', (data) => {
  79. processedKB += data.length / 1024
  80. progress.update(processedKB)
  81. })
  82. .on('end', () => {
  83. progress.update(fileSizeKB)
  84. progress.stop()
  85. }),
  86. progressBar: progress,
  87. }
  88. }
  89. async getVideoFFProbeMetadata(filePath: string): Promise<VideoFFProbeMetadata> {
  90. return new Promise<VideoFFProbeMetadata>((resolve, reject) => {
  91. ffmpeg.ffprobe(filePath, (err, data) => {
  92. if (err) {
  93. reject(err)
  94. return
  95. }
  96. const videoStream = data.streams.find((s) => s.codec_type === 'video')
  97. if (videoStream) {
  98. resolve({
  99. width: videoStream.width,
  100. height: videoStream.height,
  101. codecName: videoStream.codec_name,
  102. codecFullName: videoStream.codec_long_name,
  103. duration: videoStream.duration !== undefined ? Math.ceil(Number(videoStream.duration)) || 0 : undefined,
  104. })
  105. } else {
  106. reject(new Error('No video stream found in file'))
  107. }
  108. })
  109. })
  110. }
  111. async getVideoFileMetadata(filePath: string): Promise<VideoFileMetadata> {
  112. let ffProbeMetadata: VideoFFProbeMetadata = {}
  113. try {
  114. ffProbeMetadata = await this.getVideoFFProbeMetadata(filePath)
  115. } catch (e) {
  116. const message = e instanceof Error ? e.message : e
  117. this.warn(`Failed to get video metadata via ffprobe (${message})`)
  118. }
  119. const size = this.getFileSize(filePath)
  120. const container = path.extname(filePath).slice(1)
  121. const mimeType = mimeTypes.lookup(container) || `unknown`
  122. return {
  123. size,
  124. container,
  125. mimeType,
  126. ...ffProbeMetadata,
  127. }
  128. }
  129. async calculateFileHash(filePath: string): Promise<string> {
  130. const { fileStream } = this.createReadStreamWithProgressBar(filePath, 'Calculating file hash')
  131. let blake3Hash: Uint8Array
  132. return new Promise<string>((resolve, reject) => {
  133. fileStream
  134. .pipe(createHash())
  135. .on('data', (data) => (blake3Hash = data))
  136. .on('end', () => resolve(multihash.toB58String(multihash.encode(blake3Hash, 'blake3'))))
  137. .on('error', (err) => reject(err))
  138. })
  139. }
  140. async validateFile(filePath: string): Promise<void> {
  141. // Basic file validation
  142. if (!fs.existsSync(filePath)) {
  143. this.error(`${filePath} - file does not exist under provided path!`, { exit: ExitCodes.FileNotFound })
  144. }
  145. if (!this.maxFileSize) {
  146. this.maxFileSize = await this.getOriginalApi().consts.storage.maxDataObjectSize
  147. }
  148. if (this.maxFileSize.ltn(this.getFileSize(filePath))) {
  149. this.error(`${filePath} - file is too big. Max file size is ${this.maxFileSize.toString()} bytes`)
  150. }
  151. }
  152. async getRandomActiveStorageNodeInfo(bagId: string, retryTime = 6, retryCount = 5): Promise<StorageNodeInfo | null> {
  153. for (let i = 0; i <= retryCount; ++i) {
  154. const nodesInfo = _.shuffle(await this.getQNApi().storageNodesInfoByBagId(bagId))
  155. for (const info of nodesInfo) {
  156. try {
  157. await axios.get(info.apiEndpoint + '/version', {
  158. headers: {
  159. connection: 'close',
  160. },
  161. })
  162. return info
  163. } catch (err) {
  164. continue
  165. }
  166. }
  167. if (i !== retryCount) {
  168. this.log(`No storage provider can serve the request yet, retrying in ${retryTime}s (${i + 1}/${retryCount})...`)
  169. await new Promise((resolve) => setTimeout(resolve, retryTime * 1000))
  170. }
  171. }
  172. return null
  173. }
  174. async generateDataObjectParameters(filePath: string): Promise<DataObjectCreationParameters> {
  175. return createTypeFromConstructor(DataObjectCreationParameters, {
  176. size: this.getFileSize(filePath),
  177. ipfsContentId: await this.calculateFileHash(filePath),
  178. })
  179. }
  180. async resolveAndValidateAssets(paths: string[], basePath: string): Promise<ResolvedAsset[]> {
  181. // Resolve assets
  182. if (basePath) {
  183. paths = paths.map((p) => basePath && path.resolve(path.dirname(basePath), p))
  184. }
  185. // Validate assets
  186. await Promise.all(paths.map((p) => this.validateFile(p)))
  187. // Return data
  188. return await Promise.all(
  189. paths.map(async (path) => {
  190. const parameters = await this.generateDataObjectParameters(path)
  191. return {
  192. path,
  193. parameters,
  194. }
  195. })
  196. )
  197. }
  198. async getStorageNodeUploadToken(
  199. storageNodeInfo: StorageNodeInfo,
  200. account: KeyringPair,
  201. memberId: number,
  202. objectId: BN,
  203. bagId: string
  204. ): Promise<string> {
  205. const data: TokenRequestData = {
  206. storageBucketId: storageNodeInfo.bucketId,
  207. accountId: account.address,
  208. bagId,
  209. memberId,
  210. dataObjectId: objectId.toNumber(),
  211. }
  212. const message = JSON.stringify(data)
  213. const signature = u8aToHex(account.sign(message))
  214. const postData: TokenRequest = { data, signature }
  215. const {
  216. data: { token },
  217. } = await axios.post(`${storageNodeInfo.apiEndpoint}/authToken`, postData)
  218. if (!token) {
  219. this.error('Recieved empty token from the storage node!', { exit: ExitCodes.StorageNodeError })
  220. }
  221. return token
  222. }
  223. async uploadAsset(
  224. account: KeyringPair,
  225. memberId: number,
  226. objectId: BN,
  227. bagId: string,
  228. filePath: string,
  229. storageNode?: StorageNodeInfo,
  230. multiBar?: MultiBar
  231. ): Promise<void> {
  232. const storageNodeInfo = storageNode || (await this.getRandomActiveStorageNodeInfo(bagId))
  233. if (!storageNodeInfo) {
  234. this.error('No active storage node found!', { exit: ExitCodes.ActionCurrentlyUnavailable })
  235. }
  236. this.log(`Chosen storage node endpoint: ${storageNodeInfo.apiEndpoint}`)
  237. const token = await this.getStorageNodeUploadToken(storageNodeInfo, account, memberId, objectId, bagId)
  238. const { fileStream, progressBar } = this.createReadStreamWithProgressBar(
  239. filePath,
  240. `Uploading ${filePath}`,
  241. multiBar
  242. )
  243. fileStream.on('end', () => {
  244. // Temporarly disable because with Promise.all it breaks the UI
  245. // cli.action.start('Waiting for the file to be processed...')
  246. })
  247. const formData = new FormData()
  248. formData.append('dataObjectId', objectId.toString())
  249. formData.append('storageBucketId', storageNodeInfo.bucketId)
  250. formData.append('bagId', bagId)
  251. formData.append('file', fileStream, {
  252. filename: path.basename(filePath),
  253. filepath: filePath,
  254. knownLength: this.getFileSize(filePath),
  255. })
  256. this.log(`Uploading object ${objectId.toString()} (${filePath})`)
  257. try {
  258. await axios.post(`${storageNodeInfo.apiEndpoint}/files`, formData, {
  259. maxBodyLength: Infinity,
  260. maxContentLength: Infinity,
  261. headers: {
  262. 'x-api-key': token,
  263. 'content-type': 'multipart/form-data',
  264. ...formData.getHeaders(),
  265. },
  266. })
  267. } catch (e) {
  268. progressBar.stop()
  269. if (axios.isAxiosError(e)) {
  270. const msg = e.response && e.response.data ? JSON.stringify(e.response.data) : e.message
  271. this.error(`Unexpected error when trying to upload a file: ${msg}`, {
  272. exit: ExitCodes.StorageNodeError,
  273. })
  274. } else {
  275. throw e
  276. }
  277. }
  278. }
  279. async uploadAssets(
  280. account: KeyringPair,
  281. memberId: number,
  282. bagId: string,
  283. assets: AssetToUpload[],
  284. inputFilePath: string,
  285. outputFilePostfix = '__rejectedContent'
  286. ): Promise<void> {
  287. const storageNodeInfo = await this.getRandomActiveStorageNodeInfo(bagId)
  288. if (!storageNodeInfo) {
  289. this.warn('No storage provider is currently available!')
  290. this.handleRejectedUploads(
  291. bagId,
  292. assets,
  293. assets.map(() => false),
  294. inputFilePath,
  295. outputFilePostfix
  296. )
  297. this.exit(ExitCodes.ActionCurrentlyUnavailable)
  298. }
  299. const multiBar = new MultiBar(this.progressBarOptions)
  300. const errors: [string, string][] = []
  301. // Workaround replacement for Promise.allSettled (which is only available in ES2020)
  302. const results = await Promise.all(
  303. assets.map(async (a) => {
  304. try {
  305. await this.uploadAsset(account, memberId, a.dataObjectId, bagId, a.path, storageNodeInfo, multiBar)
  306. return true
  307. } catch (e) {
  308. errors.push([a.dataObjectId.toString(), e instanceof Error ? e.message : 'Unknown error'])
  309. return false
  310. }
  311. })
  312. )
  313. errors.forEach(([objectId, message]) => this.warn(`Upload of object ${objectId} failed: ${message}`))
  314. this.handleRejectedUploads(bagId, assets, results, inputFilePath, outputFilePostfix)
  315. multiBar.stop()
  316. }
  317. public assetsIndexes(originalPaths: (string | undefined)[], filteredPaths: string[]): (number | undefined)[] {
  318. let lastIndex = -1
  319. return originalPaths.map((path) => (filteredPaths.includes(path as string) ? ++lastIndex : undefined))
  320. }
  321. async prepareAssetsForExtrinsic(resolvedAssets: ResolvedAsset[]): Promise<StorageAssets | undefined> {
  322. const feePerMB = await this.getOriginalApi().query.storage.dataObjectPerMegabyteFee()
  323. if (resolvedAssets.length) {
  324. const totalBytes = resolvedAssets
  325. .reduce((a, b) => {
  326. return a.add(b.parameters.getField('size'))
  327. }, new BN(0))
  328. .toNumber()
  329. const totalFee = feePerMB.muln(Math.ceil(totalBytes / 1024 / 1024))
  330. await this.requireConfirmation(
  331. `Total fee of ${chalk.cyan(formatBalance(totalFee))} ` +
  332. `will have to be paid in order to store the provided assets. Are you sure you want to continue?`
  333. )
  334. return createTypeFromConstructor(StorageAssets, {
  335. expected_data_size_fee: feePerMB,
  336. object_creation_list: resolvedAssets.map((a) => a.parameters),
  337. })
  338. }
  339. return undefined
  340. }
  341. private handleRejectedUploads(
  342. bagId: string,
  343. assets: AssetToUpload[],
  344. results: boolean[],
  345. inputFilePath: string,
  346. outputFilePostfix: string
  347. ): void {
  348. // Try to save rejected contentIds and paths for reupload purposes
  349. const rejectedAssetsOutput: Assets = { bagId, assets: [] }
  350. results.forEach(
  351. (r, i) =>
  352. r === false &&
  353. rejectedAssetsOutput.assets.push({ objectId: assets[i].dataObjectId.toString(), path: assets[i].path })
  354. )
  355. if (rejectedAssetsOutput.assets.length) {
  356. this.warn(
  357. `Some assets were not uploaded successfully. Try reuploading them with ${chalk.magentaBright(
  358. 'content:reuploadAssets'
  359. )}!`
  360. )
  361. console.log(rejectedAssetsOutput)
  362. const outputPath = inputFilePath.replace('.json', `${outputFilePostfix}.json`)
  363. try {
  364. fs.writeFileSync(outputPath, JSON.stringify(rejectedAssetsOutput, null, 4))
  365. this.log(`Rejected content ids successfully saved to: ${chalk.magentaBright(outputPath)}!`)
  366. } catch (e) {
  367. console.error(e)
  368. this.warn(
  369. `Could not write rejected content output to ${outputPath}. Try copying the output above and creating the file manually!`
  370. )
  371. }
  372. }
  373. }
  374. }