Prechádzať zdrojové kódy

Graceful exit, downloads concurrency limit, more logs, cache logic fixes

Leszek Wiesner 3 rokov pred
rodič
commit
e8fc3d70d5

+ 4 - 3
distributor-node/src/api-spec/openapi.yml

@@ -31,7 +31,8 @@ paths:
           in: path
           description: Data Object ID
           schema:
-            type: string
+            type: integer
+            minimum: 0
       responses:
         200:
           description: Full available object data sent
@@ -63,8 +64,8 @@ paths:
               schema:
                 type: string
                 format: binary
-        400:
-          description: Invalid request. Data object not supported.
+        421:
+          description: Misdirected request. Data object not supported.
           content:
             application/json:
               schema:

+ 36 - 9
distributor-node/src/app/index.ts

@@ -51,26 +51,51 @@ export class App {
     this.checkConfigDirectories()
     this.stateCache.load()
     const dataObjects = await this.networking.fetchSupportedDataObjects()
-    // TODO: Try to actually save as much content as possible by downloading missing data
-    await this.content.startupSync(dataObjects)
+    await this.content.startupInit(dataObjects)
     this.server.start()
     nodeCleanup(this.exitHandler.bind(this))
   }
 
   private async exitGracefully(): Promise<void> {
-    this.logger.info('Graceful exit initialized')
     // Async exit handler - ideally should not take more than 10 sec
     // We can try to wait until some pending downloads are finished here etc.
-    await this.stateCache.save()
-    this.logger.info('Graceful exit succesful')
+    this.logger.info('Graceful exit initialized')
+
+    // Stop accepting any new requests and save cache
+    this.server.stop()
+    this.stateCache.clearInterval()
+    this.stateCache.saveSync()
+
+    // Try to process remaining downloads
+    const MAX_RETRY_ATTEMPTS = 3
+    let retryCounter = 0
+    while (retryCounter < MAX_RETRY_ATTEMPTS && this.stateCache.getPendingDownloadsCount()) {
+      const pendingDownloadsCount = this.stateCache.getPendingDownloadsCount()
+      this.logger.info(`${pendingDownloadsCount} pending downloads in progress... Retrying exit in 5 sec...`, {
+        retryCounter,
+        pendingDownloadsCount,
+      })
+      await new Promise((resolve) => setTimeout(resolve, 5000))
+      this.stateCache.saveSync()
+      ++retryCounter
+    }
+
+    if (this.stateCache.getPendingDownloadsCount()) {
+      this.logger.warn('Limit reached: Could not finish all pending downloads.', {
+        pendingDownloadsCount: this.stateCache.getPendingDownloadsCount(),
+      })
+    }
+
+    this.logger.info('Graceful exit finished')
   }
 
   private exitCritically(): void {
     this.logger.info('Critical exit initialized')
     // Handling exits due to an error - only some critical, synchronous work can be done here
+    this.server.stop()
+    this.stateCache.clearInterval()
     this.stateCache.saveSync()
-    this.logger.close()
-    this.logger.info('Critical exit succesful')
+    this.logger.info('Critical exit finished')
   }
 
   private exitHandler(exitCode: number | null, signal: string | null): boolean | undefined {
@@ -78,14 +103,15 @@ export class App {
     this.stateCache.clearInterval()
     if (signal) {
       // Async exit can be executed
+      // TODO: this.logging.end() currently doesn't seem to be enough to make sure all logs are flushed to a file
       this.exitGracefully()
         .then(() => {
-          this.logger.close()
+          this.logging.end()
           process.kill(process.pid, signal)
         })
         .catch((err) => {
           this.logger.error('Graceful exit error', { err })
-          this.logger.close()
+          this.logging.end()
           process.kill(process.pid, signal)
         })
       nodeCleanup.uninstall()
@@ -93,6 +119,7 @@ export class App {
     } else {
       // Only synchronous work can be done here
       this.exitCritically()
+      this.logging.end()
     }
   }
 }

+ 70 - 25
distributor-node/src/services/cache/StateCacheService.ts

@@ -9,8 +9,11 @@ import fs from 'fs'
 export const CACHE_GROUP_LOG_BASE = 2
 export const CACHE_GROUPS_COUNT = 24
 
+type PendingDownloadStatus = 'Waiting' | 'LookingForSource' | 'Downloading'
+
 export interface PendingDownloadData {
   objectSize: number
+  status: PendingDownloadStatus
   promise: Promise<StorageNodeDownloadResponse>
 }
 
@@ -34,10 +37,10 @@ export class StateCacheService {
     pendingDownloadsByContentHash: new Map<string, PendingDownloadData>(),
     contentHashByObjectId: new Map<string, string>(),
     storageNodeEndpointDataByEndpoint: new Map<string, StorageNodeEndpointData>(),
+    groupNumberByContentHash: new Map<string, number>(),
   }
 
   private storedState = {
-    groupNumberByContentHash: new Map<string, number>(),
     lruCacheGroups: Array.from({ length: CACHE_GROUPS_COUNT }).map(() => new Map<string, CacheItemData>()),
     mimeTypeByContentHash: new Map<string, string>(),
   }
@@ -72,6 +75,14 @@ export class StateCacheService {
     )
   }
 
+  public getCachedContentHashes(): string[] {
+    let hashes: string[] = []
+    for (const [, group] of this.storedState.lruCacheGroups.entries()) {
+      hashes = hashes.concat(Array.from(group.keys()))
+    }
+    return hashes
+  }
+
   public newContent(contentHash: string, sizeInBytes: number): void {
     const cacheItemData: CacheItemData = {
       popularity: 1,
@@ -79,12 +90,19 @@ export class StateCacheService {
       sizeKB: Math.ceil(sizeInBytes / 1024),
     }
     const groupNumber = this.calcCacheGroup(cacheItemData)
-    this.storedState.groupNumberByContentHash.set(contentHash, groupNumber)
+    this.memoryState.groupNumberByContentHash.set(contentHash, groupNumber)
     this.storedState.lruCacheGroups[groupNumber].set(contentHash, cacheItemData)
   }
 
+  public peekContent(contentHash: string): CacheItemData | undefined {
+    const groupNumber = this.memoryState.groupNumberByContentHash.get(contentHash)
+    if (groupNumber !== undefined) {
+      return this.storedState.lruCacheGroups[groupNumber].get(contentHash)
+    }
+  }
+
   public useContent(contentHash: string): void {
-    const groupNumber = this.storedState.groupNumberByContentHash.get(contentHash)
+    const groupNumber = this.memoryState.groupNumberByContentHash.get(contentHash)
     if (groupNumber === undefined) {
       this.logger.warn('groupNumberByContentHash missing when trying to update LRU of content', { contentHash })
       return
@@ -96,7 +114,7 @@ export class StateCacheService {
         contentHash,
         groupNumber,
       })
-      this.storedState.groupNumberByContentHash.delete(contentHash)
+      this.memoryState.groupNumberByContentHash.delete(contentHash)
       return
     }
     cacheItemData.lastAccessTime = Date.now()
@@ -107,7 +125,7 @@ export class StateCacheService {
     group.delete(contentHash)
     targetGroup.set(contentHash, cacheItemData)
     if (targetGroupNumber !== groupNumber) {
-      this.storedState.groupNumberByContentHash.set(contentHash, targetGroupNumber)
+      this.memoryState.groupNumberByContentHash.set(contentHash, targetGroupNumber)
     }
   }
 
@@ -116,12 +134,14 @@ export class StateCacheService {
     let bestCandidate: string | null = null
     for (const group of this.storedState.lruCacheGroups) {
       const lastItemInGroup = Array.from(group.entries())[0]
-      const [contentHash, objectData] = lastItemInGroup
-      const elapsedSinceLastAccessed = Math.ceil((Date.now() - objectData.lastAccessTime) / 60_000)
-      const itemCost = (elapsedSinceLastAccessed * objectData.sizeKB) / objectData.popularity
-      if (itemCost >= highestCost) {
-        highestCost = itemCost
-        bestCandidate = contentHash
+      if (lastItemInGroup) {
+        const [contentHash, objectData] = lastItemInGroup
+        const elapsedSinceLastAccessed = Math.ceil((Date.now() - objectData.lastAccessTime) / 60_000)
+        const itemCost = (elapsedSinceLastAccessed * objectData.sizeKB) / objectData.popularity
+        if (itemCost >= highestCost) {
+          highestCost = itemCost
+          bestCandidate = contentHash
+        }
       }
     }
     return bestCandidate
@@ -133,6 +153,7 @@ export class StateCacheService {
     promise: Promise<StorageNodeDownloadResponse>
   ): PendingDownloadData {
     const pendingDownload: PendingDownloadData = {
+      status: 'Waiting',
       objectSize,
       promise,
     }
@@ -140,6 +161,10 @@ export class StateCacheService {
     return pendingDownload
   }
 
+  public getPendingDownloadsCount(): number {
+    return this.memoryState.pendingDownloadsByContentHash.size
+  }
+
   public getPendingDownload(contentHash: string): PendingDownloadData | undefined {
     return this.memoryState.pendingDownloadsByContentHash.get(contentHash)
   }
@@ -149,11 +174,13 @@ export class StateCacheService {
   }
 
   public dropByHash(contentHash: string): void {
+    this.logger.debug('Dropping all state by content hash', contentHash)
     this.storedState.mimeTypeByContentHash.delete(contentHash)
     this.memoryState.pendingDownloadsByContentHash.delete(contentHash)
-    const cacheGroupNumber = this.storedState.groupNumberByContentHash.get(contentHash)
+    const cacheGroupNumber = this.memoryState.groupNumberByContentHash.get(contentHash)
+    this.logger.debug('Cache group by hash established', { contentHash, cacheGroupNumber })
     if (cacheGroupNumber) {
-      this.storedState.groupNumberByContentHash.delete(contentHash)
+      this.memoryState.groupNumberByContentHash.delete(contentHash)
       this.storedState.lruCacheGroups[cacheGroupNumber].delete(contentHash)
     }
   }
@@ -168,12 +195,15 @@ export class StateCacheService {
   }
 
   private serializeData() {
-    const { groupNumberByContentHash, lruCacheGroups, mimeTypeByContentHash } = this.storedState
-    return JSON.stringify({
-      lruCacheGroups: lruCacheGroups.map((g) => Array.from(g.entries())),
-      mimeTypeByContentHash: Array.from(mimeTypeByContentHash.entries()),
-      groupNumberByContentHash: Array.from(groupNumberByContentHash.entries()),
-    })
+    const { lruCacheGroups, mimeTypeByContentHash } = this.storedState
+    return JSON.stringify(
+      {
+        lruCacheGroups: lruCacheGroups.map((g) => Array.from(g.entries())),
+        mimeTypeByContentHash: Array.from(mimeTypeByContentHash.entries()),
+      },
+      null,
+      2 // TODO: Only for debugging
+    )
   }
 
   public async save(): Promise<boolean> {
@@ -198,15 +228,30 @@ export class StateCacheService {
     fs.writeFileSync(this.cacheFilePath, serialized)
   }
 
+  private loadGroupNumberByContentHashMap() {
+    for (const [groupNumber, group] of this.storedState.lruCacheGroups.entries()) {
+      for (const contentHash of group.keys()) {
+        this.memoryState.groupNumberByContentHash.set(contentHash, groupNumber)
+      }
+    }
+  }
+
   public load(): void {
     if (fs.existsSync(this.cacheFilePath)) {
       this.logger.info('Loading cache from file', { file: this.cacheFilePath })
-      const fileContent = JSON.parse(fs.readFileSync(this.cacheFilePath).toString())
-      this.storedState.lruCacheGroups = (fileContent.lruCacheGroups || []).map(
-        (g: any) => new Map<string, CacheItemData>(g)
-      )
-      this.storedState.groupNumberByContentHash = new Map<string, number>(fileContent.groupNumberByContentHash || [])
-      this.storedState.mimeTypeByContentHash = new Map<string, string>(fileContent.mimeTypeByContentHash || [])
+      try {
+        const fileContent = JSON.parse(fs.readFileSync(this.cacheFilePath).toString())
+        ;((fileContent.lruCacheGroups || []) as Array<Array<[string, CacheItemData]>>).forEach((group, groupIndex) => {
+          this.storedState.lruCacheGroups[groupIndex] = new Map<string, CacheItemData>(group)
+        })
+        this.storedState.mimeTypeByContentHash = new Map<string, string>(fileContent.mimeTypeByContentHash || [])
+        this.loadGroupNumberByContentHashMap()
+      } catch (err) {
+        this.logger.error('Error while trying to load data from cache file! Will start from scratch', {
+          file: this.cacheFilePath,
+          err,
+        })
+      }
     } else {
       this.logger.warn(`Cache file (${this.cacheFilePath}) is empty. Starting from scratch`)
     }

+ 130 - 73
distributor-node/src/services/content/ContentService.ts

@@ -6,7 +6,7 @@ import { Logger } from 'winston'
 import { FileContinousReadStream, FileContinousReadStreamOptions } from './FileContinousReadStream'
 import FileType from 'file-type'
 import _ from 'lodash'
-import { Readable } from 'stream'
+import { Readable, pipeline } from 'stream'
 
 export const DEFAULT_CONTENT_TYPE = 'application/octet-stream'
 
@@ -16,7 +16,7 @@ export class ContentService {
   private logger: Logger
   private stateCache: StateCacheService
 
-  private contentSizeSum = 0 // TODO: Assign on startup
+  private contentSizeSum = 0
 
   private get freeSpace(): number {
     return this.config.storageLimit - this.contentSizeSum
@@ -29,37 +29,83 @@ export class ContentService {
     this.dataDir = config.directories.data
   }
 
-  public async startupSync(supportedObjects: DataObjectData[]): Promise<void> {
+  public async startupInit(supportedObjects: DataObjectData[]): Promise<void> {
     const dataObjectsByHash = _.groupBy(supportedObjects, (o) => o.contentHash)
     const dataDirFiles = fs.readdirSync(this.dataDir)
+
+    let filesCountOnStartup = 0
+    let filesDropped = 0
     for (const contentHash of dataDirFiles) {
+      ++filesCountOnStartup
       this.logger.verbose('Checking content file', { contentHash })
+      // Add fileSize to contentSizeSum for each file. If the file ends up dropped - contentSizeSum will be reduced by this.drop().
+      const fileSize = this.fileSize(contentHash)
+      this.contentSizeSum += fileSize
+
+      // Drop files that are not part of current chain assignment
       const objectsByHash = dataObjectsByHash[contentHash] || []
       if (!objectsByHash.length) {
         this.drop(contentHash, 'Not supported')
-        return
-      }
-      const { size } = objectsByHash[0]
-      const fileSize = fs.statSync(this.path(contentHash)).size
-      if (fileSize !== size) {
-        this.drop(contentHash, 'Invalid file size')
-        return
+        continue
       }
-      if (!this.stateCache.getContentMimeType(contentHash)) {
-        this.stateCache.setContentMimeType(contentHash, await this.guessMimeType(contentHash))
+
+      // Compare file size to expected one
+      const { size: dataObjectSize } = objectsByHash[0]
+      if (fileSize !== dataObjectSize) {
+        // Existing file size does not match the expected one
+        const msg = `Unexpected file size. Expected: ${dataObjectSize}, actual: ${fileSize}`
+        this.logger.warn(msg, { fileSize, dataObjectSize })
+        this.drop(contentHash, msg)
+        ++filesDropped
+      } else {
+        // Existing file size is OK - detect mimeType if missing
+        if (!this.stateCache.getContentMimeType(contentHash)) {
+          this.stateCache.setContentMimeType(contentHash, await this.guessMimeType(contentHash))
+        }
       }
+
+      // Recreate contentHashByObjectId map for all supported data objects
       objectsByHash.forEach(({ contentHash, objectId }) => {
         this.stateCache.setObjectContentHash(objectId, contentHash)
       })
     }
+
+    const cachedContentHashes = this.stateCache.getCachedContentHashes()
+    const cacheItemsOnStartup = cachedContentHashes.length
+    let cacheItemsDropped = 0
+    for (const contentHash of cachedContentHashes) {
+      if (!this.exists(contentHash)) {
+        // Content is part of cache data, but does not exist in filesystem - drop from cache
+        this.stateCache.dropByHash(contentHash)
+        ++cacheItemsDropped
+      }
+    }
+
+    this.logger.info('ContentService initialized', {
+      filesCountOnStartup,
+      filesDropped,
+      cacheItemsOnStartup,
+      cacheItemsDropped,
+      contentSizeSum: this.contentSizeSum,
+    })
   }
 
   public drop(contentHash: string, reason?: string): void {
-    this.logger.info('Dropping content', { contentHash, reason })
-    fs.unlinkSync(this.path(contentHash))
+    if (this.exists(contentHash)) {
+      const size = this.fileSize(contentHash)
+      fs.unlinkSync(this.path(contentHash))
+      this.contentSizeSum -= size
+      this.logger.verbose('Dropping content', { contentHash, reason, size, contentSizeSum: this.contentSizeSum })
+    } else {
+      this.logger.verbose('Trying to drop content that no loger exists', { contentHash, reason })
+    }
     this.stateCache.dropByHash(contentHash)
   }
 
+  public fileSize(contentHash: string): number {
+    return fs.statSync(this.path(contentHash)).size
+  }
+
   public path(contentHash: string): string {
     return `${this.dataDir}/${contentHash}`
   }
@@ -88,74 +134,85 @@ export class ContentService {
     return guessResult?.mime || DEFAULT_CONTENT_TYPE
   }
 
-  private dropCacheItemsUntilFreeSpaceReached(expectedFreeSpace: number): void {
-    let evictCandidateHash: string | null
-    while ((evictCandidateHash = this.stateCache.getCacheEvictCandidateHash())) {
-      this.drop(evictCandidateHash)
-      if (this.freeSpace === expectedFreeSpace) {
-        return
+  private async dropCacheItemsUntilFreeSpaceReached(expectedFreeSpace: number): Promise<void> {
+    this.logger.verbose(`Cache eviction free space target: ${expectedFreeSpace}`)
+    while (this.freeSpace < expectedFreeSpace) {
+      const evictCandidateHash = this.stateCache.getCacheEvictCandidateHash()
+      if (evictCandidateHash) {
+        this.drop(evictCandidateHash, 'Cache eviction')
+      } else {
+        this.logger.verbose('Nothing to drop from cache, waiting...', { freeSpace: this.freeSpace, expectedFreeSpace })
+        await new Promise((resolve) => setTimeout(resolve, 1000))
       }
     }
   }
 
-  public handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<boolean> {
-    return new Promise((resolve, reject) => {
-      if (this.freeSpace < expectedSize) {
-        this.dropCacheItemsUntilFreeSpaceReached(expectedSize)
-      }
-
-      const fileStream = this.createWriteStream(contentHash)
-
-      let bytesRecieved = 0
-
-      // TODO: Use NodeJS pipeline for easier error handling (https://nodejs.org/es/docs/guides/backpressuring-in-streams/#the-problem-with-data-handling)
-
-      fileStream.on('ready', () => {
-        dataStream.pipe(fileStream)
-        // Attach handler after pipe, otherwise some data will be lost!
-        dataStream.on('data', (chunk) => {
-          bytesRecieved += chunk.length
-          if (bytesRecieved > expectedSize) {
-            dataStream.destroy(new Error('Unexpected content size: Too much data recieved from source!'))
-          }
-        })
-        // Note: The promise is resolved on "ready" event, since that's what's awaited in the current flow
-        resolve(true)
+  public handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<void> {
+    return new Promise<void>((resolve, reject) => {
+      this.logger.verbose('Handling new content', {
+        contentHash,
+        expectedSize,
       })
+      this.dropCacheItemsUntilFreeSpaceReached(expectedSize)
+        .then(() => {
+          // Reserve space for new object
+          this.contentSizeSum += expectedSize
+          this.logger.verbose('Reserved space for new data object', {
+            contentHash,
+            expectedSize,
+            newContentSizeSum: this.contentSizeSum,
+          })
 
-      dataStream.on('error', (e) => {
-        fileStream.destroy(e)
-      })
+          const fileStream = this.createWriteStream(contentHash)
+
+          let bytesRecieved = 0
+
+          pipeline(dataStream, fileStream, async (err) => {
+            const { bytesWritten } = fileStream
+            const logMetadata = {
+              contentHash,
+              expectedSize,
+              bytesRecieved,
+              bytesWritten,
+            }
+            if (err) {
+              this.logger.error(`Error while processing content data stream`, {
+                err,
+                ...logMetadata,
+              })
+              this.drop(contentHash)
+              reject(err)
+            } else {
+              if (bytesWritten === bytesRecieved && bytesWritten === expectedSize) {
+                const mimeType = await this.guessMimeType(contentHash)
+                this.logger.info('New content accepted', { ...logMetadata })
+                this.stateCache.dropPendingDownload(contentHash)
+                this.stateCache.newContent(contentHash, expectedSize)
+                this.stateCache.setContentMimeType(contentHash, mimeType)
+              } else {
+                this.logger.error('Content rejected: Bytes written/recieved/expected mismatch!', {
+                  ...logMetadata,
+                })
+                this.drop(contentHash)
+              }
+            }
+          })
 
-      fileStream.on('error', (err) => {
-        reject(err)
-        this.logger.error(`Content data stream error`, {
-          err,
-          contentHash,
-          expectedSize,
-          bytesRecieved,
-        })
-        this.drop(contentHash)
-      })
+          fileStream.on('open', () => {
+            // Note: The promise is resolved on "ready" event, since that's what's awaited in the current flow
+            resolve()
+          })
 
-      fileStream.on('close', async () => {
-        const { bytesWritten } = fileStream
-        if (bytesWritten === bytesRecieved && bytesWritten === expectedSize) {
-          this.logger.info('New content accepted', { contentHash, bytesRecieved, written: bytesWritten })
-          this.stateCache.dropPendingDownload(contentHash)
-          const mimeType = await this.guessMimeType(contentHash)
-          this.stateCache.newContent(contentHash, expectedSize)
-          this.stateCache.setContentMimeType(contentHash, mimeType)
-        } else {
-          this.logger.error('Content rejected: Bytes written/recieved/expected mismatch!', {
-            contentHash,
-            expectedSize,
-            bytesWritten,
-            bytesRecieved,
+          dataStream.on('data', (chunk) => {
+            bytesRecieved += chunk.length
+            if (bytesRecieved > expectedSize) {
+              dataStream.destroy(new Error('Unexpected content size: Too much data recieved from source!'))
+            }
           })
-          this.drop(contentHash)
-        }
-      })
+        })
+        .catch((err) => {
+          this.logger.error('Error while trying to drop items from cache', { err })
+        })
     })
   }
 }

+ 7 - 6
distributor-node/src/services/logging/LoggingService.ts

@@ -24,10 +24,10 @@ const cliFormat = winston.format.combine(
 )
 
 export class LoggingService {
-  private loggerOptions: LoggerOptions
+  private rootLogger: Logger
 
   private constructor(options: LoggerOptions) {
-    this.loggerOptions = options
+    this.rootLogger = winston.createLogger(options)
   }
 
   public static withAppConfig(config: ReadonlyConfig): LoggingService {
@@ -62,9 +62,10 @@ export class LoggingService {
   }
 
   public createLogger(label: string, ...meta: unknown[]): Logger {
-    return winston.createLogger({
-      ...this.loggerOptions,
-      defaultMeta: { label, ...meta },
-    })
+    return this.rootLogger.child({ label, ...meta })
+  }
+
+  public end(): void {
+    this.rootLogger.end()
   }
 }

+ 66 - 25
distributor-node/src/services/networking/NetworkingService.ts

@@ -3,7 +3,7 @@ import { QueryNodeApi } from './query-node/api'
 import { Logger } from 'winston'
 import { LoggingService } from '../logging'
 import { StorageNodeApi } from './storage-node/api'
-import { StateCacheService } from '../cache/StateCacheService'
+import { PendingDownloadData, StateCacheService } from '../cache/StateCacheService'
 import { DataObjectDetailsFragment } from './query-node/generated/queries'
 import axios from 'axios'
 import {
@@ -12,6 +12,7 @@ import {
   DataObjectData,
   DataObjectInfo,
   StorageNodeDownloadResponse,
+  DownloadData,
 } from '../../types'
 import queue from 'queue'
 import _ from 'lodash'
@@ -34,6 +35,7 @@ export class NetworkingService {
 
   private storageNodeEndpointsCheckInterval: NodeJS.Timeout
   private testLatencyQueue = queue({ concurrency: MAX_CONCURRENT_RESPONSE_TIME_CHECKS, autostart: true })
+  private downloadQueue = queue({ concurrency: MAX_CONCURRENT_DOWNLOADS, autostart: true })
 
   constructor(config: ReadonlyConfig, stateCache: StateCacheService, logging: LoggingService) {
     this.config = config
@@ -127,27 +129,46 @@ export class NetworkingService {
     })
   }
 
-  public downloadDataObject(objectData: DataObjectData): Promise<StorageNodeDownloadResponse> | null {
-    const { contentHash, accessPoints, size } = objectData
-
-    if (this.stateCache.getPendingDownload(contentHash)) {
-      // Already downloading
-      return null
-    }
+  private downloadJob(
+    pendingDownload: PendingDownloadData,
+    downloadData: DownloadData,
+    onSuccess: (response: StorageNodeDownloadResponse) => void,
+    onError: (error: Error) => void
+  ): Promise<StorageNodeDownloadResponse> {
+    const {
+      objectData: { contentHash, accessPoints },
+      startAt,
+    } = downloadData
+
+    pendingDownload.status = 'LookingForSource'
+
+    return new Promise((resolve, reject) => {
+      const fail = (message: string) => {
+        this.stateCache.dropPendingDownload(contentHash)
+        onError(new Error(message))
+        reject(new Error(message))
+      }
+      const success = (response: StorageNodeDownloadResponse) => {
+        this.logger.info('Download source found', { contentHash, source: response.config.url })
+        pendingDownload.status = 'Downloading'
+        onSuccess(response)
+        resolve(response)
+      }
 
-    const downloadPromise = new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
       const storageEndpoints = this.sortEndpointsByMeanResponseTime(
         accessPoints?.storageNodes.map((n) => n.endpoint) || []
       )
 
       this.logger.info('Downloading new data object', { contentHash, storageEndpoints })
       if (!storageEndpoints.length) {
-        reject(new Error('No storage endpoints available to download the data object from'))
-        return
+        return fail('No storage endpoints available to download the data object from')
       }
 
-      const availabilityQueue = queue({ concurrency: MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD, autostart: true })
-      const downloadQueue = queue({ concurrency: 1, autostart: true })
+      const availabilityQueue = queue({
+        concurrency: MAX_CONCURRENT_AVAILABILITY_CHECKS_PER_DOWNLOAD,
+        autostart: true,
+      })
+      const objectDownloadQueue = queue({ concurrency: 1, autostart: true })
 
       storageEndpoints.forEach(async (endpoint) => {
         availabilityQueue.push(async () => {
@@ -162,11 +183,12 @@ export class NetworkingService {
 
       availabilityQueue.on('success', (endpoint) => {
         availabilityQueue.stop()
-        const job = () => {
+        const job = async () => {
           const api = new StorageNodeApi(endpoint, this.logging)
-          return api.downloadObject(contentHash)
+          const response = await api.downloadObject(contentHash, startAt)
+          return response
         }
-        downloadQueue.push(job)
+        objectDownloadQueue.push(job)
       })
 
       availabilityQueue.on('error', () => {
@@ -176,32 +198,51 @@ export class NetworkingService {
         */
       })
 
-      downloadQueue.on('error', (err) => {
+      objectDownloadQueue.on('error', (err) => {
         this.logger.error('Download attempt from storage node failed after availability was confirmed:', { err })
       })
 
-      downloadQueue.on('end', () => {
+      objectDownloadQueue.on('end', () => {
         if (availabilityQueue.length) {
           availabilityQueue.start()
         } else {
-          reject(new Error('Failed to download the object from any availablable storage provider'))
+          fail('Failed to download the object from any availablable storage provider')
         }
       })
 
       availabilityQueue.on('end', () => {
-        if (!downloadQueue.length) {
-          reject(new Error('Failed to download the object from any availablable storage provider'))
+        if (!objectDownloadQueue.length) {
+          fail('Failed to download the object from any availablable storage provider')
         }
       })
 
-      downloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
+      objectDownloadQueue.on('success', (response: StorageNodeDownloadResponse) => {
         availabilityQueue.removeAllListeners().end()
-        downloadQueue.removeAllListeners().end()
-        resolve(response)
+        objectDownloadQueue.removeAllListeners().end()
+        success(response)
       })
     })
+  }
+
+  public downloadDataObject(downloadData: DownloadData): Promise<StorageNodeDownloadResponse> | null {
+    const {
+      objectData: { contentHash, size },
+    } = downloadData
+
+    if (this.stateCache.getPendingDownload(contentHash)) {
+      // Already downloading
+      return null
+    }
+
+    let resolveDownload: (response: StorageNodeDownloadResponse) => void, rejectDownload: (err: Error) => void
+    const downloadPromise = new Promise<StorageNodeDownloadResponse>((resolve, reject) => {
+      resolveDownload = resolve
+      rejectDownload = reject
+    })
 
-    this.stateCache.newPendingDownload(contentHash, size, downloadPromise)
+    // Queue the download
+    const pendingDownload = this.stateCache.newPendingDownload(contentHash, size, downloadPromise)
+    this.downloadQueue.push(() => this.downloadJob(pendingDownload, downloadData, resolveDownload, rejectDownload))
 
     return downloadPromise
   }

+ 5 - 2
distributor-node/src/services/networking/storage-node/api.ts

@@ -46,11 +46,14 @@ export class StorageNodeApi {
     }
   }
 
-  public async downloadObject(contentHash: string): Promise<StorageNodeDownloadResponse> {
-    this.logger.info('Sending download request', { contentHash })
+  public async downloadObject(contentHash: string, startAt?: number): Promise<StorageNodeDownloadResponse> {
+    this.logger.info('Sending download request', { contentHash, startAt })
     const options: AxiosRequestConfig = {
       responseType: 'stream',
     }
+    if (startAt) {
+      options.headers.Range = `bytes=${startAt}-`
+    }
     return this.publicApi.publicApiFiles(contentHash, options)
   }
 }

+ 9 - 1
distributor-node/src/services/server/ServerService.ts

@@ -11,6 +11,7 @@ import { StateCacheService } from '../cache/StateCacheService'
 import { NetworkingService } from '../networking'
 import { Logger } from 'winston'
 import { ContentService } from '../content/ContentService'
+import { Server } from 'http'
 
 const OPENAPI_SPEC_PATH = path.join(__dirname, '../../api-spec/openapi.yml')
 
@@ -18,6 +19,7 @@ export class ServerService {
   private config: ReadonlyConfig
   private logger: Logger
   private expressApp: express.Application
+  private httpServer: Server | undefined
 
   private routeWrapper<T>(
     handler: (req: express.Request<T>, res: express.Response, next: express.NextFunction) => Promise<void>
@@ -72,6 +74,7 @@ export class ServerService {
     app.use(
       expressWinston.errorLogger({
         winstonInstance: this.logger,
+        level: 'error',
       })
     )
 
@@ -99,8 +102,13 @@ export class ServerService {
 
   public start(): void {
     const { port } = this.config
-    this.expressApp.listen(port, () => {
+    this.httpServer = this.expressApp.listen(port, () => {
       this.logger.info(`Express server started listening on port ${port}`)
     })
   }
+
+  public stop(): void {
+    this.httpServer?.close()
+    this.logger.info(`Express server stopped`)
+  }
 }

+ 7 - 3
distributor-node/src/services/server/controllers/public.ts

@@ -147,7 +147,11 @@ export class PublicApiController {
     const contentHash = this.stateCache.getObjectContentHash(objectId)
     const pendingDownload = contentHash && this.stateCache.getPendingDownload(contentHash)
 
-    this.logger.verbose('Data object state', { contentHash, pendingDownload })
+    this.logger.verbose('Data object requested', {
+      objectId,
+      contentHash,
+      status: pendingDownload && pendingDownload.status,
+    })
 
     if (contentHash && !pendingDownload && this.content.exists(contentHash)) {
       this.logger.info('Requested file found in filesystem', { path: this.content.path(contentHash) })
@@ -170,7 +174,7 @@ export class PublicApiController {
         //   const errorRes: ErrorResponse = {
         //     message: 'Data object not served by this node',
         //   }
-        //   res.status(400).json(errorRes)
+        //   res.status(421).json(errorRes)
         //   // TODO: Redirect to other node that supports it?
       } else {
         const { data: objectData } = objectInfo
@@ -179,7 +183,7 @@ export class PublicApiController {
         }
         const { contentHash, size } = objectData
 
-        const downloadResponse = await this.networking.downloadDataObject(objectData)
+        const downloadResponse = await this.networking.downloadDataObject({ objectData })
 
         if (downloadResponse) {
           // Note: Await will only wait unil the file is created, so we may serve the response from it

+ 6 - 0
distributor-node/src/types/networking.ts

@@ -1,4 +1,10 @@
 import { AxiosResponse } from 'axios'
 import { Readable } from 'stream'
+import { DataObjectData } from './storage'
 
 export type StorageNodeDownloadResponse = AxiosResponse<Readable>
+
+export type DownloadData = {
+  startAt?: number
+  objectData: DataObjectData
+}

+ 1 - 1
distributor-node/tsconfig.json

@@ -6,7 +6,7 @@
     "outDir": "lib",
     "rootDir": "src",
     "strict": true,
-    "target": "es2017",
+    "target": "es2020",
     "skipLibCheck": true,
     "baseUrl": ".",
     "esModuleInterop": true,