index.ts 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import {
  2. Account,
  3. Block,
  4. Era,
  5. Event,
  6. ValidatorStats
  7. } from '../db/models'
  8. import moment from 'moment'
  9. import chalk from 'chalk'
  10. import { HeaderExtended } from '@polkadot/api-derive/type/types';
  11. import {
  12. Api,
  13. } from '../types'
  14. import {
  15. AccountId,
  16. Moment,
  17. EventRecord,
  18. BlockHash,
  19. } from '@polkadot/types/interfaces'
  20. import { Vec } from '@polkadot/types'
  21. import { ApiPromise } from '@polkadot/api';
  22. let queue: any[] = []
  23. let processing = ''
  24. export const processNext = async () => {
  25. const task = queue.shift()
  26. if (!task) return
  27. await task()
  28. processNext()
  29. }
  30. const accounts = new Map<string, any>()
  31. const getBlockHash = (api: ApiPromise, blockId: number) =>
  32. api.rpc.chain.getBlockHash(blockId).then((hash: BlockHash) => hash.toString())
  33. const getEraAtHash = (api: ApiPromise, hash: string) =>
  34. api.query.staking.activeEra
  35. .at(hash)
  36. .then((era) => era.unwrap().index.toNumber())
  37. const getAccount = async (address: string) => {
  38. if (accounts.get(address)) {
  39. return accounts.get(address)
  40. } else {
  41. const account = (await Account.findOrCreate({where: {key : address}}))[0].get({plain: true})
  42. accounts.set(address, account)
  43. return account
  44. }
  45. }
  46. const getEraAtBlock = async (api: ApiPromise, block: number) =>
  47. getEraAtHash(api, await getBlockHash(api, block))
  48. const getTimestamp = async (api: ApiPromise, hash?: string) => {
  49. const timestamp = hash
  50. ? await api.query.timestamp.now.at(hash)
  51. : await api.query.timestamp.now()
  52. return moment.utc(timestamp.toNumber()).valueOf()
  53. }
  54. export const addBlock = async (
  55. api: ApiPromise,
  56. header: { number: number; author: string }
  57. ) => {
  58. const id = +header.number
  59. const exists = await Block.findByPk(id)
  60. if (exists) {
  61. console.error(`TODO handle fork`, String(header.author))
  62. }
  63. await processBlock(api, id)
  64. // logging
  65. //const handle = await getHandleOrKey(api, key)
  66. const q = queue.length ? chalk.green(` [${queue.length}:${processing}]`) : ''
  67. console.log(`[Joystream] block ${id} ${q}`)
  68. }
  69. const processBlock = async (api: ApiPromise, id: number) => {
  70. const exists = (await Block.findByPk(id))
  71. if (exists) return exists.get({plain: true})
  72. processing = `block ${id}`
  73. console.log(processing)
  74. const previousBlockModel = (await Block.findByPk(id - 1))
  75. let lastBlockTimestamp = 0
  76. let lastBlockHash = ''
  77. let lastEraId = 0
  78. if (previousBlockModel) {
  79. const previousDbBlock = previousBlockModel.get({plain: true})
  80. lastBlockTimestamp = previousDbBlock.timestamp.getTime();
  81. lastBlockHash = previousDbBlock.hash
  82. lastEraId = previousDbBlock.eraId
  83. } else {
  84. lastBlockHash = await getBlockHash(api, id - 1);
  85. lastBlockTimestamp = await getTimestamp(api, lastBlockHash);
  86. lastEraId = await getEraAtHash(api, lastBlockHash)
  87. }
  88. const hash = await getBlockHash(api, id)
  89. const currentBlockTimestamp = await getTimestamp(api, hash)
  90. const extendedHeader = await api.derive.chain.getHeader(hash) as HeaderExtended
  91. const eraId = await getEraAtHash(api, hash)
  92. let chainTime
  93. if(eraId - lastEraId === 1) {
  94. console.log('This block marks the start of new era. Updating the previous era stats')
  95. const {total, individual} = await api.query.staking.erasRewardPoints.at(lastBlockHash, lastEraId)
  96. const slots = (await api.query.staking.validatorCount.at(lastBlockHash)).toNumber()
  97. const newEraTime = (await api.query.timestamp.now.at(hash)) as Moment
  98. chainTime = moment(newEraTime.toNumber())
  99. await Era.upsert({ // update stats for previous era
  100. id: lastEraId,
  101. slots: slots,
  102. stake: await api.query.staking.erasTotalStake.at(hash, lastEraId),
  103. eraPoints: total
  104. })
  105. const validatorStats = await ValidatorStats.findAll({where: {eraId: lastEraId}, include: [Account]})
  106. for (let stat of validatorStats) {
  107. const validatorStats = stat.get({plain: true})
  108. const validatorAccount = validatorStats.account.key
  109. console.log(validatorAccount)
  110. let pointVal = 0;
  111. for(const [key, value] of individual.entries()) {
  112. if(key == validatorAccount) {
  113. pointVal = value.toNumber()
  114. break
  115. }
  116. }
  117. const {total, own} = await api.query.staking.erasStakers.at(lastBlockHash, lastEraId, validatorAccount)
  118. ValidatorStats.upsert({
  119. eraId: lastEraId,
  120. accountId: validatorStats.accountId,
  121. stake_own: own,
  122. stake_total: total,
  123. points: pointVal,
  124. commission: (await api.query.staking.erasValidatorPrefs.at(lastBlockHash, eraId, validatorAccount)).commission.toNumber() / 10000000
  125. })
  126. }
  127. }
  128. const [era, created] = await Era.upsert({ // add the new are with just a timestamp of its first block
  129. id: eraId,
  130. timestamp: chainTime
  131. }, {returning: true})
  132. const block = await Block.upsert({
  133. id: id,
  134. hash: hash,
  135. timestamp: moment.utc(currentBlockTimestamp).toDate(),
  136. blocktime: (currentBlockTimestamp - lastBlockTimestamp),
  137. eraId: era.get({plain: true}).id,
  138. validatorId: (await getAccount(extendedHeader.author.toHuman())).id
  139. }, {returning: true})
  140. await importEraAtBlock(api, id, hash, era)
  141. processEvents(api, id, eraId, hash)
  142. return block
  143. }
  144. export const addBlockRange = async (
  145. api: ApiPromise,
  146. startBlock: number,
  147. endBlock: number
  148. ) => {
  149. for (let block = startBlock; block <= endBlock; block++) {
  150. queue.push(() => processBlock(api, block))
  151. }
  152. }
  153. const processEvents = async (api: ApiPromise, blockId: number, eraId: number, hash: string) => {
  154. processing = `events block ${blockId}`
  155. try {
  156. const blockEvents = await api.query.system.events.at(hash)
  157. blockEvents.forEach(async ({ event }: EventRecord) => {
  158. let { section, method, data } = event
  159. if(section == 'staking' && method == 'Reward') {
  160. const addressCredited = data[0].toString()
  161. await Event.create({ blockId, section, method, data: JSON.stringify(data) })
  162. Account.findOne(
  163. {
  164. where: {
  165. key: addressCredited
  166. }
  167. }
  168. ).then(async (beneficiaryAccount: Account) => {
  169. let address = ''
  170. if (beneficiaryAccount == null) {
  171. address = (await Account.create({key: addressCredited}, {returning: true})).get({plain: true}).id
  172. } else {
  173. address = beneficiaryAccount.get({plain: true}).id
  174. }
  175. await ValidatorStats.upsert(
  176. {
  177. accountId: address,
  178. eraId: eraId,
  179. rewards: Number(data[1])
  180. }
  181. )
  182. })
  183. }
  184. })
  185. } catch (e) {
  186. console.log(`failed to fetch events for block ${blockId} ${hash}`)
  187. }
  188. }
  189. const importEraAtBlock = async (api: Api, blockId: number, hash: string, eraModel: Era) => {
  190. const era = eraModel.get({plain: true})
  191. if (era.active) return
  192. const id = era.id
  193. processing = `era ${id}`
  194. try {
  195. const snapshotValidators = await api.query.staking.snapshotValidators.at(hash);
  196. if (!snapshotValidators.isEmpty) {
  197. console.log(`[Joystream] Found validator info for era ${id}`)
  198. const validators = snapshotValidators.unwrap() as Vec<AccountId>;
  199. const validatorCount = validators.length
  200. for (let validator of validators) {
  201. // create stub records, which will be populated with stats on the first block of the next era
  202. ValidatorStats.upsert({
  203. eraId: id,
  204. accountId: (await getAccount(validator.toHuman())).id,
  205. })
  206. }
  207. const slots = (await api.query.staking.validatorCount.at(hash)).toNumber()
  208. await Era.upsert({
  209. id: id,
  210. allValidators: validatorCount,
  211. waitingValidators: validatorCount > slots ? validatorCount - slots : 0,
  212. })
  213. }
  214. const snapshotNominators = await api.query.staking.snapshotNominators.at(hash);
  215. if (!snapshotNominators.isEmpty) {
  216. const nominators = snapshotNominators.unwrap() as Vec<AccountId>;
  217. await Era.upsert({
  218. id: id,
  219. nominators: nominators.length
  220. })
  221. }
  222. return id;
  223. } catch (e) {
  224. console.error(`import era ${blockId} ${hash}`, e)
  225. }
  226. }
  227. module.exports = { addBlock, addBlockRange, processNext }