Просмотр исходного кода

storage-node: runtime-api, refactor and comment RuntimeApi.signAndSend for better readability

Mokhtar Naamani 4 лет назад
Родитель
Сommit
0deaec60de
1 измененных файлов с 78 добавлено и 103 удалено
  1. 78 103
      storage-node/packages/runtime-api/index.js

+ 78 - 103
storage-node/packages/runtime-api/index.js

@@ -151,9 +151,9 @@ class RuntimeApi {
   }
 
   /*
-   * Nonce-aware signAndSend(). Also allows you to use the accountId instead
-   * of the key, making calls a little simpler. Will lock to prevent concurrent
-   * calls so correct nonce is used.
+   * signAndSend() with nonce tracking, to enable concurrent sending of transacctions
+   * so that they can be included in the same block. Allows you to use the accountId instead
+   * of the key, without requiring an external Signer configured on the underlying ApiPromie
    *
    * If the subscribed events are given, and a callback as well, then the
    * callback is invoked with matching events.
@@ -163,121 +163,96 @@ class RuntimeApi {
 
     // Key must be unlocked
     const fromKey = this.identities.keyring.getPair(accountId)
+
     if (fromKey.isLocked) {
       throw new Error('Must unlock key before using it to sign!')
     }
 
+    // Promise that will be resolved when the submitted transaction is finalized
+    // it will be rejected if the transaction is rejected by the node.
     const finalizedPromise = newExternallyControlledPromise()
 
+    // function assigned when transaction is successfully submitted
+    let unsubscribe
+
+    // Look for matching events and make a callback. Absorb any expections
+    // raised by the callback to ensure we continue to process the complete
+    // transaction lifecyle
+    const handleMatchingEvents = (events) => {
+      try {
+        if (subscribed && callback) {
+          const matched = RuntimeApi.matchingEvents(subscribed, events)
+          debug('Matching events:', matched)
+          if (matched.length) {
+            callback(matched)
+          }
+        }
+      } catch (err) {
+        debug(`Error handling events ${err.stack}`)
+      }
+    }
+
+    const handleTxCycle = ({ events = [], status }) => {
+      // when handling tx life cycle we can never detect api disconnect and could be waiting
+      // for events for ever!
+      handleMatchingEvents(events)
+
+      if (status.isFinalized) {
+        // transaction was included in block (finalized)
+        // resolve with the transaction hash
+        unsubscribe()
+        finalizedPromise.resolve(status.asFinalized)
+      } else if (status.isFuture) {
+        // This can happen if the code is incorrect, but also in a scenario where a joystream-node
+        // lost connectivity, the storage node submitted a few transactions, and incremented the nonce.
+        // The joystream-node later was restarted and storage-node continues using cached nonce.
+
+        // Can we detect api disconnect and reset nonce?
+
+        debug(`== Error: Submitted transaction with future nonce ==`)
+        delete this.nonces[accountId]
+        finalizedPromise.reject('Future Tx Nonce')
+      }
+    }
+
+    // synchronize access to nonce
     await this.executeWithAccountLock(accountId, async () => {
       // Try to get the next nonce to use
       let nonce = this.nonces[accountId]
+      // Remember if we read a previously saved nonce
+      const nonceWasCached = nonce !== undefined
+      // If it wasn't cached read it from chain and save it
+      nonce = this.nonces[accountId] = nonce || (await this.api.query.system.accountNonce(accountId))
 
-      let incrementNonce = () => {
-        // only increment once
-        incrementNonce = () => {
-          /* turn it into a no-op */
+      try {
+        unsubscribe = await tx.sign(fromKey, { nonce }).send(handleTxCycle)
+        // transaction submitted successfully, increment and save nonce,
+        // unless it was reset in handleTxCycle()
+        if (this.nonces[accountId] !== undefined) {
+          this.nonces[accountId] = nonce.addn(1)
+        }
+      } catch (err) {
+        debug('Transaction Rejected:', err.toString())
+        // Error here could be simply bad input to the transactions. It may also
+        // be due to bad nonce, resulting in attempt to replace transactions with same nonce
+        // either that were future transactions,
+        // or because of stale nonces (this will happen while a joystream-node is far behind in syncing because
+        // we will read the nonce from chain and by the time we submit the transaction, the node would have fetched a few more blocks
+        // where the nonce of the account might have changed to a higher value)
+        // Occasionally the storage node operator will use their role account from another application
+        // to send transactions to manage their role which will change the nonce, and due to a race condition
+        // between reading the nonce from chain, and signing a transaction, the selected nonce may become stale.
+
+        // All we can do is reset the nonce and re-read it from chain on next tx submit attempt.
+        // The storage node will eventually recover.
+        if (nonceWasCached) {
+          delete this.nonces[accountId]
         }
-        nonce = nonce.addn(1)
-        this.nonces[accountId] = nonce
-      }
 
-      // If the nonce isn't available, get it from chain.
-      if (!nonce) {
-        // current nonce
-        // TODO: possible race condition here found by the linter
-        // eslint-disable-next-line require-atomic-updates
-        nonce = await this.api.query.system.accountNonce(accountId)
-        debug(`Got nonce for ${accountId} from chain: ${nonce}`)
+        finalizedPromise.reject(err)
       }
-
-      return new Promise((resolve, reject) => {
-        debug('Signing and sending tx')
-        // send(statusUpdates) returns a function for unsubscribing from status updates
-        const unsubscribe = tx
-          .sign(fromKey, { nonce })
-          .send(({ events = [], status }) => {
-            debug(`TX status: ${status.type}`)
-
-            // Whatever events we get, process them if there's someone interested.
-            // It is critical that this event handling doesn't prevent
-            try {
-              if (subscribed && callback) {
-                const matched = RuntimeApi.matchingEvents(subscribed, events)
-                debug('Matching events:', matched)
-                if (matched.length) {
-                  callback(matched)
-                }
-              }
-            } catch (err) {
-              debug(`Error handling events ${err.stack}`)
-            }
-
-            // We want to release lock as early as possible, sometimes Ready status
-            // doesn't occur, so we do it on Broadcast instead
-            if (status.isReady) {
-              debug('TX Ready.')
-              incrementNonce()
-              resolve(unsubscribe) // releases lock
-            } else if (status.isBroadcast) {
-              debug('TX Broadcast.')
-              incrementNonce()
-              resolve(unsubscribe) // releases lock
-            } else if (status.isFinalized) {
-              debug('TX Finalized.')
-              finalizedPromise.resolve(status)
-            } else if (status.isFuture) {
-              // comes before ready.
-              // does that mean it will remain in mempool or in api internal queue?
-              // nonce was set in the future. Treating it as an error for now.
-              debug('TX Future!')
-              // nonce is likely out of sync, delete it so we reload it from chain on next attempt
-              delete this.nonces[accountId]
-              const err = new Error('transaction nonce set in future')
-              finalizedPromise.reject(err)
-              reject(err)
-            }
-
-            /* why don't we see these status updates on local devchain (single node)
-            isUsurped
-            isBroadcast
-            isDropped
-            isInvalid
-            */
-          })
-          .catch((err) => {
-            // 1014 error: Most likely you are sending transaction with the same nonce,
-            // so it assumes you want to replace existing one, but the priority is too low to replace it (priority = fee = len(encoded_transaction) currently)
-            // Remember this can also happen if in the past we sent a tx with a future nonce, and the current nonce
-            // now matches it.
-            if (err) {
-              const errstr = err.toString()
-              // not the best way to check error code.
-              // https://github.com/polkadot-js/api/blob/master/packages/rpc-provider/src/coder/index.ts#L52
-              if (
-                errstr.indexOf('Error: 1014:') < 0 && // low priority
-                errstr.indexOf('Error: 1010:') < 0
-              ) {
-                // bad transaction
-                // Error but not nonce related. (bad arguments maybe)
-                debug('TX error', err)
-              } else {
-                // nonce is likely out of sync, delete it so we reload it from chain on next attempt
-                delete this.nonces[accountId]
-              }
-            }
-
-            finalizedPromise.reject(err)
-            // releases lock
-            reject(err)
-          })
-      })
     })
 
-    // when does it make sense to manyally unsubscribe?
-    // at this point unsubscribe.then and unsubscribe.catch have been deleted
-    // unsubscribe() // don't unsubscribe if we want to wait for additional status
-    // updates to know when the tx has been finalized
     return finalizedPromise.promise
   }