@@ -1,16 +1,14 @@
-import { getRuntimeModel, Model } from '../../services/sync/dataObjectsModel'
-import { getAvailableData } from '../../services/sync/remoteData'
+import { getRuntimeModel, Model } from './storageObligations'
import logger from '../../services/logger'
+import {
+ SyncTask,
+ DownloadFileTask,
+ DeleteLocalFileTask,
+ PrepareDownloadFileTask,
+} from './tasks'
+import { WorkingStack, TaskProcessorSpawner, TaskSink } from './workingProcess'
import _ from 'lodash'
import fs from 'fs'
-import path from 'path'
-import { pipeline } from 'stream'
-import { promisify } from 'util'
-import superagent from 'superagent'
-import urljoin from 'url-join'
-import AwaitLock from 'await-lock'
-import sleep from 'sleep-promise'
-import { v4 as uuidv4 } from 'uuid'
const fsPromises = fs.promises
// TODO: use caching
@@ -78,162 +76,6 @@ async function getLocalFileNames(directory: string): Promise<string[]> {
return fsPromises.readdir(directory)
-interface SyncTask {
- description(): string
- execute(): Promise<void>
-class DeleteLocalFileTask implements SyncTask {
- uploadsDirectory: string
- filename: string
- constructor(uploadsDirectory: string, filename: string) {
- this.uploadsDirectory = uploadsDirectory
- this.filename = filename
- }
- description(): string {
- return `Sync - deleting local file: ${this.filename} ....`
- }
- async execute(): Promise<void> {
- const fullPath = path.join(this.uploadsDirectory, this.filename)
- return fsPromises.unlink(fullPath)
- }
-class DownloadFileTask implements SyncTask {
- id: string
- uploadsDirectory: string
- url: string
- constructor(baseUrl: string, id: string, uploadsDirectory: string) {
- this.id = id
- this.uploadsDirectory = uploadsDirectory
- this.url = urljoin(baseUrl, 'api/v1/files', id)
- }
- description(): string {
- return `Sync - downloading file: ${this.url} to ${this.uploadsDirectory} ....`
- }
- async execute(): Promise<void> {
- const streamPipeline = promisify(pipeline)
- const filepath = path.join(this.uploadsDirectory, this.id)
- try {
- const timeoutMs = 30 * 60 * 1000 // 30 min for large files (~ 10 GB)
- // Casting because of:
- // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response
- const request = superagent
- .get(this.url)
- .timeout(timeoutMs) as unknown as NodeJS.ReadableStream
- // We create tempfile first to mitigate partial downloads on app (or remote node) crash.
- // This partial downloads will be cleaned up during the next sync iteration.
- const tempFilePath = path.join(this.uploadsDirectory, uuidv4())
- const fileStream = fs.createWriteStream(tempFilePath)
- await streamPipeline(request, fileStream)
- await fsPromises.rename(tempFilePath, filepath)
- } catch (err) {
- logger.error(`Sync - fetching data error for ${this.url}: ${err}`)
- try {
- logger.warn(`Cleaning up file ${filepath}`)
- await fs.unlinkSync(filepath)
- } catch (err) {
- logger.error(`Sync - cannot cleanup file ${filepath}: ${err}`)
- }
- }
- }
-interface TaskSink {
- add(tasks: SyncTask[]): Promise<void>
-interface TaskSource {
- get(): Promise<SyncTask | null>
-class WorkingStack implements TaskSink, TaskSource {
- workingStack: SyncTask[]
- lock: AwaitLock
- constructor() {
- this.workingStack = []
- this.lock = new AwaitLock()
- }
- async get(): Promise<SyncTask | null> {
- await this.lock.acquireAsync()
- const task = this.workingStack.pop()
- this.lock.release()
- if (task !== undefined) {
- return task
- } else {
- return null
- }
- }
- async add(tasks: SyncTask[]): Promise<void> {
- await this.lock.acquireAsync()
- if (tasks !== null) {
- this.workingStack.push(...tasks)
- }
- this.lock.release()
- }
-class TaskProcessorSpawner {
- processNumber: number
- taskSource: TaskSource
- constructor(taskSource: TaskSource, processNumber: number) {
- this.taskSource = taskSource
- this.processNumber = processNumber
- }
- async process(): Promise<void> {
- const processes = []
- for (let i = 0; i < this.processNumber; i++) {
- const processor = new TaskProcessor(this.taskSource)
- processes.push(processor.process())
- }
- await Promise.all(processes)
- }
-class TaskProcessor {
- taskSource: TaskSource
- exitOnCompletion: boolean
- constructor(taskSource: TaskSource, exitOnCompletion = true) {
- this.taskSource = taskSource
- this.exitOnCompletion = exitOnCompletion
- }
- async process(): Promise<void> {
- while (true) {
- const task = await this.taskSource.get()
- if (task !== null) {
- logger.debug(task.description())
- await task.execute()
- } else {
- if (this.exitOnCompletion) {
- return
- }
- await sleep(3000)
- }
- }
- }
async function getPrepareDownloadTasks(
model: Model,
addedCids: string[],
@@ -297,61 +139,3 @@ async function getDownloadTasks(
return addedTasks
-class PrepareDownloadFileTask implements SyncTask {
- cid: string
- operatorUrlCandidates: string[]
- taskSink: TaskSink
- uploadsDirectory: string
- constructor(
- operatorUrlCandidates: string[],
- cid: string,
- uploadsDirectory: string,
- taskSink: TaskSink
- ) {
- this.cid = cid
- this.taskSink = taskSink
- // TODO: remove heavy operation
- // Cloning is critical here. The list will be modified.
- this.operatorUrlCandidates = _.cloneDeep(operatorUrlCandidates)
- this.uploadsDirectory = uploadsDirectory
- }
- description(): string {
- return `Sync - preparing for download of: ${this.cid} ....`
- }
- async execute(): Promise<void> {
- while (!_.isEmpty(this.operatorUrlCandidates)) {
- const randomUrl = _.sample(this.operatorUrlCandidates)
- if (!randomUrl) {
- break // cannot get random URL
- }
- // Remove random url from the original list.
- _.remove(this.operatorUrlCandidates, (url) => url === randomUrl)
- try {
- const chosenBaseUrl = randomUrl
- const remoteOperatorCids: string[] = await getAvailableData(
- chosenBaseUrl
- )
- if (remoteOperatorCids.includes(this.cid)) {
- const newTask = new DownloadFileTask(
- chosenBaseUrl,
- this.cid,
- this.uploadsDirectory
- )
- return this.taskSink.add([newTask])
- }
- } catch (err) {
- logger.error(`Sync - fetching data error for ${this.cid}: ${err}`)
- }
- }
- logger.warn(`Sync - cannot get operator URLs for ${this.cid}`)
- }