import { Op } from "sequelize"; import { ApiPromise } from "@polkadot/api"; // models import { Account, Balance, Block, Category, Channel, Council, Consul, Commitment, Era, Event, Member, Post, Proposal, ProposalPost, ProposalVote, Thread, Moderation, } from "../db/models"; // library import { getBlockHash, getHead, getTimestamp, getEra, getEraStake, getEvents, getCouncil, getCouncils, getCouncilRound, getCouncilElectionStatus, getCouncilElectionDurations, getCommitment, getCommitments, getProposalCount, getProposal, getProposalVotes, getProposalPost, getProposalPosts, getProposalPostCount, getProposalThreadCount, getNextMember, getNextChannel, getNextCategory, getNextThread, getNextPost, getCategory, getThread, getPost, getAccount, getAccounts, getMember, getMemberIdByAccount, getValidators, getValidatorCount, } from "./lib/api"; //import { fetchReports } from './lib/github' import axios from "axios"; import moment from "moment"; import chalk from "chalk"; // types import { AccountBalance, CacheEvent, BlockEvent, Round, Vote, ProposalDetail, } from "./lib/types"; import { AccountId, BlockNumber, Hash, Moment, ActiveEraInfo, EventRecord, } from "@polkadot/types/interfaces"; import { HeaderExtended } from "@polkadot/api-derive/types"; //import { AccountInfo } from '@polkadot/types/interfaces/system' import { SealedVote, Seat } from "@joystream/types/council"; import { MemberId, Membership } from "@joystream/types/members"; import { ProposalId, DiscussionPost, SpendingParams, VoteKind, } from "@joystream/types/proposals"; import { Status } from "../types"; import { MemberType, CategoryType, ChannelType, CommitmentType, PostType, ThreadType, CouncilType, ModerationType, ProposalPostType, } from "../types/model"; const WORKERS = 3; const DELAY = 100; // ms let lastUpdate = 0; let queuedAll = false; let queue: any[] = []; let processing = ""; let busy = 0; const skipEvents: { [key: string]: string[] } = { system: ["ExtrinsicSuccess"], imOnline: ["HeartbeatReceived", "AllGood"], utility: ["BatchCompleted"], grandpa: ["NewAuthorities"], session: ["NewSession"], }; const processNext = async () => { if (busy === WORKERS) return; //console.log(`ne free worker`) const task = queue.shift(); if (!task) return; //console.log(`no task`) busy++; if (busy < WORKERS) setTimeout(processNext, DELAY); const result = await task(); busy--; setTimeout(processNext, DELAY); }; export const addBlock = async ( api: ApiPromise, io: any, header: HeaderExtended, status: Status = { block: 0, election: { durations: [], stage: null, round: 0, stageEndsAt: 0, termEndsAt: 0, }, era: 0, round: 0, members: 0, channels: 0, categories: 0, threads: 0, posts: 0, proposals: 0, proposalPosts: 0, } ): Promise => { const id = header.number.toNumber(); const exists = await Block.findByPk(id); if (exists || !header.author) return status; const key = header.author.toHuman(); const block = await processBlock(api, id); const [account] = await Account.findOrCreate({ where: { key } }); await block.setValidator(account.key); io.emit("block", await Block.findByIdWithIncludes(id)); // log const member = await fetchMemberByAccount(api, header.author); const author = member ? member.handle : key; console.log(`[Joystream] block ${id} ${author} [${logStatus()}]`); const shouldUpdate = id / 10 === Math.floor(id / 10); return shouldUpdate ? updateStatus(api, id) : status; }; const logStatus = () => queue.length ? `${busy}/${queue.length}: ${processing}` : processing; const processBlock = async (api: ApiPromise, id: number) => { const exists = await Block.findByPk(id); if (exists) return exists; let [block, created] = await Block.findOrCreate({ where: { id } }); return block; processing = `block ${id}`; console.log(processing); const hash = await getBlockHash(api, id); const last = await Block.findByPk(id - 1); const lastTimestamp: number = last?.timestamp ? last.timestamp : await getTimestamp(api, await getBlockHash(api, id - 1)); const timestamp = await getTimestamp(api, hash); console.log(`timestamp`, timestamp, lastTimestamp); const blocktime = timestamp - lastTimestamp; return Block.create({ id, hash: String(hash), timestamp, blocktime }); processEvents(api, id, hash); importEraAtBlock(api, id, hash); processNext(); return block; }; const loadEventsCache = (path: string): [{ id: number }[], BlockEvent[]] => { const blocks: { id: number }[] = []; const events: BlockEvent[] = []; for (const [blockId, cache] of require(path)) { blocks.push({ id: blockId }); cache.forEach((event: CacheEvent) => { const { section, method } = event; if (skipEvents[section]?.indexOf(method) >= 0) return; const data = JSON.stringify(event.data); events.push({ blockId, section, method, data }); }); } return [blocks, events]; }; export const importEvents = async (files: string[]) => { for (const filename of files) { console.log(`-> Loading ${filename}`); const [blocks, events] = loadEventsCache(`../../${filename}`); console.log(`-> Adding ${blocks.length} blocks`); await Block.bulkCreate(blocks, { ignoreDuplicates: true }).then(() => { console.log(`-> Adding ${events.length} events`); Event.bulkCreate(events, { ignoreDuplicates: true }); }); } }; // TODO only fetchAll() once, then respond to chain events const updateStatus = async ( api: ApiPromise, block: number ): Promise => { const hash = await getBlockHash(api, block); const status = { block, era: await getEra(api, hash), round: await getCouncilRound(api, hash), election: await getCouncilElectionStatus(api, hash), members: (await getNextMember(api, hash)) - 1, channels: (await getNextChannel(api, hash)) - 1, categories: (await getNextCategory(api, hash)) - 1, threads: (await getNextThread(api, hash)) - 1, posts: (await getNextPost(api, hash)) - 1, proposals: await getProposalCount(api, hash), proposalPosts: await getProposalPostCount(api), }; if (!queuedAll) fetchAll(api, status); else { fetchMember(api, status.members); fetchCategory(api, status.categories); fetchThread(api, status.threads); fetchPost(api, status.posts); fetchProposal(api, status.proposals); } processNext(); return status; }; const fetchAll = async (api: ApiPromise, status: Status) => { await getCouncils(api, status.block).then((rounds: Round[]) => rounds.forEach((round) => fetchCouncil(api, round)) ); queue.push(() => fetchAccounts(api)); queue.push(() => fetchMember(api, status.members)); queue.push(() => fetchCategory(api, status.categories)); queue.push(() => fetchThread(api, status.threads)); queue.push(() => fetchPost(api, status.posts)); queue.push(() => fetchProposal(api, status.proposals)); queue.push(() => fetchProposalPosts(api)); queuedAll = true; }; const processEvents = async (api: ApiPromise, blockId: number, hash: Hash) => { processing = `events block ${blockId}`; console.log(processing); getEvents(api, hash).then((events) => events.forEach((event: EventRecord) => saveEvent(blockId, event)) ); }; const saveEvent = (blockId: number, event: EventRecord) => { const { section, method, data } = event.event; if (skipEvents[section]?.indexOf(method) >= 0) return; console.log(section, method, data); // TODO catch votes, posts, proposals Event.findOrCreate({ blockId, section, method, data: JSON.stringify(data) }); }; const importEraAtBlock = async ( api: ApiPromise, blockId: number, hash: Hash ) => { const id = await getEra(api, hash); const [era] = await Era.findOrCreate({ where: { id } }); era.addBlock(blockId); if (era.active) return; processing = `era ${id}`; getValidators(api, hash).then(async (validators: any[]) => { const validatorCount = validators.length; if (!validatorCount) return; console.log(`[Joystream] Found validator info for era ${id}`); era.slots = await getValidatorCount(api, hash); era.active = Math.min(era.slots, validatorCount); era.waiting = validatorCount > era.slots ? validatorCount - era.slots : 0; era.stake = await getEraStake(api, hash, id); const timestamp = await getTimestamp(api, hash); console.log(id, timestamp, hash); era.timestamp = timestamp; era.blockId = blockId; era.save(); updateBalances(api, hash); }); }; const validatorStatus = async ( api: ApiPromise, blockId: BlockNumber | number ) => { const hash = await getBlockHash(api, blockId); const totalValidators = await getValidators(api, hash); if (!totalValidators.length) return; const totalNrValidators = totalValidators.length; const maxSlots = await getValidatorCount(api, hash); const actives = Math.min(maxSlots, totalNrValidators); const waiting = totalNrValidators > maxSlots ? totalNrValidators - maxSlots : 0; const date = await getTimestamp(api, hash); console.log(`validator`, date); return { blockId, actives, waiting, maxSlots, date }; }; const updateBalances = async (api: ApiPromise, hash: Hash) => { const currentEra: number = await getEra(api, hash); const era = await Era.findOrCreate({ where: { id: currentEra } }); try { processing = `balances ${era.id}`; Account.findAll().then(async (account: any) => { const { key } = account; if (!key) return; console.log(`updating balance of`, key, key); const { data } = await getAccount(api, hash, key); const { free, reserved, miscFrozen, feeFrozen } = data; const balance = { available: free, reserved, frozen: miscFrozen }; console.log(`balance era ${era}`, balance); const where = { accountKey: key, eraId: era.id }; const exists = Balance.findOne({ where }); if (exists) Balance.update(balance, { where }); else Balance.create(balance).then((balance: any) => { balance.setAccount(key); balance.setEra(era.id); }); }); } catch (e) { console.error(`balances era ${era}`); } }; const fetchTokenomics = async () => { console.debug(`Updating tokenomics`); const { data } = await axios.get("https://status.joystream.org/status"); if (!data) return; // TODO save 'tokenomics', data }; const fetchCategory = async (api: ApiPromise, id: number) => { if (id <= 0) return; queue.push(() => fetchCategory(api, id - 1)); const exists = await Category.findByPk(+id); if (exists) return exists; processing = `category ${id}`; const { created_at, title, description, deleted, archived, moderator_id, num_direct_subcategories, num_direct_moderated_threads, num_direct_unmoderated_threads, position_in_parent_category, } = await getCategory(api, id); const created = created_at.block.toNumber(); const category = { id, title, description, created, deleted, archived }; return Category.create(category).then((category: CategoryType) => { if (moderator_id) createModeration(api, { categoryId: id }, moderator_id, category); return category; }); }; const fetchPost = async (api: ApiPromise, id: number): Promise => { if (id > 1) queue.push(() => fetchPost(api, id - 1)); const exists = await Post.findByPk(id); if (exists) return exists; processing = `post ${id}`; const { created_at, author_id, thread_id, current_text, moderation } = await getPost(api, id); const author = author_id; const member = await fetchMemberByAccount(api, author); const authorId = member ? member.id : null; const threadId = Number(thread_id); const thread = await fetchThread(api, threadId); const text = current_text; const created = created_at.block.toNumber(); const post = await savePost(id, { authorId, text, created, threadId }); if (moderation) createModeration(api, { postId: id }, moderation.moderator_id, post); return post; }; const savePost = async (id: number, data: any): Promise => { const [post] = await Post.findOrCreate({ where: { id } }); post.update(data); return post; }; const createModeration = async ( api: ApiPromise, association: {}, accountId: AccountId, object: { setModeration: (id: number) => {} } ) => { if (!accountId) return; const key = accountId.toHuman(); await Account.findOrCreate({ where: { key } }); const where = { ...association, moderatorKey: key }; return; // TODO const [moderation] = await Moderation.findOrCreate({ where }); if (moderation) object.setModeration(moderation.id); }; const fetchThread = async (api: ApiPromise, id: number) => { if (id <= 0) return; const exists = await Thread.findByPk(id); if (exists) return exists; processing = `thread ${id}`; const { author_id, created_at, category_id, title, moderation, nr_in_category, } = await getThread(api, id); const [thread] = await Thread.findOrCreate({ where: { id } }); thread.update({ id, title, nrInCategory: +nr_in_category, created: +created_at.block, }); const category = await fetchCategory(api, +category_id); if (category) thread.setCategory(category.id); const author = await fetchMemberByAccount(api, author_id); if (author) thread.setCreator(author.id); if (moderation) { const { moderated_at, moderator_id, rationale } = moderation; const created = moderated_at.block; const createdAt = moderated_at.time; createModeration( api, { created, createdAt, rationale }, moderator_id, thread ); } return thread; }; // council interface Council { round: number; start: number; startDate?: number; end: number; endDate?: number; } const fetchCouncil = async (api: ApiPromise, term: Round) => { const { round, start, end } = term; const exists = await Council.findByPk(round); //if (exists) return exists processing = `council ${round}`; let council: Council = { start, end, round }; const startHash = await getBlockHash(api, start); council.startDate = await getTimestamp(api, startHash); const seats: Seat[] = await getCouncil(api, startHash); const head = Number(await getHead(api)); if (end < head) { const endHash = await getBlockHash(api, end); if (endHash) council.endDate = await getTimestamp(api, endHash); } else console.log(`fetchCouncil: round ${round} is ongoing.`); // TODO import report generator and save tokenomics saveCouncil(api, council, seats); saveCommitments(api, round, start - 2); }; const saveCommitments = async ( api: ApiPromise, round: number, block: number ) => { const hash = await getBlockHash(api, block); const commitments: Hash[] = await getCommitments(api, hash); const council = await Council.findByPk(round); if (!council) return console.warn(`saveCommitments: council ${round} not found.`); Promise.all( commitments.map((voteHash: Hash) => getCommitment(api, hash, voteHash)) ).then((votes: SealedVote[]) => votes.map(async (v) => { const voter: AccountId = v.voter; const stake = v.stake.new.toNumber(); const vote = String(v.vote); const member = await fetchMemberByAccount(api, voter); const memberId = member?.id; Commitment.findOrCreate({ where: { councilRound: round, stake, memberId }, }).then(([c]: [CommitmentType]) => { if (vote) c.update({ vote }); c.setCouncil(council.id); }); }) ); }; const saveCouncil = async ( api: ApiPromise, council: Council, seats: Seat[] ) => { const { round } = council; Council.findOrCreate({ where: { round } }).then( ([council]: [CouncilType]) => { council.update(council); seats.map((seat) => fetchMemberByAccount(api, seat.member).then( (member: MemberType | undefined) => member && saveConsul(api, round, member.id, seat) ) ); } ); }; const saveConsul = async ( api: ApiPromise, councilRound: number, memberId: number, seat?: Seat ) => { const [consul] = await Consul.findOrCreate({ where: { councilRound, memberId }, }); if (!seat) return; const stake = Number(seat.stake); consul.update({ stake }); seat.backers.map(async ({ member, stake }) => fetchMemberByAccount(api, member).then(({ id }: any) => saveCommitment(Number(stake), consul.id, id) ) ); }; const saveCommitment = async ( stake: number, consulId: number, memberId: number, vote?: string ) => Commitment.findOrCreate({ where: { stake, consulId, memberId } }).then( ([c]: [CommitmentType]) => vote && c.update({ vote }) ); const fetchProposal = async (api: ApiPromise, id: number) => { if (id <= 0) return; queue.push(() => fetchProposal(api, id - 1)); const exists = await Proposal.findByIdWithIncludes(id); if (exists && exists.result !== `Pending`) { if (!exists.votes.length) queue.push(() => fetchProposalVotes(api, id)); return exists; } processing = `proposal ${id}`; const proposal = await getProposal(api, id as unknown as ProposalId); console.log(`proposal ${id}: ${proposal.result}`); await fetchMember(api, proposal.authorId); queue.push(() => fetchProposalVotes(api, id)); // save const found = await Proposal.findByPk(id); if (found) Proposal.update(proposal, { where: { id } }); else Proposal.create(proposal); return proposal; }; const saveProposalPost = (id: number, proposalId: number, data: any) => ProposalPost.findOrCreate({ where: { id } }).then( ([post]: [ProposalPostType]) => { post.update(data); post.setProposal(proposalId); console.log(post); } ); const fetchProposalPosts = async (api: ApiPromise) => { processing = `proposal posts`; getProposalPosts(api).then((posts: [any, DiscussionPost][]) => { posts.map(async (p) => { const [proposalId, id] = p[0].toHuman(); await fetchProposal(api, proposalId); const { text, created_at, author_id, edition_number } = p[1]; saveProposalPost(id, proposalId, { text: text.toHuman(), created: created_at.toNumber(), version: edition_number.toNumber(), authorId: author_id.toNumber(), }); }); }); }; const councilAt = (block: number): Promise | void => { if (block) return Council.findOne({ where: { start: { [Op.lte]: block }, end: { [Op.gte]: block } }, }); }; const fetchProposalVotes = async (api: ApiPromise, id: number) => { const proposal = await Proposal.findByPk(id); if (!proposal) return console.warn(`fetchProposalVotes: proposal ${id} not found.`); processing = `votes proposal ${proposal.id}`; // find council for creation and finalization time let councils: number[] = []; const { created, finalizedAt } = proposal; const councilStart = await councilAt(created); if (councilStart) { councilStart.addProposal(proposal.id); councils.push(councilStart.round); } const councilEnd = await councilAt(finalizedAt); if (councilEnd) councils.push(councilEnd.round); const votes = await getProposalVotes(api, id); votes?.forEach(({ memberId, vote }) => saveProposalVote(id, councils, memberId, vote) ); }; const saveProposalVote = ( proposalId: number, councils: number[], memberId: number, vote: string ): void => Consul.findOne({ where: { memberId, councilRound: { [Op.or]: councils } }, }).then((consul: any) => { if (!consul) return console.log(`consul not found: member ${memberId}`, councils); const where = { memberId, proposalId, consulId: consul.id }; if (!consul) return console.log(`saveProposalVote: No Consul found.`, where); ProposalVote.findOne({ where }).then((exists: any) => { const pv = { ...where, vote }; if (!exists) ProposalVote.create(pv); }); }); // accounts const fetchAccounts = async (api: ApiPromise) => { processing = `accounts`; getAccounts(api).then((accounts: AccountBalance[]) => accounts.map(({ accountId }) => Account.findOrCreate({ where: { key: accountId } }) ) ); }; const fetchMemberByAccount = async ( api: ApiPromise, accountId: AccountId ): Promise => { const rootKey = accountId.toHuman(); const member = await Member.findOne({ where: { rootKey } }); if (member) return member; const id = await getMemberIdByAccount(api, accountId); if (id) return fetchMember(api, id.toNumber()); }; const fetchMember = async ( api: ApiPromise, id: number ): Promise => { if (id > 0) queue.push(() => fetchMember(api, id - 1)); const exists = await Member.findByPk(id); if (exists && exists.handle) return exists; processing = `member ${id}`; const membership = await getMember(api, id); if (!membership) { console.warn(`fetchMember: empty membership`); return; } const about = String(membership.about); const handle = String(membership.handle); const created = +membership.registered_at_block; const rootKey = String(membership.root_account); const where = { id }; return Member.findOrCreate({ where }).then(([member]: [MemberType]) => { member.update({ id, about, created, handle, rootKey }); Account.findOrCreate({ where: { key: rootKey } }).then(([account]: any) => account?.setMember(id) ); return member; }); };