index.ts 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. import { Op } from 'sequelize'
  2. import {
  3. Account,
  4. Balance,
  5. Block,
  6. Category,
  7. Channel,
  8. Council,
  9. Consul,
  10. ConsulStake,
  11. Era,
  12. Event,
  13. Member,
  14. Post,
  15. Proposal,
  16. ProposalPost,
  17. ProposalVote,
  18. Thread,
  19. Moderation,
  20. } from '../db/models'
  21. import * as get from './lib/getters'
  22. //import {fetchReports} from './lib/github'
  23. import axios from 'axios'
  24. import moment from 'moment'
  25. import chalk from 'chalk'
  26. import { VoteKind } from '@joystream/types/proposals'
  27. import { Seats } from '@joystream/types/council'
  28. import { AccountInfo } from '@polkadot/types/interfaces/system'
  29. import {
  30. Api,
  31. Handles,
  32. IState,
  33. MemberType,
  34. CategoryType,
  35. ChannelType,
  36. PostType,
  37. Seat,
  38. ThreadType,
  39. CouncilType,
  40. ProposalDetail,
  41. Status,
  42. } from '../types'
  43. import {
  44. AccountId,
  45. Moment,
  46. ActiveEraInfo,
  47. EventRecord,
  48. } from '@polkadot/types/interfaces'
  49. import Option from '@polkadot/types/codec/Option'
  50. import { Vec } from '@polkadot/types'
  51. // TODO fetch consts from db/chain
  52. const TERMDURATION = 144000
  53. const VOTINGDURATION = 57601
  54. const CYCLE = VOTINGDURATION + TERMDURATION
  55. const DELAY = 0 // ms
  56. let lastUpdate = 0
  57. let queuedAll = false
  58. let queue: any[] = []
  59. let processing = ''
  60. let busy = false
  61. const processNext = async () => {
  62. if (busy) return
  63. const task = queue.shift()
  64. if (!task) return
  65. const result = await task()
  66. busy = false
  67. setTimeout(() => processNext(), DELAY)
  68. }
  69. const getBlockHash = (api: Api, blockId: number) =>
  70. api.rpc.chain.getBlockHash(blockId).then((array: any) => array.toHuman())
  71. const getEraAtHash = (api: Api, hash: string) =>
  72. api.query.staking.activeEra
  73. .at(hash)
  74. .then((era: Option<ActiveEraInfo>) => era.unwrap().index.toNumber())
  75. const getEraAtBlock = async (api: Api, block: number) =>
  76. getEraAtHash(api, await getBlockHash(api, block))
  77. const getTimestamp = async (api: Api, hash?: string) => {
  78. const timestamp = hash
  79. ? await api.query.timestamp.now.at(hash)
  80. : await api.query.timestamp.now()
  81. return moment.utc(timestamp.toNumber()).valueOf()
  82. }
  83. const findCouncilAtBlock = (api: Api, block: number) =>
  84. Council.findOne({
  85. where: {
  86. start: { [Op.lte]: block },
  87. end: { [Op.gte]: block - VOTINGDURATION },
  88. },
  89. })
  90. const addBlock = async (
  91. api: Api,
  92. io: any,
  93. header: { number: number; author: string },
  94. status: Status
  95. ): Promise<Status> => {
  96. const id = +header.number
  97. const exists = await Block.findByPk(id)
  98. if (exists) {
  99. console.error(`TODO handle fork`, String(header.author))
  100. return status
  101. }
  102. const block = await processBlock(api, id)
  103. const key = header.author?.toString()
  104. const [account] = await Account.findOrCreate({ where: { key } })
  105. await block.setValidator(account.key)
  106. //account.addBlock(block.id) // TODO needed?
  107. io.emit('block', await Block.findByIdWithIncludes(id))
  108. // logging
  109. const handle = await getHandleOrKey(api, key)
  110. const q = queue.length ? chalk.green(` [${queue.length}:${processing}]`) : ''
  111. console.log(`[Joystream] block ${id} ${handle} ${q}`)
  112. return updateStatus(api, status, id)
  113. }
  114. const processBlock = async (api: Api, id: number) => {
  115. const exists = await Block.findByPk(id)
  116. if (exists) return exists
  117. processing = `block ${id}`
  118. const last = await Block.findByPk(id - 1)
  119. const [block] = await Block.findOrCreate({ where: { id } })
  120. block.hash = await getBlockHash(api, id)
  121. block.timestamp = await getTimestamp(api, block.hash)
  122. block.blocktime =
  123. last && last.timestamp > 0 ? block.timestamp - last.timestamp : 6000
  124. block.save()
  125. processEvents(api, id, block.hash)
  126. await importEraAtBlock(api, id, block.hash)
  127. return block
  128. }
  129. const addBlockRange = async (
  130. api: Api,
  131. startBlock: number,
  132. endBlock: number
  133. ) => {
  134. for (let block = startBlock; block <= endBlock; block++)
  135. queue.push(() => processBlock(api, block))
  136. }
  137. const updateStatus = async (api: Api, old: Status, block: number) => {
  138. const status = {
  139. block,
  140. era: await getEraAtBlock(api, block),
  141. round: Number(await api.query.councilElection.round()),
  142. members: (await api.query.members.nextMemberId()) - 1,
  143. channels: await get.currentChannelId(api),
  144. categories: await get.currentCategoryId(api),
  145. threads: await get.currentThreadId(api),
  146. posts: await get.currentPostId(api),
  147. proposals: await get.proposalCount(api),
  148. proposalPosts: (await api.query.proposalsDiscussion.postCount()).toHuman(),
  149. }
  150. if (!queuedAll) fetchAll(api, status)
  151. else {
  152. // TODO catch if more than one are added
  153. status.members > old.members && fetchMember(api, status.members)
  154. status.posts > old.posts && fetchPost(api, status.posts)
  155. status.proposals > old.proposals && fetchProposal(api, status.proposals)
  156. status.channels > old.channels && fetchChannel(api, status.channels)
  157. status.categories > old.categories && fetchCategory(api, status.categories)
  158. status.proposalPosts > old.proposalPosts &&
  159. fetchProposalPosts(api, status.proposalPosts)
  160. }
  161. return status
  162. }
  163. const fetchAll = async (api: Api, status: Status) => {
  164. queue.push(() => fetchAccounts(api, status.block))
  165. for (let id = status.members; id > 0; id--) {
  166. queue.push(() => fetchMember(api, id))
  167. }
  168. for (let id = status.round; id > 0; id--) {
  169. queue.push(() => fetchCouncil(api, id))
  170. }
  171. for (let id = status.proposals; id > 0; id--) {
  172. queue.push(() => fetchProposal(api, id))
  173. }
  174. for (let id = status.channels; id > 0; id--) {
  175. queue.push(() => fetchChannel(api, id))
  176. }
  177. for (let id = status.categories; id > 0; id--) {
  178. queue.push(() => fetchCategory(api, id))
  179. }
  180. for (let id = status.threads; id > 0; id--) {
  181. queue.push(() => fetchThread(api, id))
  182. }
  183. for (let id = status.posts; id > 0; id--) {
  184. queue.push(() => fetchPost(api, id))
  185. }
  186. queue.push(() => fetchProposalPosts(api, status.proposalPosts))
  187. queue.push(() => addBlockRange(api, 0, status.block))
  188. queuedAll = true
  189. processNext()
  190. }
  191. const processEvents = async (api: Api, blockId: number, hash: string) => {
  192. processing = `events block ${blockId}`
  193. try {
  194. const blockEvents = await api.query.system.events.at(hash)
  195. blockEvents.forEach(({ event }: EventRecord) => {
  196. let { section, method, data } = event
  197. Event.create({ blockId, section, method, data: JSON.stringify(data) })
  198. })
  199. } catch (e) {
  200. console.log(`failed to fetch events for block ${blockId} ${hash}`)
  201. }
  202. // TODO catch votes, posts, proposals?
  203. }
  204. const fetchValidators = async (api: Api, hash: string) =>
  205. api.query.staking.snapshotValidators.at(hash) as Option<Vec<AccountId>>
  206. const importEraAtBlock = async (api: Api, blockId: number, hash: string) => {
  207. const id = await getEraAtHash(api, hash)
  208. const [era] = await Era.findOrCreate({ where: { id } })
  209. if (era.active) return
  210. era.addBlock(blockId)
  211. processing = `era ${id}`
  212. try {
  213. fetchValidators(api, hash).then(
  214. async (snapshot: Option<Vec<AccountId>>) => {
  215. if (snapshot.isEmpty) return
  216. console.log(`[Joystream] Found validator info for era ${id}`)
  217. const validatorCount = snapshot.unwrap().length
  218. era.slots = (await api.query.staking.validatorCount.at(hash)).toNumber()
  219. era.active = Math.min(era.slots, validatorCount)
  220. era.waiting =
  221. validatorCount > era.slots ? validatorCount - era.slots : 0
  222. era.stake = await api.query.staking.erasTotalStake.at(hash, id)
  223. const chainTimestamp = (await api.query.timestamp.now.at(
  224. hash
  225. )) as Moment
  226. era.timestamp = moment(chainTimestamp.toNumber())
  227. // era.update({ slots, active, waiting, stake, timestamp })
  228. era.blockId = id
  229. era.save()
  230. updateBalances(api, hash)
  231. }
  232. )
  233. } catch (e) {
  234. console.error(`import era ${blockId} ${hash}`, e)
  235. }
  236. }
  237. const updateBalances = async (api: Api, blockHash: string) => {
  238. const currentEra: number = await api.query.staking.currentEra.at(blockHash)
  239. const era = await Era.findOrCreate({ where: { id: currentEra } })
  240. try {
  241. processing = `balances ${era}`
  242. Account.findAll().then(async (account: any) => {
  243. const { key } = account
  244. if (!key) return
  245. console.log(`updating balance of`, key, key)
  246. const { data } = await getAccountAtBlock(api, blockHash, key)
  247. const { free, reserved, miscFrozen, feeFrozen } = data
  248. const balance = { available: free, reserved, frozen: miscFrozen }
  249. console.log(`balance ${era}`, balance)
  250. Balance.create(balance).then((balance: any) => {
  251. balance.setAccount(key)
  252. balance.setEra(era.id)
  253. console.log(`balance`, era.id, key, balance.available)
  254. })
  255. })
  256. } catch (e) {
  257. console.error(`balances era ${era}`)
  258. }
  259. }
  260. const fetchTokenomics = async () => {
  261. console.debug(`Updating tokenomics`)
  262. const { data } = await axios.get('https://status.joystream.org/status')
  263. if (!data) return
  264. // TODO save 'tokenomics', data
  265. }
  266. const fetchChannel = async (api: Api, id: number) => {
  267. if (id <= 0) return
  268. const exists = await Channel.findByPk(id)
  269. if (exists) return exists
  270. processing = `channel ${id}`
  271. const data = await api.query.contentWorkingGroup.channelById(id)
  272. const { handle, title, description, avatar, banner, content, created } = data
  273. // TODO const accountId = String(data.role_account)
  274. const channel = {
  275. id,
  276. handle: String(handle),
  277. title: String(title),
  278. description: String(description),
  279. avatar: String(avatar),
  280. banner: String(banner),
  281. content: String(content),
  282. publicationStatus: data.publication_status === 'Public' ? true : false,
  283. curation: String(data.curation_status),
  284. createdAt: +created,
  285. principal: Number(data.principal_id),
  286. }
  287. const chan = await Channel.create(channel)
  288. const owner = await fetchMember(api, data.owner)
  289. chan.setOwner(owner)
  290. return chan
  291. }
  292. const fetchCategory = async (api: Api, id: number) => {
  293. if (id <= 0) return
  294. const exists = await Category.findByPk(+id)
  295. if (exists) return exists
  296. processing = `category ${id}`
  297. const data = await api.query.forum.categoryById(id)
  298. const { title, description, deleted, archived } = data
  299. const category = await Category.create({
  300. id,
  301. title,
  302. threadId: +data.thread_id, // TODO needed?
  303. description,
  304. createdAt: +data.created_at.block,
  305. deleted,
  306. archived,
  307. subcategories: Number(data.num_direct_subcategories),
  308. moderatedThreads: Number(data.num_direct_moderated_threads),
  309. unmoderatedThreads: Number(data.num_direct_unmoderated_threads),
  310. //position:+data.position_in_parent_category // TODO sometimes NaN,
  311. })
  312. createModeration(api, { categoryId: id }, String(data.moderator_id), category)
  313. return category
  314. }
  315. const fetchPost = async (api: Api, id: number) => {
  316. if (id <= 0) return
  317. const exists = await Post.findByPk(id)
  318. if (exists) return exists
  319. processing = `post ${id}`
  320. const data = await api.query.forum.postById(id)
  321. const author: string = String(data.author_id)
  322. const member = await fetchMemberByAccount(api, author)
  323. const authorId = member ? member.id : null
  324. const threadId = Number(data.thread_id)
  325. const thread = await fetchThread(api, threadId)
  326. const text = data.current_text
  327. const history = data.text_change_history // TODO needed?
  328. const createdAt = data.created_at.block
  329. const post = await Post.create({ id, authorId, text, createdAt, threadId })
  330. if (data.moderation)
  331. createModeration(api, { postId: id }, data.moderation, post)
  332. return post
  333. }
  334. const createModeration = async (
  335. api: Api,
  336. where: {},
  337. key: string,
  338. object: { setModeration: (id: number) => {} }
  339. ) => {
  340. if (key === '') return
  341. await Account.findOrCreate({ where: { key } })
  342. const moderation = await Moderation.create({ moderatorKey: key })
  343. object.setModeration(moderation.id)
  344. return moderation
  345. }
  346. const fetchThread = async (api: Api, id: number) => {
  347. if (id <= 0) return
  348. const exists = await Thread.findByPk(id)
  349. if (exists) return exists
  350. processing = `thread ${id}`
  351. const data = await api.query.forum.threadById(id)
  352. const { title, moderation, nr_in_category } = data
  353. const account = String(data.author_id)
  354. const t = {
  355. id,
  356. title,
  357. nrInCategory: +nr_in_category,
  358. createdAt: +data.created_at.block,
  359. }
  360. const thread = await Thread.create(t)
  361. const category = await fetchCategory(api, +data.category_id)
  362. if (category) thread.setCategory(category.id)
  363. const author = await fetchMemberByAccount(api, account)
  364. if (author) thread.setCreator(author.id)
  365. if (moderation) {
  366. const { moderated_at, moderator_id, rationale } = moderation
  367. const created = moderated_at.block
  368. const createdAt = moment.utc(moderated_at.time)
  369. createModeration(
  370. api,
  371. { created, createdAt, rationale },
  372. moderator_id.toHuman(),
  373. thread
  374. )
  375. }
  376. return thread
  377. }
  378. const fetchCouncil = async (api: Api, round: number) => {
  379. if (round <= 0) return console.log(chalk.red(`[fetchCouncil] round:${round}`))
  380. const exists = await Council.findByPk(round)
  381. if (exists) return exists
  382. processing = `council ${round}`
  383. const start = 57601 + (round - 1) * CYCLE
  384. const end = start + TERMDURATION
  385. let council = { round, start, end, startDate: 0, endDate: 0 }
  386. let seats: Seats
  387. try {
  388. const startHash = await getBlockHash(api, start)
  389. council.startDate = await getTimestamp(api, startHash)
  390. seats = await api.query.council.activeCouncil.at(startHash)
  391. } catch (e) {
  392. return console.log(`council term ${round} lies in the future ${start}`)
  393. }
  394. try {
  395. const endHash = await getBlockHash(api, end)
  396. council.endDate = await getTimestamp(api, endHash)
  397. } catch (e) {
  398. console.warn(`end of council term ${round} lies in the future ${end}`)
  399. }
  400. try {
  401. Council.create(council).then(({ round }: any) =>
  402. seats.map(({ member, stake, backers }) =>
  403. fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
  404. Consul.create({
  405. stake: Number(stake),
  406. councilRound: round,
  407. memberId: id,
  408. }).then((consul: any) =>
  409. backers.map(async ({ member, stake }) =>
  410. fetchMemberByAccount(api, member.toHuman()).then(({ id }: any) =>
  411. ConsulStake.create({
  412. stake: Number(stake),
  413. consulId: consul.id,
  414. memberId: id,
  415. })
  416. )
  417. )
  418. )
  419. )
  420. )
  421. )
  422. } catch (e) {
  423. console.error(`Failed to save council ${round}`, e)
  424. }
  425. }
  426. const fetchProposal = async (api: Api, id: number) => {
  427. if (id <= 0) return
  428. const exists = await Proposal.findByPk(+id)
  429. if (exists) {
  430. fetchProposalVotes(api, exists)
  431. return exists
  432. }
  433. processing = `proposal ${id}`
  434. const proposal = await get.proposalDetail(api, id)
  435. await fetchMember(api, proposal.authorId)
  436. fetchProposalVotes(api, proposal)
  437. return Proposal.create(proposal)
  438. }
  439. const fetchProposalPost = (api: Api, threadId: number, postId: number) =>
  440. api.query.proposalsDiscussion.postThreadIdByPostId(threadId, postId)
  441. const fetchProposalPosts = async (api: Api, posts: number) => {
  442. const threads = (await api.query.proposalsDiscussion.threadCount()).toNumber()
  443. let proposalId = 1
  444. for (let id = 1; id <= posts && proposalId <= threads; ) {
  445. const exists = await ProposalPost.findByPk(id)
  446. if (exists) {
  447. id++
  448. proposalId = 1
  449. continue
  450. }
  451. processing = `proposal post ${id}/${posts} ${proposalId}/${threads}`
  452. const post = await fetchProposalPost(api, proposalId, id)
  453. if (!post.text.length) {
  454. proposalId++
  455. continue
  456. }
  457. const proposal = await Proposal.findByPk(proposalId)
  458. if (!proposal) {
  459. console.warn(`[fetchProposalPosts] proposal ${proposalId} not found.`)
  460. id++
  461. continue
  462. }
  463. ProposalPost.create({
  464. id,
  465. text: post.text.toHuman(),
  466. created: Number(post.created_at),
  467. updated: Number(post.updated_at),
  468. edition: Number(post.edition_number),
  469. authorId: Number(post.author_id),
  470. }).then((p: any) => proposal.addPost(p))
  471. id++
  472. proposalId = 1
  473. }
  474. }
  475. const fetchProposalVotes = async (api: Api, proposal: ProposalDetail) => {
  476. if (!proposal) return console.error(`[fetchProposalVotes] empty proposal`)
  477. processing = `votes proposal ${proposal.id}`
  478. const { createdAt, finalizedAt } = proposal
  479. try {
  480. const start = createdAt ? await findCouncilAtBlock(api, createdAt) : null
  481. if (start) start.addProposal(proposal.id)
  482. else
  483. return console.error(
  484. `[fetchProposalVotes] no council found for proposal ${proposal.id}`
  485. )
  486. // some proposals make it into a second term
  487. const end = finalizedAt ? await findCouncilAtBlock(api, finalizedAt) : null
  488. const councils = [start && start.round, end && end.round]
  489. const consuls = await Consul.findAll({
  490. where: { councilRound: { [Op.or]: councils } },
  491. })
  492. consuls.map(({ id, memberId }: any) =>
  493. fetchProposalVoteByConsul(api, proposal.id, id, memberId)
  494. )
  495. } catch (e) {
  496. console.log(`failed to fetch votes of proposal ${proposal.id}`, e)
  497. }
  498. }
  499. const fetchProposalVoteByConsul = async (
  500. api: Api,
  501. proposalId: number,
  502. consulId: number,
  503. memberId: number
  504. ): Promise<any> => {
  505. processing = `vote by ${consulId} for proposal ${proposalId}`
  506. const exists = await ProposalVote.findOne({
  507. where: { proposalId, memberId, consulId },
  508. })
  509. if (exists) return exists
  510. const query = api.query.proposalsEngine
  511. const args = [proposalId, memberId]
  512. const hasVoted = await query.voteExistsByProposalByVoter.size(...args)
  513. if (!hasVoted.toNumber()) return
  514. const vote = (await query.voteExistsByProposalByVoter(...args)).toHuman()
  515. await fetchMember(api, memberId) // TODO needed?
  516. return ProposalVote.create({ vote: vote, proposalId, consulId, memberId })
  517. }
  518. // accounts
  519. const getHandleOrKey = async (api: Api, key: string) => {
  520. const member = await fetchMemberByAccount(api, key)
  521. return member ? member.handle : key //abbrKey(key)
  522. }
  523. const abbrKey = (key: string) =>
  524. `${key.slice(0, 5)}..${key.slice(key.length - 5)}`
  525. const getAccountAtBlock = (
  526. api: Api,
  527. hash: string,
  528. account: string
  529. ): Promise<AccountInfo> => api.query.system.account.at(hash, account)
  530. const fetchAccounts = async (api: Api, blockId: number) => {
  531. processing = `accounts`
  532. api.query.system.account
  533. .entries()
  534. .then((account: any) =>
  535. Account.findOrCreate({ where: { key: account[0][0].toHuman()[0] } })
  536. )
  537. }
  538. const fetchMemberByAccount = async (api: Api, rootKey: string) => {
  539. const member = await Member.findOne({ where: { rootKey } })
  540. if (member) return member
  541. const id = Number(await get.memberIdByAccount(api, rootKey))
  542. if (id) return fetchMember(api, id)
  543. else Account.findOrCreate({ where: { key: rootKey } })
  544. }
  545. const fetchMember = async (
  546. api: Api,
  547. id: number
  548. ): Promise<MemberType | undefined> => {
  549. if (id <= 0) return
  550. const exists = await Member.findByPk(+id)
  551. if (exists) return exists
  552. processing = `member ${id}`
  553. const membership = await get.membership(api, id)
  554. const about = String(membership.about)
  555. const handle = String(membership.handle)
  556. const createdAt = +membership.registered_at_block
  557. const rootKey = String(membership.root_account)
  558. return Member.create({ id, about, createdAt, handle, rootKey }).then(
  559. (member: any) => {
  560. Account.findOrCreate({ where: { key: rootKey } }).then(([account]: any) =>
  561. account.setMember(id)
  562. )
  563. return member
  564. }
  565. )
  566. }
  567. module.exports = { addBlock, addBlockRange }