import { Op } from 'sequelize' import { ApiPromise } from '@polkadot/api' // models import { Account, Balance, Block, Category, Channel, Council, Consul, Commitment, Era, Event, Member, Post, Proposal, ProposalPost, ProposalVote, Thread, Moderation, } from '../db/models' // library import { getBlockHash, getHead, getTimestamp, getEra, getEraStake, getEvents, getCouncil, getCouncils, getCouncilRound, getCouncilElectionStatus, getCouncilElectionDurations, getCommitment, getCommitments, getProposalCount, getProposal, getProposalVotes, getProposalPost, getProposalPosts, getProposalPostCount, getProposalThreadCount, getNextMember, getNextChannel, getNextCategory, getNextThread, getNextPost, getCategory, getThread, getPost, getAccount, getAccounts, getMember, getMemberIdByAccount, getValidators, getValidatorCount, } from './lib/api' //import { fetchReports } from './lib/github' import axios from 'axios' import moment from 'moment' import chalk from 'chalk' // types import { AccountBalance, Round, Vote, ProposalDetail } from './lib/types' import { AccountId, BlockNumber, Hash, Moment, ActiveEraInfo, EventRecord, } from '@polkadot/types/interfaces' import { HeaderExtended } from '@polkadot/api-derive/types' //import { AccountInfo } from '@polkadot/types/interfaces/system' import { SealedVote, Seat } from '@joystream/types/council' import { MemberId, Membership } from '@joystream/types/members' import { ProposalId, DiscussionPost, SpendingParams, VoteKind, } from '@joystream/types/proposals' import { Status } from '../types' import { MemberType, CategoryType, ChannelType, CommitmentType, PostType, ThreadType, CouncilType, ModerationType, ProposalPostType, } from '../types/model' const WORKERS = 3 const DELAY = 100 // ms let lastUpdate = 0 let queuedAll = false let queue: any[] = [] let processing = '' let busy = 0 const processNext = async () => { if (busy === WORKERS) return //console.log(`ne free worker`) const task = queue.shift() if (!task) return //console.log(`no task`) busy++ if (busy < WORKERS) setTimeout(processNext, DELAY) const result = await task() busy-- setTimeout(processNext, DELAY) } export const addBlock = async ( api: ApiPromise, io: any, header: HeaderExtended, status: Status = { block: 0, election: { durations: [], stage: null, round: 0, stageEndsAt: 0, termEndsAt: 0, }, era: 0, round: 0, members: 0, channels: 0, categories: 0, threads: 0, posts: 0, proposals: 0, proposalPosts: 0, } ): Promise => { const id = header.number.toNumber() const exists = await Block.findByPk(id) if (exists || !header.author) return status const key = header.author.toHuman() const block = await processBlock(api, id) const [account] = await Account.findOrCreate({ where: { key } }) await block.setValidator(account.key) io.emit('block', await Block.findByIdWithIncludes(id)) // log const member = await fetchMemberByAccount(api, header.author) const author = member ? member.handle : key console.log(`[Joystream] block ${id} ${author} [${logStatus()}]`) const shouldUpdate = id / 10 === Math.floor(id / 10) return shouldUpdate ? updateStatus(api, id) : status } const logStatus = () => queue.length ? `${busy}/${queue.length}: ${processing}` : processing; const processBlock = async (api: ApiPromise, id: number) => { const exists = await Block.findByPk(id) if (exists) return exists let [block, created] = await Block.findOrCreate({ where: { id } }) return block processing = `block ${id}` console.log(processing) const hash = await getBlockHash(api, id) const last = await Block.findByPk(id - 1) const lastTimestamp: number = last?.timestamp ? last.timestamp : await getTimestamp(api, await getBlockHash(api, id - 1)) const timestamp = await getTimestamp(api, hash) console.log(`timestamp`, timestamp, lastTimestamp) const blocktime = timestamp - lastTimestamp return Block.create({ id, hash: String(hash), timestamp, blocktime }) processEvents(api, id, hash) importEraAtBlock(api, id, hash) processNext() return block } export const addBlockRange = async ( api: ApiPromise, startBlock: number, endBlock: number ) => { for (let block = startBlock; block <= endBlock; block++) queue.push(() => processBlock(api, block)) setInterval(() => { const status = logStatus() if (status === `[no tasks]`) process.exit() console.log(status) }, 1000) processNext() } // TODO only fetchAll() once, then respond to chain events const updateStatus = async ( api: ApiPromise, block: number ): Promise => { const hash = await getBlockHash(api, block) const status = { block, era: await getEra(api, hash), round: await getCouncilRound(api, hash), election: await getCouncilElectionStatus(api, hash), members: (await getNextMember(api, hash)) - 1, channels: (await getNextChannel(api, hash)) - 1, categories: (await getNextCategory(api, hash)) - 1, threads: (await getNextThread(api, hash)) - 1, posts: (await getNextPost(api, hash)) - 1, proposals: await getProposalCount(api, hash), proposalPosts: await getProposalPostCount(api), } if (!queuedAll) fetchAll(api, status) else { fetchMember(api, status.members) fetchCategory(api, status.categories) fetchThread(api, status.threads) fetchPost(api, status.posts) fetchProposal(api, status.proposals) } processNext() return status } const fetchAll = async (api: ApiPromise, status: Status) => { await getCouncils(api, status.block).then((rounds: Round[]) => rounds.forEach((round) => fetchCouncil(api, round)) ) queue.push(() => fetchAccounts(api)) queue.push(() => fetchMember(api, status.members)) queue.push(() => fetchCategory(api, status.categories)) queue.push(() => fetchThread(api, status.threads)) queue.push(() => fetchPost(api, status.posts)) queue.push(() => fetchProposal(api, status.proposals)) queue.push(() => fetchProposalPosts(api)) queuedAll = true } const processEvents = async (api: ApiPromise, blockId: number, hash: Hash) => { processing = `events block ${blockId}` console.log(processing) getEvents(api, hash).then((events) => events.forEach((event: EventRecord) => saveEvent(blockId, event)) ) } const saveEvent = (blockId: number, event: EventRecord) => { const { section, method, data } = event.event if (section === 'system' && method === 'ExtrinsicSuccess') return if (section === 'imOnline' && method === 'HeartbeatReceived') return if (section === 'imOnline' && method === 'AllGood') return if (section === 'utility' && method === 'BatchCompleted') return if (section === 'grandpa' && method === 'NewAuthorities') return if (section === 'session' && method === 'NewSession') return console.log(section, method, data) // TODO catch votes, posts, proposals Event.create({ blockId, section, method, data: JSON.stringify(data) }) } const importEraAtBlock = async ( api: ApiPromise, blockId: number, hash: Hash ) => { const id = await getEra(api, hash) const [era] = await Era.findOrCreate({ where: { id } }) era.addBlock(blockId) if (era.active) return processing = `era ${id}` getValidators(api, hash).then(async (validators: any[]) => { const validatorCount = validators.length if (!validatorCount) return console.log(`[Joystream] Found validator info for era ${id}`) era.slots = await getValidatorCount(api, hash) era.active = Math.min(era.slots, validatorCount) era.waiting = validatorCount > era.slots ? validatorCount - era.slots : 0 era.stake = await getEraStake(api, hash, id) const timestamp = await getTimestamp(api, hash) console.log(id, timestamp, hash) era.timestamp = timestamp era.blockId = blockId era.save() updateBalances(api, hash) }) } const validatorStatus = async ( api: ApiPromise, blockId: BlockNumber | number ) => { const hash = await getBlockHash(api, blockId) const totalValidators = await getValidators(api, hash) if (!totalValidators.length) return const totalNrValidators = totalValidators.length const maxSlots = await getValidatorCount(api, hash) const actives = Math.min(maxSlots, totalNrValidators) const waiting = totalNrValidators > maxSlots ? totalNrValidators - maxSlots : 0 const date = await getTimestamp(api, hash) console.log(`validator`, date) return { blockId, actives, waiting, maxSlots, date } } const updateBalances = async (api: ApiPromise, hash: Hash) => { const currentEra: number = await getEra(api, hash) const era = await Era.findOrCreate({ where: { id: currentEra } }) try { processing = `balances ${era.id}` Account.findAll().then(async (account: any) => { const { key } = account if (!key) return console.log(`updating balance of`, key, key) const { data } = await getAccount(api, hash, key) const { free, reserved, miscFrozen, feeFrozen } = data const balance = { available: free, reserved, frozen: miscFrozen } console.log(`balance era ${era}`, balance) const where = { accountKey: key, eraId: era.id } const exists = Balance.findOne({ where }) if (exists) Balance.update(balance, { where }) else Balance.create(balance).then((balance: any) => { balance.setAccount(key) balance.setEra(era.id) }) }) } catch (e) { console.error(`balances era ${era}`) } } const fetchTokenomics = async () => { console.debug(`Updating tokenomics`) const { data } = await axios.get('https://status.joystream.org/status') if (!data) return // TODO save 'tokenomics', data } const fetchCategory = async (api: ApiPromise, id: number) => { if (id <= 0) return queue.push(() => fetchCategory(api, id - 1)) const exists = await Category.findByPk(+id) if (exists) return exists processing = `category ${id}` const { created_at, title, description, deleted, archived, moderator_id, num_direct_subcategories, num_direct_moderated_threads, num_direct_unmoderated_threads, position_in_parent_category, } = await getCategory(api, id) const created = created_at.block.toNumber() const category = { id, title, description, created, deleted, archived } return Category.create(category).then((category: CategoryType) => { if (moderator_id) createModeration(api, { categoryId: id }, moderator_id, category) return category }) } const fetchPost = async (api: ApiPromise, id: number): Promise => { if (id > 1) queue.push(() => fetchPost(api, id - 1)) const exists = await Post.findByPk(id) if (exists) return exists processing = `post ${id}` const { created_at, author_id, thread_id, current_text, moderation } = await getPost(api, id) const author = author_id const member = await fetchMemberByAccount(api, author) const authorId = member ? member.id : null const threadId = Number(thread_id) const thread = await fetchThread(api, threadId) const text = current_text const created = created_at.block.toNumber() const post = await savePost(id, { authorId, text, created, threadId }) if (moderation) createModeration(api, { postId: id }, moderation.moderator_id, post) return post } const savePost = async (id: number, data: any): Promise => { const [post] = await Post.findOrCreate({ where: { id } }) post.update(data) return post } const createModeration = async ( api: ApiPromise, association: {}, accountId: AccountId, object: { setModeration: (id: number) => {} } ) => { if (!accountId) return const key = accountId.toHuman() await Account.findOrCreate({ where: { key } }) const where = { ...association, moderatorKey: key } return // TODO const [moderation] = await Moderation.findOrCreate({ where }) if (moderation) object.setModeration(moderation.id) } const fetchThread = async (api: ApiPromise, id: number) => { if (id <= 0) return const exists = await Thread.findByPk(id) if (exists) return exists processing = `thread ${id}` const { author_id, created_at, category_id, title, moderation, nr_in_category, } = await getThread(api, id) const [thread] = await Thread.findOrCreate({ where: { id } }) thread.update({ id, title, nrInCategory: +nr_in_category, created: +created_at.block, }) const category = await fetchCategory(api, +category_id) if (category) thread.setCategory(category.id) const author = await fetchMemberByAccount(api, author_id) if (author) thread.setCreator(author.id) if (moderation) { const { moderated_at, moderator_id, rationale } = moderation const created = moderated_at.block const createdAt = moderated_at.time createModeration( api, { created, createdAt, rationale }, moderator_id, thread ) } return thread } // council interface Council { round: number start: number startDate?: number end: number endDate?: number } const fetchCouncil = async (api: ApiPromise, term: Round) => { const { round, start, end } = term const exists = await Council.findByPk(round) //if (exists) return exists processing = `council ${round}` let council: Council = { start, end, round } const startHash = await getBlockHash(api, start) council.startDate = await getTimestamp(api, startHash) const seats: Seat[] = await getCouncil(api, startHash) const head = Number(await getHead(api)) if (end < head) { const endHash = await getBlockHash(api, end) if (endHash) council.endDate = await getTimestamp(api, endHash) } else console.log(`fetchCouncil: round ${round} is ongoing.`) // TODO import report generator and save tokenomics saveCouncil(api, council, seats) saveCommitments(api, round, start - 2) } const saveCommitments = async ( api: ApiPromise, round: number, block: number ) => { const hash = await getBlockHash(api, block) const commitments: Hash[] = await getCommitments(api, hash) const council = await Council.findByPk(round) if (!council) return console.warn(`saveCommitments: council ${round} not found.`) Promise.all( commitments.map((voteHash: Hash) => getCommitment(api, hash, voteHash)) ).then((votes: SealedVote[]) => votes.map(async (v) => { const voter: AccountId = v.voter const stake = v.stake.new.toNumber() const vote = String(v.vote) const member = await fetchMemberByAccount(api, voter) const memberId = member?.id Commitment.findOrCreate({ where: { councilRound: round, stake, memberId }, }).then(([c]: [CommitmentType]) => { if (vote) c.update({ vote }) c.setCouncil(council.id) }) }) ) } const saveCouncil = async ( api: ApiPromise, council: Council, seats: Seat[] ) => { const { round } = council Council.findOrCreate({ where: { round } }).then( ([council]: [CouncilType]) => { council.update(council) seats.map((seat) => fetchMemberByAccount(api, seat.member).then( (member: MemberType | undefined) => member && saveConsul(api, round, member.id, seat) ) ) } ) } const saveConsul = async ( api: ApiPromise, councilRound: number, memberId: number, seat?: Seat ) => { const [consul] = await Consul.findOrCreate({ where: { councilRound, memberId }, }) if (!seat) return const stake = Number(seat.stake) consul.update({ stake }) seat.backers.map(async ({ member, stake }) => fetchMemberByAccount(api, member).then(({ id }: any) => saveCommitment(Number(stake), consul.id, id) ) ) } const saveCommitment = async ( stake: number, consulId: number, memberId: number, vote?: string ) => Commitment.findOrCreate({ where: { stake, consulId, memberId } }).then( ([c]: [CommitmentType]) => vote && c.update({ vote }) ) const fetchProposal = async (api: ApiPromise, id: number) => { if (id <= 0) return queue.push(() => fetchProposal(api, id - 1)) const exists = await Proposal.findByIdWithIncludes(id) if (exists && exists.result !== `Pending`) { if (!exists.votes.length) queue.push(() => fetchProposalVotes(api, id)) return exists } processing = `proposal ${id}` const proposal = await getProposal(api, id as unknown as ProposalId) console.log(`proposal ${id}: ${proposal.result}`) await fetchMember(api, proposal.authorId) queue.push(() => fetchProposalVotes(api, id)) // save const found = await Proposal.findByPk(id) if (found) Proposal.update(proposal, { where: { id } }) else Proposal.create(proposal) return proposal } const saveProposalPost = (id: number, proposalId: number, data: any) => ProposalPost.findOrCreate({ where: { id } }).then( ([post]: [ProposalPostType]) => { post.update(data) post.setProposal(proposalId) console.log(post) } ) const fetchProposalPosts = async (api: ApiPromise) => { processing = `proposal posts` getProposalPosts(api).then((posts: [any, DiscussionPost][]) => { posts.map(async (p) => { const [proposalId, id] = p[0].toHuman() await fetchProposal(api, proposalId); const { text, created_at, author_id, edition_number } = p[1] saveProposalPost(id, proposalId, { text: text.toHuman(), created: created_at.toNumber(), version: edition_number.toNumber(), authorId: author_id.toNumber(), }) }) }) } const councilAt = (block: number): Promise | void => { if (block) return Council.findOne({ where: { start: { [Op.lte]: block }, end: { [Op.gte]: block } }, }) } const fetchProposalVotes = async (api: ApiPromise, id: number) => { const proposal = await Proposal.findByPk(id) if (!proposal) return console.warn(`fetchProposalVotes: proposal ${id} not found.`) processing = `votes proposal ${proposal.id}` // find council for creation and finalization time let councils: number[] = [] const { created, finalizedAt } = proposal const councilStart = await councilAt(created) if (councilStart) { councilStart.addProposal(proposal.id) councils.push(councilStart.round) } const councilEnd = await councilAt(finalizedAt) if (councilEnd) councils.push(councilEnd.round) const votes = await getProposalVotes(api, id) votes?.forEach(({ memberId, vote }) => saveProposalVote(id, councils, memberId, vote) ) } const saveProposalVote = ( proposalId: number, councils: number[], memberId: number, vote: string ): void => Consul.findOne({ where: { memberId, councilRound: { [Op.or]: councils } }, }).then((consul: any) => { if (!consul) return console.log(`consul not found: member ${memberId}`, councils) const where = { memberId, proposalId, consulId: consul.id } if (!consul) return console.log(`saveProposalVote: No Consul found.`, where) ProposalVote.findOne({ where }).then((exists: any) => { const pv = { ...where, vote } if (!exists) ProposalVote.create(pv) }) }) // accounts const fetchAccounts = async (api: ApiPromise) => { processing = `accounts` getAccounts(api).then((accounts: AccountBalance[]) => accounts.map(({ accountId }) => Account.findOrCreate({ where: { key: accountId } }) ) ) } const fetchMemberByAccount = async ( api: ApiPromise, accountId: AccountId ): Promise => { const rootKey = accountId.toHuman() const member = await Member.findOne({ where: { rootKey } }) if (member) return member const id = await getMemberIdByAccount(api, accountId) if (id) return fetchMember(api, id.toNumber()) } const fetchMember = async ( api: ApiPromise, id: number ): Promise => { if (id > 0) queue.push(() => fetchMember(api, id - 1)) const exists = await Member.findByPk(id) if (exists && exists.handle) return exists processing = `member ${id}` const membership = await getMember(api, id) if (!membership) { console.warn(`fetchMember: empty membership`) return } const about = String(membership.about) const handle = String(membership.handle) const created = +membership.registered_at_block const rootKey = String(membership.root_account) const where = { id } return Member.findOrCreate({ where }).then(([member]: [MemberType]) => { member.update({ id, about, created, handle, rootKey }) Account.findOrCreate({ where: { key: rootKey } }).then(([account]: any) => account?.setMember(id) ) return member }) }