GraphQLSource.ts 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import { IProcessorSource, EventFilter } from '.';
  2. import { SubstrateEvent } from '../model';
  3. import { ProcessorOptions } from '../node';
  4. import { GraphQLClient } from 'graphql-request';
  5. import { Inject } from 'typedi';
  6. import Debug from 'debug';
  7. import { EventEmitter } from 'events';
  8. const debug = Debug('index-builder:processor');
  9. const GET_EVENTS_AFTER_QUERY = `
  10. query GetEventsAfterID( $afterID: ID, $names: [String!]!, $fromBlock: Int, $toBlock: Int, $size: Int) {
  11. substrateEventsAfter(where: { name_in: $names, blockNumber_gte: $fromBlock, blockNumber_lte: $toBlock }, afterID: $afterID, limit: $size) {
  12. id
  13. name
  14. method
  15. params {
  16. name
  17. type
  18. value
  19. }
  20. index
  21. blockNumber
  22. extrinsic {
  23. method
  24. section
  25. versionInfo
  26. signer
  27. args
  28. signature
  29. hash
  30. tip
  31. }
  32. }
  33. }
  34. `
  35. // to be replaced with a ws subsription
  36. const GET_INDEXER_HEAD = `
  37. query {
  38. indexerStatus {
  39. head
  40. }
  41. }
  42. `
  43. export class GraphQLSource extends EventEmitter implements IProcessorSource {
  44. private graphClient: GraphQLClient;
  45. constructor(@Inject('ProcessorOptions') protected options: ProcessorOptions) {
  46. super();
  47. const _endpoint = options.indexerEndpointURL || process.env.INDEXER_ENDPOINT_URL;
  48. if (!_endpoint) {
  49. throw new Error(`Indexer endpoint is not provided`);
  50. }
  51. debug(`Using Indexer API endpoint ${_endpoint}`);
  52. this.graphClient = new GraphQLClient(_endpoint);
  53. }
  54. // TODO: implement
  55. subscribe(events: string[]): Promise<void> {
  56. throw new Error("Method not implemented.");
  57. }
  58. async indexerHead(): Promise<number> {
  59. const status = await this.graphClient.request<{ indexerStatus: { head: number } }>(GET_INDEXER_HEAD);
  60. return status.indexerStatus.head
  61. }
  62. async nextBatch(filter: EventFilter, size: number): Promise<SubstrateEvent[]> {
  63. debug(`Filter: ${JSON.stringify(filter, null, 2)}`)
  64. const data = await this.graphClient.request<{ substrateEventsAfter: SubstrateEvent[] }>(GET_EVENTS_AFTER_QUERY,
  65. { size,
  66. names: filter.names,
  67. afterID: filter.afterID,
  68. fromBlock: filter.fromBlock,
  69. toBlock: filter.toBlock
  70. });
  71. debug(`Fetched ${data.substrateEventsAfter.length} events`);
  72. debug(`Events: ${JSON.stringify(data, null, 2)} events`);
  73. return data.substrateEventsAfter;
  74. }
  75. }