workingGroup.ts 7.5 KB


  1. import { SubstrateEvent } from '@dzlzv/hydra-common'
  2. import { DatabaseManager } from '@dzlzv/hydra-db-utils'
  3. import { FindConditions } from 'typeorm'
  4. import { Bytes } from '@polkadot/types'
  5. import { fixBlockTimestamp } from './eventFix'
  6. import { convertBytesToString, inconsistentState, logger, getNextId } from './common'
  7. import { Channel, Worker, WorkerType } from 'query-node'
  8. import { GatewayWorkingGroup, StorageWorkingGroup } from '../../generated/types'
  9. import { ApplicationId, ApplicationIdToWorkerIdMap, WorkerId } from '@joystream/types/augment'
  10. /// ///////////////// Storage working group //////////////////////////////////////
  11. export async function storageWorkingGroup_OpeningFilled(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  12. // read event data
  13. const { applicationIdToWorkerIdMap } = new StorageWorkingGroup.OpeningFilledEvent(event).data
  14. // call generic processing
  15. await workingGroup_OpeningFilled(db, WorkerType.STORAGE, applicationIdToWorkerIdMap, event)
  16. }
  17. export async function storageWorkingGroup_WorkerStorageUpdated(
  18. db: DatabaseManager,
  19. event: SubstrateEvent
  20. ): Promise<void> {
  21. // read event data
  22. const { workerId, bytes: newMetadata } = new StorageWorkingGroup.WorkerStorageUpdatedEvent(event).data
  23. // call generic processing
  24. await workingGroup_WorkerStorageUpdated(db, WorkerType.STORAGE, workerId, newMetadata)
  25. }
  26. export async function storageWorkingGroup_TerminatedWorker(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  27. // read event data
  28. const { workerId } = new StorageWorkingGroup.TerminatedWorkerEvent(event).data
  29. // call generic processing
  30. await workingGroup_TerminatedWorker(db, event, WorkerType.STORAGE, workerId)
  31. }
  32. export async function storageWorkingGroup_WorkerExited(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  33. // read event data
  34. const { workerId } = new StorageWorkingGroup.WorkerExitedEvent(event).data
  35. // call generic processing
  36. await workingGroup_WorkerExited(db, event, WorkerType.STORAGE, workerId)
  37. }
  38. export async function storageWorkingGroup_TerminatedLeader(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  39. // read event data
  40. const { workerId } = new StorageWorkingGroup.TerminatedLeaderEvent(event).data
  41. // call generic processing
  42. await workingGroup_TerminatedLeader(db, event, WorkerType.STORAGE, workerId)
  43. }
  44. /// ///////////////// Gateway working group //////////////////////////////////////
  45. export async function gatewayWorkingGroup_OpeningFilled(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  46. // read event data
  47. const { applicationIdToWorkerIdMap } = new GatewayWorkingGroup.OpeningFilledEvent(event).data
  48. // call generic processing
  49. await workingGroup_OpeningFilled(db, WorkerType.GATEWAY, applicationIdToWorkerIdMap, event)
  50. }
  51. export async function gatewayWorkingGroup_WorkerStorageUpdated(
  52. db: DatabaseManager,
  53. event: SubstrateEvent
  54. ): Promise<void> {
  55. // read event data
  56. const { workerId, bytes: newMetadata } = new GatewayWorkingGroup.WorkerStorageUpdatedEvent(event).data
  57. // call generic processing
  58. await workingGroup_WorkerStorageUpdated(db, WorkerType.GATEWAY, workerId, newMetadata)
  59. }
  60. export async function gatewayWorkingGroup_TerminatedWorker(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  61. // read event data
  62. const { workerId } = new GatewayWorkingGroup.TerminatedWorkerEvent(event).data
  63. // call generic processing
  64. await workingGroup_TerminatedWorker(db, event, WorkerType.GATEWAY, workerId)
  65. }
  66. export async function gatewayWorkingGroup_WorkerExited(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  67. // read event data
  68. const { workerId } = new GatewayWorkingGroup.WorkerExitedEvent(event).data
  69. // call generic processing
  70. await workingGroup_WorkerExited(db, event, WorkerType.GATEWAY, workerId)
  71. }
  72. export async function gatewayWorkingGroup_TerminatedLeader(db: DatabaseManager, event: SubstrateEvent): Promise<void> {
  73. // read event data
  74. const { workerId } = new GatewayWorkingGroup.TerminatedLeaderEvent(event).data
  75. // call generic processing
  76. await workingGroup_TerminatedLeader(db, event, WorkerType.GATEWAY, workerId)
  77. }
  78. /// ///////////////// Generic working group processing ///////////////////////////
  79. export async function workingGroup_OpeningFilled(
  80. db: DatabaseManager,
  81. workerType: WorkerType,
  82. applicationIdToWorkerIdMap: ApplicationIdToWorkerIdMap,
  83. event: SubstrateEvent
  84. ): Promise<void> {
  85. const workerIds = [...applicationIdToWorkerIdMap.values()]
  86. for (const workerId of workerIds) {
  87. await createWorker(db, workerId, workerType, event)
  88. }
  89. // emit log event
  90. logger.info('Workers have been created', { ids: workerIds.map((item) => item.toString()), workerType })
  91. }
  92. export async function workingGroup_WorkerStorageUpdated(
  93. db: DatabaseManager,
  94. workerType: WorkerType,
  95. workerId: WorkerId,
  96. newMetadata: Bytes
  97. ): Promise<void> {
  98. // load worker
  99. const worker = await db.get(Worker, {
  100. where: {
  101. workerId: workerId.toString(),
  102. type: workerType,
  103. } as FindConditions<Worker>,
  104. })
  105. // ensure worker exists
  106. if (!worker) {
  107. return inconsistentState('Non-existing worker update requested', workerId)
  108. }
  109. worker.metadata = convertBytesToString(newMetadata)
  110. await db.save<Worker>(worker)
  111. // emit log event
  112. logger.info('Worker has been updated', { workerId, workerType })
  113. }
  114. export async function workingGroup_TerminatedWorker(
  115. db: DatabaseManager,
  116. event: SubstrateEvent,
  117. workerType: WorkerType,
  118. workerId: WorkerId
  119. ): Promise<void> {
  120. // do removal logic
  121. await deactivateWorker(db, event, workerType, workerId)
  122. // emit log event
  123. logger.info('Worker has been removed (worker terminated)', { workerId, workerType })
  124. }
  125. export async function workingGroup_WorkerExited(
  126. db: DatabaseManager,
  127. event: SubstrateEvent,
  128. workerType: WorkerType,
  129. workerId: WorkerId
  130. ): Promise<void> {
  131. // do removal logic
  132. await deactivateWorker(db, event, workerType, workerId)
  133. // emit log event
  134. logger.info('Worker has been removed (worker exited)', { workerId, workerType })
  135. }
  136. export async function workingGroup_TerminatedLeader(
  137. db: DatabaseManager,
  138. event: SubstrateEvent,
  139. workerType: WorkerType,
  140. workerId: WorkerId
  141. ): Promise<void> {
  142. // do removal logic
  143. await deactivateWorker(db, event, workerType, workerId)
  144. // emit log event
  145. logger.info('Working group leader has been removed (worker exited)', { workerId, workerType })
  146. }
  147. /// ///////////////// Helpers ////////////////////////////////////////////////////
  148. async function createWorker(
  149. db: DatabaseManager,
  150. workerId: WorkerId,
  151. workerType: WorkerType,
  152. event: SubstrateEvent
  153. ): Promise<void> {
  154. // create entity
  155. const newWorker = new Worker({
  156. id: await getNextId(db),
  157. workerId: workerId.toString(),
  158. type: workerType,
  159. isActive: true,
  160. createdAt: new Date(fixBlockTimestamp(event.blockTimestamp).toNumber()),
  161. updatedAt: new Date(fixBlockTimestamp(event.blockTimestamp).toNumber()),
  162. })
  163. // save worker
  164. await db.save<Worker>(newWorker)
  165. }
  166. async function deactivateWorker(
  167. db: DatabaseManager,
  168. event: SubstrateEvent,
  169. workerType: WorkerType,
  170. workerId: WorkerId
  171. ) {
  172. // load worker
  173. const worker = await db.get(Worker, {
  174. where: {
  175. workerId: workerId.toString(),
  176. type: workerType,
  177. } as FindConditions<Worker>,
  178. })
  179. // ensure worker exists
  180. if (!worker) {
  181. return inconsistentState('Non-existing worker deletion requested', workerId)
  182. }
  183. // update worker
  184. worker.isActive = false
  185. // set last update time
  186. worker.updatedAt = new Date(fixBlockTimestamp(event.blockTimestamp).toNumber())
  187. // save worker
  188. await db.save<Worker>(worker)
  189. }