@@ -140,86 +140,89 @@ export class ContentService {
return guessResult?.mime || DEFAULT_CONTENT_TYPE
- private async dropCacheItemsUntilFreeSpaceReached(targetFreeSpace: number): Promise<void> {
- this.logger.verbose('Cache eviction initialized.', { targetFreeSpace })
+ private async evictCacheUntilFreeSpaceReached(targetFreeSpace: number): Promise<void> {
+ this.logger.verbose('Cache eviction triggered.', { targetFreeSpace, currentFreeSpace: this.freeSpace })
+ let itemsDropped = 0
while (this.freeSpace < targetFreeSpace) {
const evictCandidateHash = this.stateCache.getCacheEvictCandidateHash()
if (evictCandidateHash) {
this.drop(evictCandidateHash, 'Cache eviction')
+ ++itemsDropped
} else {
this.logger.verbose('Nothing to drop from cache, waiting...', { freeSpace: this.freeSpace, targetFreeSpace })
await new Promise((resolve) => setTimeout(resolve, 1000))
- this.logger.verbose('Cache eviction finalized.', { freeSpace: this.freeSpace })
+ this.logger.verbose('Cache eviction finalized.', { currentfreeSpace: this.freeSpace, itemsDropped })
- public handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<void> {
+ public async handleNewContent(contentHash: string, expectedSize: number, dataStream: Readable): Promise<void> {
+ this.logger.verbose('Handling new content', {
+ contentHash,
+ expectedSize,
+ })
+ // Trigger cache eviction if required
+ if (this.freeSpace < expectedSize) {
+ await this.evictCacheUntilFreeSpaceReached(expectedSize)
+ }
+ // Reserve space for the new object
+ this.contentSizeSum += expectedSize
+ this.logger.verbose('Reserved space for new data object', {
+ contentHash,
+ expectedSize,
+ newContentSizeSum: this.contentSizeSum,
+ })
+ // Return a promise that resolves when the new file is created
return new Promise<void>((resolve, reject) => {
- this.logger.verbose('Handling new content', {
- contentHash,
- expectedSize,
- })
- this.dropCacheItemsUntilFreeSpaceReached(expectedSize)
- .then(() => {
- // Reserve space for new object
- this.contentSizeSum += expectedSize
- this.logger.verbose('Reserved space for new data object', {
- contentHash,
- expectedSize,
- newContentSizeSum: this.contentSizeSum,
- })
+ const fileStream = this.createWriteStream(contentHash)
- const fileStream = this.createWriteStream(contentHash)
- let bytesRecieved = 0
- pipeline(dataStream, fileStream, async (err) => {
- const { bytesWritten } = fileStream
- const logMetadata = {
- contentHash,
- expectedSize,
- bytesRecieved,
- bytesWritten,
- }
- if (err) {
- this.logger.error(`Error while processing content data stream`, {
- err,
- ...logMetadata,
- })
- this.drop(contentHash)
- reject(err)
- } else {
- if (bytesWritten === bytesRecieved && bytesWritten === expectedSize) {
- const mimeType = await this.guessMimeType(contentHash)
- this.logger.info('New content accepted', { ...logMetadata })
- this.stateCache.dropPendingDownload(contentHash)
- this.stateCache.newContent(contentHash, expectedSize)
- this.stateCache.setContentMimeType(contentHash, mimeType)
- } else {
- this.logger.error('Content rejected: Bytes written/recieved/expected mismatch!', {
- ...logMetadata,
- })
- this.drop(contentHash)
- }
- }
- })
+ let bytesRecieved = 0
- fileStream.on('open', () => {
- // Note: The promise is resolved on "ready" event, since that's what's awaited in the current flow
- resolve()
+ pipeline(dataStream, fileStream, async (err) => {
+ const { bytesWritten } = fileStream
+ const logMetadata = {
+ contentHash,
+ expectedSize,
+ bytesRecieved,
+ bytesWritten,
+ }
+ if (err) {
+ this.logger.error(`Error while processing content data stream`, {
+ err,
+ ...logMetadata,
+ this.drop(contentHash)
+ reject(err)
+ } else {
+ if (bytesWritten === bytesRecieved && bytesWritten === expectedSize) {
+ const mimeType = await this.guessMimeType(contentHash)
+ this.logger.info('New content accepted', { ...logMetadata })
+ this.stateCache.dropPendingDownload(contentHash)
+ this.stateCache.newContent(contentHash, expectedSize)
+ this.stateCache.setContentMimeType(contentHash, mimeType)
+ } else {
+ this.logger.error('Content rejected: Bytes written/recieved/expected mismatch!', {
+ ...logMetadata,
+ })
+ this.drop(contentHash)
+ }
+ }
+ })
- dataStream.on('data', (chunk) => {
- bytesRecieved += chunk.length
- if (bytesRecieved > expectedSize) {
- dataStream.destroy(new Error('Unexpected content size: Too much data recieved from source!'))
- }
- })
- })
- .catch((err) => {
- this.logger.error('Error while trying to drop items from cache', { err })
- })
+ fileStream.on('open', () => {
+ // Note: The promise is resolved on "ready" event, since that's what's awaited in the current flow
+ resolve()
+ })
+ dataStream.on('data', (chunk) => {
+ bytesRecieved += chunk.length
+ if (bytesRecieved > expectedSize) {
+ dataStream.destroy(new Error('Unexpected content size: Too much data recieved from source!'))
+ }
+ })