Browse Source

Introduce authorized OperatorApi

Leszek Wiesner 3 years ago
parent
commit
ea01bf917b
26 changed files with 951 additions and 217 deletions
  1. 5 1
      distributor-node/config.yml
  2. 10 4
      distributor-node/package.json
  3. 123 0
      distributor-node/src/api-spec/operator.yml
  4. 10 15
      distributor-node/src/api-spec/public.yml
  5. 34 9
      distributor-node/src/app/index.ts
  6. 58 0
      distributor-node/src/command-base/node.ts
  7. 39 0
      distributor-node/src/commands/node/set-buckets.ts
  8. 29 0
      distributor-node/src/commands/node/set-worker.ts
  9. 17 0
      distributor-node/src/commands/node/shutdown.ts
  10. 17 0
      distributor-node/src/commands/node/start-public-api.ts
  11. 17 0
      distributor-node/src/commands/node/stop-public-api.ts
  12. 2 1
      distributor-node/src/commands/start.ts
  13. 18 6
      distributor-node/src/schemas/configSchema.ts
  14. 1 0
      distributor-node/src/schemas/utils.ts
  15. 145 0
      distributor-node/src/services/httpApi/HttpApiBase.ts
  16. 0 127
      distributor-node/src/services/httpApi/HttpApiService.ts
  17. 90 0
      distributor-node/src/services/httpApi/OperatorApiService.ts
  18. 60 0
      distributor-node/src/services/httpApi/PublicApiService.ts
  19. 76 0
      distributor-node/src/services/httpApi/controllers/operator.ts
  20. 9 2
      distributor-node/src/services/httpApi/controllers/public.ts
  21. 66 50
      distributor-node/src/types/generated/ConfigJson.d.ts
  22. 114 0
      distributor-node/src/types/generated/OperatorApi.ts
  23. 4 0
      distributor-node/src/types/generated/PublicApi.ts
  24. 2 1
      distributor-node/src/types/index.ts
  25. 4 0
      distributor-node/src/types/operatorApi.ts
  26. 1 1
      distributor-node/src/types/publicApi.ts

+ 5 - 1
distributor-node/config.yml

@@ -27,7 +27,11 @@ intervals:
   saveCacheState: 60
   checkStorageNodeResponseTimes: 60
   cacheCleanup: 60
-port: 3334
+publicApi:
+  port: 3334
+operatorApi:
+  port: 3335
+  hmacSecret: this-is-not-so-secret
 keys:
   - suri: //Alice
   # - mnemonic: "escape naive annual throw tragic achieve grunt verify cram note harvest problem"

+ 10 - 4
distributor-node/package.json

@@ -44,7 +44,8 @@
     "js-image-generator": "^1.0.3",
     "url-join": "^4.0.1",
     "@types/url-join": "^4.0.1",
-    "winston-daily-rotate-file": "^4.5.5"
+    "winston-daily-rotate-file": "^4.5.5",
+    "jsonwebtoken": "^8.5.1"
   },
   "devDependencies": {
     "@graphql-codegen/cli": "^1.21.4",
@@ -108,6 +109,9 @@
       "operator": {
         "description": "Commands for performing node operator (Distribution Working Group worker) on-chain duties (like accepting bucket invitations, setting node metadata)"
       },
+      "node": {
+        "description": "Commands for interacting with a running distributor node through OperatorApi"
+      },
       "dev": {
         "description":"Developer utility commands"
       }
@@ -125,13 +129,15 @@
     "version": "generate:docs:cli && git add docs/cli/*",
     "generate:types:json-schema": "yarn ts-node ./src/schemas/scripts/generateTypes.ts",
     "generate:types:graphql": "yarn graphql-codegen -c ./src/services/networking/query-node/codegen.yml",
-    "generate:types:openapi": "yarn openapi-typescript ./src/api-spec/openapi.yml -o ./src/types/generated/OpenApi.ts -c ../prettierrc.js",
-    "generate:types:all": "yarn generate:types:json-schema && yarn generate:types:graphql && yarn generate:types:openapi",
+    "generate:types:public-api": "yarn openapi-typescript ./src/api-spec/public.yml -o ./src/types/generated/PublicApi.ts -c ../prettierrc.js",
+    "generate:types:operator-api": "yarn openapi-typescript ./src/api-spec/operator.yml -o ./src/types/generated/OperatorApi.ts -c ../prettierrc.js",
+    "generate:types:api": "yarn generate:types:public-api && yarn generate:types:operator-api",
+    "generate:types:all": "yarn generate:types:json-schema && yarn generate:types:graphql && yarn generate:types:api",
     "generate:api:storage-node": "yarn openapi-generator-cli generate -i ../storage-node-v2/src/api-spec/openapi.yaml -g typescript-axios -o ./src/services/networking/storage-node/generated",
     "generate:api:all": "yarn generate:api:storage-node",
     "generate:docs:cli": "yarn oclif-dev readme --multi --dir ./docs/commands",
     "generate:docs:config": "yarn ts-node --transpile-only ./src/schemas/scripts/generateConfigDoc.ts",
-    "generate:docs:api": "yarn widdershins ./src/api-spec/openapi.yml --language_tabs javascript:JavaScript shell:Shell -o ./docs/api/index.md -u ./docs/api/templates",
+    "generate:docs:public-api": "yarn widdershins ./src/api-spec/public.yml --language_tabs javascript:JavaScript shell:Shell -o ./docs/api/index.md -u ./docs/api/templates",
     "generate:docs:toc": "yarn md-magic --path ./docs/**/*.md",
     "generate:docs:all": "yarn generate:docs:cli && yarn generate:docs:config && yarn generate:docs:api && yarn generate:docs:toc",
     "generate:all": "yarn generate:types:all && yarn generate:api:all && yarn generate:docs:all",

+ 123 - 0
distributor-node/src/api-spec/operator.yml

@@ -0,0 +1,123 @@
+openapi: 3.0.3
+info:
+  title: Distributor node operator API
+  description: Distributor node operator API
+  contact:
+    email: info@joystream.org
+  license:
+    name: GPL-3.0-only
+    url: https://spdx.org/licenses/GPL-3.0-only.html
+  version: 0.1.0
+servers:
+  - url: http://localhost:3335/api/v1/
+
+paths:
+  /stop-api:
+    post:
+      operationId: operator.stopApi
+      description: Turns off the public api.
+      responses:
+        200:
+          description: OK
+        401:
+          description: Not authorized
+        409:
+          description: Already stopped
+        500:
+          description: Unexpected server error
+  /start-api:
+    post:
+      operationId: operator.startApi
+      description: Turns on the public api.
+      responses:
+        200:
+          description: OK
+        401:
+          description: Not authorized
+        409:
+          description: Already started
+        500:
+          description: Unexpected server error
+  /shutdown:
+    post:
+      operationId: operator.shutdown
+      description: Shuts down the node.
+      responses:
+        200:
+          description: OK
+        401:
+          description: Not authorized
+        409:
+          description: Already shutting down
+        500:
+          description: Unexpected server error
+  /set-worker:
+    post:
+      operationId: operator.setWorker
+      description: Updates the operator worker id.
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/SetWorkerOperation'
+      responses:
+        200:
+          description: OK
+        401:
+          description: Not authorized
+        500:
+          description: Unexpected server error
+  /set-buckets:
+    post:
+      operationId: operator.setBuckets
+      description: Updates buckets supported by the node.
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/SetBucketsOperation'
+      responses:
+        200:
+          description: OK
+        401:
+          description: Not authorized
+        500:
+          description: Unexpected server error
+
+components:
+  securitySchemes:
+    OperatorAuth:
+      type: http
+      scheme: bearer
+      bearerFormat: "JWT signed with HMAC (HS256) secret key specified in distributor node's `config.operator.hmacSecret`.
+        The payload should include:
+        - `reqBody` - content of the request body
+        - `reqUrl` - request url (only pathname + query string, without origin. For example: `/api/v1/set-buckets`)"
+  schemas:
+    SetWorkerOperation:
+      type: object
+      required:
+        - workerId
+      properties:
+        workerId:
+          type: integer
+          minimum: 0
+    SetBucketsOperation:
+      type: object
+      required:
+        - buckets
+      properties:
+        buckets:
+          oneOf:
+            - type: string
+              enum: ['all']
+              description: All buckets assigned to configured workerId.
+            - type: array
+              minItems: 1
+              items:
+                type: integer
+                minimum: 0
+
+security:
+  - OperatorAuth: []
+

+ 10 - 15
distributor-node/src/api-spec/openapi.yml → distributor-node/src/api-spec/public.yml

@@ -1,7 +1,7 @@
 openapi: 3.0.3
 info:
-  title: Distributor node API
-  description: Distributor node API
+  title: Distributor node public API
+  description: Distributor node public API
   contact:
     email: info@joystream.org
   license:
@@ -9,22 +9,16 @@ info:
     url: https://spdx.org/licenses/GPL-3.0-only.html
   version: 0.1.0
 externalDocs:
-  description: Distributor node API
+  description: Distributor node public API
   url: https://github.com/Joystream/joystream/issues/2224
 servers:
   - url: http://localhost:3334/api/v1/
 
-tags:
-  - name: public
-    description: Public distributor node API
-
 paths:
   /status:
     get:
       operationId: public.status
       description: Returns json object describing current node status.
-      tags:
-        - public
       responses:
         200:
           description: OK
@@ -38,8 +32,6 @@ paths:
     get:
       operationId: public.buckets
       description: Returns list of distributed buckets
-      tags:
-        - public
       responses:
         200:
           description: OK
@@ -53,8 +45,6 @@ paths:
     head:
       operationId: public.assetHead
       description: Returns asset response headers (cache status, content type and/or length, accepted ranges etc.)
-      tags:
-        - public
       parameters:
         - $ref: '#/components/parameters/ObjectId'
       responses:
@@ -72,8 +62,6 @@ paths:
     get:
       operationId: public.asset
       description: Returns a media file.
-      tags:
-        - public
       parameters:
         - $ref: '#/components/parameters/ObjectId'
       responses:
@@ -159,6 +147,13 @@ components:
         type: string
         enum: ['external', 'local']
   schemas:
+    SetConfigBody:
+      type: object
+      properties:
+        path:
+          description: Config setting path (ie. limits.storage)
+          type: string
+
     ErrorResponse:
       type: object
       required:

+ 34 - 9
distributor-node/src/app/index.ts

@@ -1,31 +1,37 @@
-import { ReadonlyConfig } from '../types'
+import { Config } from '../types'
 import { NetworkingService } from '../services/networking'
 import { LoggingService } from '../services/logging'
 import { StateCacheService } from '../services/cache/StateCacheService'
 import { ContentService } from '../services/content/ContentService'
-import { HttpApiService } from '../services/httpApi/HttpApiService'
 import { Logger } from 'winston'
 import fs from 'fs'
 import nodeCleanup from 'node-cleanup'
 import { AppIntervals } from '../types/app'
+import { PublicApiService } from '../services/httpApi/PublicApiService'
+import { OperatorApiService } from '../services/httpApi/OperatorApiService'
 
 export class App {
-  private config: ReadonlyConfig
+  private config: Config
   private content: ContentService
   private stateCache: StateCacheService
   private networking: NetworkingService
-  private httpApi: HttpApiService
+  private publicApi: PublicApiService
+  private operatorApi: OperatorApiService | undefined
   private logging: LoggingService
   private logger: Logger
   private intervals: AppIntervals | undefined
+  private isStopping = false
 
-  constructor(config: ReadonlyConfig) {
+  constructor(config: Config) {
     this.config = config
     this.logging = LoggingService.withAppConfig(config)
     this.stateCache = new StateCacheService(config, this.logging)
     this.networking = new NetworkingService(config, this.stateCache, this.logging)
     this.content = new ContentService(config, this.logging, this.networking, this.stateCache)
-    this.httpApi = new HttpApiService(config, this.stateCache, this.content, this.logging, this.networking)
+    this.publicApi = new PublicApiService(config, this.stateCache, this.content, this.logging, this.networking)
+    if (this.config.operatorApi) {
+      this.operatorApi = new OperatorApiService(config, this, this.logging, this.publicApi)
+    }
     this.logger = this.logging.createLogger('App')
   }
 
@@ -70,6 +76,8 @@ export class App {
         throw new Error(`${dirInfo} is not writable`)
       }
     })
+
+    // TODO: Logging dir if specified
   }
 
   public async start(): Promise<void> {
@@ -79,7 +87,8 @@ export class App {
       this.stateCache.load()
       await this.content.startupInit()
       this.setIntervals()
-      this.httpApi.start()
+      this.publicApi.start()
+      this.operatorApi?.start()
     } catch (err) {
       this.logger.error('Node initialization failed!', { err })
       process.exit(-1)
@@ -87,6 +96,21 @@ export class App {
     nodeCleanup(this.exitHandler.bind(this))
   }
 
+  public stop(timeoutSec?: number): boolean {
+    if (this.isStopping) {
+      return false
+    }
+    this.logger.info(`Stopping the app${timeoutSec ? ` in ${timeoutSec} sec...` : ''}`)
+    this.isStopping = true
+    if (timeoutSec) {
+      setTimeout(() => process.kill(process.pid, 'SIGINT'), timeoutSec * 1000)
+    } else {
+      process.kill(process.pid, 'SIGINT')
+    }
+
+    return true
+  }
+
   private async exitGracefully(): Promise<void> {
     // Async exit handler - ideally should not take more than 10 sec
     // We can try to wait until some pending downloads are finished here etc.
@@ -125,8 +149,9 @@ export class App {
     this.logger.info('Exiting...')
     // Clear intervals
     this.clearIntervals()
-    // Stop the http api
-    this.httpApi.stop()
+    // Stop the http apis
+    this.publicApi.stop()
+    this.operatorApi?.stop()
     // Save cache
     try {
       this.stateCache.saveSync()

+ 58 - 0
distributor-node/src/command-base/node.ts

@@ -0,0 +1,58 @@
+import axios from 'axios'
+import urljoin from 'url-join'
+import DefaultCommandBase, { flags } from './default'
+import jwt from 'jsonwebtoken'
+import ExitCodes from './ExitCodes'
+
+export default abstract class NodeCommandBase extends DefaultCommandBase {
+  static flags = {
+    url: flags.string({
+      char: 'u',
+      description: 'Distributor node operator api base url (ie. http://localhost:3335)',
+      required: true,
+    }),
+    secret: flags.string({
+      char: 's',
+      description: 'HMAC secret key to use (will default to config.operatorApi.hmacSecret if present)',
+      required: false,
+    }),
+    ...DefaultCommandBase.flags,
+  }
+
+  protected abstract reqUrl(): string
+
+  protected reqBody(): Record<string, unknown> {
+    return {}
+  }
+
+  async run(): Promise<void> {
+    const { url, secret } = this.parse(this.constructor as typeof NodeCommandBase).flags
+
+    const hmacSecret = secret || this.appConfig.operatorApi?.hmacSecret
+
+    if (!hmacSecret) {
+      this.error('No --secret was provided and no config.operatorApi.hmacSecret is set!', {
+        exit: ExitCodes.InvalidInput,
+      })
+    }
+
+    const reqUrl = this.reqUrl()
+    const reqBody = this.reqBody()
+    const payload = { reqUrl, reqBody }
+    try {
+      await axios.post(urljoin(url, reqUrl), reqBody, {
+        headers: {
+          authorization: `bearer ${jwt.sign(payload, hmacSecret, { expiresIn: 60 })}`,
+        },
+      })
+      this.log('Request successful')
+    } catch (e) {
+      if (axios.isAxiosError(e)) {
+        this.error(`Request failed: ${e.response ? JSON.stringify(e.response.data) : e.message}`, {
+          exit: ExitCodes.ApiError,
+        })
+      }
+      this.error(e instanceof Error ? e.message : JSON.stringify(e), { exit: ExitCodes.ApiError })
+    }
+  }
+}

+ 39 - 0
distributor-node/src/commands/node/set-buckets.ts

@@ -0,0 +1,39 @@
+import { flags } from '@oclif/command'
+import ExitCodes from '../../command-base/ExitCodes'
+import NodeCommandBase from '../../command-base/node'
+import { SetBucketsOperation } from '../../types'
+
+export default class NodeSetBucketsCommand extends NodeCommandBase {
+  static description = `Send an api request to change the set of buckets distributed by given distributor node.`
+
+  static flags = {
+    all: flags.boolean({
+      char: 'a',
+      description: 'Distribute all buckets belonging to configured worker',
+      exclusive: ['bucketIds'],
+    }),
+    bucketIds: flags.integer({
+      char: 'B',
+      description: 'Set of bucket ids to distribute',
+      exclusive: ['all'],
+      multiple: true,
+    }),
+    ...NodeCommandBase.flags,
+  }
+
+  protected reqUrl(): string {
+    return '/api/v1/set-buckets'
+  }
+
+  protected reqBody(): SetBucketsOperation {
+    const {
+      flags: { all, bucketIds },
+    } = this.parse(NodeSetBucketsCommand)
+    if (!all && !bucketIds) {
+      this.error('You must provide either --bucketIds or --all flag!', { exit: ExitCodes.InvalidInput })
+    }
+    return {
+      buckets: all ? 'all' : bucketIds,
+    }
+  }
+}

+ 29 - 0
distributor-node/src/commands/node/set-worker.ts

@@ -0,0 +1,29 @@
+import { flags } from '@oclif/command'
+import NodeCommandBase from '../../command-base/node'
+import { SetWorkerOperation } from '../../types'
+
+export default class NodeSetWorkerCommand extends NodeCommandBase {
+  static description = `Send an api request to change workerId assigned to given distributor node instance.`
+
+  static flags = {
+    workerId: flags.integer({
+      char: 'w',
+      description: 'New workerId to set',
+      required: true,
+    }),
+    ...NodeCommandBase.flags,
+  }
+
+  protected reqUrl(): string {
+    return '/api/v1/set-worker'
+  }
+
+  protected reqBody(): SetWorkerOperation {
+    const {
+      flags: { workerId },
+    } = this.parse(NodeSetWorkerCommand)
+    return {
+      workerId,
+    }
+  }
+}

+ 17 - 0
distributor-node/src/commands/node/shutdown.ts

@@ -0,0 +1,17 @@
+import NodeCommandBase from '../../command-base/node'
+
+export default class NodeShutdownCommand extends NodeCommandBase {
+  static description = `Send an api request to shutdown given distributor node.`
+
+  static flags = {
+    ...NodeCommandBase.flags,
+  }
+
+  protected reqUrl(): string {
+    return '/api/v1/shutdown'
+  }
+
+  protected reqBody(): Record<string, unknown> {
+    return {}
+  }
+}

+ 17 - 0
distributor-node/src/commands/node/start-public-api.ts

@@ -0,0 +1,17 @@
+import NodeCommandBase from '../../command-base/node'
+
+export default class NodeStartPublicApiCommand extends NodeCommandBase {
+  static description = `Send an api request to start public api of given distributor node.`
+
+  static flags = {
+    ...NodeCommandBase.flags,
+  }
+
+  protected reqUrl(): string {
+    return '/api/v1/start-api'
+  }
+
+  protected reqBody(): Record<string, unknown> {
+    return {}
+  }
+}

+ 17 - 0
distributor-node/src/commands/node/stop-public-api.ts

@@ -0,0 +1,17 @@
+import NodeCommandBase from '../../command-base/node'
+
+export default class NodeStopPublicApiCommand extends NodeCommandBase {
+  static description = `Send an api request to stop public api of given distributor node.`
+
+  static flags = {
+    ...NodeCommandBase.flags,
+  }
+
+  protected reqUrl(): string {
+    return '/api/v1/stop-api'
+  }
+
+  protected reqBody(): Record<string, unknown> {
+    return {}
+  }
+}

+ 2 - 1
distributor-node/src/commands/start.ts

@@ -1,5 +1,6 @@
 import DefaultCommandBase from '../command-base/default'
 import { App } from '../app'
+import { Config } from '../types'
 
 export default class StartNode extends DefaultCommandBase {
   static description = 'Start the node'
@@ -9,7 +10,7 @@ export default class StartNode extends DefaultCommandBase {
   }
 
   async run(): Promise<void> {
-    const app = new App(this.appConfig)
+    const app = new App(this.appConfig as Config)
     await app.start()
   }
 

+ 18 - 6
distributor-node/src/schemas/configSchema.ts

@@ -18,13 +18,11 @@ const offSwitch: JSONSchema4 = {
   enum: ['off'],
 }
 
-export const configSchema: JSONSchema4 = {
+export const configSchema: JSONSchema4 = objectSchema({
   '$id': 'https://joystream.org/schemas/argus/config',
   title: 'Distributor node configuration',
   description: 'Configuration schema for distirubtor CLI and node',
-  type: 'object',
-  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'port', 'workerId', 'limits', 'intervals'],
-  additionalProperties: false,
+  required: ['id', 'endpoints', 'directories', 'buckets', 'keys', 'workerId', 'limits', 'intervals', 'publicApi'],
   properties: {
     id: {
       type: 'string',
@@ -207,7 +205,21 @@ export const configSchema: JSONSchema4 = {
       },
       required: ['saveCacheState', 'checkStorageNodeResponseTimes', 'cacheCleanup'],
     }),
-    port: { description: 'Distributor node http api port', type: 'integer', minimum: 0 },
+    publicApi: objectSchema({
+      description: 'Public api configuration',
+      properties: {
+        port: { description: 'Distributor node public api port', type: 'integer', minimum: 0 },
+      },
+      required: ['port'],
+    }),
+    operatorApi: objectSchema({
+      description: 'Operator api configuration',
+      properties: {
+        port: { description: 'Distributor node operator api port', type: 'integer', minimum: 0 },
+        hmacSecret: { description: 'HMAC (HS256) secret key used for JWT authorization', type: 'string' },
+      },
+      required: ['port', 'hmacSecret'],
+    }),
     keys: {
       description: 'Specifies the keys available within distributor node CLI.',
       type: 'array',
@@ -267,6 +279,6 @@ export const configSchema: JSONSchema4 = {
       minimum: 0,
     },
   },
-}
+})
 
 export default configSchema

+ 1 - 0
distributor-node/src/schemas/utils.ts

@@ -1,6 +1,7 @@
 import { JSONSchema4 } from 'json-schema'
 
 export function objectSchema<P extends NonNullable<JSONSchema4['properties']>>(props: {
+  $id?: string
   title?: string
   description?: string
   properties: P

+ 145 - 0
distributor-node/src/services/httpApi/HttpApiBase.ts

@@ -0,0 +1,145 @@
+import express from 'express'
+import * as OpenApiValidator from 'express-openapi-validator'
+import { HttpError, OpenApiValidatorOpts } from 'express-openapi-validator/dist/framework/types'
+import { ReadonlyConfig } from '../../types/config'
+import expressWinston from 'express-winston'
+import { Logger } from 'winston'
+import { Server } from 'http'
+import cors from 'cors'
+
+// eslint-disable-next-line @typescript-eslint/no-explicit-any
+export type HttpApiRoute = ['get' | 'head' | 'post', string, express.RequestHandler<any>]
+
+export abstract class HttpApiBase {
+  protected abstract port: number
+  protected expressApp: express.Application
+  protected config: ReadonlyConfig
+  protected logger: Logger
+  private httpServer: Server | undefined
+  private isInitialized = false
+  private isOn = false
+
+  protected routeWrapper(handler: express.RequestHandler) {
+    return async (req: express.Request, res: express.Response, next: express.NextFunction): Promise<void> => {
+      // Fix for express-winston in order to also log prematurely closed requests
+      res.on('close', () => {
+        res.locals.prematurelyClosed = !res.writableFinished
+        res.end()
+      })
+      try {
+        await handler(req, res, next)
+      } catch (err) {
+        next(err)
+      }
+    }
+  }
+
+  public constructor(config: ReadonlyConfig, logger: Logger) {
+    this.expressApp = express()
+    this.logger = logger
+    this.config = config
+  }
+
+  protected createRoutes(routes: HttpApiRoute[]): void {
+    routes.forEach(([type, path, handler]) => {
+      this.expressApp[type](path, this.routeWrapper(handler))
+    })
+  }
+
+  protected abstract routes(): HttpApiRoute[]
+
+  protected defaultOpenApiValidatorConfig(): Partial<OpenApiValidatorOpts> {
+    const isProd = process.env.NODE_ENV === 'prod'
+    return {
+      validateResponses: !isProd,
+    }
+  }
+
+  protected abstract openApiValidatorConfig(): OpenApiValidatorOpts
+
+  protected defaultRequestLoggerConfig(): expressWinston.LoggerOptions {
+    return {
+      winstonInstance: this.logger,
+      level: 'http',
+      dynamicMeta: (req, res) => {
+        return { prematurelyClosed: res.locals.prematurelyClosed ?? false }
+      },
+    }
+  }
+
+  protected requestLoggerConfig(): expressWinston.LoggerOptions {
+    return this.defaultRequestLoggerConfig()
+  }
+
+  protected defaultErrorLoggerConfig(): expressWinston.ErrorLoggerOptions {
+    return {
+      winstonInstance: this.logger,
+      level: 'error',
+      metaField: null,
+      exceptionToMeta: (err) => ({ err }),
+    }
+  }
+
+  protected errorLoggerConfig(): expressWinston.ErrorLoggerOptions {
+    return this.defaultErrorLoggerConfig()
+  }
+
+  protected errorHandler() {
+    return (err: HttpError, req: express.Request, res: express.Response, next: express.NextFunction): void => {
+      if (res.headersSent) {
+        return next(err)
+      }
+      if (err.status && err.status >= 400 && err.status < 500) {
+        res
+          .status(err.status)
+          .json({
+            type: 'request_validation',
+            message: err.message,
+            errors: err.errors,
+          })
+          .end()
+      } else {
+        res.status(err.status || 500).json({ type: 'exception', message: err.message })
+      }
+    }
+  }
+
+  protected initApp(): void {
+    if (this.isInitialized) {
+      return
+    }
+    const { expressApp: app } = this
+    app.use(express.json())
+    app.use(cors())
+    app.use(expressWinston.logger(this.requestLoggerConfig()))
+    app.use(OpenApiValidator.middleware(this.openApiValidatorConfig()))
+    this.createRoutes(this.routes())
+    app.use(expressWinston.errorLogger(this.errorLoggerConfig()))
+    app.use(this.errorHandler())
+    this.isInitialized = true
+  }
+
+  public start(): boolean {
+    if (this.isOn) {
+      return false
+    }
+    if (!this.isInitialized) {
+      this.initApp()
+    }
+    this.httpServer = this.expressApp.listen(this.port, () => {
+      this.logger.info(`Express server started listening on port ${this.port}`)
+    })
+    this.isOn = true
+    return true
+  }
+
+  public stop(): boolean {
+    if (!this.isOn) {
+      return false
+    }
+    this.httpServer?.close()
+    this.logger.info(`Express server stopped`)
+    this.isOn = false
+    return true
+  }
+}

+ 0 - 127
distributor-node/src/services/httpApi/HttpApiService.ts

@@ -1,127 +0,0 @@
-import express from 'express'
-import path from 'path'
-import cors from 'cors'
-import * as OpenApiValidator from 'express-openapi-validator'
-import { HttpError } from 'express-openapi-validator/dist/framework/types'
-import { ReadonlyConfig } from '../../types/config'
-import expressWinston from 'express-winston'
-import { LoggingService } from '../logging'
-import { PublicApiController } from './controllers/public'
-import { StateCacheService } from '../cache/StateCacheService'
-import { NetworkingService } from '../networking'
-import { Logger } from 'winston'
-import { ContentService } from '../content/ContentService'
-import { Server } from 'http'
-
-const OPENAPI_SPEC_PATH = path.join(__dirname, '../../api-spec/openapi.yml')
-
-export class HttpApiService {
-  private config: ReadonlyConfig
-  private logger: Logger
-  private expressApp: express.Application
-  private httpServer: Server | undefined
-
-  private routeWrapper<T>(
-    handler: (req: express.Request<T>, res: express.Response, next: express.NextFunction) => Promise<void>
-  ) {
-    return async (req: express.Request<T>, res: express.Response, next: express.NextFunction) => {
-      // Fix for express-winston in order to also log prematurely closed requests
-      res.on('close', () => {
-        res.locals.prematurelyClosed = !res.writableFinished
-        res.end()
-      })
-      try {
-        await handler(req, res, next)
-      } catch (err) {
-        next(err)
-      }
-    }
-  }
-
-  public constructor(
-    config: ReadonlyConfig,
-    stateCache: StateCacheService,
-    content: ContentService,
-    logging: LoggingService,
-    networking: NetworkingService
-  ) {
-    this.logger = logging.createLogger('ExpressServer')
-    this.config = config
-
-    const publicController = new PublicApiController(config, logging, networking, stateCache, content)
-
-    const app = express()
-    app.use(cors())
-    app.use(express.json())
-
-    // Request logger
-    app.use(
-      expressWinston.logger({
-        winstonInstance: this.logger,
-        level: 'http',
-        dynamicMeta: (req, res) => {
-          return { prematurelyClosed: res.locals.prematurelyClosed ?? false }
-        },
-      })
-    )
-
-    // Setup OpenAPiValidator
-    app.use(
-      OpenApiValidator.middleware({
-        apiSpec: OPENAPI_SPEC_PATH,
-        validateApiSpec: true,
-        validateResponses: true,
-        validateRequests: true,
-      })
-    )
-
-    // Routes
-    app.head('/api/v1/assets/:objectId', this.routeWrapper(publicController.assetHead.bind(publicController)))
-    app.get('/api/v1/assets/:objectId', this.routeWrapper(publicController.asset.bind(publicController)))
-    app.get('/api/v1/status', this.routeWrapper(publicController.status.bind(publicController)))
-    app.get('/api/v1/buckets', this.routeWrapper(publicController.buckets.bind(publicController)))
-
-    // Error logger
-    app.use(
-      expressWinston.errorLogger({
-        winstonInstance: this.logger,
-        level: 'error',
-        metaField: null,
-        exceptionToMeta: (err) => ({ err }),
-      })
-    )
-
-    // Error handler
-    app.use((err: HttpError, req: express.Request, res: express.Response, next: express.NextFunction) => {
-      if (res.headersSent) {
-        return next(err)
-      }
-      if (err.status && err.status >= 400 && err.status < 500) {
-        res
-          .status(err.status)
-          .json({
-            type: 'request_validation',
-            message: err.message,
-            errors: err.errors,
-          })
-          .end()
-      } else {
-        res.status(err.status || 500).json({ type: 'exception', message: err.message })
-      }
-    })
-
-    this.expressApp = app
-  }
-
-  public start(): void {
-    const { port } = this.config
-    this.httpServer = this.expressApp.listen(port, () => {
-      this.logger.info(`Express server started listening on port ${port}`)
-    })
-  }
-
-  public stop(): void {
-    this.httpServer?.close()
-    this.logger.info(`Express server stopped`)
-  }
-}

+ 90 - 0
distributor-node/src/services/httpApi/OperatorApiService.ts

@@ -0,0 +1,90 @@
+import express from 'express'
+import path from 'path'
+import { OpenApiValidatorOpts } from 'express-openapi-validator/dist/framework/types'
+import { Config } from '../../types/config'
+import { LoggingService } from '../logging'
+import jwt from 'jsonwebtoken'
+import { OperatorApiController } from './controllers/operator'
+import { HttpApiBase, HttpApiRoute } from './HttpApiBase'
+import { PublicApiService } from './PublicApiService'
+import _ from 'lodash'
+import { App } from '../../app'
+
+const OPENAPI_SPEC_PATH = path.join(__dirname, '../../api-spec/operator.yml')
+const JWT_TOKEN_MAX_AGE = '5m'
+
+export class OperatorApiService extends HttpApiBase {
+  protected port: number
+  protected operatorSecretKey: string
+  protected config: Config
+  protected app: App
+  protected publicApi: PublicApiService
+  protected logging: LoggingService
+
+  public constructor(config: Config, app: App, logging: LoggingService, publicApi: PublicApiService) {
+    super(config, logging.createLogger('OperatorApi'))
+    if (!config.operatorApi) {
+      throw new Error('Cannot construct OperatorApiService - missing operatorApi config!')
+    }
+    this.port = config.operatorApi.port
+    this.operatorSecretKey = config.operatorApi.hmacSecret
+    this.config = config
+    this.app = app
+    this.logging = logging
+    this.publicApi = publicApi
+    this.initApp()
+  }
+
+  protected openApiValidatorConfig(): OpenApiValidatorOpts {
+    return {
+      apiSpec: OPENAPI_SPEC_PATH,
+      validateSecurity: {
+        handlers: {
+          OperatorAuth: this.operatorRequestValidator(),
+        },
+      },
+      ...this.defaultOpenApiValidatorConfig(),
+    }
+  }
+
+  protected routes(): HttpApiRoute[] {
+    const controller = new OperatorApiController(this.config, this.app, this.publicApi, this.logging)
+    return [
+      ['post', '/api/v1/stop-api', controller.stopApi.bind(controller)],
+      ['post', '/api/v1/start-api', controller.startApi.bind(controller)],
+      ['post', '/api/v1/shutdown', controller.shutdown.bind(controller)],
+      ['post', '/api/v1/set-worker', controller.setWorker.bind(controller)],
+      ['post', '/api/v1/set-buckets', controller.setBuckets.bind(controller)],
+    ]
+  }
+
+  private operatorRequestValidator() {
+    return (req: express.Request): boolean => {
+      const authHeader = req.headers.authorization
+      if (!authHeader) {
+        throw new Error('Authrorization header missing')
+      }
+
+      const [authType, token] = authHeader.split(' ')
+      if (authType.toLowerCase() !== 'bearer') {
+        throw new Error(`Unexpected authorization type: ${authType}`)
+      }
+
+      if (!token) {
+        throw new Error(`Bearer token missing`)
+      }
+
+      const decoded = jwt.verify(token, this.operatorSecretKey, { maxAge: JWT_TOKEN_MAX_AGE }) as jwt.JwtPayload
+
+      if (!_.isEqual(req.body, decoded.reqBody)) {
+        throw new Error('Invalid token: Request body does not match')
+      }
+
+      if (req.originalUrl !== decoded.reqUrl) {
+        throw new Error('Invalid token: Request url does not match')
+      }
+
+      return true
+    }
+  }
+}

+ 60 - 0
distributor-node/src/services/httpApi/PublicApiService.ts

@@ -0,0 +1,60 @@
+import path from 'path'
+import { ReadonlyConfig } from '../../types/config'
+import { LoggingService } from '../logging'
+import { PublicApiController } from './controllers/public'
+import { StateCacheService } from '../cache/StateCacheService'
+import { NetworkingService } from '../networking'
+import { ContentService } from '../content/ContentService'
+import { HttpApiBase, HttpApiRoute } from './HttpApiBase'
+import { OpenApiValidatorOpts } from 'express-openapi-validator/dist/openapi.validator'
+
+const OPENAPI_SPEC_PATH = path.join(__dirname, '../../api-spec/public.yml')
+
+export class PublicApiService extends HttpApiBase {
+  protected port: number
+
+  private loggingService: LoggingService
+  private networkingService: NetworkingService
+  private stateCache: StateCacheService
+  private contentService: ContentService
+
+  public constructor(
+    config: ReadonlyConfig,
+    stateCache: StateCacheService,
+    content: ContentService,
+    logging: LoggingService,
+    networking: NetworkingService
+  ) {
+    super(config, logging.createLogger('PublicApi'))
+    this.stateCache = stateCache
+    this.loggingService = logging
+    this.networkingService = networking
+    this.contentService = content
+    this.port = config.publicApi.port
+    this.initApp()
+  }
+
+  protected openApiValidatorConfig(): OpenApiValidatorOpts {
+    return {
+      apiSpec: OPENAPI_SPEC_PATH,
+      ...this.defaultOpenApiValidatorConfig(),
+    }
+  }
+
+  protected routes(): HttpApiRoute[] {
+    const publicController = new PublicApiController(
+      this.config,
+      this.loggingService,
+      this.networkingService,
+      this.stateCache,
+      this.contentService
+    )
+
+    return [
+      ['head', '/api/v1/assets/:objectId', publicController.assetHead.bind(publicController)],
+      ['get', '/api/v1/assets/:objectId', publicController.asset.bind(publicController)],
+      ['get', '/api/v1/status', publicController.status.bind(publicController)],
+      ['get', '/api/v1/buckets', publicController.buckets.bind(publicController)],
+    ]
+  }
+}

+ 76 - 0
distributor-node/src/services/httpApi/controllers/operator.ts

@@ -0,0 +1,76 @@
+import { Logger } from 'winston'
+import * as express from 'express'
+import { PublicApiService } from '../PublicApiService'
+import { LoggingService } from '../../logging'
+import { App } from '../../../app'
+import { Config, SetBucketsOperation, SetWorkerOperation } from '../../../types'
+import { ParamsDictionary } from 'express-serve-static-core'
+
+export class OperatorApiController {
+  private config: Config
+  private app: App
+  private publicApi: PublicApiService
+  private logger: Logger
+
+  public constructor(config: Config, app: App, publicApi: PublicApiService, logging: LoggingService) {
+    this.config = config
+    this.app = app
+    this.publicApi = publicApi
+    this.logger = logging.createLogger('OperatorApiController')
+  }
+
+  public async stopApi(req: express.Request, res: express.Response): Promise<void> {
+    this.logger.info(`Stopping public api on operator request from ${req.ip}`, { ip: req.ip })
+    const stopped = this.publicApi.stop()
+    if (!stopped) {
+      res.status(409).json({ message: 'Already stopped' })
+    }
+    res.status(200).send()
+  }
+
+  public async startApi(req: express.Request, res: express.Response): Promise<void> {
+    this.logger.info(`Starting public api on operator request from ${req.ip}`, { ip: req.ip })
+    const started = this.publicApi.start()
+    if (!started) {
+      res.status(409).json({ message: 'Already started' })
+    }
+    res.status(200).send()
+  }
+
+  public async shutdown(req: express.Request, res: express.Response): Promise<void> {
+    this.logger.info(`Shutting down the app on operator request from ${req.ip}`, { ip: req.ip })
+    const shutdown = this.app.stop(5)
+    if (!shutdown) {
+      res.status(409).json({ message: 'Already shutting down' })
+    }
+    res.status(200).send()
+  }
+
+  public async setWorker(
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    req: express.Request<ParamsDictionary, any, SetWorkerOperation>,
+    res: express.Response
+  ): Promise<void> {
+    const { workerId } = req.body
+    this.logger.info(`Updating workerId to ${workerId} on operator request from ${req.ip}`, {
+      workerId,
+      ip: req.ip,
+    })
+    this.config.workerId = workerId
+    res.status(200).send()
+  }
+
+  public async setBuckets(
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    req: express.Request<ParamsDictionary, any, SetBucketsOperation>,
+    res: express.Response
+  ): Promise<void> {
+    const { buckets } = req.body
+    this.logger.info(`Updating buckets to ${JSON.stringify(buckets)} on operator request from ${req.ip}`, {
+      buckets,
+      ip: req.ip,
+    })
+    this.config.buckets = buckets
+    res.status(200).send()
+  }
+}

+ 9 - 2
distributor-node/src/services/httpApi/controllers/public.ts

@@ -3,11 +3,18 @@ import { Logger } from 'winston'
 import send from 'send'
 import { StateCacheService } from '../../cache/StateCacheService'
 import { NetworkingService } from '../../networking'
-import { AssetRouteParams, BucketsResponse, ErrorResponse, StatusResponse } from '../../../types/api'
+import {
+  AssetRouteParams,
+  BucketsResponse,
+  ErrorResponse,
+  StatusResponse,
+  DataObjectData,
+  ObjectStatusType,
+  ReadonlyConfig,
+} from '../../../types'
 import { LoggingService } from '../../logging'
 import { ContentService, DEFAULT_CONTENT_TYPE } from '../../content/ContentService'
 import proxy from 'express-http-proxy'
-import { DataObjectData, ObjectStatusType, ReadonlyConfig } from '../../../types'
 import { PendingDownloadStatusDownloading, PendingDownloadStatusType } from '../../networking/PendingDownload'
 import urljoin from 'url-join'
 

+ 66 - 50
distributor-node/src/types/generated/ConfigJson.d.ts

@@ -5,6 +5,7 @@
  * and run json-schema-to-typescript to regenerate this file.
  */
 
+export type SwitchOff = 'off'
 /**
  * List of distribution bucket ids
  */
@@ -52,54 +53,9 @@ export interface DistributorNodeConfiguration {
    * Specifies the logging configuration
    */
   logs?: {
-    file?:
-      | {
-          /**
-           * Minimum level of logs sent to this output
-           */
-          level: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
-          /**
-           * Path where the logs will be stored (absolute or relative to config file)
-           */
-          path: string
-          /**
-           * Maximum number of log files to store
-           */
-          maxFiles?: number
-          /**
-           * Maximum size of a single log file in bytes
-           */
-          maxSize?: number
-          /**
-           * The frequency of creating new log files (regardless of maxSize)
-           */
-          frequency?: 'yearly' | 'monthly' | 'daily' | 'hourly'
-          /**
-           * Whether to archive old logs
-           */
-          archive?: boolean
-        }
-      | 'off'
-    console?:
-      | {
-          /**
-           * Minimum level of logs sent to this output
-           */
-          level: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
-        }
-      | 'off'
-    elastic?:
-      | {
-          /**
-           * Minimum level of logs sent to this output
-           */
-          level: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
-          /**
-           * Elastichsearch endpoint to push the logs to (for example: http://localhost:9200)
-           */
-          endpoint: string
-        }
-      | 'off'
+    file?: FileLoggingOptions | SwitchOff
+    console?: ConsoleLoggingOptions | SwitchOff
+    elastic?: ElasticsearchLoggingOptions | SwitchOff
   }
   /**
    * Specifies node limits w.r.t. storage, outbound connections etc.
@@ -152,9 +108,27 @@ export interface DistributorNodeConfiguration {
     cacheCleanup: number
   }
   /**
-   * Distributor node http api port
+   * Public api configuration
    */
-  port: number
+  publicApi: {
+    /**
+     * Distributor node public api port
+     */
+    port: number
+  }
+  /**
+   * Operator api configuration
+   */
+  operatorApi?: {
+    /**
+     * Distributor node operator api port
+     */
+    port: number
+    /**
+     * HMAC (HS256) secret key used for JWT authorization
+     */
+    hmacSecret: string
+  }
   /**
    * Specifies the keys available within distributor node CLI.
    */
@@ -168,6 +142,48 @@ export interface DistributorNodeConfiguration {
    */
   workerId: number
 }
+export interface FileLoggingOptions {
+  /**
+   * Minimum level of logs sent to this output
+   */
+  level: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
+  /**
+   * Path where the logs will be stored (absolute or relative to config file)
+   */
+  path: string
+  /**
+   * Maximum number of log files to store
+   */
+  maxFiles?: number
+  /**
+   * Maximum size of a single log file in bytes
+   */
+  maxSize?: number
+  /**
+   * The frequency of creating new log files (regardless of maxSize)
+   */
+  frequency?: 'yearly' | 'monthly' | 'daily' | 'hourly'
+  /**
+   * Whether to archive old logs
+   */
+  archive?: boolean
+}
+export interface ConsoleLoggingOptions {
+  /**
+   * Minimum level of logs sent to this output
+   */
+  level: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
+}
+export interface ElasticsearchLoggingOptions {
+  /**
+   * Minimum level of logs sent to this output
+   */
+  level: 'error' | 'warn' | 'info' | 'http' | 'verbose' | 'debug' | 'silly'
+  /**
+   * Elastichsearch endpoint to push the logs to (for example: http://localhost:9200)
+   */
+  endpoint: string
+}
 /**
  * Keypair's substrate uri (for example: //Alice)
  */

+ 114 - 0
distributor-node/src/types/generated/OperatorApi.ts

@@ -0,0 +1,114 @@
+/**
+ * This file was auto-generated by openapi-typescript.
+ * Do not make direct changes to the file.
+ */
+
+export interface paths {
+  '/stop-api': {
+    /** Turns off the public api. */
+    'post': operations['operator.stopApi']
+  }
+  '/start-api': {
+    /** Turns on the public api. */
+    'post': operations['operator.startApi']
+  }
+  '/shutdown': {
+    /** Shuts down the node. */
+    'post': operations['operator.shutdown']
+  }
+  '/set-worker': {
+    /** Updates the operator worker id. */
+    'post': operations['operator.setWorker']
+  }
+  '/set-buckets': {
+    /** Updates buckets supported by the node. */
+    'post': operations['operator.setBuckets']
+  }
+}
+
+export interface components {
+  schemas: {
+    'SetWorkerOperation': {
+      'workerId': number
+    }
+    'SetBucketsOperation': {
+      'buckets': 'all' | number[]
+    }
+  }
+}
+
+export interface operations {
+  /** Turns off the public api. */
+  'operator.stopApi': {
+    responses: {
+      /** OK */
+      200: unknown
+      /** Not authorized */
+      401: unknown
+      /** Already stopped */
+      409: unknown
+      /** Unexpected server error */
+      500: unknown
+    }
+  }
+  /** Turns on the public api. */
+  'operator.startApi': {
+    responses: {
+      /** OK */
+      200: unknown
+      /** Not authorized */
+      401: unknown
+      /** Already started */
+      409: unknown
+      /** Unexpected server error */
+      500: unknown
+    }
+  }
+  /** Shuts down the node. */
+  'operator.shutdown': {
+    responses: {
+      /** OK */
+      200: unknown
+      /** Not authorized */
+      401: unknown
+      /** Already shutting down */
+      409: unknown
+      /** Unexpected server error */
+      500: unknown
+    }
+  }
+  /** Updates the operator worker id. */
+  'operator.setWorker': {
+    responses: {
+      /** OK */
+      200: unknown
+      /** Not authorized */
+      401: unknown
+      /** Unexpected server error */
+      500: unknown
+    }
+    requestBody: {
+      content: {
+        'application/json': components['schemas']['SetWorkerOperation']
+      }
+    }
+  }
+  /** Updates buckets supported by the node. */
+  'operator.setBuckets': {
+    responses: {
+      /** OK */
+      200: unknown
+      /** Not authorized */
+      401: unknown
+      /** Unexpected server error */
+      500: unknown
+    }
+    requestBody: {
+      content: {
+        'application/json': components['schemas']['SetBucketsOperation']
+      }
+    }
+  }
+}
+
+export interface external {}

+ 4 - 0
distributor-node/src/types/generated/OpenApi.ts → distributor-node/src/types/generated/PublicApi.ts

@@ -22,6 +22,10 @@ export interface paths {
 
 export interface components {
   schemas: {
+    'SetConfigBody': {
+      /** Config setting path (ie. limits.storage) */
+      'path'?: string
+    }
     'ErrorResponse': {
       'type'?: string
       'message': string

+ 2 - 1
distributor-node/src/types/index.ts

@@ -1,4 +1,5 @@
-export * from './api'
+export * from './publicApi'
+export * from './operatorApi'
 export * from './common'
 export * from './config'
 export * from './content'

+ 4 - 0
distributor-node/src/types/operatorApi.ts

@@ -0,0 +1,4 @@
+import { components } from './generated/OperatorApi'
+
+export type SetWorkerOperation = components['schemas']['SetWorkerOperation']
+export type SetBucketsOperation = components['schemas']['SetBucketsOperation']

+ 1 - 1
distributor-node/src/types/api.ts → distributor-node/src/types/publicApi.ts

@@ -1,4 +1,4 @@
-import { components, operations } from './generated/OpenApi'
+import { components, operations } from './generated/PublicApi'
 export type AssetRouteParams = operations['public.asset']['parameters']['path']
 export type ErrorResponse = components['schemas']['ErrorResponse']
 export type StatusResponse = components['schemas']['StatusResponse']