Browse Source

API - add /status, /buckets and HEAD support for /asset

Leszek Wiesner 3 years ago
parent
commit
727df9ac9b

+ 118 - 9
distributor-node/src/api-spec/openapi.yml

@@ -5,8 +5,8 @@ info:
   contact:
     email: info@joystream.org
   license:
-    name: MIT
-    url: https://opensource.org/licenses/MIT
+    name: GPL-3.0-only
+    url: https://spdx.org/licenses/GPL-3.0-only.html
   version: 0.1.0
 externalDocs:
   description: Distributor node API
@@ -19,23 +19,71 @@ tags:
     description: Public distributor node API
 
 paths:
+  /status:
+    get:
+      operationId: public.status
+      description: Returns json object describing current node status.
+      tags:
+        - public
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/StatusResponse'
+        500:
+          description: Unexpected server error
+  /buckets:
+    get:
+      operationId: public.buckets
+      description: Returns list of distributed buckets
+      tags:
+        - public
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/BucketsResponse'
+        500:
+          description: Unexpected server error
   /asset/{objectId}:
+    head:
+      operationId: public.assetHead
+      description: Returns asset response headers (cache status, content type and/or length, accepted ranges etc.)
+      tags:
+        - public
+      parameters:
+        - $ref: '#components/parameters/ObjectId'
+      responses:
+        200:
+          description: Object is supported and should be send on GET request.
+          headers:
+            X-Cache:
+              $ref: '#/components/headers/X-Cache'
+        421:
+          description: Misdirected request. Data object not supported by the node.
+        404:
+          description: Data object does not exist.
+        500:
+          description: Unexpected server error
     get:
       operationId: public.asset
       description: Returns a media file.
       tags:
         - public
       parameters:
-        - name: objectId
-          required: true
-          in: path
-          description: Data Object ID
-          schema:
-            type: integer
-            minimum: 0
+        - $ref: '#components/parameters/ObjectId'
       responses:
         200:
           description: Full available object data sent
+          headers:
+            X-Cache:
+              $ref: '#/components/headers/X-Cache'
+            X-Data-Source:
+              $ref: '#/components/headers/X-Data-Source'
           content:
             image/*:
               schema:
@@ -51,6 +99,11 @@ paths:
                 format: binary
         206:
           description: Requested partial object data sent
+          headers:
+            X-Cache:
+              $ref: '#/components/headers/X-Cache'
+            X-Data-Source:
+              $ref: '#/components/headers/X-Data-Source'
           content:
             image/*:
               schema:
@@ -80,6 +133,33 @@ paths:
           description: Unexpected server error
 
 components:
+  parameters:
+    ObjectId:
+      name: objectId
+      required: true
+      in: path
+      description: Data Object ID
+      schema:
+        type: string
+        pattern: \d+
+  headers:
+    X-Cache:
+      description:
+        Describes cache status of an object.
+        Hit - object is already fully fetched in distributor node's cache.
+        Pending - object is still beeing fetched from the storage node.
+        Miss - object is neither in cache not currently beeing fetched. Fetching from storage node may be triggered.
+      schema:
+        type: string
+        enum: ['hit', 'pending', 'miss']
+    X-Data-Source:
+      description:
+        Describes the source of data stream.
+        External - the request was proxied to a storage node.
+        Local - the data is streamed from local file.
+      schema:
+        type: string
+        enum: ['external', 'local']
   schemas:
     ErrorResponse:
       type: object
@@ -90,3 +170,32 @@ components:
           type: string
         message:
           type: string
+    StatusResponse:
+      type: object
+      required:
+        - objectsInCache
+        - storageLimit
+        - storageUsed
+        - uptime
+        - downloadsInProgress
+      properties:
+        objectsInCache:
+          type: integer
+          minimum: 0
+        storageLimit:
+          type: integer
+          minimum: 0
+        storageUsed:
+          type: integer
+          minimum: 0
+        uptime:
+          type: integer
+          minimum: 0
+        downloadsInProgress:
+          type: integer
+          minimum: 0
+    BucketsResponse:
+      type: array
+      items:
+        type: integer
+        minimum: 0

+ 4 - 0
distributor-node/src/services/cache/StateCacheService.ts

@@ -83,6 +83,10 @@ export class StateCacheService {
     return hashes
   }
 
+  public getCachedContentLength(): number {
+    return Array.from(this.storedState.lruCacheGroups.values()).reduce((a, b) => a + b.size, 0)
+  }
+
   public newContent(contentHash: string, sizeInBytes: number): void {
     const cacheItemData: CacheItemData = {
       popularity: 1,

+ 5 - 1
distributor-node/src/services/content/ContentService.ts

@@ -18,7 +18,11 @@ export class ContentService {
 
   private contentSizeSum = 0
 
-  private get freeSpace(): number {
+  public get usedSpace(): number {
+    return this.contentSizeSum
+  }
+
+  public get freeSpace(): number {
     return this.config.storageLimit - this.contentSizeSum
   }
 

+ 5 - 2
distributor-node/src/services/server/ServerService.ts

@@ -43,7 +43,7 @@ export class ServerService {
     this.logger = logging.createLogger('ExpressServer')
     this.config = config
 
-    const publicController = new PublicApiController(logging, networking, stateCache, content)
+    const publicController = new PublicApiController(config, logging, networking, stateCache, content)
 
     const app = express()
     app.use(cors())
@@ -68,7 +68,10 @@ export class ServerService {
     )
 
     // Routes
-    app.use('/api/v1/asset/:objectId', this.routeWrapper(publicController.asset.bind(publicController)))
+    app.head('/api/v1/asset/:objectId', this.routeWrapper(publicController.assetHead.bind(publicController)))
+    app.get('/api/v1/asset/:objectId', this.routeWrapper(publicController.asset.bind(publicController)))
+    app.get('/api/v1/status', this.routeWrapper(publicController.status.bind(publicController)))
+    app.get('/api/v1/buckets', this.routeWrapper(publicController.buckets.bind(publicController)))
 
     // Error logger
     app.use(

+ 70 - 13
distributor-node/src/services/server/controllers/public.ts

@@ -3,25 +3,30 @@ import { Logger } from 'winston'
 import send from 'send'
 import { StateCacheService } from '../../../services/cache/StateCacheService'
 import { NetworkingService } from '../../../services/networking'
-import { ErrorResponse, RouteParams } from '../../../types/api'
+import { AssetRouteParams, BucketsResponse, ErrorResponse, StatusResponse } from '../../../types/api'
 import { LoggingService } from '../../logging'
 import { ContentService, DEFAULT_CONTENT_TYPE } from '../../content/ContentService'
 import proxy from 'express-http-proxy'
+import { ReadonlyConfig } from '../../../types'
 
-const CACHE_MAX_AGE = 31536000
+const CACHED_MAX_AGE = 31536000
+const PENDING_MAX_AGE = 180
 
 export class PublicApiController {
+  private config: ReadonlyConfig
   private logger: Logger
   private networking: NetworkingService
   private stateCache: StateCacheService
   private content: ContentService
 
   public constructor(
+    config: ReadonlyConfig,
     logging: LoggingService,
     networking: NetworkingService,
     stateCache: StateCacheService,
     content: ContentService
   ) {
+    this.config = config
     this.logger = logging.createLogger('PublicApiController')
     this.networking = networking
     this.stateCache = stateCache
@@ -29,7 +34,7 @@ export class PublicApiController {
   }
 
   private serveAssetFromFilesystem(
-    req: express.Request,
+    req: express.Request<AssetRouteParams>,
     res: express.Response,
     next: express.NextFunction,
     contentHash: string
@@ -41,14 +46,14 @@ export class PublicApiController {
 
     const path = this.content.path(contentHash)
     const stream = send(req, path, {
-      maxAge: CACHE_MAX_AGE,
+      maxAge: CACHED_MAX_AGE,
       lastModified: false,
     })
     const mimeType = this.stateCache.getContentMimeType(contentHash)
 
     stream.on('headers', (res) => {
       res.setHeader('x-cache', 'hit')
-      res.setHeader('x-data-source', 'cache')
+      res.setHeader('x-data-source', 'local')
       res.setHeader('content-disposition', 'inline')
       res.setHeader('content-type', mimeType || DEFAULT_CONTENT_TYPE)
     })
@@ -69,7 +74,7 @@ export class PublicApiController {
   }
 
   private async servePendingDownloadAsset(
-    req: express.Request,
+    req: express.Request<AssetRouteParams>,
     res: express.Response,
     next: express.NextFunction,
     contentHash: string
@@ -87,7 +92,7 @@ export class PublicApiController {
     res.setHeader('content-type', contentType)
     // Allow caching pendingDownload reponse only for very short period of time and requite revalidation,
     // since the data coming from the source may not be valid
-    res.setHeader('cache-control', `max-age=180, must-revalidate`)
+    res.setHeader('cache-control', `max-age=${PENDING_MAX_AGE}, must-revalidate`)
 
     // Handle request using pending download file if this makes sense in current context:
     if (this.content.exists(contentHash)) {
@@ -108,7 +113,7 @@ export class PublicApiController {
   }
 
   private async servePendingDownloadAssetFromFile(
-    req: express.Request,
+    req: express.Request<AssetRouteParams>,
     res: express.Response,
     next: express.NextFunction,
     contentHash: string,
@@ -126,7 +131,7 @@ export class PublicApiController {
     })
     res.status(isRange ? 206 : 200)
     res.setHeader('accept-ranges', 'bytes')
-    res.setHeader('x-data-source', 'partial-cache')
+    res.setHeader('x-data-source', 'local')
     res.setHeader('content-disposition', 'inline')
     if (isRange) {
       res.setHeader('content-range', `bytes 0-${rangeEnd}/${objectSize}`)
@@ -134,15 +139,53 @@ export class PublicApiController {
     stream.pipe(res)
   }
 
+  public async assetHead(req: express.Request<AssetRouteParams>, res: express.Response): Promise<void> {
+    const objectId = req.params.objectId
+    const contentHash = this.stateCache.getObjectContentHash(objectId)
+    const pendingDownload = contentHash && this.stateCache.getPendingDownload(contentHash)
+
+    if (contentHash && !pendingDownload && this.content.exists(contentHash)) {
+      res.status(200)
+      res.setHeader('accept-ranges', 'bytes')
+      res.setHeader('x-cache', 'hit')
+      res.setHeader('content-disposition', 'inline')
+      res.setHeader('cache-control', `max-age=${CACHED_MAX_AGE}`)
+      res.setHeader('content-type', this.stateCache.getContentMimeType(contentHash) || DEFAULT_CONTENT_TYPE)
+      res.setHeader('content-length', this.content.fileSize(contentHash))
+    } else if (contentHash && pendingDownload) {
+      res.status(200)
+      res.setHeader('accept-ranges', 'bytes')
+      res.setHeader('x-cache', 'pending')
+      res.setHeader('content-disposition', 'inline')
+      res.setHeader('cache-control', `max-age=${PENDING_MAX_AGE}, must-revalidate`)
+      res.setHeader('content-length', pendingDownload.objectSize)
+    } else {
+      const objectInfo = await this.networking.dataObjectInfo(objectId)
+      if (!objectInfo.exists) {
+        res.status(404)
+      } else if (!objectInfo.isSupported) {
+        res.status(421)
+      } else {
+        res.status(200)
+        res.setHeader('accept-ranges', 'bytes')
+        res.setHeader('x-cache', 'miss')
+        res.setHeader('content-disposition', 'inline')
+        res.setHeader('cache-control', `max-age=${PENDING_MAX_AGE}, must-revalidate`)
+        res.setHeader('content-length', objectInfo.data?.size || 0)
+      }
+    }
+
+    res.send()
+  }
+
   public async asset(
-    req: express.Request<RouteParams<'public.asset'>>,
+    req: express.Request<AssetRouteParams>,
     res: express.Response,
     next: express.NextFunction
   ): Promise<void> {
     req.on('close', () => {
       res.end()
     })
-    // TODO: objectId validation
     const objectId = req.params.objectId
     const contentHash = this.stateCache.getObjectContentHash(objectId)
     const pendingDownload = contentHash && this.stateCache.getPendingDownload(contentHash)
@@ -155,7 +198,6 @@ export class PublicApiController {
 
     if (contentHash && !pendingDownload && this.content.exists(contentHash)) {
       this.logger.info('Requested file found in filesystem', { path: this.content.path(contentHash) })
-      this.stateCache.useContent(contentHash)
       return this.serveAssetFromFilesystem(req, res, next, contentHash)
     } else if (contentHash && pendingDownload) {
       this.logger.info('Requested file is in pending download state', { path: this.content.path(contentHash) })
@@ -188,7 +230,7 @@ export class PublicApiController {
         if (downloadResponse) {
           // Note: Await will only wait unil the file is created, so we may serve the response from it
           await this.content.handleNewContent(contentHash, size, downloadResponse.data)
-          res.setHeader('x-cache', 'fetch-triggered')
+          res.setHeader('x-cache', 'miss')
         } else {
           res.setHeader('x-cache', 'pending')
         }
@@ -196,4 +238,19 @@ export class PublicApiController {
       }
     }
   }
+
+  public async status(req: express.Request, res: express.Response<StatusResponse>): Promise<void> {
+    const data: StatusResponse = {
+      objectsInCache: this.stateCache.getCachedContentLength(),
+      storageLimit: this.config.storageLimit,
+      storageUsed: this.content.usedSpace,
+      uptime: Math.floor(process.uptime()),
+      downloadsInProgress: this.stateCache.getPendingDownloadsCount(),
+    }
+    res.status(200).json(data)
+  }
+
+  public async buckets(req: express.Request, res: express.Response<BucketsResponse>): Promise<void> {
+    res.status(200).json([...this.config.buckets])
+  }
 }

+ 3 - 1
distributor-node/src/types/api.ts

@@ -1,3 +1,5 @@
 import { components, operations } from './generated/OpenApi'
-export type RouteParams<Name extends keyof operations> = operations[Name]['parameters']['path']
+export type AssetRouteParams = operations['public.asset']['parameters']['path']
 export type ErrorResponse = components['schemas']['ErrorResponse']
+export type StatusResponse = components['schemas']['StatusResponse']
+export type BucketsResponse = components['schemas']['BucketsResponse']

+ 80 - 5
distributor-node/src/types/generated/OpenApi.ts

@@ -4,9 +4,19 @@
  */
 
 export interface paths {
+  '/status': {
+    /** Returns json object describing current node status. */
+    'get': operations['public.status']
+  }
+  '/buckets': {
+    /** Returns list of distributed buckets */
+    'get': operations['public.buckets']
+  }
   '/asset/{objectId}': {
     /** Returns a media file. */
     'get': operations['public.asset']
+    /** Returns asset response headers (cache status, content type and/or length, accepted ranges etc.) */
+    'head': operations['public.assetHead']
   }
 }
 
@@ -16,21 +26,66 @@ export interface components {
       'type'?: string
       'message': string
     }
+    'StatusResponse': {
+      'objectsInCache': number
+      'storageLimit': number
+      'storageUsed': number
+      'uptime': number
+      'downloadsInProgress'?: number
+    }
+    'BucketsResponse': number[]
+  }
+  parameters: {
+    /** Data Object ID */
+    'ObjectId': string
+  }
+  headers: {
+    /** Describes cache status of an object. Hit - object is already fully fetch in distributor node's cache. Pending - object is still beeing fetched from the storage node. Miss - object is neither in cache not currently beeing fetched. Fetching from storage node may be triggered. */
+    'X-Cache'?: 'hit' | 'pending' | 'miss'
+    /** Describes the source of data stream. External - the request was proxied to a storage node. Local - the data is streamed from local file. */
+    'X-Data-Source'?: 'external' | 'local'
   }
 }
 
 export interface operations {
+  /** Returns json object describing current node status. */
+  'public.status': {
+    responses: {
+      /** OK */
+      200: {
+        content: {
+          'application/json': components['schemas']['StatusResponse']
+        }
+      }
+      /** Unexpected server error */
+      500: unknown
+    }
+  }
+  /** Returns list of distributed buckets */
+  'public.buckets': {
+    responses: {
+      /** OK */
+      200: {
+        content: {
+          'application/json': components['schemas']['BucketsResponse']
+        }
+      }
+      /** Unexpected server error */
+      500: unknown
+    }
+  }
   /** Returns a media file. */
   'public.asset': {
     parameters: {
       path: {
         /** Data Object ID */
-        'objectId': string
+        'objectId': components['parameters']['ObjectId']
       }
     }
     responses: {
       /** Full available object data sent */
       200: {
+        headers: {}
         content: {
           'image/*': string
           'audio/*': string
@@ -39,20 +94,21 @@ export interface operations {
       }
       /** Requested partial object data sent */
       206: {
+        headers: {}
         content: {
           'image/*': string
           'audio/*': string
           'video/*': string
         }
       }
-      /** Invalid request. Data object not supported. */
-      400: {
+      /** Data object does not exist. */
+      404: {
         content: {
           'application/json': components['schemas']['ErrorResponse']
         }
       }
-      /** Data object does not exist. */
-      404: {
+      /** Misdirected request. Data object not supported. */
+      421: {
         content: {
           'application/json': components['schemas']['ErrorResponse']
         }
@@ -61,6 +117,25 @@ export interface operations {
       500: unknown
     }
   }
+  /** Returns asset response headers (cache status, content type and/or length, accepted ranges etc.) */
+  'public.assetHead': {
+    parameters: {
+      path: {
+        /** Data Object ID */
+        'objectId': components['parameters']['ObjectId']
+      }
+    }
+    responses: {
+      /** Object is supported and should be send on GET request. */
+      200: unknown
+      /** Data object does not exist. */
+      404: unknown
+      /** Misdirected request. Data object not supported by the node. */
+      421: unknown
+      /** Unexpected server error */
+      500: unknown
+    }
+  }
 }
 
 export interface external {}