NetworkingService.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. import { ReadonlyConfig } from '../../types/config'
  2. import { QueryNodeApi } from './query-node/api'
  3. import { Logger } from 'winston'
  4. import { LoggingService } from '../logging'
  5. import { StorageNodeApi } from './storage-node/api'
  6. import { PendingDownloadData, StateCacheService } from '../cache/StateCacheService'
  7. import { DataObjectDetailsFragment } from './query-node/generated/queries'
  8. import axios from 'axios'
  9. import {
  10. StorageNodeEndpointData,
  11. DataObjectAccessPoints,
  12. DataObjectData,
  13. DataObjectInfo,
  14. StorageNodeDownloadResponse,
  15. DownloadData,
  16. } from '../../types'
  17. import queue from 'queue'
  18. import { DistributionBucketOperatorStatus } from './query-node/generated/schema'
  19. import http from 'http'
  20. import https from 'https'
  21. import { parseAxiosError } from '../parsers/errors'
  22. const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10
  23. const MAX_CONCURRENT_RESPONSE_TIME_CHECKS = 10
  24. const STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS = 60000
  25. export class NetworkingService {
  26. private config: ReadonlyConfig
  27. private queryNodeApi: QueryNodeApi
  28. // private runtimeApi: RuntimeApi
  29. private logging: LoggingService
  30. private stateCache: StateCacheService
  31. private logger: Logger
  32. private storageNodeEndpointsCheckInterval: NodeJS.Timeout
  33. private testLatencyQueue: queue
  34. private downloadQueue: queue
  35. constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
  36. axios.defaults.timeout = config.limits.outboundRequestsTimeout
  37. const httpConfig: http.AgentOptions | https.AgentOptions = {
  38. keepAlive: true,
  39. timeout: config.limits.outboundRequestsTimeout,
  40. maxSockets: config.limits.maxConcurrentOutboundConnections,
  41. }
  42. axios.defaults.httpAgent = new http.Agent(httpConfig)
  43. axios.defaults.httpsAgent = new https.Agent(httpConfig)
  44. this.config = config
  45. this.logging = logging
  46. this.stateCache = stateCache
  47. this.logger = logging.createLogger('NetworkingManager')
  48. this.queryNodeApi = new QueryNodeApi(config.endpoints.queryNode)
  49. // this.runtimeApi = new RuntimeApi(config.endpoints.substrateNode)
  50. this.checkActiveStorageNodeEndpoints()
  51. this.storageNodeEndpointsCheckInterval = setInterval(
  52. this.checkActiveStorageNodeEndpoints.bind(this),
  53. STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS
  54. )
  55. // Queues
  56. this.testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true }).on(
  57. 'end',
  58. () => {
  59. this.logger.verbose('Mean response times updated', {
  60. responseTimes: this.stateCache.getStorageNodeEndpointsMeanResponseTimes(),
  61. })
  62. }
  63. )
  64. this.downloadQueue = queue({ concurrency: config.limits.maxConcurrentStorageNodeDownloads, autostart: true })
  65. }
  66. public clearIntervals(): void {
  67. clearInterval(this.storageNodeEndpointsCheckInterval)
  68. }
  69. private validateNodeEndpoint(endpoint: string): void {
  70. const endpointUrl = new URL(endpoint)
  71. if (endpointUrl.protocol !== 'http:' && endpointUrl.protocol !== 'https:') {
  72. throw new Error(`Invalid endpoint protocol: ${endpointUrl.protocol}`)
  73. }
  74. }
  75. private filterStorageNodeEndpoints(input: StorageNodeEndpointData[]): StorageNodeEndpointData[] {
  76. return input.filter((b) => {
  77. try {
  78. this.validateNodeEndpoint(b.endpoint)
  79. return true
  80. } catch (err) {
  81. this.logger.warn(`Invalid storage node endpoint: ${b.endpoint} for bucket ${b.bucketId}`, {
  82. bucketId: b.bucketId,
  83. endpoint: b.endpoint,
  84. err,
  85. '@pauseFor': 900,
  86. })
  87. return false
  88. }
  89. })
  90. }
  91. private prepareStorageNodeEndpoints(details: DataObjectDetailsFragment) {
  92. const endpointsData = details.storageBag.storageAssignments
  93. .filter(
  94. (a) =>
  95. a.storageBucket.operatorStatus.__typename === 'StorageBucketOperatorStatusActive' &&
  96. a.storageBucket.operatorMetadata?.nodeEndpoint
  97. )
  98. .map((a) => ({
  99. bucketId: a.storageBucket.id,
  100. endpoint: a.storageBucket.operatorMetadata!.nodeEndpoint!,
  101. }))
  102. return this.filterStorageNodeEndpoints(endpointsData)
  103. }
  104. private parseDataObjectAccessPoints(details: DataObjectDetailsFragment): DataObjectAccessPoints {
  105. return {
  106. storageNodes: this.prepareStorageNodeEndpoints(details),
  107. }
  108. }
  109. public async dataObjectInfo(objectId: string): Promise<DataObjectInfo> {
  110. const details = await this.queryNodeApi.getDataObjectDetails(objectId)
  111. if (details) {
  112. this.stateCache.setObjectContentHash(objectId, details.ipfsHash)
  113. }
  114. return {
  115. exists: !!details,
  116. isSupported:
  117. (this.config.buckets === 'all' &&
  118. details?.storageBag.distirbutionAssignments.some((d) =>
  119. d.distributionBucket.operators.some(
  120. (o) => o.workerId === this.config.workerId && o.status === DistributionBucketOperatorStatus.Active
  121. )
  122. )) ||
  123. (Array.isArray(this.config.buckets) &&
  124. this.config.buckets.some((bucketId) =>
  125. details?.storageBag.distirbutionAssignments
  126. .map((a) => a.distributionBucket.id)
  127. .includes(bucketId.toString())
  128. )),
  129. data: details
  130. ? {
  131. objectId,
  132. accessPoints: this.parseDataObjectAccessPoints(details),
  133. contentHash: details.ipfsHash,
  134. size: parseInt(details.size),
  135. }
  136. : undefined,
  137. }
  138. }
  139. private sortEndpointsByMeanResponseTime(endpoints: string[]) {
  140. return endpoints.sort(
  141. (a, b) =>
  142. this.stateCache.getStorageNodeEndpointMeanResponseTime(a) -
  143. this.stateCache.getStorageNodeEndpointMeanResponseTime(b)
  144. )
  145. }
  146. private downloadJob(
  147. pendingDownload: PendingDownloadData,
  148. downloadData: DownloadData,
  149. onSourceFound: (response: StorageNodeDownloadResponse) => void,
  150. onError: (error: Error) => void,
  151. onFinished?: () => void
  152. ): Promise<void> {
  153. const {
  154. objectData: { contentHash, accessPoints },
  155. startAt,
  156. } = downloadData
  157. pendingDownload.status = 'LookingForSource'
  158. return new Promise<void>((resolve, reject) => {
  159. // Handlers:
  160. const fail = (message: string) => {
  161. this.stateCache.dropPendingDownload(contentHash)
  162. onError(new Error(message))
  163. reject(new Error(message))
  164. }
  165. const sourceFound = (response: StorageNodeDownloadResponse) => {
  166. this.logger.info('Download source chosen', { contentHash, source: response.config.url })
  167. pendingDownload.status = 'Downloading'
  168. onSourceFound(response)
  169. }
  170. const finish = () => {
  171. onFinished && onFinished()
  172. resolve()
  173. }
  174. const storageEndpoints = this.sortEndpointsByMeanResponseTime(
  175. accessPoints?.storageNodes.map((n) => n.endpoint) || []
  176. )
  177. this.logger.info('Downloading new data object', {
  178. contentHash,
  179. possibleSources: storageEndpoints.map((e) => ({
  180. endpoint: e,
  181. meanResponseTime: this.stateCache.getStorageNodeEndpointMeanResponseTime(e),
  182. })),
  183. })
  184. if (!storageEndpoints.length) {
  185. return fail('No storage endpoints available to download the data object from')
  186. }
  187. const availabilityQueue = queue({
  188. concurrency: MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD,
  189. autostart: true,
  190. })
  191. const objectDownloadQueue = queue({ concurrency: 1, autostart: true })
  192. storageEndpoints.forEach(async (endpoint) => {
  193. availabilityQueue.push(async () => {
  194. const api = new StorageNodeApi(endpoint, this.logging)
  195. const available = await api.isObjectAvailable(contentHash)
  196. if (!available) {
  197. throw new Error('Not avilable')
  198. }
  199. return endpoint
  200. })
  201. })
  202. availabilityQueue.on('success', (endpoint) => {
  203. availabilityQueue.stop()
  204. const job = async () => {
  205. const api = new StorageNodeApi(endpoint, this.logging)
  206. const response = await api.downloadObject(contentHash, startAt)
  207. return response
  208. }
  209. objectDownloadQueue.push(job)
  210. })
  211. availabilityQueue.on('error', () => {
  212. /*
  213. Do nothing.
  214. The handler is needed to avoid unhandled promise rejection
  215. */
  216. })
  217. availabilityQueue.on('end', () => {
  218. if (!objectDownloadQueue.length) {
  219. fail('Failed to download the object from any availablable storage provider')
  220. }
  221. })
  222. objectDownloadQueue.on('error', (err) => {
  223. this.logger.error('Download attempt from storage node failed after availability was confirmed:', { err })
  224. })
  225. objectDownloadQueue.on('end', () => {
  226. if (availabilityQueue.length) {
  227. availabilityQueue.start()
  228. } else {
  229. fail('Failed to download the object from any availablable storage provider')
  230. }
  231. })
  232. objectDownloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
  233. availabilityQueue.removeAllListeners().end()
  234. objectDownloadQueue.removeAllListeners().end()
  235. response.data.on('close', finish).on('error', finish).on('end', finish)
  236. sourceFound(response)
  237. })
  238. })
  239. }
  240. public downloadDataObject(downloadData: DownloadData): Promise<StorageNodeDownloadResponse> | null {
  241. const {
  242. objectData: { contentHash, size },
  243. } = downloadData
  244. if (this.stateCache.getPendingDownload(contentHash)) {
  245. // Already downloading
  246. return null
  247. }
  248. let resolveDownload: (response: StorageNodeDownloadResponse) => void, rejectDownload: (err: Error) => void
  249. const downloadPromise = new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
  250. resolveDownload = resolve
  251. rejectDownload = reject
  252. })
  253. // Queue the download
  254. const pendingDownload = this.stateCache.newPendingDownload(contentHash, size, downloadPromise)
  255. this.downloadQueue.push(() => this.downloadJob(pendingDownload, downloadData, resolveDownload, rejectDownload))
  256. return downloadPromise
  257. }
  258. async fetchSupportedDataObjects(): Promise<DataObjectData[]> {
  259. const data =
  260. this.config.buckets === 'all'
  261. ? await this.queryNodeApi.getDistributionBucketsWithObjectsByWorkerId(this.config.workerId)
  262. : await this.queryNodeApi.getDistributionBucketsWithObjectsByIds(this.config.buckets.map((id) => id.toString()))
  263. const objectsData: DataObjectData[] = []
  264. data.forEach((bucket) => {
  265. bucket.bagAssignments.forEach((a) => {
  266. a.storageBag.objects.forEach((object) => {
  267. const { ipfsHash, id, size } = object
  268. objectsData.push({ contentHash: ipfsHash, objectId: id, size: parseInt(size) })
  269. })
  270. })
  271. })
  272. return objectsData
  273. }
  274. async checkActiveStorageNodeEndpoints(): Promise<void> {
  275. const activeStorageOperators = await this.queryNodeApi.getActiveStorageBucketOperatorsData()
  276. const endpoints = this.filterStorageNodeEndpoints(
  277. activeStorageOperators.map(({ id, operatorMetadata }) => ({
  278. bucketId: id,
  279. endpoint: operatorMetadata!.nodeEndpoint!,
  280. }))
  281. )
  282. this.logger.verbose('Checking nearby storage nodes...', { validEndpointsCount: endpoints.length })
  283. endpoints.forEach(({ endpoint }) =>
  284. this.testLatencyQueue.push(async () => {
  285. await this.checkResponseTime(endpoint)
  286. })
  287. )
  288. }
  289. async checkResponseTime(endpoint: string): Promise<void> {
  290. const start = Date.now()
  291. this.logger.debug(`Sending storage node response-time check request to: ${endpoint}`, { endpoint })
  292. try {
  293. // TODO: Use a status endpoint once available?
  294. await axios.get(endpoint, {
  295. headers: {
  296. connection: 'close',
  297. },
  298. })
  299. throw new Error('Unexpected status 200')
  300. } catch (err) {
  301. if (axios.isAxiosError(err) && err.response?.status === 404) {
  302. // This is the expected outcome currently
  303. const responseTime = Date.now() - start
  304. this.logger.debug(`${endpoint} check request response time: ${responseTime}`, { endpoint, responseTime })
  305. this.stateCache.setStorageNodeEndpointResponseTime(endpoint, responseTime)
  306. } else {
  307. this.logger.warn(`${endpoint} check request unexpected response`, {
  308. endpoint,
  309. err: axios.isAxiosError(err) ? parseAxiosError(err) : err,
  310. '@pauseFor': 900,
  311. })
  312. }
  313. }
  314. }
  315. }