index.ts 21 KB


  1. /*
  2. eslint-disable @typescript-eslint/naming-convention
  3. */
  4. import { DatabaseManager, EventContext, StoreContext } from '@joystream/hydra-common'
  5. import { Storage } from '../generated/types/storage'
  6. import {
  7. DistributionBucket,
  8. DistributionBucketFamily,
  9. DistributionBucketOperator,
  10. DistributionBucketOperatorStatus,
  11. StorageBag,
  12. StorageBagOwner,
  13. StorageBagOwnerChannel,
  14. StorageBagOwnerCouncil,
  15. StorageBagOwnerMember,
  16. StorageBagOwnerWorkingGroup,
  17. StorageBucket,
  18. StorageBucketOperatorStatusActive,
  19. StorageBucketOperatorStatusInvited,
  20. StorageBucketOperatorStatusMissing,
  21. StorageDataObject,
  22. StorageSystemParameters,
  23. } from 'query-node/dist/model'
  24. import BN from 'bn.js'
  25. import { getById, getWorkingGroupModuleName } from '../common'
  26. import { BTreeSet } from '@polkadot/types'
  27. import { DataObjectCreationParameters } from '@joystream/types/storage'
  28. import { registry } from '@joystream/types'
  29. import { In } from 'typeorm'
  30. import _ from 'lodash'
  31. import { DataObjectId, BagId, DynamicBagId, StaticBagId } from '@joystream/types/augment/all'
  32. import { processDistributionOperatorMetadata, processStorageOperatorMetadata } from './metadata'
  33. async function getDataObjectsInBag(
  34. store: DatabaseManager,
  35. bagId: BagId,
  36. dataObjectIds: BTreeSet<DataObjectId>
  37. ): Promise<StorageDataObject[]> {
  38. const dataObjects = await store.getMany(StorageDataObject, {
  39. where: {
  40. id: In(Array.from(dataObjectIds).map((id) => id.toString())),
  41. storageBag: { id: getBagId(bagId) },
  42. },
  43. })
  44. if (dataObjects.length !== Array.from(dataObjectIds).length) {
  45. throw new Error(
  46. `Missing data objects: ${_.difference(
  47. Array.from(dataObjectIds).map((id) => id.toString()),
  48. dataObjects.map((o) => o.id)
  49. )} in bag ${getBagId(bagId)}`
  50. )
  51. }
  52. return dataObjects
  53. }
  54. function getStaticBagOwner(bagId: StaticBagId): typeof StorageBagOwner {
  55. if (bagId.isCouncil) {
  56. return new StorageBagOwnerCouncil()
  57. } else if (bagId.isWorkingGroup) {
  58. const owner = new StorageBagOwnerWorkingGroup()
  59. owner.workingGroupId = getWorkingGroupModuleName(bagId.asWorkingGroup)
  60. return owner
  61. } else {
  62. throw new Error(`Unexpected static bag type: ${bagId.type}`)
  63. }
  64. }
  65. function getDynamicBagOwner(bagId: DynamicBagId) {
  66. if (bagId.isChannel) {
  67. const owner = new StorageBagOwnerChannel()
  68. owner.channelId = bagId.asChannel.toNumber()
  69. return owner
  70. } else if (bagId.isMember) {
  71. const owner = new StorageBagOwnerMember()
  72. owner.memberId = bagId.asMember.toNumber()
  73. return owner
  74. } else {
  75. throw new Error(`Unexpected dynamic bag type: ${bagId.type}`)
  76. }
  77. }
  78. function getStaticBagId(bagId: StaticBagId): string {
  79. if (bagId.isCouncil) {
  80. return `CO`
  81. } else if (bagId.isWorkingGroup) {
  82. return `WG-${bagId.asWorkingGroup.type}`
  83. } else {
  84. throw new Error(`Unexpected static bag type: ${bagId.type}`)
  85. }
  86. }
  87. function getDynamicBagId(bagId: DynamicBagId): string {
  88. if (bagId.isChannel) {
  89. return `CH-${bagId.asChannel.toString()}`
  90. } else if (bagId.isMember) {
  91. return `M-${bagId.asMember.toString()}`
  92. } else {
  93. throw new Error(`Unexpected dynamic bag type: ${bagId.type}`)
  94. }
  95. }
  96. function getBagId(bagId: BagId) {
  97. return bagId.isStatic ? getStaticBagId(bagId.asStatic) : getDynamicBagId(bagId.asDynamic)
  98. }
  99. async function getDynamicBag(
  100. store: DatabaseManager,
  101. bagId: DynamicBagId,
  102. relations?: ('storedBy' | 'distributedBy' | 'objects')[]
  103. ): Promise<StorageBag> {
  104. return getById(store, StorageBag, getDynamicBagId(bagId), relations)
  105. }
  106. async function getStaticBag(
  107. store: DatabaseManager,
  108. bagId: StaticBagId,
  109. relations?: ('storedBy' | 'distributedBy' | 'objects')[]
  110. ): Promise<StorageBag> {
  111. const id = getStaticBagId(bagId)
  112. const bag = await store.get(StorageBag, { where: { id }, relations })
  113. if (!bag) {
  114. console.log(`Creating new static bag: ${id}`)
  115. const newBag = new StorageBag({
  116. id,
  117. owner: getStaticBagOwner(bagId),
  118. })
  119. await store.save<StorageBag>(newBag)
  120. return newBag
  121. }
  122. return bag
  123. }
  124. async function getBag(
  125. store: DatabaseManager,
  126. bagId: BagId,
  127. relations?: ('storedBy' | 'distributedBy' | 'objects')[]
  128. ): Promise<StorageBag> {
  129. return bagId.isStatic
  130. ? getStaticBag(store, bagId.asStatic, relations)
  131. : getDynamicBag(store, bagId.asDynamic, relations)
  132. }
  133. async function getDistributionBucketOperatorWithMetadata(store: DatabaseManager, id: string) {
  134. const operator = await store.get(DistributionBucketOperator, {
  135. where: { id },
  136. relations: ['metadata', 'metadata.nodeLocation', 'metadata.nodeLocation.coordinates'],
  137. })
  138. if (!operator) {
  139. throw new Error(`DistributionBucketOperator not found by id: ${id}`)
  140. }
  141. return operator
  142. }
  143. async function getStorageBucketWithOperatorMetadata(store: DatabaseManager, id: string) {
  144. const bucket = await store.get(StorageBucket, {
  145. where: { id },
  146. relations: ['operatorMetadata', 'operatorMetadata.nodeLocation', 'operatorMetadata.nodeLocation.coordinates'],
  147. })
  148. if (!bucket) {
  149. throw new Error(`StorageBucket not found by id: ${id}`)
  150. }
  151. return bucket
  152. }
  153. async function getDistributionBucketFamilyWithMetadata(store: DatabaseManager, id: string) {
  154. const family = await store.get(DistributionBucketFamily, {
  155. where: { id },
  156. relations: ['metadata', 'metadata.boundary'],
  157. })
  158. if (!family) {
  159. throw new Error(`DistributionBucketFamily not found by id: ${id}`)
  160. }
  161. return family
  162. }
  163. // BUCKETS
  164. export async function storage_StorageBucketCreated({ event, store }: EventContext & StoreContext): Promise<void> {
  165. const [
  166. bucketId,
  167. invitedWorkerId,
  168. acceptingNewBags,
  169. dataObjectSizeLimit,
  170. dataObjectCountLimit,
  171. ] = new Storage.StorageBucketCreatedEvent(event).params
  172. const storageBucket = new StorageBucket({
  173. id: bucketId.toString(),
  174. acceptingNewBags: acceptingNewBags.isTrue,
  175. dataObjectCountLimit: new BN(dataObjectCountLimit.toString()),
  176. dataObjectsSizeLimit: new BN(dataObjectSizeLimit.toString()),
  177. })
  178. if (invitedWorkerId.isSome) {
  179. const operatorStatus = new StorageBucketOperatorStatusInvited()
  180. operatorStatus.workerId = invitedWorkerId.unwrap().toNumber()
  181. storageBucket.operatorStatus = operatorStatus
  182. } else {
  183. storageBucket.operatorStatus = new StorageBucketOperatorStatusMissing()
  184. }
  185. await store.save<StorageBucket>(storageBucket)
  186. }
  187. export async function storage_StorageOperatorMetadataSet({ event, store }: EventContext & StoreContext): Promise<void> {
  188. const [bucketId, , metadataBytes] = new Storage.StorageOperatorMetadataSetEvent(event).params
  189. const storageBucket = await getStorageBucketWithOperatorMetadata(store, bucketId.toString())
  190. storageBucket.operatorMetadata = await processStorageOperatorMetadata(
  191. store,
  192. storageBucket.operatorMetadata,
  193. metadataBytes
  194. )
  195. await store.save<StorageBucket>(storageBucket)
  196. }
  197. export async function storage_StorageBucketStatusUpdated({ event, store }: EventContext & StoreContext): Promise<void> {
  198. const [bucketId, , acceptingNewBags] = new Storage.StorageBucketStatusUpdatedEvent(event).params
  199. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  200. storageBucket.acceptingNewBags = acceptingNewBags.isTrue
  201. await store.save<StorageBucket>(storageBucket)
  202. }
  203. export async function storage_StorageBucketInvitationAccepted({
  204. event,
  205. store,
  206. }: EventContext & StoreContext): Promise<void> {
  207. const [bucketId, workerId] = new Storage.StorageBucketInvitationAcceptedEvent(event).params
  208. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  209. const operatorStatus = new StorageBucketOperatorStatusActive()
  210. operatorStatus.workerId = workerId.toNumber()
  211. storageBucket.operatorStatus = operatorStatus
  212. await store.save<StorageBucket>(storageBucket)
  213. }
  214. export async function storage_StorageBucketInvitationCancelled({
  215. event,
  216. store,
  217. }: EventContext & StoreContext): Promise<void> {
  218. const [bucketId] = new Storage.StorageBucketInvitationCancelledEvent(event).params
  219. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  220. const operatorStatus = new StorageBucketOperatorStatusMissing()
  221. storageBucket.operatorStatus = operatorStatus
  222. await store.save<StorageBucket>(storageBucket)
  223. }
  224. export async function storage_StorageBucketOperatorInvited({
  225. event,
  226. store,
  227. }: EventContext & StoreContext): Promise<void> {
  228. const [bucketId, workerId] = new Storage.StorageBucketOperatorInvitedEvent(event).params
  229. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  230. const operatorStatus = new StorageBucketOperatorStatusInvited()
  231. operatorStatus.workerId = workerId.toNumber()
  232. storageBucket.operatorStatus = operatorStatus
  233. await store.save<StorageBucket>(storageBucket)
  234. }
  235. export async function storage_StorageBucketOperatorRemoved({
  236. event,
  237. store,
  238. }: EventContext & StoreContext): Promise<void> {
  239. const [bucketId] = new Storage.StorageBucketInvitationCancelledEvent(event).params
  240. const storageBucket = await getById(store, StorageBucket, bucketId.toString())
  241. const operatorStatus = new StorageBucketOperatorStatusMissing()
  242. storageBucket.operatorStatus = operatorStatus
  243. await store.save<StorageBucket>(storageBucket)
  244. }
  245. export async function storage_StorageBucketsUpdatedForBag({
  246. event,
  247. store,
  248. }: EventContext & StoreContext): Promise<void> {
  249. const [bagId, addedBucketsIds, removedBucketsIds] = new Storage.StorageBucketsUpdatedForBagEvent(event).params
  250. const storageBag = await getBag(store, bagId, ['storedBy'])
  251. storageBag.storedBy = (storageBag.storedBy || [])
  252. .filter((b) => !Array.from(removedBucketsIds).some((id) => id.eq(b.id)))
  253. .concat(Array.from(addedBucketsIds).map((id) => new StorageBucket({ id: id.toString() })))
  254. await store.save<StorageBag>(storageBag)
  255. }
  256. export async function storage_StorageBucketDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  257. const [bucketId] = new Storage.StorageBucketDeletedEvent(event).params
  258. // TODO: Delete or just change status?
  259. // TODO: Cascade remove on db level?
  260. // We shouldn't have to worry about deleting DataObjects, since this is already enforced by the runtime
  261. const storageBucket = await getById(store, StorageBucket, bucketId.toString(), ['storedBags'])
  262. await Promise.all((storageBucket.storedBags || []).map((b) => store.remove<StorageBag>(b)))
  263. await store.remove<StorageBucket>(storageBucket)
  264. }
  265. // DYNAMIC BAGS
  266. export async function storage_DynamicBagCreated({ event, store }: EventContext & StoreContext): Promise<void> {
  267. const [bagId] = new Storage.DynamicBagCreatedEvent(event).params
  268. const storageBag = new StorageBag({
  269. id: getDynamicBagId(bagId),
  270. owner: getDynamicBagOwner(bagId),
  271. })
  272. await store.save<StorageBag>(storageBag)
  273. }
  274. export async function storage_DynamicBagDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  275. const [, bagId] = new Storage.DynamicBagDeletedEvent(event).params
  276. // TODO: Delete or just change status?
  277. // TODO: Cascade remove on db level?
  278. const storageBag = await getDynamicBag(store, bagId, ['objects'])
  279. await Promise.all((storageBag.objects || []).map((o) => store.remove<StorageDataObject>(o)))
  280. await store.remove<StorageBag>(storageBag)
  281. }
  282. // DATA OBJECTS
  283. // Note: "Uploaded" here actually means "created" (the real upload happens later)
  284. export async function storage_DataObjectdUploaded({ event, store }: EventContext & StoreContext): Promise<void> {
  285. const [dataObjectIds, uploadParams] = new Storage.DataObjectdUploadedEvent(event).params
  286. const { bagId, authenticationKey, objectCreationList } = uploadParams
  287. const storageBag = await getBag(store, bagId)
  288. const dataObjects = dataObjectIds.map((objectId, i) => {
  289. const objectParams = new DataObjectCreationParameters(registry, objectCreationList[i].toJSON() as any)
  290. return new StorageDataObject({
  291. id: objectId.toString(),
  292. authenticationKey: authenticationKey.toString(),
  293. isAccepted: false,
  294. ipfsHash: objectParams.ipfsContentId.toString(),
  295. size: new BN(objectParams.getField('size').toString()),
  296. storageBag,
  297. })
  298. })
  299. await Promise.all(dataObjects.map((o) => store.save<StorageDataObject>(o)))
  300. }
  301. export async function storage_PendingDataObjectsAccepted({ event, store }: EventContext & StoreContext): Promise<void> {
  302. const [, , bagId, dataObjectIds] = new Storage.PendingDataObjectsAcceptedEvent(event).params
  303. const dataObjects = await getDataObjectsInBag(store, bagId, dataObjectIds)
  304. await Promise.all(
  305. dataObjects.map(async (dataObject) => {
  306. dataObject.isAccepted = true
  307. // TODO: Do we still want other storage providers to accept it? How long should the key be valid?
  308. // dataObject.authenticationKey = null as any
  309. await store.save<StorageDataObject>(dataObject)
  310. })
  311. )
  312. }
  313. export async function storage_DataObjectsMoved({ event, store }: EventContext & StoreContext): Promise<void> {
  314. const [srcBagId, destBagId, dataObjectIds] = new Storage.DataObjectsMovedEvent(event).params
  315. const dataObjects = await getDataObjectsInBag(store, srcBagId, dataObjectIds)
  316. const destBag = await getBag(store, destBagId)
  317. await Promise.all(
  318. dataObjects.map(async (dataObject) => {
  319. dataObject.storageBag = destBag
  320. await store.save<StorageDataObject>(dataObject)
  321. })
  322. )
  323. }
  324. export async function storage_DataObjectsDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  325. const [, bagId, dataObjectIds] = new Storage.DataObjectsDeletedEvent(event).params
  326. const dataObjects = await getDataObjectsInBag(store, bagId, dataObjectIds)
  327. // TODO: Delete them or just change status?
  328. // (may not be so optimal if we expect a large amount of data objects)
  329. await Promise.all(dataObjects.map((o) => store.remove<StorageDataObject>(o)))
  330. }
  331. // BLACKLIST
  332. export async function storage_UpdateBlacklist({ event, store }: EventContext & StoreContext): Promise<void> {
  333. const [removedContentIds, addedContentIds] = new Storage.UpdateBlacklistEvent(event).params
  334. const storageSystem = await store.get(StorageSystemParameters, {})
  335. if (!storageSystem) {
  336. throw new Error('StorageSystemParameters entity not found!')
  337. }
  338. storageSystem.blacklist = storageSystem.blacklist
  339. .filter((cid) => !Array.from(removedContentIds).some((id) => id.eq(cid)))
  340. .concat(Array.from(addedContentIds).map((id) => id.toString()))
  341. await store.save<StorageSystemParameters>(storageSystem)
  342. }
  343. export async function storage_DistributionBucketFamilyCreated({
  344. event,
  345. store,
  346. }: EventContext & StoreContext): Promise<void> {
  347. const [familyId] = new Storage.DistributionBucketFamilyCreatedEvent(event).params
  348. const family = new DistributionBucketFamily({
  349. id: familyId.toString(),
  350. })
  351. await store.save<DistributionBucketFamily>(family)
  352. }
  353. export async function storage_DistributionBucketFamilyDeleted({
  354. event,
  355. store,
  356. }: EventContext & StoreContext): Promise<void> {
  357. const [familyId] = new Storage.DistributionBucketFamilyDeletedEvent(event).params
  358. const family = await getById(store, DistributionBucketFamily, familyId.toString())
  359. await store.remove(family)
  360. }
  361. export async function storage_DistributionBucketCreated({ event, store }: EventContext & StoreContext): Promise<void> {
  362. const [familyId, acceptingNewBags, bucketId] = new Storage.DistributionBucketCreatedEvent(event).params
  363. const family = await getById(store, DistributionBucketFamily, familyId.toString())
  364. const bucket = new DistributionBucket({
  365. id: bucketId.toString(),
  366. acceptingNewBags: acceptingNewBags.valueOf(),
  367. distributing: true, // Runtime default
  368. family,
  369. })
  370. await store.save<DistributionBucket>(bucket)
  371. }
  372. export async function storage_DistributionBucketStatusUpdated({
  373. event,
  374. store,
  375. }: EventContext & StoreContext): Promise<void> {
  376. const [, bucketId, acceptingNewBags] = new Storage.DistributionBucketStatusUpdatedEvent(event).params
  377. const bucket = await getById(store, DistributionBucket, bucketId.toString())
  378. bucket.acceptingNewBags = acceptingNewBags.valueOf()
  379. await store.save<DistributionBucket>(bucket)
  380. }
  381. export async function storage_DistributionBucketDeleted({ event, store }: EventContext & StoreContext): Promise<void> {
  382. const [, bucketId] = new Storage.DistributionBucketDeletedEvent(event).params
  383. const bucket = await getById(store, DistributionBucket, bucketId.toString())
  384. await store.remove<DistributionBucket>(bucket)
  385. }
  386. export async function storage_DistributionBucketsUpdatedForBag({
  387. event,
  388. store,
  389. }: EventContext & StoreContext): Promise<void> {
  390. const [bagId, , addedBucketsIds, removedBucketsIds] = new Storage.DistributionBucketsUpdatedForBagEvent(event).params
  391. const storageBag = await getBag(store, bagId, ['distributedBy'])
  392. storageBag.distributedBy = (storageBag.distributedBy || [])
  393. .filter((b) => !Array.from(removedBucketsIds).some((id) => id.eq(b.id)))
  394. .concat(Array.from(addedBucketsIds).map((id) => new DistributionBucket({ id: id.toString() })))
  395. await store.save<StorageBag>(storageBag)
  396. }
  397. export async function storage_DistributionBucketModeUpdated({
  398. event,
  399. store,
  400. }: EventContext & StoreContext): Promise<void> {
  401. const [, bucketId, distributing] = new Storage.DistributionBucketModeUpdatedEvent(event).params
  402. const bucket = await getById(store, DistributionBucket, bucketId.toString())
  403. bucket.distributing = distributing.valueOf()
  404. await store.save<DistributionBucket>(bucket)
  405. }
  406. export async function storage_DistributionBucketOperatorInvited({
  407. event,
  408. store,
  409. }: EventContext & StoreContext): Promise<void> {
  410. const [, bucketId, workerId] = new Storage.DistributionBucketOperatorInvitedEvent(event).params
  411. const bucket = await getById(store, DistributionBucket, bucketId.toString())
  412. const invitedOperator = new DistributionBucketOperator({
  413. id: `${bucketId}-${workerId}`,
  414. distributionBucket: bucket,
  415. status: DistributionBucketOperatorStatus.INVITED,
  416. workerId: workerId.toNumber(),
  417. })
  418. await store.save<DistributionBucketOperator>(invitedOperator)
  419. }
  420. export async function storage_DistributionBucketInvitationCancelled({
  421. event,
  422. store,
  423. }: EventContext & StoreContext): Promise<void> {
  424. const [, bucketId, workerId] = new Storage.DistributionBucketOperatorInvitedEvent(event).params
  425. const invitedOperator = await getById(store, DistributionBucketOperator, `${bucketId}-${workerId}`)
  426. await store.remove<DistributionBucketOperator>(invitedOperator)
  427. }
  428. export async function storage_DistributionBucketInvitationAccepted({
  429. event,
  430. store,
  431. }: EventContext & StoreContext): Promise<void> {
  432. const [workerId, , bucketId] = new Storage.DistributionBucketInvitationAcceptedEvent(event).params
  433. const invitedOperator = await getById(store, DistributionBucketOperator, `${bucketId}-${workerId}`)
  434. invitedOperator.status = DistributionBucketOperatorStatus.ACTIVE
  435. await store.save<DistributionBucketOperator>(invitedOperator)
  436. }
  437. export async function storage_DistributionBucketMetadataSet({
  438. event,
  439. store,
  440. }: EventContext & StoreContext): Promise<void> {
  441. const [workerId, , bucketId, metadataBytes] = new Storage.DistributionBucketMetadataSetEvent(event).params
  442. const operator = await getDistributionBucketOperatorWithMetadata(store, `${bucketId}-${workerId}`)
  443. operator.metadata = await processDistributionOperatorMetadata(store, operator.metadata, metadataBytes)
  444. await store.save<DistributionBucketOperator>(operator)
  445. }
  446. export async function storage_DistributionBucketsPerBagLimitUpdated({
  447. event,
  448. store,
  449. }: EventContext & StoreContext): Promise<void> {
  450. // To be implemented
  451. }
  452. export async function storage_FamiliesInDynamicBagCreationPolicyUpdated({
  453. event,
  454. store,
  455. }: EventContext & StoreContext): Promise<void> {
  456. // To be implemented
  457. }
  458. export async function storage_StorageBucketVoucherLimitsSet({
  459. event,
  460. store,
  461. }: EventContext & StoreContext): Promise<void> {
  462. // To be implemented
  463. }
  464. export async function storage_UploadingBlockStatusUpdated({
  465. event,
  466. store,
  467. }: EventContext & StoreContext): Promise<void> {
  468. // To be implemented
  469. }
  470. export async function storage_DataObjectPerMegabyteFeeUpdated({
  471. event,
  472. store,
  473. }: EventContext & StoreContext): Promise<void> {
  474. // To be implemented
  475. }
  476. export async function storage_StorageBucketsPerBagLimitUpdated({
  477. event,
  478. store,
  479. }: EventContext & StoreContext): Promise<void> {
  480. // To be implemented
  481. }
  482. export async function storage_StorageBucketsVoucherMaxLimitsUpdated({
  483. event,
  484. store,
  485. }: EventContext & StoreContext): Promise<void> {
  486. // To be implemented
  487. }
  488. export async function storage_DeletionPrizeChanged({ event, store }: EventContext & StoreContext): Promise<void> {
  489. // To be implemented
  490. }
  491. export async function storage_VoucherChanged({ event, store }: EventContext & StoreContext): Promise<void> {
  492. // To be implemented
  493. }
  494. export async function storage_NumberOfStorageBucketsInDynamicBagCreationPolicyUpdated({
  495. event,
  496. store,
  497. }: EventContext & StoreContext): Promise<void> {
  498. // To be implemented
  499. }