Bladeren bron

Merge branch 'giza' into test-merge-giza

Mokhtar Naamani 3 jaren geleden
bovenliggende
commit
88421ef77d

+ 1 - 0
utils/migration-scripts/.prettierignore

@@ -1,2 +1,3 @@
+lib
 results
 src/sumer-giza/sumer-query-node/generated

+ 3 - 1
utils/migration-scripts/package.json

@@ -31,7 +31,9 @@
     "@types/sharp": "^0.29.2",
     "form-data": "^4.0.0",
     "node-cleanup": "^2.1.2",
-    "@types/node-cleanup": "^2.1.2"
+    "@types/node-cleanup": "^2.1.2",
+    "winston": "^3.3.3",
+    "fast-safe-stringify": "^2.1.1"
   },
   "devDependencies": {
     "@graphql-codegen/cli": "^1.21.4",

+ 7 - 3
utils/migration-scripts/src/RuntimeApi.ts

@@ -61,7 +61,7 @@ export class RuntimeApi extends ApiPromise {
     return (events.sort((a, b) => new BN(a.index).cmp(new BN(b.index))) as unknown) as EventType<S, M>[]
   }
 
-  private formatDispatchError(err: DispatchError): string {
+  public formatDispatchError(err: DispatchError): string {
     try {
       const { name, docs } = this.registry.findMetaError(err.asModule)
       return `${name} (${docs.join(', ')})`
@@ -81,7 +81,11 @@ export class RuntimeApi extends ApiPromise {
     return entries.sort((a, b) => a[0].toNumber() - b[0].toNumber())
   }
 
-  sendExtrinsic(keyPair: KeyringPair, tx: SubmittableExtrinsic<'promise'>): Promise<SubmittableResult> {
+  sendExtrinsic(
+    keyPair: KeyringPair,
+    tx: SubmittableExtrinsic<'promise'>,
+    waitUntilFinalized = true
+  ): Promise<SubmittableResult> {
     let txName = `${tx.method.section}.${tx.method.method}`
     if (txName === 'sudo.sudo') {
       const innerCall = tx.args[0] as Call
@@ -95,7 +99,7 @@ export class RuntimeApi extends ApiPromise {
           return
         }
 
-        if (result.status.isInBlock) {
+        if ((!waitUntilFinalized && result.status.isInBlock) || result.status.isFinalized) {
           unsubscribe()
           result.events
             .filter(({ event }) => event.section === 'system')

+ 4 - 1
utils/migration-scripts/src/commands/sumer-giza/retryFailedUploads.ts

@@ -38,7 +38,10 @@ export class RetryFailedUploadsCommand extends Command {
       await api.isReadyOrError
       const assetsManager = await AssetsManager.create({
         api,
-        config: opts,
+        config: {
+          ...opts,
+          migrationStatePath: path.dirname(opts.failedUploadsPath),
+        },
       })
       assetsManager.loadQueue(opts.failedUploadsPath)
       await assetsManager.processQueuedUploads()

+ 27 - 0
utils/migration-scripts/src/logging.ts

@@ -0,0 +1,27 @@
+import winston, { Logger } from 'winston'
+import stringify from 'fast-safe-stringify'
+
+const colors = {
+  error: 'red',
+  warn: 'yellow',
+  info: 'green',
+}
+winston.addColors(colors)
+
+export function createLogger(label: string): Logger {
+  return winston.createLogger({
+    level: 'debug',
+    transports: [new winston.transports.Console()],
+    defaultMeta: { label },
+    format: winston.format.combine(
+      winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss:ms' }),
+      winston.format.metadata({ fillExcept: ['label', 'level', 'timestamp', 'message'] }),
+      winston.format.colorize(),
+      winston.format.printf(
+        (info) =>
+          `${info.timestamp} ${info.label} [${info.level}]: ${info.message}` +
+          (Object.keys(info.metadata).length ? `\n${stringify(info.metadata, undefined, 4)}` : '')
+      )
+    ),
+  })
+}

+ 35 - 23
utils/migration-scripts/src/sumer-giza/AssetsManager.ts

@@ -16,13 +16,16 @@ import { ContentHash } from './ContentHash'
 import { promisify } from 'util'
 import { createType } from '@joystream/types'
 import { Readable, pipeline } from 'stream'
+import { Logger } from 'winston'
 import _ from 'lodash'
+import { createLogger } from '../logging'
 
 export type AssetsManagerConfig = {
   preferredDownloadSpEndpoints?: string[]
   uploadSpBucketId: number
   uploadSpEndpoint: string
   dataDir: string
+  migrationStatePath: string
 }
 
 export type AssetsManagerParams = {
@@ -56,6 +59,8 @@ export class AssetsManager {
   private resizer: ImageResizer
   private queuedUploads: Set<string>
   private isQueueProcessing = false
+  private queueFilePath: string
+  private logger: Logger
 
   public get queueSize(): number {
     return this.queuedUploads.size
@@ -79,8 +84,11 @@ export class AssetsManager {
     this.config = config
     this.resizer = new ImageResizer()
     this.queuedUploads = new Set()
+    this.logger = createLogger('Assets Manager')
     fs.mkdirSync(this.tmpAssetPath(''), { recursive: true })
     fs.mkdirSync(this.assetPath(''), { recursive: true })
+    this.queueFilePath = path.join(this.config.migrationStatePath, `unprocessedUploads_${Date.now()}.json`)
+    this.logger.info(`Failed/pending uploads will be saved to ${this.queueFilePath}`)
   }
 
   private static async getSumerStorageProviderEndpoints(queryNodeApi: QueryNodeApi): Promise<string[]> {
@@ -116,26 +124,20 @@ export class AssetsManager {
     targetSize?: [number, number]
   ): Promise<DataObjectCreationParameters | undefined> {
     if (data.liaisonJudgement !== 'ACCEPTED') {
-      console.error(
+      this.logger.warn(
         `Data object ${data.joystreamContentId} has invalid liason judgement: ${data.liaisonJudgement}. Skipping...`
       )
       return
     }
     let objectSize = new BN(data.size).toNumber()
-    let path: string
-    try {
-      path = await this.fetchAssetWithRetry(data.joystreamContentId, objectSize)
-    } catch (e) {
-      console.error(`Data object ${data.joystreamContentId} was not fetched: ${(e as Error).message}`)
-      return
-    }
+    const path = await this.fetchAssetWithRetry(data.joystreamContentId, objectSize)
     if (targetSize) {
       try {
         await this.resizer.resize(path, targetSize)
         // Re-estabilish object size
         objectSize = fs.statSync(path).size
       } catch (e) {
-        console.error(
+        this.logger.error(
           `Could not resize image ${path} to target size ${targetSize[0]}/${targetSize[1]}: ${(e as Error).message}`
         )
       }
@@ -256,17 +258,11 @@ export class AssetsManager {
     } catch (e) {
       uploadSuccesful = false
       const msg = this.reqErrorMessage(e)
-      console.error(`Upload of object ${dataObjectId} to ${uploadSpEndpoint} failed: ${msg}`)
+      this.logger.error(`Upload of object ${dataObjectId} to ${uploadSpEndpoint} failed: ${msg}`)
     }
 
     if (uploadSuccesful) {
-      // Remove asset from queuedUploads and temporary storage
-      this.queuedUploads.delete(`${bagId}|${dataObjectId}`)
-      try {
-        fs.rmSync(dataPath)
-      } catch (e) {
-        console.error(`Could not remove file "${dataPath}" after succesful upload...`)
-      }
+      this.finalizeUpload(bagId, dataObjectId, dataPath)
     }
   }
 
@@ -275,7 +271,7 @@ export class AssetsManager {
       throw new Error('Uploads queue is already beeing processed!')
     }
     this.isQueueProcessing = true
-    console.log(`Uploading ${this.queueSize} data objects...`)
+    this.logger.info(`Uploading ${this.queueSize} data objects...`)
     await Promise.all(
       Array.from(this.queuedUploads).map((queuedUpload) => {
         const [bagId, objectId] = queuedUpload.split('|')
@@ -290,19 +286,35 @@ export class AssetsManager {
     this.queuedUploads = new Set(queue)
   }
 
-  public saveQueue(queueFilePath: string): void {
-    fs.writeFileSync(queueFilePath, JSON.stringify(Array.from(this.queuedUploads)))
+  public saveQueue(): void {
+    fs.writeFileSync(this.queueFilePath, JSON.stringify(Array.from(this.queuedUploads)))
+    this.logger.debug(`${this.queueFilePath} updated`, { queueSize: this.queuedUploads.size })
   }
 
   private queueUpload(bagId: BagId, objectId: DataObjectId): void {
     const bagIdStr = `dynamic:channel:${bagId.asType('Dynamic').asType('Channel').toString()}`
     this.queuedUploads.add(`${bagIdStr}|${objectId.toString()}`)
+    this.saveQueue()
   }
 
-  public async uploadFromEvents(events: IEvent<[Vec<DataObjectId>, UploadParameters, Balance]>[]): Promise<void> {
+  private finalizeUpload(bagId: string, dataObjectId: number, dataPath: string) {
+    this.queuedUploads.delete(`${bagId}|${dataObjectId}`)
+    this.saveQueue()
+    try {
+      fs.rmSync(dataPath)
+    } catch (e) {
+      this.logger.error(`Could not remove file "${dataPath}" after succesful upload...`)
+    }
+  }
+
+  public queueUploadsFromEvents(events: IEvent<[Vec<DataObjectId>, UploadParameters, Balance]>[]): void {
+    let queuedUploads = 0
     events.map(({ data: [objectIds, uploadParams] }) => {
-      objectIds.forEach((objectId) => this.queueUpload(uploadParams.bagId, objectId))
+      objectIds.forEach((objectId) => {
+        this.queueUpload(uploadParams.bagId, objectId)
+        ++queuedUploads
+      })
     })
-    await this.processQueuedUploads()
+    this.logger.info(`Added ${queuedUploads} new data object uploads to the upload queue`)
   }
 }

+ 5 - 25
utils/migration-scripts/src/sumer-giza/AssetsMigration.ts

@@ -1,40 +1,20 @@
-import { BaseMigration, BaseMigrationConfig, BaseMigrationParams, MigrationResult } from './BaseMigration'
+import { BaseMigration, BaseMigrationConfig, BaseMigrationParams } from './BaseMigration'
 import { AssetsManager, AssetsManagerConfig } from './AssetsManager'
 
 export type AssetsMigrationConfig = BaseMigrationConfig & AssetsManagerConfig
 
 export type AssetsMigrationParams = BaseMigrationParams & {
+  assetsManager: AssetsManager
   config: AssetsMigrationConfig
 }
 
 export abstract class AssetsMigration extends BaseMigration {
   protected config: AssetsMigrationConfig
-  protected assetsManager!: AssetsManager
+  protected assetsManager: AssetsManager
 
-  public constructor({ api, queryNodeApi, config }: AssetsMigrationParams) {
+  public constructor({ api, queryNodeApi, config, assetsManager }: AssetsMigrationParams) {
     super({ api, queryNodeApi, config })
     this.config = config
-  }
-
-  public async init(): Promise<void> {
-    await super.init()
-    this.assetsManager = await AssetsManager.create({
-      api: this.api,
-      queryNodeApi: this.queryNodeApi,
-      config: this.config,
-    })
-  }
-
-  public abstract run(): Promise<MigrationResult>
-
-  protected saveMigrationState(): void {
-    super.saveMigrationState()
-    if (this.assetsManager.queueSize) {
-      const failedUploadsFilePath = this.getMigrationStateFilePath().replace(
-        '.json',
-        `FailedUploads_${Date.now()}.json`
-      )
-      this.assetsManager.saveQueue(failedUploadsFilePath)
-    }
+    this.assetsManager = assetsManager
   }
 }

+ 82 - 11
utils/migration-scripts/src/sumer-giza/BaseMigration.ts

@@ -3,10 +3,12 @@ import { KeyringPair } from '@polkadot/keyring/types'
 import { QueryNodeApi } from './sumer-query-node/api'
 import { RuntimeApi } from '../RuntimeApi'
 import { Keyring } from '@polkadot/keyring'
+import { Logger } from 'winston'
 import path from 'path'
 import nodeCleanup from 'node-cleanup'
 import _ from 'lodash'
 import fs from 'fs'
+import { SubmittableExtrinsic } from '@polkadot/api/types'
 
 export type MigrationResult = {
   idsMap: Map<number, number>
@@ -37,6 +39,8 @@ export abstract class BaseMigration {
   protected config: BaseMigrationConfig
   protected failedMigrations: Set<number>
   protected idsMap: Map<number, number>
+  protected pendingMigrationIteration: Promise<void> | undefined
+  protected abstract logger: Logger
 
   public constructor({ api, queryNodeApi, config }: BaseMigrationParams) {
     this.api = api
@@ -54,12 +58,14 @@ export abstract class BaseMigration {
 
   public async init(): Promise<void> {
     this.loadMigrationState()
-    nodeCleanup(() => this.saveMigrationState())
+    nodeCleanup(this.onExit.bind(this))
     await this.loadSudoKey()
   }
 
   public abstract run(): Promise<MigrationResult>
 
+  protected abstract migrateBatch(batchTx: SubmittableExtrinsic<'promise'>, batch: { id: string }[]): Promise<void>
+
   protected getMigrationStateJson(): MigrationStateJson {
     return {
       idsMapEntries: Array.from(this.idsMap.entries()),
@@ -76,7 +82,25 @@ export abstract class BaseMigration {
     }
   }
 
-  protected saveMigrationState(): void {
+  protected onExit(exitCode: number | null, signal: string | null): void | false {
+    nodeCleanup.uninstall() // don't call cleanup handler again
+    this.logger.info('Exitting...')
+    if (signal && this.pendingMigrationIteration) {
+      this.logger.info('Waiting for currently pending migration iteration to finalize...')
+      this.pendingMigrationIteration.then(() => {
+        this.saveMigrationState(true)
+        this.logger.info('Done.')
+        process.kill(process.pid, signal)
+      })
+      return false
+    } else {
+      this.saveMigrationState(true)
+      this.logger.info('Done.')
+    }
+  }
+
+  protected saveMigrationState(isExitting: boolean): void {
+    this.logger.info(`Saving ${isExitting ? 'final' : 'intermediate'} migration state...`)
     const stateFilePath = this.getMigrationStateFilePath()
     const migrationState = this.getMigrationStateJson()
     fs.writeFileSync(stateFilePath, JSON.stringify(migrationState, undefined, 2))
@@ -92,22 +116,69 @@ export abstract class BaseMigration {
     }
   }
 
-  protected extractFailedSudoAsMigrations<T extends { id: string }>(result: SubmittableResult, batch: T[]): void {
+  protected async executeBatchMigration<T extends { id: string }>(
+    batchTx: SubmittableExtrinsic<'promise'>,
+    batch: T[]
+  ): Promise<void> {
+    this.pendingMigrationIteration = (async () => {
+      await this.migrateBatch(batchTx, batch)
+      this.saveMigrationState(false)
+    })()
+    await this.pendingMigrationIteration
+    this.pendingMigrationIteration = undefined
+  }
+
+  /**
+   * Extract failed migrations (entity ids) from batch transaction result.
+   * Assumptions:
+   * - Each entity is migrated with a constant number of calls (2 by default: balnces.transferKeepAlive and sudo.sudoAs)
+   * - Ordering of the entities in the `batch` array matches the ordering of the batched calls through which they are migrated
+   * - Last call for each entity is always sudo.sudoAs
+   * - There is only one sudo.sudoAs call per entity
+   *
+   * Entity migration is considered failed if sudo.sudoAs call failed or was not executed at all, regardless of
+   * the result of any of the previous calls associated with that entity migration.
+   * (This means that regardless of whether balnces.transferKeepAlive failed and interrupted the batch or balnces.transferKeepAlive
+   * succeeded, but sudo.sudoAs failed - in both cases the migration is considered failed and should be fully re-executed on
+   * the next script run)
+   */
+  protected extractFailedMigrations<T extends { id: string }>(
+    result: SubmittableResult,
+    batch: T[],
+    callsPerEntity = 2
+  ): void {
     const { api } = this
+    const batchInterruptedEvent = api.findEvent(result, 'utility', 'BatchInterrupted')
     const sudoAsDoneEvents = api.findEvents(result, 'sudo', 'SudoAsDone')
-    if (sudoAsDoneEvents.length !== batch.length) {
-      throw new Error(`Could not extract failed migrations from: ${JSON.stringify(result.toHuman())}`)
+    const numberOfSuccesfulCalls = batchInterruptedEvent
+      ? batchInterruptedEvent.data[0].toNumber()
+      : callsPerEntity * batch.length
+    const numberOfMigratedEntites = Math.floor(numberOfSuccesfulCalls / callsPerEntity)
+    if (sudoAsDoneEvents.length !== numberOfMigratedEntites) {
+      throw new Error(
+        `Unexpected number of SudoAsDone events (expected: ${numberOfMigratedEntites}, got: ${sudoAsDoneEvents.length})! ` +
+          `Could not extract failed migrations from: ${JSON.stringify(result.toHuman())}`
+      )
     }
     const failedIds: number[] = []
-    sudoAsDoneEvents.forEach(({ data: [sudoAsDone] }, i) => {
-      if (sudoAsDone.isFalse) {
-        const id = parseInt(batch[i].id)
-        failedIds.push(id)
-        this.failedMigrations.add(id)
+    batch.forEach((entity, i) => {
+      const entityId = parseInt(entity.id)
+      if (i >= numberOfMigratedEntites || sudoAsDoneEvents[i].data[0].isFalse) {
+        failedIds.push(entityId)
+        this.failedMigrations.add(entityId)
       }
     })
+
+    if (batchInterruptedEvent) {
+      this.logger.error(
+        `Batch interrupted at call ${numberOfSuccesfulCalls}: ${this.api.formatDispatchError(
+          batchInterruptedEvent.data[1]
+        )}`
+      )
+    }
+
     if (failedIds.length) {
-      console.error(`Failed to migrate:`, failedIds)
+      this.logger.error(`Failed to migrate:`, { failedIds })
     }
   }
 

+ 38 - 26
utils/migration-scripts/src/sumer-giza/ChannelsMigration.ts

@@ -8,6 +8,9 @@ import { CHANNEL_AVATAR_TARGET_SIZE, CHANNEL_COVER_TARGET_SIZE } from './ImageRe
 import { ChannelId } from '@joystream/types/common'
 import _ from 'lodash'
 import { MigrationResult } from './BaseMigration'
+import { Logger } from 'winston'
+import { createLogger } from '../logging'
+import { SubmittableExtrinsic } from '@polkadot/api/types'
 
 export type ChannelsMigrationConfig = AssetsMigrationConfig & {
   channelIds: number[]
@@ -30,11 +33,13 @@ export class ChannelMigration extends AssetsMigration {
   protected config: ChannelsMigrationConfig
   protected videoIds: number[] = []
   protected forcedChannelOwner: { id: string; controllerAccount: string } | undefined
+  protected logger: Logger
 
   public constructor(params: ChannelsMigrationParams) {
     super(params)
     this.config = params.config
     this.forcedChannelOwner = params.forcedChannelOwner
+    this.logger = createLogger(this.name)
   }
 
   private getChannelOwnerMember({ id, ownerMember }: ChannelFieldsFragment) {
@@ -49,6 +54,31 @@ export class ChannelMigration extends AssetsMigration {
     return ownerMember
   }
 
+  protected async migrateBatch(tx: SubmittableExtrinsic<'promise'>, channels: ChannelFieldsFragment[]): Promise<void> {
+    const { api } = this
+    const result = await api.sendExtrinsic(this.sudo, tx)
+    const channelCreatedEvents = api.findEvents(result, 'content', 'ChannelCreated')
+    const newChannelIds: ChannelId[] = channelCreatedEvents.map((e) => e.data[1])
+    if (channelCreatedEvents.length !== channels.length) {
+      this.extractFailedMigrations(result, channels)
+    }
+    const newChannelMapEntries: [number, number][] = []
+    let newChannelIdIndex = 0
+    channels.forEach(({ id }) => {
+      if (this.failedMigrations.has(parseInt(id))) {
+        return
+      }
+      const newChannelId = newChannelIds[newChannelIdIndex++].toNumber()
+      this.idsMap.set(parseInt(id), newChannelId)
+      newChannelMapEntries.push([parseInt(id), newChannelId])
+    })
+    if (newChannelMapEntries.length) {
+      this.logger.info('Channel map entries added!', { newChannelMapEntries })
+      const dataObjectsUploadedEvents = this.api.findEvents(result, 'storage', 'DataObjectsUploaded')
+      this.assetsManager.queueUploadsFromEvents(dataObjectsUploadedEvents)
+    }
+  }
+
   public async run(): Promise<ChannelsMigrationResult> {
     await this.init()
     const {
@@ -58,12 +88,12 @@ export class ChannelMigration extends AssetsMigration {
     const ids = channelIds.sort((a, b) => a - b)
     while (ids.length) {
       const idsBatch = ids.splice(0, channelBatchSize)
-      console.log(`Fetching a batch of ${idsBatch.length} channels...`)
+      this.logger.info(`Fetching a batch of ${idsBatch.length} channels...`)
       const channelsBatch = (await this.queryNodeApi.getChannelsByIds(idsBatch)).sort(
         (a, b) => parseInt(a.id) - parseInt(b.id)
       )
       if (channelsBatch.length < idsBatch.length) {
-        console.error(
+        this.logger.warn(
           `Some channels were not be found: ${_.difference(
             idsBatch,
             channelsBatch.map((c) => parseInt(c.id))
@@ -72,34 +102,16 @@ export class ChannelMigration extends AssetsMigration {
       }
       const channelsToMigrate = channelsBatch.filter((c) => !this.idsMap.has(parseInt(c.id)))
       if (channelsToMigrate.length < channelsBatch.length) {
-        console.log(
+        this.logger.info(
           `${channelsToMigrate.length ? 'Some' : 'All'} channels in this batch were already migrated ` +
             `(${channelsBatch.length - channelsToMigrate.length}/${channelsBatch.length})`
         )
       }
       if (channelsToMigrate.length) {
-        const txs = _.flatten(await Promise.all(channelsToMigrate.map((c) => this.prepareChannel(c))))
-        const result = await api.sendExtrinsic(this.sudo, api.tx.utility.batch(txs))
-        const channelCreatedEvents = api.findEvents(result, 'content', 'ChannelCreated')
-        const newChannelIds: ChannelId[] = channelCreatedEvents.map((e) => e.data[1])
-        if (channelCreatedEvents.length !== channelsToMigrate.length) {
-          this.extractFailedSudoAsMigrations(result, channelsToMigrate)
-        }
-        const dataObjectsUploadedEvents = api.findEvents(result, 'storage', 'DataObjectsUploaded')
-        const newChannelMapEntries: [number, number][] = []
-        let newChannelIdIndex = 0
-        channelsToMigrate.forEach(({ id }) => {
-          if (this.failedMigrations.has(parseInt(id))) {
-            return
-          }
-          const newChannelId = newChannelIds[newChannelIdIndex++].toNumber()
-          this.idsMap.set(parseInt(id), newChannelId)
-          newChannelMapEntries.push([parseInt(id), newChannelId])
-        })
-        if (newChannelMapEntries.length) {
-          console.log('Channel map entries added!', newChannelMapEntries)
-          await this.assetsManager.uploadFromEvents(dataObjectsUploadedEvents)
-        }
+        const calls = _.flatten(await Promise.all(channelsToMigrate.map((c) => this.prepareChannel(c))))
+        const batchTx = api.tx.utility.batch(calls)
+        await this.executeBatchMigration(batchTx, channelsToMigrate)
+        await this.assetsManager.processQueuedUploads()
       }
       const videoIdsToMigrate: number[] = channelsBatch.reduce<number[]>(
         (res, { id, videos }) =>
@@ -110,7 +122,7 @@ export class ChannelMigration extends AssetsMigration {
       )
       this.videoIds = this.videoIds.concat(videoIdsToMigrate)
       if (videoIdsToMigrate.length) {
-        console.log(`Added ${videoIdsToMigrate.length} video ids to the list of videos to migrate`)
+        this.logger.info(`Added ${videoIdsToMigrate.length} video ids to the list of videos to migrate`)
       }
     }
     return {

+ 7 - 0
utils/migration-scripts/src/sumer-giza/ContentMigration.ts

@@ -3,6 +3,7 @@ import { QueryNodeApi } from './sumer-query-node/api'
 import { RuntimeApi } from '../RuntimeApi'
 import { VideosMigration } from './VideosMigration'
 import { ChannelMigration } from './ChannelsMigration'
+import { AssetsManager } from './AssetsManager'
 
 export type ContentMigrationConfig = {
   queryNodeUri: string
@@ -52,11 +53,16 @@ export class ContentMigration {
     const { api, queryNodeApi, config } = this
     await this.api.isReadyOrError
     const forcedChannelOwner = await this.getForcedChannelOwner()
+    const assetsManager = await AssetsManager.create({
+      api,
+      config,
+    })
     const { idsMap: channelsMap, videoIds } = await new ChannelMigration({
       api,
       queryNodeApi,
       config,
       forcedChannelOwner,
+      assetsManager,
     }).run()
     await new VideosMigration({
       api,
@@ -65,6 +71,7 @@ export class ContentMigration {
       channelsMap,
       videoIds,
       forcedChannelOwner,
+      assetsManager,
     }).run()
   }
 }

+ 44 - 33
utils/migration-scripts/src/sumer-giza/VideosMigration.ts

@@ -8,6 +8,9 @@ import moment from 'moment'
 import { VIDEO_THUMB_TARGET_SIZE } from './ImageResizer'
 import { AssetsMigration, AssetsMigrationConfig, AssetsMigrationParams } from './AssetsMigration'
 import { MigrationResult } from './BaseMigration'
+import { Logger } from 'winston'
+import { createLogger } from '../logging'
+import { SubmittableExtrinsic } from '@polkadot/api/types'
 
 export type VideosMigrationConfig = AssetsMigrationConfig & {
   videoBatchSize: number
@@ -26,13 +29,15 @@ export class VideosMigration extends AssetsMigration {
   protected channelsMap: Map<number, number>
   protected videoIds: number[]
   protected forcedChannelOwner: { id: string; controllerAccount: string } | undefined
-
-  public constructor({ api, queryNodeApi, config, videoIds, channelsMap, forcedChannelOwner }: VideosMigrationParams) {
-    super({ api, queryNodeApi, config })
-    this.config = config
-    this.channelsMap = channelsMap
-    this.videoIds = videoIds
-    this.forcedChannelOwner = forcedChannelOwner
+  protected logger: Logger
+
+  public constructor(params: VideosMigrationParams) {
+    super(params)
+    this.config = params.config
+    this.channelsMap = params.channelsMap
+    this.videoIds = params.videoIds
+    this.forcedChannelOwner = params.forcedChannelOwner
+    this.logger = createLogger(this.name)
   }
 
   private getNewChannelId(oldChannelId: number): number {
@@ -43,6 +48,31 @@ export class VideosMigration extends AssetsMigration {
     return newChannelId
   }
 
+  protected async migrateBatch(tx: SubmittableExtrinsic<'promise'>, videos: VideoFieldsFragment[]): Promise<void> {
+    const { api } = this
+    const result = await api.sendExtrinsic(this.sudo, tx)
+    const videoCreatedEvents = api.findEvents(result, 'content', 'VideoCreated')
+    const newVideoIds: VideoId[] = videoCreatedEvents.map((e) => e.data[2])
+    if (videoCreatedEvents.length !== videos.length) {
+      this.extractFailedMigrations(result, videos)
+    }
+    const newVideoMapEntries: [number, number][] = []
+    let newVideoIdIndex = 0
+    videos.forEach(({ id }) => {
+      if (this.failedMigrations.has(parseInt(id))) {
+        return
+      }
+      const newVideoId = newVideoIds[newVideoIdIndex++].toNumber()
+      this.idsMap.set(parseInt(id), newVideoId)
+      newVideoMapEntries.push([parseInt(id), newVideoId])
+    })
+    if (newVideoMapEntries.length) {
+      this.logger.info('Video map entries added!', { newVideoMapEntries })
+      const dataObjectsUploadedEvents = api.findEvents(result, 'storage', 'DataObjectsUploaded')
+      this.assetsManager.queueUploadsFromEvents(dataObjectsUploadedEvents)
+    }
+  }
+
   public async run(): Promise<MigrationResult> {
     await this.init()
     const {
@@ -53,48 +83,29 @@ export class VideosMigration extends AssetsMigration {
     const idsToMigrate = videoIds.filter((id) => !this.idsMap.has(id)).sort((a, b) => a - b)
     if (idsToMigrate.length < videoIds.length) {
       const alreadyMigratedVideosNum = videoIds.length - idsToMigrate.length
-      console.log(
+      this.logger.info(
         (idsToMigrate.length ? `${alreadyMigratedVideosNum}/${videoIds.length}` : 'All') +
           ' videos already migrated, skippping...'
       )
     }
     while (idsToMigrate.length) {
       const idsBatch = idsToMigrate.splice(0, videoBatchSize)
-      console.log(`Fetching a batch of ${idsBatch.length} videos...`)
+      this.logger.info(`Fetching a batch of ${idsBatch.length} videos...`)
       const videosBatch = (await this.queryNodeApi.getVideosByIds(idsBatch)).sort(
         (a, b) => parseInt(a.id) - parseInt(b.id)
       )
       if (videosBatch.length < idsBatch.length) {
-        console.error(
+        this.logger.warn(
           `Some videos were not be found: ${_.difference(
             idsBatch,
             videosBatch.map((v) => parseInt(v.id))
           )}`
         )
       }
-      const txs = _.flatten(await Promise.all(videosBatch.map((v) => this.prepareVideo(v))))
-      const result = await api.sendExtrinsic(this.sudo, api.tx.utility.batch(txs))
-      const videoCreatedEvents = api.findEvents(result, 'content', 'VideoCreated')
-      const newVideoIds: VideoId[] = videoCreatedEvents.map((e) => e.data[2])
-      if (videoCreatedEvents.length !== videosBatch.length) {
-        this.extractFailedSudoAsMigrations(result, videosBatch)
-      }
-
-      const dataObjectsUploadedEvents = api.findEvents(result, 'storage', 'DataObjectsUploaded')
-      const newVideoMapEntries: [number, number][] = []
-      let newVideoIdIndex = 0
-      videosBatch.forEach(({ id }) => {
-        if (this.failedMigrations.has(parseInt(id))) {
-          return
-        }
-        const newVideoId = newVideoIds[newVideoIdIndex++].toNumber()
-        this.idsMap.set(parseInt(id), newVideoId)
-        newVideoMapEntries.push([parseInt(id), newVideoId])
-      })
-      if (newVideoMapEntries.length) {
-        console.log('Video map entries added!', newVideoMapEntries)
-        await this.assetsManager.uploadFromEvents(dataObjectsUploadedEvents)
-      }
+      const calls = _.flatten(await Promise.all(videosBatch.map((v) => this.prepareVideo(v))))
+      const batchTx = api.tx.utility.batch(calls)
+      await this.executeBatchMigration(batchTx, videosBatch)
+      await this.assetsManager.processQueuedUploads()
     }
     return this.getResult()
   }

+ 6 - 2
utils/migration-scripts/src/sumer-giza/sumer-query-node/api.ts

@@ -30,12 +30,15 @@ import {
   VideoFieldsFragment,
   WorkerFieldsFragment,
 } from './generated/queries'
+import { Logger } from 'winston'
+import { createLogger } from '../../logging'
 
 export class QueryNodeApi {
   private endpoint: string
   private apolloClient: ApolloClient<NormalizedCacheObject>
   private retryAttempts: number
   private retryIntervalMs: number
+  private logger: Logger
 
   public constructor(endpoint: string, retryAttempts = 5, retryIntervalMs = 5000) {
     this.endpoint = endpoint
@@ -46,6 +49,7 @@ export class QueryNodeApi {
       cache: new InMemoryCache(),
       defaultOptions: { query: { fetchPolicy: 'no-cache', errorPolicy: 'all' } },
     })
+    this.logger = createLogger('Query Node Api')
   }
 
   private async query<T>(queryFunc: () => Promise<ApolloQueryResult<T>>): Promise<ApolloQueryResult<T>> {
@@ -56,11 +60,11 @@ export class QueryNodeApi {
         return result
       } catch (e) {
         if (e instanceof Error && isApolloError(e) && e.networkError) {
-          console.error(`Query node (${this.endpoint}) network error: ${e.networkError.message}`)
+          this.logger.error(`${this.endpoint} network error: ${e.networkError.message}`)
           if (attempts++ > this.retryAttempts) {
             throw new Error(`Maximum number of query retry attempts reached for ${this.endpoint}`)
           }
-          console.log(`Retrying in ${this.retryIntervalMs}ms...`)
+          this.logger.info(`Retrying in ${this.retryIntervalMs}ms...`)
           await new Promise((resolve) => setTimeout(resolve, this.retryIntervalMs))
         } else {
           throw e