Selaa lähdekoodia

Improve configuration & error handling

Leszek Wiesner 3 vuotta sitten
vanhempi
commit
7352d3139d

+ 5 - 1
distributor-node/config.yml

@@ -11,7 +11,11 @@ log:
   file: debug
   console: info
   # elastic: info
-storageLimit: 100G
+limits:
+  storage: 100G
+  maxConcurrentStorageNodeDownloads: 100
+  maxConcurrentOutboundConnections: 300
+  outboundRequestsTimeout: 5000
 port: 3334
 keys: [//Alice]
 buckets: 'all'

+ 5 - 1
distributor-node/config/docker/config.docker.yml

@@ -10,7 +10,11 @@ directories:
 log:
   console: info
   # elastic: info
-storageLimit: 100G
+limits:
+  storage: 100G
+  maxConcurrentStorageNodeDownloads: 100
+  maxConcurrentOutboundConnections: 300
+  outboundRequestsTimeout: 5000
 port: 3334
 keys: [//Alice]
 buckets: 'all'

+ 3 - 1
distributor-node/package.json

@@ -16,6 +16,8 @@
     "@oclif/plugin-help": "^2",
     "@apollo/client": "^3.2.5",
     "graphql": "^14.7.0",
+    "winston": "^3.3.3",
+    "fast-safe-stringify": "^2.1.1",
     "ajv": "^7",
     "axios": "^0.21.1",
     "cross-fetch": "^3.1.4",
@@ -105,7 +107,7 @@
     "generate:api:distributor-node": "yarn openapi-generator-cli generate -i ./src/api-spec/openapi.yml -g typescript-axios -o ./src/services/networking/distributor-node/generated",
     "generate:api:all": "yarn generate:api:storage-node && yarn generate:api:distributor-node",
     "generate:all": "yarn generate:types:all && yarn generate:api:all",
-    "build": "tsc --build tsconfig.json && cp -r ./src/api-spec ./lib/api-spec",
+    "build": "rm -rf lib && tsc --build tsconfig.json && cp -r ./src/api-spec ./lib/api-spec",
     "lint": "eslint ./src --ext .ts",
     "format": "prettier ./ --write",
     "checks": "tsc --noEmit --pretty && prettier ./ --check && yarn lint",

+ 1 - 1
distributor-node/src/services/content/ContentService.ts

@@ -23,7 +23,7 @@ export class ContentService {
   }
 
   public get freeSpace(): number {
-    return this.config.storageLimit - this.contentSizeSum
+    return this.config.limits.storage - this.contentSizeSum
   }
 
   public constructor(config: ReadonlyConfig, logging: LoggingService, stateCache: StateCacheService) {

+ 2 - 1
distributor-node/src/services/logging/LoggingService.ts

@@ -4,6 +4,7 @@ import { ElasticsearchTransport } from 'winston-elasticsearch'
 import { ReadonlyConfig } from '../../types'
 import { blake2AsHex } from '@polkadot/util-crypto'
 import { Format } from 'logform'
+import stringify from 'fast-safe-stringify'
 import NodeCache from 'node-cache'
 
 const cliColors = {
@@ -43,7 +44,7 @@ const cliFormat = winston.format.combine(
   winston.format.printf(
     (info) =>
       `${info.timestamp} ${info.label} ${info.level}: ${info.message}` +
-      (Object.keys(info.metadata).length ? `\n${JSON.stringify(info.metadata, null, 4)}` : '')
+      (Object.keys(info.metadata).length ? `\n${stringify(info.metadata, undefined, 4)}` : '')
   )
 )
 

+ 32 - 21
distributor-node/src/services/networking/NetworkingService.ts

@@ -15,19 +15,14 @@ import {
   DownloadData,
 } from '../../types'
 import queue from 'queue'
-import _ from 'lodash'
 import { DistributionBucketOperatorStatus } from './query-node/generated/schema'
 import http from 'http'
 import https from 'https'
+import { parseAxiosError } from '../parsers/errors'
 
-// TODO: Adjust limits and intervals
+const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10
 const MAX_CONCURRENT_RESPONSE_TIME_CHECKS = 10
-const MAX_CONCURRENT_DOWNLOADS = 10
-const MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD = 10 // 10 pending download * 10 availibility checks per download = 100 concurrent requests
-
 const STORAGE_NODE_ENDPOINTS_CHECK_INTERVAL_MS = 60000
-const STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT = 5000
-const GLOBAL_AXIOS_TIMEOUT = 10000
 
 export class NetworkingService {
   private config: ReadonlyConfig
@@ -42,9 +37,14 @@ export class NetworkingService {
   private downloadQueue: queue
 
   constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
-    axios.defaults.timeout = GLOBAL_AXIOS_TIMEOUT
-    axios.defaults.httpAgent = new http.Agent({ keepAlive: true, timeout: GLOBAL_AXIOS_TIMEOUT })
-    axios.defaults.httpsAgent = new https.Agent({ keepAlive: true, timeout: GLOBAL_AXIOS_TIMEOUT })
+    axios.defaults.timeout = config.limits.outboundRequestsTimeout
+    const httpConfig: http.AgentOptions | https.AgentOptions = {
+      keepAlive: true,
+      timeout: config.limits.outboundRequestsTimeout,
+      maxSockets: config.limits.maxConcurrentOutboundConnections,
+    }
+    axios.defaults.httpAgent = new http.Agent(httpConfig)
+    axios.defaults.httpsAgent = new https.Agent(httpConfig)
     this.config = config
     this.logging = logging
     this.stateCache = stateCache
@@ -65,7 +65,7 @@ export class NetworkingService {
         })
       }
     )
-    this.downloadQueue = queue({ concurrency: MAX_CONCURRENT_DOWNLOADS, autostart: true })
+    this.downloadQueue = queue({ concurrency: config.limits.maxConcurrentStorageNodeDownloads, autostart: true })
   }
 
   public clearIntervals(): void {
@@ -159,9 +159,10 @@ export class NetworkingService {
   private downloadJob(
     pendingDownload: PendingDownloadData,
     downloadData: DownloadData,
-    onSuccess: (response: StorageNodeDownloadResponse) => void,
-    onError: (error: Error) => void
-  ): Promise<StorageNodeDownloadResponse> {
+    onSourceFound: (response: StorageNodeDownloadResponse) => void,
+    onError: (error: Error) => void,
+    onFinished?: () => void
+  ): Promise<void> {
     const {
       objectData: { contentHash, accessPoints },
       startAt,
@@ -169,17 +170,23 @@ export class NetworkingService {
 
     pendingDownload.status = 'LookingForSource'
 
-    return new Promise((resolve, reject) => {
+    return new Promise<void>((resolve, reject) => {
+      // Handlers:
       const fail = (message: string) => {
         this.stateCache.dropPendingDownload(contentHash)
         onError(new Error(message))
         reject(new Error(message))
       }
-      const success = (response: StorageNodeDownloadResponse) => {
+
+      const sourceFound = (response: StorageNodeDownloadResponse) => {
         this.logger.info('Download source chosen', { contentHash, source: response.config.url })
         pendingDownload.status = 'Downloading'
-        onSuccess(response)
-        resolve(response)
+        onSourceFound(response)
+      }
+
+      const finish = () => {
+        onFinished && onFinished()
+        resolve()
       }
 
       const storageEndpoints = this.sortEndpointsByMeanResponseTime(
@@ -252,7 +259,8 @@ export class NetworkingService {
       objectDownloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
         availabilityQueue.removeAllListeners().end()
         objectDownloadQueue.removeAllListeners().end()
-        success(response)
+        response.data.on('close', finish).on('error', finish).on('end', finish)
+        sourceFound(response)
       })
     })
   }
@@ -321,7 +329,6 @@ export class NetworkingService {
     try {
       // TODO: Use a status endpoint once available?
       await axios.get(endpoint, {
-        timeout: STORAGE_NODE_ENDPOINT_CHECK_TIMEOUT,
         headers: {
           connection: 'close',
         },
@@ -334,7 +341,11 @@ export class NetworkingService {
         this.logger.debug(`${endpoint} check request response time: ${responseTime}`, { endpoint, responseTime })
         this.stateCache.setStorageNodeEndpointResponseTime(endpoint, responseTime)
       } else {
-        this.logger.warn(`${endpoint} check request unexpected response`, { endpoint, err, '@pauseFor': 900 })
+        this.logger.warn(`${endpoint} check request unexpected response`, {
+          endpoint,
+          err: axios.isAxiosError(err) ? parseAxiosError(err) : err,
+          '@pauseFor': 900,
+        })
       }
     }
   }

+ 2 - 1
distributor-node/src/services/networking/storage-node/api.ts

@@ -4,6 +4,7 @@ import axios, { AxiosRequestConfig } from 'axios'
 import { LoggingService } from '../../logging'
 import { Logger } from 'winston'
 import { StorageNodeDownloadResponse } from '../../../types'
+import { parseAxiosError } from '../../parsers/errors'
 
 export class StorageNodeApi {
   private logger: Logger
@@ -27,7 +28,7 @@ export class StorageNodeApi {
       return true
     } catch (err) {
       if (axios.isAxiosError(err)) {
-        this.logger.debug('Data object not available', { err })
+        this.logger.debug('Data object not available', { err: parseAxiosError(err) })
         return false
       }
       this.logger.error('Unexpected error while requesting data object', { err })

+ 5 - 2
distributor-node/src/services/parsers/ConfigParserService.ts

@@ -94,7 +94,7 @@ export class ConfigParserService {
 
     // Normalize values
     const directories = this.resolveConfigDirectoryPaths(configJson.directories, configPath)
-    const storageLimit = this.parseBytesize(configJson.storageLimit)
+    const storageLimit = this.parseBytesize(configJson.limits.storage)
 
     if (storageLimit < MIN_CACHE_SIZE) {
       throw new Error('Cache storage limit should be at least 20G!')
@@ -103,7 +103,10 @@ export class ConfigParserService {
     const parsedConfig: Config = {
       ...configJson,
       directories,
-      storageLimit,
+      limits: {
+        ...configJson.limits,
+        storage: storageLimit,
+      },
     }
 
     return parsedConfig

+ 14 - 0
distributor-node/src/services/parsers/errors.ts

@@ -0,0 +1,14 @@
+import { AxiosError } from 'axios'
+
+export function parseAxiosError(e: AxiosError) {
+  return {
+    message: e.message,
+    stack: e.stack,
+    response: {
+      data: e.response?.data,
+      status: e.response?.status,
+      statusText: e.response?.statusText,
+      headers: e.response?.headers,
+    },
+  }
+}

+ 1 - 1
distributor-node/src/services/server/controllers/public.ts

@@ -241,7 +241,7 @@ export class PublicApiController {
     const data: StatusResponse = {
       id: this.config.id,
       objectsInCache: this.stateCache.getCachedContentLength(),
-      storageLimit: this.config.storageLimit,
+      storageLimit: this.config.limits.storage,
       storageUsed: this.content.usedSpace,
       uptime: Math.floor(process.uptime()),
       downloadsInProgress: this.stateCache.getPendingDownloadsCount(),

+ 7 - 2
distributor-node/src/services/validation/schemas/configSchema.ts

@@ -7,7 +7,7 @@ export const bytesizeRegex = new RegExp(`^[0-9]+(${bytesizeUnits.join('|')})$`)
 
 export const configSchema: JSONSchema4 = {
   type: 'object',
-  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'port', 'storageLimit', 'workerId'],
+  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'port', 'workerId', 'limits'],
   additionalProperties: false,
   properties: {
     id: { type: 'string' },
@@ -35,7 +35,12 @@ export const configSchema: JSONSchema4 = {
         elastic: { type: 'string', enum: [...Object.keys(winston.config.npm.levels), 'off'] },
       },
     },
-    storageLimit: { type: 'string', pattern: bytesizeRegex.source },
+    limits: strictObject({
+      storage: { type: 'string', pattern: bytesizeRegex.source },
+      maxConcurrentStorageNodeDownloads: { type: 'integer', minimum: 1 },
+      maxConcurrentOutboundConnections: { type: 'integer', minimum: 1 },
+      outboundRequestsTimeout: { type: 'integer', minimum: 1 },
+    }),
     port: { type: 'integer', minimum: 0 },
     keys: { type: 'array', items: { type: 'string' }, minItems: 1 },
     buckets: {

+ 4 - 2
distributor-node/src/types/config.ts

@@ -1,7 +1,9 @@
 import { ConfigJson } from './generated/ConfigJson'
 import { DeepReadonly } from './common'
 
-export type Config = Omit<ConfigJson, 'storageLimit'> & {
-  storageLimit: number
+export type Config = Omit<ConfigJson, 'limits'> & {
+  limits: Omit<ConfigJson['limits'], 'storage'> & {
+    storage: number
+  }
 }
 export type ReadonlyConfig = DeepReadonly<Config>

+ 6 - 1
distributor-node/src/types/generated/ConfigJson.d.ts

@@ -22,7 +22,12 @@ export interface ConfigJson {
     console?: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly' | 'off'
     elastic?: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly' | 'off'
   }
-  storageLimit: string
+  limits: {
+    storage: string
+    maxConcurrentStorageNodeDownloads: number
+    maxConcurrentOutboundConnections: number
+    outboundRequestsTimeout: number
+  }
   port: number
   keys: [string, ...string[]]
   buckets: [number, ...number[]] | 'all'

+ 1 - 1
docker-compose.yml

@@ -63,7 +63,7 @@ services:
     #   JOYSTREAM_DISTRIBUTOR__LOG__CONSOLE: "off"
     #   JOYSTREAM_DISTRIBUTOR__LOG__FILE: "off"
     #   JOYSTREAM_DISTRIBUTOR__LOG__ELASTIC: "off"
-    #   JOYSTREAM_DISTRIBUTOR__STORAGE_LIMIT: 50G
+    #   JOYSTREAM_DISTRIBUTOR__LIMITS__STORAGE: 50G
     #   JOYSTREAM_DISTRIBUTOR__PORT: 1234
     #   JOYSTREAM_DISTRIBUTOR__KEYS: "[\"//Bob\"]"
     #   JOYSTREAM_DISTRIBUTOR__BUCKETS: "[1,2]"