1
0

index.ts 19 KB


  1. import { Op } from 'sequelize'
  2. import {
  3. Account,
  4. Balance,
  5. Block,
  6. Category,
  7. Channel,
  8. Council,
  9. Consul,
  10. ConsulStake,
  11. Era,
  12. Event,
  13. Member,
  14. Post,
  15. Proposal,
  16. ProposalPost,
  17. ProposalVote,
  18. Thread,
  19. Moderation,
  20. } from '../db/models'
  21. import * as get from './lib/getters'
  22. //import {fetchReports} from './lib/github'
  23. import axios from 'axios'
  24. import moment from 'moment'
  25. import chalk from 'chalk'
  26. import { MemberId } from '@joystream/types/members'
  27. import { VoteKind } from '@joystream/types/proposals'
  28. import { Seats } from '@joystream/types/council'
  29. import { AccountInfo } from '@polkadot/types/interfaces/system'
  30. import {
  31. Api,
  32. Handles,
  33. IState,
  34. MemberType,
  35. CategoryType,
  36. ChannelType,
  37. PostType,
  38. Seat,
  39. ThreadType,
  40. CouncilType,
  41. ProposalDetail,
  42. Status,
  43. } from '../types'
  44. import {
  45. AccountId,
  46. Moment,
  47. ActiveEraInfo,
  48. EventRecord,
  49. } from '@polkadot/types/interfaces'
  50. import { Vec } from '@polkadot/types'
  51. // TODO fetch consts from db/chain
  52. const TERMDURATION = 144000
  53. const VOTINGDURATION = 57601
  54. const CYCLE = VOTINGDURATION + TERMDURATION
  55. const WORKERS = 3
  56. const DELAY = 100 // ms
  57. let lastUpdate = 0
  58. let queuedAll = false
  59. let queue: any[] = []
  60. let processing = ''
  61. let busy = 0
  62. const processNext = async () => {
  63. if (busy === WORKERS) return
  64. const task = queue.shift()
  65. if (!task) return
  66. busy++
  67. if (busy < WORKERS) setTimeout(processNext, DELAY)
  68. const result = await task()
  69. busy--
  70. setTimeout(processNext, DELAY)
  71. }
  72. const getBlockHash = (api: Api, blockId: number) => {
  73. //console.log(`getBlockHash`, blockId)
  74. return api.rpc.chain
  75. .getBlockHash(blockId)
  76. .then((array: any) => array.toHuman())
  77. }
  78. const getEraAtHash = (api: Api, hash: string) =>
  79. api.query.staking.activeEra
  80. .at(hash)
  81. .then((era: any) => era.unwrap().index.toNumber())
  82. const getEraAtBlock = async (api: Api, block: number) =>
  83. getEraAtHash(api, await getBlockHash(api, block))
  84. const getTimestamp = async (api: Api, hash?: string) => {
  85. const timestamp = hash
  86. ? await api.query.timestamp.now.at(hash)
  87. : await api.query.timestamp.now()
  88. return moment.utc(timestamp.toNumber()).valueOf()
  89. }
  90. const findCouncilAtBlock = (api: Api, block: number) =>
  91. Council.findOne({
  92. where: {
  93. start: { [Op.lte]: block },
  94. end: { [Op.gte]: block - VOTINGDURATION },
  95. },
  96. })
  97. export const addBlock = async (
  98. api: Api,
  99. io: any,
  100. header: { number: number; author: string },
  101. status: Status = {
  102. block: 0,
  103. era: 0,
  104. round: 0,
  105. members: 0,
  106. channels: 0,
  107. categories: 0,
  108. threads: 0,
  109. posts: 0,
  110. proposals: 0,
  111. proposalPosts: 0,
  112. }
  113. ): Promise<Status> => {
  114. const id = +header.number
  115. const exists = await Block.findByPk(id)
  116. if (exists) {
  117. console.error(`TODO handle fork`, String(header.author))
  118. return status
  119. }
  120. const block = await processBlock(api, id)
  121. const key = header.author?.toString()
  122. const [account] = await Account.findOrCreate({ where: { key } })
  123. await block.setValidator(account.key)
  124. io.emit('block', await Block.findByIdWithIncludes(id))
  125. const handle = await getHandleOrKey(api, key)
  126. console.log(`[Joystream] block ${id} ${handle} ${logStatus()}`)
  127. return id / 10 === Math.floor(id / 10)
  128. ? updateStatus(api, status, id)
  129. : status
  130. }
  131. const logStatus = () => {
  132. return queue.length
  133. ? chalk.green(` [${busy}/${queue.length}:${processing}]`)
  134. : '[no tasks]'
  135. }
  136. const processBlock = async (api: Api, id: number) => {
  137. const exists = await Block.findByPk(id)
  138. if (exists) return exists
  139. processing = `block ${id}`
  140. const last = await Block.findByPk(id - 1)
  141. let lastBlockTimestamp
  142. if (last) {
  143. if (last.timestamp) lastBlockTimestamp = last.timestamp.getTime()
  144. } else {
  145. let lastBlockHash = await getBlockHash(api, id - 1)
  146. lastBlockTimestamp = await getTimestamp(api, lastBlockHash)
  147. }
  148. const [block] = await Block.findOrCreate({ where: { id } })
  149. block.hash = await getBlockHash(api, id)
  150. let currentBlockTimestamp = await getTimestamp(api, block.hash)
  151. block.timestamp = new Date(currentBlockTimestamp)
  152. if (lastBlockTimestamp)
  153. block.blocktime = currentBlockTimestamp - lastBlockTimestamp
  154. block.save()
  155. processEvents(api, id, block.hash)
  156. importEraAtBlock(api, id, block.hash)
  157. processNext()
  158. return block
  159. }
  160. export const addBlockRange = async (
  161. api: Api,
  162. startBlock: number,
  163. endBlock: number
  164. ) => {
  165. for (let block = startBlock; block <= endBlock; block++)
  166. queue.push(() => processBlock(api, block))
  167. setInterval(() => {
  168. const status = logStatus()
  169. if (status === `[no tasks]`) process.exit()
  170. console.log(status)
  171. }, 1000)
  172. processNext()
  173. }
  174. const updateStatus = async (api: Api, old: Status, block: number) => {
  175. const status = {
  176. block,
  177. era: await getEraAtBlock(api, block),
  178. round: Number(await api.query.councilElection.round()),
  179. members: (await api.query.members.nextMemberId()) - 1,
  180. channels: 0, //await get.currentChannelId(api),
  181. categories: await get.currentCategoryId(api),
  182. threads: await get.currentThreadId(api),
  183. posts: await get.currentPostId(api),
  184. proposals: await get.proposalCount(api),
  185. proposalPosts: (await api.query.proposalsDiscussion.postCount()).toHuman(),
  186. }
  187. console.log(`updated status`)
  188. if (!queuedAll) fetchAll(api, status)
  189. else {
  190. fetchMember(api, status.members)
  191. fetchCategory(api, status.categories)
  192. fetchThread(api, status.threads)
  193. fetchPost(api, status.posts)
  194. const active = Proposal.findAll({
  195. where: { result: `Pending` },
  196. }).then((pp: { id: number }[]) =>
  197. pp.forEach((p: { id: number }) => {
  198. fetchProposal(api, p.id)
  199. ProposalVote.destroy({ where: { vote: `Reject` } })
  200. })
  201. )
  202. }
  203. return status
  204. }
  205. const fetchAll = async (api: Api, status: Status) => {
  206. queue.push(() => fetchProposal(api, status.proposals))
  207. queue.push(() => fetchAccounts(api, status.block))
  208. queue.push(() => fetchMember(api, status.members))
  209. queue.push(() => fetchProposalPosts(api, status.proposalPosts))
  210. queue.push(() => fetchCouncil(api, status.round, status.block))
  211. //queue.push(() => addBlockRange(api, 1, status.block))
  212. queuedAll = true
  213. }
  214. const processEvents = async (api: Api, blockId: number, hash: string) => {
  215. processing = `events block ${blockId}`
  216. console.log(processing)
  217. try {
  218. const blockEvents = await api.query.system.events.at(hash)
  219. blockEvents.forEach(({ event }: EventRecord) => {
  220. let { section, method, data } = event
  221. Event.create({ blockId, section, method, data: JSON.stringify(data) })
  222. })
  223. } catch (e) {
  224. console.log(`failed to fetch events for block ${blockId} ${hash}`)
  225. }
  226. // TODO catch votes, posts, proposals?
  227. }
  228. const fetchValidators = async (api: Api, hash: string) =>
  229. api.query.staking.snapshotValidators.at(hash)
  230. const importEraAtBlock = async (api: Api, blockId: number, hash: string) => {
  231. const id = await getEraAtHash(api, hash)
  232. const [era] = await Era.findOrCreate({ where: { id } })
  233. era.addBlock(blockId)
  234. if (era.active) return
  235. processing = `era ${id}`
  236. try {
  237. fetchValidators(api, hash).then(async (snapshot: any) => {
  238. if (snapshot.isEmpty) return
  239. console.log(`[Joystream] Found validator info for era ${id}`)
  240. const validatorCount = snapshot.unwrap().length
  241. era.slots = (await api.query.staking.validatorCount.at(hash)).toNumber()
  242. era.active = Math.min(era.slots, validatorCount)
  243. era.waiting = validatorCount > era.slots ? validatorCount - era.slots : 0
  244. era.stake = await api.query.staking.erasTotalStake.at(hash, id)
  245. const chainTimestamp = (await api.query.timestamp.now.at(hash)) as Moment
  246. era.timestamp = moment(chainTimestamp.toNumber())
  247. // era.update({ slots, active, waiting, stake, timestamp })
  248. era.blockId = id
  249. era.save()
  250. updateBalances(api, hash)
  251. })
  252. } catch (e) {
  253. console.error(`import era ${blockId} ${hash}`, e)
  254. }
  255. }
  256. const validatorStatus = async (api: Api, blockId: number) => {
  257. const hash = await getBlockHash(api, blockId)
  258. let totalValidators = await api.query.staking.snapshotValidators.at(hash)
  259. if (totalValidators.isEmpty) return
  260. let totalNrValidators = totalValidators.unwrap().length
  261. const maxSlots = Number(await api.query.staking.validatorCount.at(hash))
  262. const actives = Math.min(maxSlots, totalNrValidators)
  263. const waiting =
  264. totalNrValidators > maxSlots ? totalNrValidators - maxSlots : 0
  265. let timestamp = await api.query.timestamp.now.at(hash)
  266. const date = moment(timestamp.toNumber()).valueOf()
  267. return { blockId, actives, waiting, maxSlots, date }
  268. }
  269. const updateBalances = async (api: Api, blockHash: string) => {
  270. const currentEra: number = await api.query.staking.currentEra.at(blockHash)
  271. const era = await Era.findOrCreate({ where: { id: currentEra } })
  272. try {
  273. processing = `balances ${era}`
  274. Account.findAll().then(async (account: any) => {
  275. const { key } = account
  276. if (!key) return
  277. console.log(`updating balance of`, key, key)
  278. const { data } = await getAccountAtBlock(api, blockHash, key)
  279. const { free, reserved, miscFrozen, feeFrozen } = data
  280. const balance = { available: free, reserved, frozen: miscFrozen }
  281. console.log(`balance ${era}`, balance)
  282. Balance.create(balance).then((balance: any) => {
  283. balance.setAccount(key)
  284. balance.setEra(era.id)
  285. console.log(`balance`, era.id, key, balance.available)
  286. })
  287. })
  288. } catch (e) {
  289. console.error(`balances era ${era}`)
  290. }
  291. }
  292. const fetchTokenomics = async () => {
  293. console.debug(`Updating tokenomics`)
  294. const { data } = await axios.get('https://status.joystream.org/status')
  295. if (!data) return
  296. // TODO save 'tokenomics', data
  297. }
  298. const fetchCategory = async (api: Api, id: number) => {
  299. if (id <= 0) return
  300. queue.push(() => fetchCategory(api, id - 1))
  301. const exists = await Category.findByPk(+id)
  302. if (exists) return exists
  303. processing = `category ${id}`
  304. console.log(processing)
  305. const data = await api.query.forum.categoryById(id)
  306. const { title, description, deleted, archived } = data
  307. const category = await Category.create({
  308. id,
  309. title,
  310. threadId: +data.thread_id, // TODO needed?
  311. description,
  312. createdAt: +data.created_at.block,
  313. deleted,
  314. archived,
  315. subcategories: Number(data.num_direct_subcategories),
  316. moderatedThreads: Number(data.num_direct_moderated_threads),
  317. unmoderatedThreads: Number(data.num_direct_unmoderated_threads),
  318. //position:+data.position_in_parent_category // TODO sometimes NaN,
  319. })
  320. createModeration(api, { categoryId: id }, String(data.moderator_id), category)
  321. return category
  322. }
  323. const fetchPost = async (api: Api, id: number) => {
  324. if (id <= 0) return
  325. queue.push(() => fetchPost(api, id - 1))
  326. const exists = await Post.findByPk(id)
  327. if (exists) return exists
  328. processing = `post ${id}`
  329. console.log(processing)
  330. const data = await api.query.forum.postById(id)
  331. const author: string = String(data.author_id)
  332. const member = await fetchMemberByAccount(api, author)
  333. const authorId = member ? member.id : null
  334. const threadId = Number(data.thread_id)
  335. const thread = await fetchThread(api, threadId)
  336. const text = data.current_text
  337. const history = data.text_change_history // TODO needed?
  338. const createdAt = data.created_at.block
  339. const post = await Post.create({ id, authorId, text, createdAt, threadId })
  340. if (data.moderation)
  341. createModeration(api, { postId: id }, data.moderation, post)
  342. return post
  343. }
  344. const createModeration = async (
  345. api: Api,
  346. where: {},
  347. key: string,
  348. object: { setModeration: (id: number) => {} }
  349. ) => {
  350. if (key === '') return
  351. return // TODO
  352. await Account.findOrCreate({ where: { key } })
  353. const moderation = await Moderation.create({ moderatorKey: key })
  354. object.setModeration(moderation.id)
  355. return moderation
  356. }
  357. const fetchThread = async (api: Api, id: number) => {
  358. if (id <= 0) return
  359. const exists = await Thread.findByPk(id)
  360. if (exists) return exists
  361. processing = `thread ${id}`
  362. console.log(processing)
  363. const data = await api.query.forum.threadById(id)
  364. const { title, moderation, nr_in_category } = data
  365. const account = String(data.author_id)
  366. const t = {
  367. id,
  368. title,
  369. nrInCategory: +nr_in_category,
  370. createdAt: +data.created_at.block,
  371. }
  372. const thread = await Thread.create(t)
  373. const category = await fetchCategory(api, +data.category_id)
  374. if (category) thread.setCategory(category.id)
  375. const author = await fetchMemberByAccount(api, account)
  376. if (author) thread.setCreator(author.id)
  377. if (moderation) {
  378. const { moderated_at, moderator_id, rationale } = moderation
  379. const created = moderated_at.block
  380. const createdAt = moment.utc(moderated_at.time)
  381. createModeration(
  382. api,
  383. { created, createdAt, rationale },
  384. moderator_id.toHuman(),
  385. thread
  386. )
  387. }
  388. return thread
  389. }
  390. const fetchCouncil = async (api: Api, round: number, head: number) => {
  391. if (round <= 0) return console.log(chalk.red(`[fetchCouncil] round:${round}`))
  392. queue.push(() => fetchCouncil(api, round - 1, head))
  393. const exists = await Council.findByPk(round)
  394. if (exists) return exists
  395. processing = `council ${round}`
  396. console.log(processing)
  397. const start = 57601 + (round - 1) * CYCLE
  398. const end = start + TERMDURATION
  399. let council = { round, start, end, startDate: 0, endDate: 0 }
  400. let seats: Seats
  401. if (start > head || end > head) return
  402. try {
  403. const startHash = await getBlockHash(api, start)
  404. console.log(`start`, start, `hash`, startHash)
  405. council.startDate = await getTimestamp(api, startHash)
  406. seats = await api.query.council.activeCouncil.at(startHash)
  407. } catch (e) {
  408. return console.log(`council term ${round} lies in the future ${start}`)
  409. }
  410. try {
  411. const endHash = await getBlockHash(api, end)
  412. council.endDate = await getTimestamp(api, endHash)
  413. } catch (e) {
  414. console.warn(`end of council term ${round} lies in the future ${end}`)
  415. }
  416. try {
  417. Council.create(council).then(({ round }: any) =>
  418. seats.map(({ member, stake, backers }) =>
  419. fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
  420. Consul.create({
  421. stake: Number(stake),
  422. councilRound: round,
  423. memberId: id,
  424. }).then((consul: any) =>
  425. backers.map(async ({ member, stake }) =>
  426. fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
  427. ConsulStake.create({
  428. stake: Number(stake),
  429. consulId: consul.id,
  430. memberId: id,
  431. })
  432. )
  433. )
  434. )
  435. )
  436. )
  437. )
  438. } catch (e) {
  439. console.error(`Failed to save council ${round}`, e)
  440. }
  441. }
  442. const fetchProposal = async (api: Api, id: number) => {
  443. if (id <= 0) return
  444. queue.push(() => fetchProposal(api, id - 1))
  445. const exists = await Proposal.findByIdWithIncludes(id)
  446. if (exists && exists.result !== `Pending`) {
  447. //if (!exists.votes.length) queue.push(() => fetchProposalVotes(api, id))
  448. return exists
  449. }
  450. processing = `proposal ${id}`
  451. const proposal = await get.proposalDetail(api, id)
  452. console.log(`proposal ${id}: ${proposal.result}`)
  453. await fetchMember(api, proposal.authorId)
  454. queue.push(() => fetchProposalVotes(api, id))
  455. console.log(`createdAt`, proposal.createdAt)
  456. if (exists) Proposal.update(proposal, { where: { id } })
  457. else Proposal.create(proposal)
  458. return proposal
  459. }
  460. const fetchProposalPost = (api: Api, threadId: number, postId: number) =>
  461. api.query.proposalsDiscussion.postThreadIdByPostId(threadId, postId)
  462. const fetchProposalPosts = async (api: Api, posts: number) => {
  463. const threads = (await api.query.proposalsDiscussion.threadCount()).toNumber()
  464. let proposalId = 1
  465. for (let id = 1; id <= posts && proposalId <= threads; ) {
  466. const exists = await ProposalPost.findByPk(id)
  467. if (exists) {
  468. id++
  469. proposalId = 1
  470. continue
  471. }
  472. processing = `proposal post ${id}/${posts} ${proposalId}/${threads}`
  473. console.log(processing)
  474. const post = await fetchProposalPost(api, proposalId, id)
  475. if (!post.text.length) {
  476. proposalId++
  477. continue
  478. }
  479. let proposal = await Proposal.findByPk(proposalId)
  480. if (!proposal) proposal = await fetchProposal(api, proposalId)
  481. if (!proposal) {
  482. console.warn(`[fetchProposalPosts] proposal ${proposalId} not found.`)
  483. id++
  484. continue
  485. }
  486. ProposalPost.create({
  487. id,
  488. text: post.text.toHuman(),
  489. created: Number(post.created_at),
  490. updated: Number(post.updated_at),
  491. edition: Number(post.edition_number),
  492. authorId: Number(post.author_id),
  493. }).then((p: any) => proposal.addPost(p))
  494. id++
  495. proposalId = 1
  496. }
  497. }
  498. const fetchProposalVotes = async (api: Api, id: number) => {
  499. const proposal = await Proposal.findByPk(id)
  500. if (!proposal) return
  501. processing = `votes proposal ${proposal.id}`
  502. console.log(processing)
  503. // find council for creation and finalization time
  504. const { createdAt, finalizedAt } = proposal
  505. const start = createdAt ? await findCouncilAtBlock(api, createdAt) : null
  506. if (start) start.addProposal(proposal.id)
  507. else return
  508. // some proposals make it into a second term
  509. const end = finalizedAt ? await findCouncilAtBlock(api, finalizedAt) : null
  510. const councils = [start && start.round, end && end.round]
  511. // fetch votes
  512. const entries = await api.query.proposalsEngine.voteExistsByProposalByVoter.entries(
  513. id
  514. )
  515. // find memberId and consulId for each vote
  516. entries.forEach(async (e: any) => {
  517. const memberId = Number(e[0].args[1])
  518. const vote = String(e[1])
  519. const consul = await Consul.findOne({
  520. where: { memberId, councilRound: { [Op.or]: councils } },
  521. })
  522. if (consul) {
  523. const v = { vote, proposalId: id, consulId: consul.id, memberId }
  524. const exists = await ProposalVote.findOne({
  525. where: { proposalId: id, consulId: consul.id, memberId },
  526. })
  527. if (exists) return
  528. console.log(v)
  529. ProposalVote.create(v)
  530. }
  531. })
  532. }
  533. // accounts
  534. const getHandleOrKey = async (api: Api, key: string) => {
  535. const member = await fetchMemberByAccount(api, key)
  536. return member ? member.handle : key //abbrKey(key)
  537. }
  538. const abbrKey = (key: string) =>
  539. `${key.slice(0, 5)}..${key.slice(key.length - 5)}`
  540. const getAccountAtBlock = (
  541. api: Api,
  542. hash: string,
  543. account: string
  544. ): Promise<AccountInfo> => api.query.system.account.at(hash, account)
  545. const fetchAccounts = async (api: Api, blockId: number) => {
  546. processing = `accounts`
  547. api.query.system.account
  548. .entries()
  549. .then((account: any) =>
  550. Account.findOrCreate({ where: { key: account[0][0].toHuman()[0] } })
  551. )
  552. }
  553. const fetchMemberByAccount = async (api: Api, rootKey: string) => {
  554. const member = await Member.findOne({ where: { rootKey } })
  555. if (member) return member
  556. const id = Number(await get.memberIdByAccount(api, rootKey))
  557. if (id) return fetchMember(api, id)
  558. else Account.findOrCreate({ where: { key: rootKey } })
  559. }
  560. const fetchMember = async (
  561. api: Api,
  562. id: number
  563. ): Promise<MemberType | undefined> => {
  564. if (id <= 0) return
  565. queue.push(() => fetchMember(api, id - 1))
  566. const exists = await Member.findByPk(+id)
  567. if (exists) return exists
  568. processing = `member ${id}`
  569. const membership = await get.membership(api, id)
  570. const about = String(membership.about)
  571. const handle = String(membership.handle)
  572. const createdAt = +membership.registered_at_block
  573. const rootKey = String(membership.root_account)
  574. return Member.create({ id, about, createdAt, handle, rootKey }).then(
  575. (member: any) => {
  576. Account.findOrCreate({ where: { key: rootKey } }).then(([account]: any) =>
  577. account.setMember(id)
  578. )
  579. return member
  580. }
  581. )
  582. }