Procházet zdrojové kódy

storage node: update helios reporting tool

Mokhtar Naamani před 4 roky
rodič
revize
d462b587cf

+ 107 - 88
storage-node/packages/helios/bin/cli.js

@@ -1,125 +1,127 @@
 #!/usr/bin/env node
 
-const { RuntimeApi } = require('@joystream/runtime-api');
+const { RuntimeApi } = require('@joystream/runtime-api')
 const { encodeAddress } = require('@polkadot/keyring')
-const { discover } = require('@joystream/discovery');
-const axios = require('axios');
-const stripEndingSlash = require('@joystream/util/stripEndingSlash');
+const { discover } = require('@joystream/discovery')
+const axios = require('axios')
+const stripEndingSlash = require('@joystream/util/stripEndingSlash')
 
-(async function main () {
-
-  const runtime = await RuntimeApi.create();
-  const api  = runtime.api;
+async function main () {
+  const runtime = await RuntimeApi.create()
+  const { api } = runtime
 
   // get current blockheight
-  const currentHeader = await api.rpc.chain.getHeader();
-  const currentHeight = currentHeader.number.toBn();
+  const currentHeader = await api.rpc.chain.getHeader()
+  const currentHeight = currentHeader.number.toBn()
 
   // get all providers
-  const storageProviders = await api.query.actors.accountIdsByRole(0);
+  const storageProviders = await runtime.workers.getAllProviders()
+  console.log(`Found ${storageProviders.length} staked providers`)
 
-  const storageProviderAccountInfos = await Promise.all(storageProviders.map(async (account) => {
+  const storageProviderAccountInfos = await Promise.all(storageProviders.map(async (providerId) => {
     return ({
-      account,
-      info: await runtime.discovery.getAccountInfo(account),
-      joined: (await api.query.actors.actorByAccountId(account)).unwrap().joined_at
-    });
-  }));
+      providerId,
+      info: await runtime.discovery.getAccountInfo(providerId)
+    })
+  }))
 
-  const liveProviders = storageProviderAccountInfos.filter(({account, info}) => {
+  // providers that have updated their account info and published ipfs id
+  // considered live if the record hasn't expired yet
+  const liveProviders = storageProviderAccountInfos.filter(({info}) => {
     return info && info.expires_at.gte(currentHeight)
-  });
+  })
 
-  const downProviders = storageProviderAccountInfos.filter(({account, info}) => {
+  const downProviders = storageProviderAccountInfos.filter(({info}) => {
     return info == null
-  });
+  })
 
-  const expiredTtlProviders = storageProviderAccountInfos.filter(({account, info}) => {
+  const expiredTtlProviders = storageProviderAccountInfos.filter(({info}) => {
     return info && currentHeight.gte(info.expires_at)
-  });
+  })
 
-  let providersStatuses = mapInfoToStatus(liveProviders, currentHeight);
-  console.log('\n== Live Providers\n', providersStatuses);
+  let providersStatuses = mapInfoToStatus(liveProviders, currentHeight)
+  console.log('\n== Live Providers\n', providersStatuses)
 
   let expiredProviderStatuses = mapInfoToStatus(expiredTtlProviders, currentHeight)
-  console.log('\n== Expired Providers\n', expiredProviderStatuses);
+  console.log('\n== Expired Providers\n', expiredProviderStatuses)
 
-  // check when actor account was created consider grace period before removing
   console.log('\n== Down Providers!\n', downProviders.map(provider => {
     return ({
-      account: provider.account.toString(),
-      age: currentHeight.sub(provider.joined).toNumber()
+      providerId: provider.providerId
     })
-  }));
+  }))
 
   // Resolve IPNS identities of providers
   console.log('\nResolving live provider API Endpoints...')
-  //providersStatuses = providersStatuses.concat(expiredProviderStatuses);
-  let endpoints = await Promise.all(providersStatuses.map(async (status) => {
+  let endpoints = await Promise.all(providersStatuses.map(async ({providerId}) => {
     try {
-      let serviceInfo = await discover.discover_over_joystream_discovery_service(status.address, runtime);
-      let info = JSON.parse(serviceInfo.serialized);
-      console.log(`${status.address} -> ${info.asset.endpoint}`);
-      return { address: status.address, endpoint: info.asset.endpoint};
+      let serviceInfo = await discover.discover_over_joystream_discovery_service(providerId, runtime)
+
+      if (serviceInfo == null) {
+        console.log(`provider ${providerId} has not published service information`)
+        return { providerId, endpoint: null }
+      }
+
+      let info = JSON.parse(serviceInfo.serialized)
+      console.log(`${providerId} -> ${info.asset.endpoint}`)
+      return { providerId, endpoint: info.asset.endpoint }
     } catch (err) {
-      console.log('resolve failed', status.address, err.message);
-      return { address: status.address, endpoint: null};
+      console.log('resolve failed for id', providerId, err.message)
+      return { providerId, endpoint: null }
     }
-  }));
+  }))
 
-  console.log('\nChecking API Endpoint is online')
+  console.log('\nChecking API Endpoints are online')
   await Promise.all(endpoints.map(async (provider) => {
     if (!provider.endpoint) {
-      console.log('skipping', provider.address);
+      console.log('skipping', provider.address)
       return
     }
-    const swaggerUrl = `${stripEndingSlash(provider.endpoint)}/swagger.json`;
-    let error;
+    const swaggerUrl = `${stripEndingSlash(provider.endpoint)}/swagger.json`
+    let error
     try {
       await axios.get(swaggerUrl)
-    } catch (err) {error = err}
-    console.log(`${provider.endpoint} - ${error ? error.message : 'OK'}`);
-  }));
+      // maybe print out api version information to detect which version of colossus is running?
+      // or add anothe api endpoint for diagnostics information
+    } catch (err) { error = err }
+    console.log(`${provider.endpoint} - ${error ? error.message : 'OK'}`)
+  }))
 
-  // after resolving for each resolved provider, HTTP HEAD with axios all known content ids
-  // report available/known
   let knownContentIds = await runtime.assets.getKnownContentIds()
+  console.log(`\nData Directory has ${knownContentIds.length} assets`)
 
-  console.log(`\nContent Directory has ${knownContentIds.length} assets`);
-
+  // Check which providers are reporting a ready relationship for each asset
   await Promise.all(knownContentIds.map(async (contentId) => {
-    let [relationships, judgement] = await assetRelationshipState(api, contentId, storageProviders);
-    console.log(`${encodeAddress(contentId)} replication ${relationships}/${storageProviders.length} - ${judgement}`);
-  }));
-
-  console.log('\nChecking available assets on providers...');
-
-  endpoints.map(async ({address, endpoint}) => {
-    if (!endpoint) { return }
-    let { found, content } = await countContentAvailability(knownContentIds, endpoint);
-    console.log(`${address}: has ${found} assets`);
-    return content
-  });
-
+    let [relationshipsCount, judgement] = await assetRelationshipState(api, contentId, storageProviders)
+    console.log(`${encodeAddress(contentId)} replication ${relationshipsCount}/${storageProviders.length} - ${judgement}`)
+  }))
 
   // interesting disconnect doesn't work unless an explicit provider was created
   // for underlying api instance
-  runtime.api.disconnect();
-})();
+  // We no longer need a connection to the chain
+  api.disconnect()
+
+  console.log(`\nChecking available assets on providers (this can take some time)...`)
+  endpoints.forEach(async ({ providerId, endpoint }) => {
+    if (!endpoint) { return }
+    const total = knownContentIds.length
+    let { found, missing } = await countContentAvailability(knownContentIds, endpoint)
+    console.log(`provider ${providerId}: has ${found} out of ${total}`)
+  })
+}
 
-function mapInfoToStatus(providers, currentHeight) {
-  return providers.map(({account, info, joined}) => {
+function mapInfoToStatus (providers, currentHeight) {
+  return providers.map(({providerId, info}) => {
     if (info) {
       return {
-        address: account.toString(),
-        age: currentHeight.sub(joined).toNumber(),
+        providerId: providerId.toNumber(),
         identity: info.identity.toString(),
         expiresIn: info.expires_at.sub(currentHeight).toNumber(),
-        expired: currentHeight.gte(info.expires_at),
+        expired: currentHeight.gte(info.expires_at)
       }
     } else {
       return {
-        address: account.toString(),
+        providerId: providerId.toNumber(),
         identity: null,
         status: 'down'
       }
@@ -127,40 +129,57 @@ function mapInfoToStatus(providers, currentHeight) {
   })
 }
 
-async function countContentAvailability(contentIds, source) {
+// HTTP HEAD with axios all known content ids on each provider
+async function countContentAvailability (contentIds, source) {
   let content = {}
-  let found = 0;
-  for(let i = 0; i < contentIds.length; i++) {
-    const assetUrl = makeAssetUrl(contentIds[i], source);
+  let found = 0
+  let missing = 0
+  for (let i = 0; i < contentIds.length; i++) {
+    const assetUrl = makeAssetUrl(contentIds[i], source)
     try {
       let info = await axios.head(assetUrl)
       content[encodeAddress(contentIds[i])] = {
         type: info.headers['content-type'],
         bytes: info.headers['content-length']
       }
+      // TODO: cross check against dataobject size
       found++
-    } catch(err) { console.log(`${assetUrl} ${err.message}`); continue; }
+    } catch (err) {
+      // console.log(`${assetUrl} ${err.message}`)
+      missing++
+      continue
+    }
   }
-  console.log(content);
-  return { found, content };
+
+  return { found, missing, content }
 }
 
-function makeAssetUrl(contentId, source) {
-  source = stripEndingSlash(source);
+function makeAssetUrl (contentId, source) {
+  source = stripEndingSlash(source)
   return `${source}/asset/v0/${encodeAddress(contentId)}`
 }
 
-async function assetRelationshipState(api, contentId, providers) {
-  let dataObject = await api.query.dataDirectory.dataObjectByContentId(contentId);
+async function assetRelationshipState (api, contentId, providers) {
+  let dataObject = await api.query.dataDirectory.dataObjectByContentId(contentId)
 
-  // how many relationships out of active providers?
-  let relationshipIds = await api.query.dataObjectStorageRegistry.relationshipsByContentId(contentId);
+  let relationshipIds = await api.query.dataObjectStorageRegistry.relationshipsByContentId(contentId)
 
+  // how many relationships associated with active providers and in ready state
   let activeRelationships = await Promise.all(relationshipIds.map(async (id) => {
-    let relationship = await api.query.dataObjectStorageRegistry.relationships(id);
+    let relationship = await api.query.dataObjectStorageRegistry.relationships(id)
     relationship = relationship.unwrap()
+    // only interested in ready relationships
+    if (!relationship.ready) {
+      return undefined
+    }
+    // Does the relationship belong to an active provider ?
     return providers.find((provider) => relationship.storage_provider.eq(provider))
-  }));
+  }))
+
+  return ([
+    activeRelationships.filter(active => active).length,
+    dataObject.unwrap().liaison_judgement
+  ])
+}
 
-  return [activeRelationships.filter(active => active).length, dataObject.unwrap().liaison_judgement]
-}
+main()

+ 5 - 0
storage-node/packages/runtime-api/workers.js

@@ -71,6 +71,11 @@ class WorkersApi {
       return worker.role_account
     }
   }
+
+  async getAllProviders () {
+    const workerEntries = await this.base.api.query.storageWorkingGroup.workerById()
+    return workerEntries[0] // keys
+  }
 }
 
 module.exports = {