workingGroup.ts 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import { EventContext, StoreContext, DatabaseManager, SubstrateEvent } from '@joystream/hydra-common'
  2. import { bytesToString, inconsistentState, logger } from './common'
  3. import { Worker, WorkerType } from 'query-node/dist/model'
  4. import { StorageWorkingGroup } from './generated/types'
  5. import { WorkerId } from '@joystream/types/augment'
  6. export async function workingGroup_OpeningFilled({ event, store }: EventContext & StoreContext): Promise<void> {
  7. const workerType = getWorkerType(event)
  8. if (!workerType) {
  9. return
  10. }
  11. const [, applicationIdToWorkerIdMap] = new StorageWorkingGroup.OpeningFilledEvent(event).params
  12. const workerIds = [...applicationIdToWorkerIdMap.values()]
  13. for (const workerId of workerIds) {
  14. await createWorker(store, workerId, workerType, event)
  15. }
  16. // emit log event
  17. logger.info('Workers have been created', { ids: workerIds.map((item) => item.toString()), workerType })
  18. }
  19. export async function workingGroup_WorkerStorageUpdated({ event, store }: EventContext & StoreContext): Promise<void> {
  20. const workerType = getWorkerType(event)
  21. if (!workerType) {
  22. return
  23. }
  24. const [workerId, newMetadata] = new StorageWorkingGroup.WorkerStorageUpdatedEvent(event).params
  25. // load worker
  26. const worker = await store.get(Worker, {
  27. where: {
  28. workerId: workerId.toString(),
  29. type: workerType,
  30. },
  31. })
  32. // ensure worker exists
  33. if (!worker) {
  34. return inconsistentState('Non-existing worker update requested', workerId)
  35. }
  36. worker.metadata = bytesToString(newMetadata)
  37. await store.save<Worker>(worker)
  38. // emit log event
  39. logger.info('Worker has been updated', { workerId, workerType })
  40. }
  41. export async function workingGroup_TerminatedWorker({ event, store }: EventContext & StoreContext): Promise<void> {
  42. const workerType = getWorkerType(event)
  43. if (!workerType) {
  44. return
  45. }
  46. const [workerId] = new StorageWorkingGroup.TerminatedWorkerEvent(event).params
  47. // do removal logic
  48. await deactivateWorker(store, event, workerType, workerId)
  49. // emit log event
  50. logger.info('Worker has been removed (worker terminated)', { workerId, workerType })
  51. }
  52. export async function workingGroup_WorkerExited({ event, store }: EventContext & StoreContext): Promise<void> {
  53. const workerType = getWorkerType(event)
  54. if (!workerType) {
  55. return
  56. }
  57. const [workerId] = new StorageWorkingGroup.WorkerExitedEvent(event).params
  58. // do removal logic
  59. await deactivateWorker(store, event, workerType, workerId)
  60. // emit log event
  61. logger.info('Worker has been removed (worker exited)', { workerId, workerType })
  62. }
  63. export async function workingGroup_TerminatedLeader({ event, store }: EventContext & StoreContext): Promise<void> {
  64. const workerType = getWorkerType(event)
  65. if (!workerType) {
  66. return
  67. }
  68. const [workerId] = new StorageWorkingGroup.WorkerExitedEvent(event).params
  69. // do removal logic
  70. await deactivateWorker(store, event, workerType, workerId)
  71. // emit log event
  72. logger.info('Working group leader has been removed (worker exited)', { workerId, workerType })
  73. }
  74. /// ///////////////// Helpers ////////////////////////////////////////////////////
  75. function getWorkerType(event: SubstrateEvent): WorkerType | null {
  76. if (event.section === 'storageWorkingGroup') {
  77. return WorkerType.STORAGE
  78. }
  79. if (event.section === 'gatewayWorkingGroup') {
  80. return WorkerType.GATEWAY
  81. }
  82. return null
  83. }
  84. async function createWorker(
  85. db: DatabaseManager,
  86. workerId: WorkerId,
  87. workerType: WorkerType,
  88. event: SubstrateEvent
  89. ): Promise<void> {
  90. // create entity
  91. const newWorker = new Worker({
  92. id: `${workerType}-${workerId.toString()}`,
  93. workerId: workerId.toString(),
  94. type: workerType,
  95. isActive: true,
  96. createdAt: new Date(event.blockTimestamp),
  97. updatedAt: new Date(event.blockTimestamp),
  98. })
  99. // save worker
  100. await db.save<Worker>(newWorker)
  101. }
  102. async function deactivateWorker(
  103. db: DatabaseManager,
  104. event: SubstrateEvent,
  105. workerType: WorkerType,
  106. workerId: WorkerId
  107. ) {
  108. // load worker
  109. const worker = await db.get(Worker, {
  110. where: {
  111. workerId: workerId.toString(),
  112. type: workerType,
  113. },
  114. })
  115. // ensure worker exists
  116. if (!worker) {
  117. return inconsistentState('Non-existing worker deletion requested', workerId)
  118. }
  119. // update worker
  120. worker.isActive = false
  121. // set last update time
  122. worker.updatedAt = new Date(event.blockTimestamp)
  123. // save worker
  124. await db.save<Worker>(worker)
  125. }