Browse Source

Cache fixes

Leszek Wiesner 3 years ago
parent
commit
633b8105cc

+ 36 - 13
distributor-node/src/services/cache/StateCacheService.ts

@@ -85,18 +85,24 @@ export class StateCacheService {
   }
 
   public getCachedContentLength(): number {
-    return Array.from(this.storedState.lruCacheGroups.values()).reduce((a, b) => a + b.size, 0)
+    return this.storedState.lruCacheGroups.reduce((a, b) => a + b.size, 0)
   }
 
   public newContent(contentHash: string, sizeInBytes: number): void {
+    const { groupNumberByContentHash } = this.memoryState
+    const { lruCacheGroups } = this.storedState
+    if (groupNumberByContentHash.get(contentHash)) {
+      this.logger.warn('newContent was called for content that already exists, ignoring the call', { contentHash })
+      return
+    }
     const cacheItemData: CacheItemData = {
       popularity: 1,
       lastAccessTime: Date.now(),
       sizeKB: Math.ceil(sizeInBytes / 1024),
     }
     const groupNumber = this.calcCacheGroup(cacheItemData)
-    this.memoryState.groupNumberByContentHash.set(contentHash, groupNumber)
-    this.storedState.lruCacheGroups[groupNumber].set(contentHash, cacheItemData)
+    groupNumberByContentHash.set(contentHash, groupNumber)
+    lruCacheGroups[groupNumber].set(contentHash, cacheItemData)
   }
 
   public peekContent(contentHash: string): CacheItemData | undefined {
@@ -107,30 +113,32 @@ export class StateCacheService {
   }
 
   public useContent(contentHash: string): void {
-    const groupNumber = this.memoryState.groupNumberByContentHash.get(contentHash)
+    const { groupNumberByContentHash } = this.memoryState
+    const { lruCacheGroups } = this.storedState
+    const groupNumber = groupNumberByContentHash.get(contentHash)
     if (groupNumber === undefined) {
       this.logger.warn('groupNumberByContentHash missing when trying to update LRU of content', { contentHash })
       return
     }
-    const group = this.storedState.lruCacheGroups[groupNumber]
+    const group = lruCacheGroups[groupNumber]
     const cacheItemData = group.get(contentHash)
     if (!cacheItemData) {
       this.logger.warn('Cache inconsistency: item missing in group retrieved from by groupNumberByContentHash map!', {
         contentHash,
         groupNumber,
       })
-      this.memoryState.groupNumberByContentHash.delete(contentHash)
+      groupNumberByContentHash.delete(contentHash)
       return
     }
     cacheItemData.lastAccessTime = Date.now()
     ++cacheItemData.popularity
     // Move object to the top of the current group / new group
     const targetGroupNumber = this.calcCacheGroup(cacheItemData)
-    const targetGroup = this.storedState.lruCacheGroups[targetGroupNumber]
+    const targetGroup = lruCacheGroups[targetGroupNumber]
     group.delete(contentHash)
     targetGroup.set(contentHash, cacheItemData)
     if (targetGroupNumber !== groupNumber) {
-      this.memoryState.groupNumberByContentHash.set(contentHash, targetGroupNumber)
+      groupNumberByContentHash.set(contentHash, targetGroupNumber)
     }
   }
 
@@ -248,11 +256,26 @@ export class StateCacheService {
   }
 
   private loadGroupNumberByContentHashMap() {
-    for (const [groupNumber, group] of this.storedState.lruCacheGroups.entries()) {
-      for (const contentHash of group.keys()) {
-        this.memoryState.groupNumberByContentHash.set(contentHash, groupNumber)
-      }
-    }
+    const contentHashes = _.uniq(this.getCachedContentHashes())
+    const { lruCacheGroups: groups } = this.storedState
+    const { groupNumberByContentHash } = this.memoryState
+
+    contentHashes.forEach((contentHash) => {
+      groups.forEach((group, groupNumber) => {
+        if (group.has(contentHash)) {
+          if (!groupNumberByContentHash.has(contentHash)) {
+            groupNumberByContentHash.set(contentHash, groupNumber)
+          } else {
+            // Content duplicated in multiple groups - remove!
+            this.logger.warn(
+              `Content hash ${contentHash} was found in in multiple lru cache groups. Removing from group ${groupNumber}...`,
+              { firstGroup: groupNumberByContentHash.get(contentHash), currentGroup: groupNumber }
+            )
+            group.delete(contentHash)
+          }
+        }
+      })
+    })
   }
 
   public load(): void {

+ 67 - 64
distributor-node/src/services/content/ContentService.ts

@@ -140,86 +140,89 @@ export class ContentService {
     return guessResult?.mime || DEFAULT_CONTENT_TYPE
   }
 
-  private async dropCacheItemsUntilFreeSpaceReached(targetFreeSpace: number): Promise<void> {
-    this.logger.verbose('Cache eviction initialized.', { targetFreeSpace })
+  private async evictCacheUntilFreeSpaceReached(targetFreeSpace: number): Promise<void> {
+    this.logger.verbose('Cache eviction triggered.', { targetFreeSpace, currentFreeSpace: this.freeSpace })
+    let itemsDropped = 0
     while (this.freeSpace < targetFreeSpace) {
       const evictCandidateHash = this.stateCache.getCacheEvictCandidateHash()
       if (evictCandidateHash) {
         this.drop(evictCandidateHash, 'Cache eviction')
+        ++itemsDropped
       } else {
         this.logger.verbose('Nothing to drop from cache, waiting...', { freeSpace: this.freeSpace, targetFreeSpace })
         await new Promise((resolve) => setTimeout(resolve, 1000))
       }
     }
-    this.logger.verbose('Cache eviction finalized.', { freeSpace: this.freeSpace })
+    this.logger.verbose('Cache eviction finalized.', { currentfreeSpace: this.freeSpace, itemsDropped })
   }
 
-  public handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<void> {
+  public async handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<void> {
+    this.logger.verbose('Handling new content', {
+      contentHash,
+      expectedSize,
+    })
+
+    // Trigger cache eviction if required
+    if (this.freeSpace < expectedSize) {
+      await this.evictCacheUntilFreeSpaceReached(expectedSize)
+    }
+
+    // Reserve space for the new object
+    this.contentSizeSum += expectedSize
+    this.logger.verbose('Reserved space for new data object', {
+      contentHash,
+      expectedSize,
+      newContentSizeSum: this.contentSizeSum,
+    })
+
+    // Return a promise that resolves when the new file is created
     return new Promise<void>((resolve, reject) => {
-      this.logger.verbose('Handling new content', {
-        contentHash,
-        expectedSize,
-      })
-      this.dropCacheItemsUntilFreeSpaceReached(expectedSize)
-        .then(() => {
-          // Reserve space for new object
-          this.contentSizeSum += expectedSize
-          this.logger.verbose('Reserved space for new data object', {
-            contentHash,
-            expectedSize,
-            newContentSizeSum: this.contentSizeSum,
-          })
+      const fileStream = this.createWriteStream(contentHash)
 
-          const fileStream = this.createWriteStream(contentHash)
-
-          let bytesRecieved = 0
-
-          pipeline(dataStream, fileStream, async (err) => {
-            const { bytesWritten } = fileStream
-            const logMetadata = {
-              contentHash,
-              expectedSize,
-              bytesRecieved,
-              bytesWritten,
-            }
-            if (err) {
-              this.logger.error(`Error while processing content data stream`, {
-                err,
-                ...logMetadata,
-              })
-              this.drop(contentHash)
-              reject(err)
-            } else {
-              if (bytesWritten === bytesRecieved && bytesWritten === expectedSize) {
-                const mimeType = await this.guessMimeType(contentHash)
-                this.logger.info('New content accepted', { ...logMetadata })
-                this.stateCache.dropPendingDownload(contentHash)
-                this.stateCache.newContent(contentHash, expectedSize)
-                this.stateCache.setContentMimeType(contentHash, mimeType)
-              } else {
-                this.logger.error('Content rejected: Bytes written/recieved/expected mismatch!', {
-                  ...logMetadata,
-                })
-                this.drop(contentHash)
-              }
-            }
-          })
+      let bytesRecieved = 0
 
-          fileStream.on('open', () => {
-            // Note: The promise is resolved on "ready" event, since that's what's awaited in the current flow
-            resolve()
+      pipeline(dataStream, fileStream, async (err) => {
+        const { bytesWritten } = fileStream
+        const logMetadata = {
+          contentHash,
+          expectedSize,
+          bytesRecieved,
+          bytesWritten,
+        }
+        if (err) {
+          this.logger.error(`Error while processing content data stream`, {
+            err,
+            ...logMetadata,
           })
+          this.drop(contentHash)
+          reject(err)
+        } else {
+          if (bytesWritten === bytesRecieved && bytesWritten === expectedSize) {
+            const mimeType = await this.guessMimeType(contentHash)
+            this.logger.info('New content accepted', { ...logMetadata })
+            this.stateCache.dropPendingDownload(contentHash)
+            this.stateCache.newContent(contentHash, expectedSize)
+            this.stateCache.setContentMimeType(contentHash, mimeType)
+          } else {
+            this.logger.error('Content rejected: Bytes written/recieved/expected mismatch!', {
+              ...logMetadata,
+            })
+            this.drop(contentHash)
+          }
+        }
+      })
 
-          dataStream.on('data', (chunk) => {
-            bytesRecieved += chunk.length
-            if (bytesRecieved > expectedSize) {
-              dataStream.destroy(new Error('Unexpected content size: Too much data recieved from source!'))
-            }
-          })
-        })
-        .catch((err) => {
-          this.logger.error('Error while trying to drop items from cache', { err })
-        })
+      fileStream.on('open', () => {
+        // Note: The promise is resolved on "ready" event, since that's what's awaited in the current flow
+        resolve()
+      })
+
+      dataStream.on('data', (chunk) => {
+        bytesRecieved += chunk.length
+        if (bytesRecieved > expectedSize) {
+          dataStream.destroy(new Error('Unexpected content size: Too much data recieved from source!'))
+        }
+      })
     })
   }
 }