Browse Source

storage-node-v2: Add URL resolving for data objects.

Shamil Gadelshin 3 years ago
parent
commit
f0b62f767b

+ 34 - 1
storage-node-v2/scripts/generate-test-data.ts

@@ -10,7 +10,7 @@ async function doJob(): Promise<void> {
   const uploadDirectory = '/Users/shamix/uploads2'
   const fileSize = 100
 
-  const objectNumber = 10
+  const objectNumber = 1000
   const bagNumber = 10
   const bucketNumber = 10
 
@@ -39,6 +39,7 @@ async function doJob(): Promise<void> {
     await createBuckets(client, bucketNumber)
     await createBagBucketLinks(client)
     await createBucketWorkerLinks(client)
+    await createBucketOperatorUrls(client)
     const dbTasks = createDataObjects(client, objectNumber)
     await Promise.all(dbTasks)
 
@@ -136,6 +137,11 @@ async function createBagBucketLinks(client: Client): Promise<void> {
     await client.query(
       `INSERT INTO storage_bag_storage_bucket(storage_bag_id, storage_bucket_id) 
        values('1', '2')`
+    )    
+    // Bucket3 to Bag1
+    await client.query(
+      `INSERT INTO storage_bag_storage_bucket(storage_bag_id, storage_bucket_id) 
+       values('1', '3')`
     )
 }
 
@@ -144,6 +150,7 @@ async function createBucketWorkerLinks(client: Client): Promise<void> {
 
     const assignedWorker0 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 0}`
     const assignedWorker1 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 1}`
+    const assignedWorker2 = `{"isTypeOf": "StorageBucketOperatorStatusActive", "workerId": 2}`
 
     // Bucket1 to Worker0
     await client.query(
@@ -156,6 +163,32 @@ async function createBucketWorkerLinks(client: Client): Promise<void> {
       `UPDATE storage_bucket
        SET operator_status = '${assignedWorker1}'
        WHERE id = '2'`
+    )   
+     // Bucket3 to Worker2
+    await client.query(
+      `UPDATE storage_bucket
+       SET operator_status = '${assignedWorker2}'
+       WHERE id = '3'`
+    )
+}
+
+async function createBucketOperatorUrls(client: Client): Promise<void> {
+    console.log(`Writing bucket operator URLs...`)
+
+    const metadata1 = `http://localhost:3333/`
+    const metadata3 = `http://localhost:3334/`
+
+    // Bucket1
+    await client.query(
+      `UPDATE storage_bucket
+       SET operator_metadata = '${metadata1}'
+       WHERE id = '1'`
+    )
+     // Bucket3
+    await client.query(
+      `UPDATE storage_bucket
+       SET operator_metadata = '${metadata3}'
+       WHERE id = '3'`
     )
 }
 

+ 3 - 3
storage-node-v2/src/commands/dev/sync.ts

@@ -27,7 +27,7 @@ export default class DevSync extends Command {
       required: false,
       description:
         'Storage node host and port (e.g.: some.com:8081) to get data from.',
-    }),    
+    }),
     uploads: flags.string({
       char: 'd',
       required: true,
@@ -42,7 +42,7 @@ export default class DevSync extends Command {
 
     const queryNodeHost = flags.queryNodeHost ?? 'localhost:8081'
     const queryNodeUrl = `http://${queryNodeHost}/graphql`
-    const processNumber = flags.processNumber ?? 30
+    const processNumber = flags.processNumber ?? 20
     const dataSourceOperatorHost =
       flags.dataSourceOperatorHost ?? 'localhost:3333'
     const operatorUrl = `http://${dataSourceOperatorHost}/`
@@ -62,7 +62,7 @@ export default class DevSync extends Command {
   }
 }
 
-//TODO: implement periodical sync
+// TODO: implement periodical sync
 // import sleep from 'sleep-promise'
 // export function runSyncWithInterval() {
 //   setTimeout(async () => {

+ 6 - 6
storage-node-v2/src/services/sync/dataObjectsModel.ts

@@ -35,15 +35,15 @@ export async function getRuntimeModel(
 ): Promise<Model> {
   const api = new QueryNodeApi(queryNodeUrl)
 
-  let allBuckets = await getAllBuckets(api)
+  const allBuckets = await getAllBuckets(api)
 
-  let bucketIds = allBuckets
+  const bucketIds = allBuckets
     .filter((bucket) => bucket.operatorStatus?.workerId === workerId)
     .map((bucket) => bucket.id)
-  let assignedBags = await getAllAssignedBags(api, bucketIds)
+  const assignedBags = await getAllAssignedBags(api, bucketIds)
 
-  let bagIds = assignedBags.map((bag) => bag.id)
-  let assignedDataObjects = await getAllAssignedDataObjects(api, bagIds)
+  const bagIds = assignedBags.map((bag) => bag.id)
+  const assignedDataObjects = await getAllAssignedDataObjects(api, bagIds)
 
   const model: Model = {
     storageBuckets: allBuckets.map((bucket) => ({
@@ -99,7 +99,7 @@ async function getAllObjectsWithPaging<T>(
   objectName: string,
   query: (offset: number, limit: number) => Promise<T[]>
 ): Promise<T[]> {
-  let result = []
+  const result = []
   const limit = 1000 // TODO: make as parameter?
   let offset = 0
 

+ 63 - 0
storage-node-v2/src/services/sync/remoteData.ts

@@ -0,0 +1,63 @@
+import fetch from 'node-fetch'
+import urljoin from 'url-join'
+import logger from '../../services/logger'
+import NodeCache from 'node-cache'
+
+// Expiration period in seconds for the local cache.
+const ExpirationPeriod: number = 5 * (60 * 1000) // minutes
+
+// Max data entries in local cache
+const MaxEntries = 10000
+
+// Local in-memory cache for CIDs by operator URL.
+const availableCidsCache = new NodeCache({
+  stdTTL: ExpirationPeriod,
+  deleteOnExpire: true,
+  maxKeys: MaxEntries,
+})
+
+// Local in-memory cache for faulty operator URL. Prevents fetching from the
+// offline storage nodes.
+const badOperatorUrls = new NodeCache({
+  stdTTL: ExpirationPeriod,
+  deleteOnExpire: true,
+  maxKeys: MaxEntries,
+})
+
+export async function getAvailableData(operatorUrl: string): Promise<string[]> {
+  const url = urljoin(operatorUrl, 'api/v1/sync')
+
+  const faultyOperator = badOperatorUrls.has(operatorUrl)
+  if (faultyOperator) {
+    logger.debug(`Sync - cached error for the ${url} skipping ....`)
+    return []
+  }
+
+  const cachedData = availableCidsCache.get<string[]>(url)
+  if (!!cachedData) {
+    logger.debug(`Sync - getting from cache available data for ${url}`)
+    return cachedData
+  }
+
+  try {
+    logger.debug(`Sync - fetching available data for ${url}`)
+    const response = await fetch(url)
+    if (!response.ok) {
+      logger.error(
+        `Sync - unexpected response for ${url}: ${response.statusText}`
+      )
+
+      return []
+    }
+
+    const data = await response.json()
+    availableCidsCache.set(url, data, ExpirationPeriod)
+
+    return data
+  } catch (err) {
+    logger.error(`Sync - fetching data error from ${url}: ${err}`)
+    badOperatorUrls.set(operatorUrl, null, ExpirationPeriod)
+  }
+
+  return []
+}

+ 128 - 24
storage-node-v2/src/services/sync/synchronizer.ts

@@ -1,4 +1,5 @@
 import { getRuntimeModel, Model } from '../../services/sync/dataObjectsModel'
+import { getAvailableData } from '../../services/sync/remoteData'
 import logger from '../../services/logger'
 import _ from 'lodash'
 import fs from 'fs'
@@ -11,7 +12,7 @@ import AwaitLock from 'await-lock'
 import sleep from 'sleep-promise'
 const fsPromises = fs.promises
 
-//TODO: use caching
+// TODO: use caching
 export async function getLocalDataObjects(
   uploadDirectory: string
 ): Promise<string[]> {
@@ -25,7 +26,7 @@ export async function performSync(
   processNumber: number,
   queryNodeUrl: string,
   uploadDirectory: string,
-  operatorUrl: string
+  operatorUrl?: string
 ): Promise<void> {
   logger.info('Started syncing...')
   const [model, files] = await Promise.all([
@@ -42,17 +43,29 @@ export async function performSync(
   logger.debug(`Sync - added objects: ${added.length}`)
   logger.debug(`Sync - deleted objects: ${deleted.length}`)
 
+  const workingStack = new WorkingStack()
   const deletedTasks = deleted.map(
     (fileName) => new DeleteLocalFileTask(uploadDirectory, fileName)
   )
-  const addedTasks = await getDownloadTasks(model, operatorUrl, added, uploadDirectory)
+
+  let addedTasks: SyncTask[]
+  if (operatorUrl !== null) {
+    addedTasks = await getPrepareDownloadTasks(
+      model,
+      added,
+      uploadDirectory,
+      workingStack
+    )
+  } else {
+    addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory)
+  }
 
   logger.debug(`Sync - started processing...`)
-  const workingStack = new WorkingStack()
+
   const processSpawner = new TaskProcessorSpawner(workingStack, processNumber)
 
-  workingStack.add(addedTasks)
-  workingStack.add(deletedTasks)
+  await workingStack.add(addedTasks)
+  await workingStack.add(deletedTasks)
 
   await processSpawner.process()
   logger.info('Sync ended.')
@@ -108,11 +121,11 @@ class DownloadFileTask implements SyncTask {
       try {
         await streamPipeline(response.body, fs.createWriteStream(this.filepath))
       } catch (err) {
-        logger.error(`Fetching data error for ${this.url}: ${err}`)
+        logger.error(`Sync - fetching data error for ${this.url}: ${err}`)
       }
     } else {
       logger.error(
-        `Unexpected response for ${this.url}: ${response.statusText}`
+        `Sync - unexpected response for ${this.url}: ${response.statusText}`
       )
     }
   }
@@ -168,7 +181,7 @@ class TaskProcessorSpawner {
   async process(): Promise<void> {
     const processes = []
 
-    for (let i: number = 0; i < this.processNumber; i++) {
+    for (let i = 0; i < this.processNumber; i++) {
       const processor = new TaskProcessor(this.taskSource)
       processes.push(processor.process())
     }
@@ -181,7 +194,7 @@ class TaskProcessor {
   taskSource: TaskSource
   exitOnCompletion: boolean
 
-  constructor(taskSource: TaskSource, exitOnCompletion: boolean = true) {
+  constructor(taskSource: TaskSource, exitOnCompletion = true) {
     this.taskSource = taskSource
     this.exitOnCompletion = exitOnCompletion
   }
@@ -204,32 +217,123 @@ class TaskProcessor {
   }
 }
 
-
-async function getDownloadTasks(
+async function getPrepareDownloadTasks(
   model: Model,
-  operatorUrl: string,
   addedCids: string[],
-  uploadDirectory: string
-): Promise<DownloadFileTask[]> {
-  //model.dataObjects.
+  uploadDirectory: string,
+  taskSink: TaskSink
+): Promise<PrepareDownloadFileTask[]> {
+  const cidMap = new Map()
+  for (const entry of model.dataObjects) {
+    cidMap.set(entry.cid, entry.bagId)
+  }
 
-  model.bags[0].buckets[0]
+  const bucketMap = new Map()
+  for (const entry of model.storageBuckets) {
+    bucketMap.set(entry.id, entry.operatorUrl)
+  }
+
+  const bagMap = new Map()
+  for (const entry of model.bags) {
+    const operatorUrls = []
 
-  const buckets = new Set()
+    for (const bucket of entry.buckets) {
+      if (bucketMap.has(bucket)) {
+        const operatorUrl = bucketMap.get(bucket)
+        if (operatorUrl) {
+          operatorUrls.push(operatorUrl)
+        }
+      }
+    }
 
-  for (const bag of bags) {
-    buckets.
-} 
+    bagMap.set(entry.id, operatorUrls)
+  }
 
-  const urls = new Map()
-  
-  for 
+  const tasks = addedCids.map((cid) => {
+    let operatorUrls: string[] = [] // can be empty after look up
+    if (cidMap.has(cid)) {
+      const bagid = cidMap.get(cid)
+      if (bagMap.has(bagid)) {
+        operatorUrls = bagMap.get(bagid)
+      }
+    }
 
+    return new PrepareDownloadFileTask(
+      operatorUrls,
+      cid,
+      uploadDirectory,
+      taskSink
+    )
+  })
 
+  return tasks
+}
 
+async function getDownloadTasks(
+  operatorUrl: string,
+  addedCids: string[],
+  uploadDirectory: string
+): Promise<DownloadFileTask[]> {
   const addedTasks = addedCids.map(
     (fileName) => new DownloadFileTask(operatorUrl, fileName, uploadDirectory)
   )
 
   return addedTasks
 }
+
+class PrepareDownloadFileTask implements SyncTask {
+  cid: string
+  operatorUrlCandidates: string[]
+  taskSink: TaskSink
+  uploadsDirectory: string
+
+  constructor(
+    operatorUrlCandidates: string[],
+    cid: string,
+    uploadsDirectory: string,
+    taskSink: TaskSink
+  ) {
+    this.cid = cid
+    this.taskSink = taskSink
+    // Cloning is critical here. The list will be modified.
+    this.operatorUrlCandidates = _.cloneDeep(operatorUrlCandidates)
+    this.uploadsDirectory = uploadsDirectory
+  }
+
+  description(): string {
+    return `Sync - preparing for download of: ${this.cid} ....`
+  }
+
+  async execute(): Promise<void> {
+    while (!_.isEmpty(this.operatorUrlCandidates)) {
+      const randomUrl = _.sample(this.operatorUrlCandidates)
+      if (!randomUrl) {
+        break // cannot get random URL
+      }
+
+      // Remove random url from the original list.
+      _.remove(this.operatorUrlCandidates, (url) => url === randomUrl)
+
+      try {
+        const chosenBaseUrl = randomUrl
+        const remoteOperatorCids: string[] = await getAvailableData(
+          chosenBaseUrl
+        )
+
+        if (remoteOperatorCids.includes(this.cid)) {
+          const newTask = new DownloadFileTask(
+            chosenBaseUrl,
+            this.cid,
+            this.uploadsDirectory
+          )
+
+          return this.taskSink.add([newTask])
+        }
+      } catch (err) {
+        logger.error(`Sync - fetching data error for ${this.cid}: ${err}`)
+      }
+    }
+
+    logger.warn(`Sync - cannot get operator URLs for ${this.cid}`)
+  }
+}

+ 1 - 3
storage-node-v2/src/services/webApi/controllers/publicApi.ts

@@ -433,11 +433,10 @@ function getHttpStatusCodeByError(err: Error): number {
 
   return 500
 }
-
 /**
  * A public endpoint: return all local data objects.
  */
- export async function getAllLocalDataObjects(
+export async function getAllLocalDataObjects(
   req: express.Request,
   res: express.Response
 ): Promise<void> {
@@ -448,7 +447,6 @@ function getHttpStatusCodeByError(err: Error): number {
 
     res.status(200).json(cids)
   } catch (err) {
-
     res.status(500).json({
       type: 'local_data_objects',
       message: err.toString(),