StateCacheService.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. import { Logger } from 'winston'
  2. import { ReadonlyConfig, StorageNodeDownloadResponse } from '../../types'
  3. import { LoggingService } from '../logging'
  4. import _ from 'lodash'
  5. import fs from 'fs'
  6. // LRU-SP cache parameters
  7. // Since size is in KB, these parameters should be enough for grouping objects of size up to 2^24 KB = 16 GB
  8. // TODO: Intoduce MAX_CACHED_ITEM_SIZE and skip caching for large objects entirely? (ie. 10 GB objects)
  9. export const CACHE_GROUP_LOG_BASE = 2
  10. export const CACHE_GROUPS_COUNT = 24
  11. type PendingDownloadStatus = 'Waiting' | 'LookingForSource' | 'Downloading'
  12. export interface PendingDownloadData {
  13. objectSize: number
  14. status: PendingDownloadStatus
  15. promise: Promise<StorageNodeDownloadResponse>
  16. }
  17. export interface StorageNodeEndpointData {
  18. last10ResponseTimes: number[]
  19. }
  20. export interface CacheItemData {
  21. sizeKB: number
  22. popularity: number
  23. lastAccessTime: number
  24. }
  25. export class StateCacheService {
  26. private logger: Logger
  27. private config: ReadonlyConfig
  28. private cacheFilePath: string
  29. private saveInterval: NodeJS.Timeout
  30. private memoryState = {
  31. pendingDownloadsByContentHash: new Map<string, PendingDownloadData>(),
  32. contentHashByObjectId: new Map<string, string>(),
  33. storageNodeEndpointDataByEndpoint: new Map<string, StorageNodeEndpointData>(),
  34. groupNumberByContentHash: new Map<string, number>(),
  35. }
  36. private storedState = {
  37. lruCacheGroups: Array.from({ length: CACHE_GROUPS_COUNT }).map(() => new Map<string, CacheItemData>()),
  38. mimeTypeByContentHash: new Map<string, string>(),
  39. }
  40. public constructor(config: ReadonlyConfig, logging: LoggingService, saveIntervalMs = 60 * 1000) {
  41. this.logger = logging.createLogger('StateCacheService')
  42. this.cacheFilePath = `${config.directories.cache}/cache.json`
  43. this.config = config
  44. this.saveInterval = setInterval(() => this.save(), saveIntervalMs)
  45. }
  46. public setContentMimeType(contentHash: string, mimeType: string): void {
  47. this.storedState.mimeTypeByContentHash.set(contentHash, mimeType)
  48. }
  49. public getContentMimeType(contentHash: string): string | undefined {
  50. return this.storedState.mimeTypeByContentHash.get(contentHash)
  51. }
  52. public setObjectContentHash(objectId: string, hash: string): void {
  53. this.memoryState.contentHashByObjectId.set(objectId, hash)
  54. }
  55. public getObjectContentHash(objectId: string): string | undefined {
  56. return this.memoryState.contentHashByObjectId.get(objectId)
  57. }
  58. private calcCacheGroup({ sizeKB, popularity }: CacheItemData) {
  59. return Math.min(
  60. Math.max(Math.ceil(Math.log(sizeKB / popularity) / Math.log(CACHE_GROUP_LOG_BASE)), 0),
  61. CACHE_GROUPS_COUNT - 1
  62. )
  63. }
  64. public getCachedContentHashes(): string[] {
  65. let hashes: string[] = []
  66. for (const [, group] of this.storedState.lruCacheGroups.entries()) {
  67. hashes = hashes.concat(Array.from(group.keys()))
  68. }
  69. return hashes
  70. }
  71. public getCachedContentLength(): number {
  72. return this.storedState.lruCacheGroups.reduce((a, b) => a + b.size, 0)
  73. }
  74. public newContent(contentHash: string, sizeInBytes: number): void {
  75. const { groupNumberByContentHash } = this.memoryState
  76. const { lruCacheGroups } = this.storedState
  77. if (groupNumberByContentHash.get(contentHash)) {
  78. this.logger.warn('newContent was called for content that already exists, ignoring the call', { contentHash })
  79. return
  80. }
  81. const cacheItemData: CacheItemData = {
  82. popularity: 1,
  83. lastAccessTime: Date.now(),
  84. sizeKB: Math.ceil(sizeInBytes / 1024),
  85. }
  86. const groupNumber = this.calcCacheGroup(cacheItemData)
  87. groupNumberByContentHash.set(contentHash, groupNumber)
  88. lruCacheGroups[groupNumber].set(contentHash, cacheItemData)
  89. }
  90. public peekContent(contentHash: string): CacheItemData | undefined {
  91. const groupNumber = this.memoryState.groupNumberByContentHash.get(contentHash)
  92. if (groupNumber !== undefined) {
  93. return this.storedState.lruCacheGroups[groupNumber].get(contentHash)
  94. }
  95. }
  96. public useContent(contentHash: string): void {
  97. const { groupNumberByContentHash } = this.memoryState
  98. const { lruCacheGroups } = this.storedState
  99. const groupNumber = groupNumberByContentHash.get(contentHash)
  100. if (groupNumber === undefined) {
  101. this.logger.warn('groupNumberByContentHash missing when trying to update LRU of content', { contentHash })
  102. return
  103. }
  104. const group = lruCacheGroups[groupNumber]
  105. const cacheItemData = group.get(contentHash)
  106. if (!cacheItemData) {
  107. this.logger.warn('Cache inconsistency: item missing in group retrieved from by groupNumberByContentHash map!', {
  108. contentHash,
  109. groupNumber,
  110. })
  111. groupNumberByContentHash.delete(contentHash)
  112. return
  113. }
  114. cacheItemData.lastAccessTime = Date.now()
  115. ++cacheItemData.popularity
  116. // Move object to the top of the current group / new group
  117. const targetGroupNumber = this.calcCacheGroup(cacheItemData)
  118. const targetGroup = lruCacheGroups[targetGroupNumber]
  119. group.delete(contentHash)
  120. targetGroup.set(contentHash, cacheItemData)
  121. if (targetGroupNumber !== groupNumber) {
  122. groupNumberByContentHash.set(contentHash, targetGroupNumber)
  123. }
  124. }
  125. public getCacheEvictCandidateHash(): string | null {
  126. let highestCost = 0
  127. let bestCandidate: string | null = null
  128. for (const group of this.storedState.lruCacheGroups) {
  129. const lastItemInGroup = Array.from(group.entries())[0]
  130. if (lastItemInGroup) {
  131. const [contentHash, objectData] = lastItemInGroup
  132. const elapsedSinceLastAccessed = Math.ceil((Date.now() - objectData.lastAccessTime) / 60_000)
  133. const itemCost = (elapsedSinceLastAccessed * objectData.sizeKB) / objectData.popularity
  134. if (itemCost >= highestCost) {
  135. highestCost = itemCost
  136. bestCandidate = contentHash
  137. }
  138. }
  139. }
  140. return bestCandidate
  141. }
  142. public newPendingDownload(
  143. contentHash: string,
  144. objectSize: number,
  145. promise: Promise<StorageNodeDownloadResponse>
  146. ): PendingDownloadData {
  147. const pendingDownload: PendingDownloadData = {
  148. status: 'Waiting',
  149. objectSize,
  150. promise,
  151. }
  152. this.memoryState.pendingDownloadsByContentHash.set(contentHash, pendingDownload)
  153. return pendingDownload
  154. }
  155. public getPendingDownloadsCount(): number {
  156. return this.memoryState.pendingDownloadsByContentHash.size
  157. }
  158. public getPendingDownload(contentHash: string): PendingDownloadData | undefined {
  159. return this.memoryState.pendingDownloadsByContentHash.get(contentHash)
  160. }
  161. public dropPendingDownload(contentHash: string): void {
  162. this.memoryState.pendingDownloadsByContentHash.delete(contentHash)
  163. }
  164. public dropByHash(contentHash: string): void {
  165. this.logger.debug('Dropping all state by content hash', contentHash)
  166. this.storedState.mimeTypeByContentHash.delete(contentHash)
  167. this.memoryState.pendingDownloadsByContentHash.delete(contentHash)
  168. const cacheGroupNumber = this.memoryState.groupNumberByContentHash.get(contentHash)
  169. this.logger.debug('Cache group by hash established', { contentHash, cacheGroupNumber })
  170. if (cacheGroupNumber) {
  171. this.memoryState.groupNumberByContentHash.delete(contentHash)
  172. this.storedState.lruCacheGroups[cacheGroupNumber].delete(contentHash)
  173. }
  174. }
  175. public setStorageNodeEndpointResponseTime(endpoint: string, time: number): void {
  176. const data = this.memoryState.storageNodeEndpointDataByEndpoint.get(endpoint) || { last10ResponseTimes: [] }
  177. if (data.last10ResponseTimes.length === 10) {
  178. data.last10ResponseTimes.shift()
  179. }
  180. data.last10ResponseTimes.push(time)
  181. if (!this.memoryState.storageNodeEndpointDataByEndpoint.has(endpoint)) {
  182. this.memoryState.storageNodeEndpointDataByEndpoint.set(endpoint, data)
  183. }
  184. }
  185. public getStorageNodeEndpointMeanResponseTime(endpoint: string, max = 99999): number {
  186. const data = this.memoryState.storageNodeEndpointDataByEndpoint.get(endpoint)
  187. return _.mean(data?.last10ResponseTimes || [max])
  188. }
  189. public getStorageNodeEndpointsMeanResponseTimes(max = 99999): [string, number][] {
  190. return Array.from(this.memoryState.storageNodeEndpointDataByEndpoint.keys()).map((endpoint) => [
  191. endpoint,
  192. this.getStorageNodeEndpointMeanResponseTime(endpoint, max),
  193. ])
  194. }
  195. private serializeData() {
  196. const { lruCacheGroups, mimeTypeByContentHash } = this.storedState
  197. return JSON.stringify(
  198. {
  199. lruCacheGroups: lruCacheGroups.map((g) => Array.from(g.entries())),
  200. mimeTypeByContentHash: Array.from(mimeTypeByContentHash.entries()),
  201. },
  202. null,
  203. 2 // TODO: Only for debugging
  204. )
  205. }
  206. public async save(): Promise<boolean> {
  207. return new Promise((resolve) => {
  208. const serialized = this.serializeData()
  209. const fd = fs.openSync(this.cacheFilePath, 'w')
  210. fs.write(fd, serialized, (err) => {
  211. fs.closeSync(fd)
  212. if (err) {
  213. this.logger.error('Cache file save error', { err })
  214. resolve(false)
  215. } else {
  216. this.logger.verbose('Cache file updated')
  217. resolve(true)
  218. }
  219. })
  220. })
  221. }
  222. public saveSync(): void {
  223. const serialized = this.serializeData()
  224. fs.writeFileSync(this.cacheFilePath, serialized)
  225. }
  226. private loadGroupNumberByContentHashMap() {
  227. const contentHashes = _.uniq(this.getCachedContentHashes())
  228. const { lruCacheGroups: groups } = this.storedState
  229. const { groupNumberByContentHash } = this.memoryState
  230. contentHashes.forEach((contentHash) => {
  231. groups.forEach((group, groupNumber) => {
  232. if (group.has(contentHash)) {
  233. if (!groupNumberByContentHash.has(contentHash)) {
  234. groupNumberByContentHash.set(contentHash, groupNumber)
  235. } else {
  236. // Content duplicated in multiple groups - remove!
  237. this.logger.warn(
  238. `Content hash ${contentHash} was found in in multiple lru cache groups. Removing from group ${groupNumber}...`,
  239. { firstGroup: groupNumberByContentHash.get(contentHash), currentGroup: groupNumber }
  240. )
  241. group.delete(contentHash)
  242. }
  243. }
  244. })
  245. })
  246. }
  247. public load(): void {
  248. if (fs.existsSync(this.cacheFilePath)) {
  249. this.logger.info('Loading cache from file', { file: this.cacheFilePath })
  250. try {
  251. const fileContent = JSON.parse(fs.readFileSync(this.cacheFilePath).toString())
  252. ;((fileContent.lruCacheGroups || []) as Array<Array<[string, CacheItemData]>>).forEach((group, groupIndex) => {
  253. this.storedState.lruCacheGroups[groupIndex] = new Map<string, CacheItemData>(group)
  254. })
  255. this.storedState.mimeTypeByContentHash = new Map<string, string>(fileContent.mimeTypeByContentHash || [])
  256. this.loadGroupNumberByContentHashMap()
  257. } catch (err) {
  258. this.logger.error('Error while trying to load data from cache file! Will start from scratch', {
  259. file: this.cacheFilePath,
  260. err,
  261. })
  262. }
  263. } else {
  264. this.logger.warn(`Cache file (${this.cacheFilePath}) is empty. Starting from scratch`)
  265. }
  266. }
  267. public clearInterval(): void {
  268. clearInterval(this.saveInterval)
  269. }
  270. }