service.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Copyright 2019 Joystream Contributors
  2. // This file is part of Joystream node.
  3. // Joystream node is free software: you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation, either version 3 of the License, or
  6. // (at your option) any later version.
  7. // Joystream node is distributed in the hope that it will be useful,
  8. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. // GNU General Public License for more details.
  11. // You should have received a copy of the GNU General Public License
  12. // along with Joystream node. If not, see <http://www.gnu.org/licenses/>.
  13. #![warn(unused_extern_crates)]
  14. //! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
  15. use client_db::Backend;
  16. use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
  17. use inherents::InherentDataProviders;
  18. use network::{construct_simple_protocol, NetworkService};
  19. use node_runtime::{self, opaque::Block, GenesisConfig, RuntimeApi};
  20. use offchain::OffchainWorkers;
  21. use primitives::Blake2Hasher;
  22. use runtime_primitives::traits::Block as BlockT;
  23. use std::sync::Arc;
  24. use substrate_client::{Client, LocalCallExecutor, LongestChain};
  25. pub use substrate_executor::{native_executor_instance, NativeExecutor};
  26. use substrate_service::{
  27. error::Error as ServiceError, AbstractService, Configuration, NetworkStatus, Service,
  28. ServiceBuilder,
  29. };
  30. use transaction_pool::{self, txpool::Pool as TransactionPool};
  31. construct_simple_protocol! {
  32. /// Demo protocol attachment for substrate.
  33. pub struct NodeProtocol where Block = Block { }
  34. }
  35. // Declare an instance of the native executor named `Executor`. Include the wasm binary as the
  36. // equivalent wasm code.
  37. native_executor_instance!(
  38. pub Executor,
  39. node_runtime::api::dispatch,
  40. node_runtime::native_version
  41. );
  42. /// Starts a `ServiceBuilder` for a full service.
  43. ///
  44. /// Use this macro if you don't actually need the full service, but just the builder in order to
  45. /// be able to perform chain operations.
  46. #[macro_export]
  47. macro_rules! new_full_start {
  48. ($config:expr) => {{
  49. // type RpcExtension = jsonrpc_core::IoHandler<substrate_rpc::Metadata>;
  50. let mut import_setup = None;
  51. let inherent_data_providers = inherents::InherentDataProviders::new();
  52. let builder = substrate_service::ServiceBuilder::new_full::<
  53. node_runtime::opaque::Block,
  54. node_runtime::RuntimeApi,
  55. crate::service::Executor,
  56. >($config)?
  57. .with_select_chain(|_config, backend| {
  58. Ok(substrate_client::LongestChain::new(backend.clone()))
  59. })?
  60. .with_transaction_pool(|config, client| {
  61. Ok(transaction_pool::txpool::Pool::new(
  62. config,
  63. transaction_pool::FullChainApi::new(client),
  64. ))
  65. })?
  66. .with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
  67. let select_chain = select_chain
  68. .take()
  69. .ok_or_else(|| substrate_service::Error::SelectChainRequired)?;
  70. let (grandpa_block_import, grandpa_link) =
  71. grandpa::block_import::<_, _, _, node_runtime::RuntimeApi, _>(
  72. client.clone(),
  73. &*client,
  74. select_chain,
  75. )?;
  76. let justification_import = grandpa_block_import.clone();
  77. let (block_import, babe_link) = babe::block_import(
  78. babe::Config::get_or_compute(&*client)?,
  79. grandpa_block_import,
  80. client.clone(),
  81. client.clone(),
  82. )?;
  83. let import_queue = babe::import_queue(
  84. babe_link.clone(),
  85. block_import.clone(),
  86. Some(Box::new(justification_import)),
  87. None,
  88. client.clone(),
  89. client,
  90. inherent_data_providers.clone(),
  91. )?;
  92. import_setup = Some((block_import, grandpa_link, babe_link));
  93. Ok(import_queue)
  94. })?;
  95. // We don't have any custom rpc commands...
  96. // .with_rpc_extensions(|client, pool| -> RpcExtension {
  97. // node_rpc::create(client, pool)
  98. // })?;
  99. (builder, import_setup, inherent_data_providers)
  100. }};
  101. }
  102. /// Creates a full service from the configuration.
  103. ///
  104. /// We need to use a macro because the test suit doesn't work with an opaque service. It expects
  105. /// concrete types instead.
  106. macro_rules! new_full {
  107. ($config:expr, $with_startup_data: expr) => {{
  108. use futures::sync::mpsc;
  109. use network::DhtEvent;
  110. let (
  111. is_authority,
  112. force_authoring,
  113. name,
  114. disable_grandpa
  115. ) = (
  116. $config.roles.is_authority(),
  117. $config.force_authoring,
  118. $config.name.clone(),
  119. $config.disable_grandpa
  120. );
  121. // sentry nodes announce themselves as authorities to the network
  122. // and should run the same protocols authorities do, but it should
  123. // never actively participate in any consensus process.
  124. let participates_in_consensus = is_authority && !$config.sentry_mode;
  125. let (builder, mut import_setup, inherent_data_providers) = new_full_start!($config);
  126. // Dht event channel from the network to the authority discovery module. Use bounded channel to ensure
  127. // back-pressure. Authority discovery is triggering one event per authority within the current authority set.
  128. // This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to
  129. // 10 000.
  130. let (dht_event_tx, _dht_event_rx) =
  131. mpsc::channel::<DhtEvent>(10_000);
  132. let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))?
  133. .with_finality_proof_provider(|client, backend|
  134. Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _)
  135. )?
  136. .with_dht_event_tx(dht_event_tx)?
  137. .build()?;
  138. let (block_import, grandpa_link, babe_link) = import_setup.take()
  139. .expect("Link Half and Block Import are present for Full Services or setup failed before. qed");
  140. ($with_startup_data)(&block_import, &babe_link);
  141. if participates_in_consensus {
  142. let proposer = substrate_basic_authorship::ProposerFactory {
  143. client: service.client(),
  144. transaction_pool: service.transaction_pool(),
  145. };
  146. let client = service.client();
  147. let select_chain = service.select_chain()
  148. .ok_or(substrate_service::Error::SelectChainRequired)?;
  149. let babe_config = babe::BabeParams {
  150. keystore: service.keystore(),
  151. client,
  152. select_chain,
  153. env: proposer,
  154. block_import,
  155. sync_oracle: service.network(),
  156. inherent_data_providers: inherent_data_providers.clone(),
  157. force_authoring,
  158. babe_link,
  159. };
  160. let babe = babe::start_babe(babe_config)?;
  161. service.spawn_essential_task(babe);
  162. }
  163. // if the node isn't actively participating in consensus then it doesn't
  164. // need a keystore, regardless of which protocol we use below.
  165. let keystore = if participates_in_consensus {
  166. Some(service.keystore())
  167. } else {
  168. None
  169. };
  170. let config = grandpa::Config {
  171. // FIXME #1578 make this available through chainspec
  172. gossip_duration: std::time::Duration::from_millis(333),
  173. justification_period: 512,
  174. name: Some(name),
  175. observer_enabled: true,
  176. keystore,
  177. is_authority,
  178. };
  179. match (is_authority, disable_grandpa) {
  180. (false, false) => {
  181. // start the lightweight GRANDPA observer
  182. service.spawn_task(Box::new(grandpa::run_grandpa_observer(
  183. config,
  184. grandpa_link,
  185. service.network(),
  186. service.on_exit(),
  187. )?));
  188. },
  189. (true, false) => {
  190. // start the full GRANDPA voter
  191. let grandpa_config = grandpa::GrandpaParams {
  192. config: config,
  193. link: grandpa_link,
  194. network: service.network(),
  195. inherent_data_providers: inherent_data_providers.clone(),
  196. on_exit: service.on_exit(),
  197. telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
  198. voting_rule: grandpa::VotingRulesBuilder::default().build(),
  199. };
  200. // the GRANDPA voter task is considered infallible, i.e.
  201. // if it fails we take down the service with it.
  202. service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?);
  203. },
  204. (_, true) => {
  205. grandpa::setup_disabled_grandpa(
  206. service.client(),
  207. &inherent_data_providers,
  208. service.network(),
  209. )?;
  210. },
  211. }
  212. Ok((service, inherent_data_providers))
  213. }};
  214. ($config:expr) => {{
  215. new_full!($config, |_, _| {})
  216. }}
  217. }
  218. #[allow(dead_code)]
  219. type ConcreteBlock = node_runtime::opaque::Block;
  220. #[allow(dead_code)]
  221. type ConcreteClient = Client<
  222. Backend<ConcreteBlock>,
  223. LocalCallExecutor<Backend<ConcreteBlock>, NativeExecutor<Executor>>,
  224. ConcreteBlock,
  225. node_runtime::RuntimeApi,
  226. >;
  227. #[allow(dead_code)]
  228. type ConcreteBackend = Backend<ConcreteBlock>;
  229. /// A specialized configuration object for setting up the node..
  230. pub type NodeConfiguration<C> =
  231. Configuration<C, GenesisConfig /*, crate::chain_spec::Extensions*/>;
  232. /// Builds a new service for a full client.
  233. pub fn new_full<C: Send + Default + 'static>(config: NodeConfiguration<C>)
  234. -> Result<
  235. Service<
  236. ConcreteBlock,
  237. ConcreteClient,
  238. LongestChain<ConcreteBackend, ConcreteBlock>,
  239. NetworkStatus<ConcreteBlock>,
  240. NetworkService<ConcreteBlock, crate::service::NodeProtocol, <ConcreteBlock as BlockT>::Hash>,
  241. TransactionPool<transaction_pool::FullChainApi<ConcreteClient, ConcreteBlock>>,
  242. OffchainWorkers<
  243. ConcreteClient,
  244. <ConcreteBackend as substrate_client::backend::Backend<Block, Blake2Hasher>>::OffchainStorage,
  245. ConcreteBlock,
  246. >
  247. >,
  248. ServiceError,
  249. >
  250. {
  251. new_full!(config).map(|(service, _)| service)
  252. }
  253. /// Builds a new service for a light client.
  254. pub fn new_light<C: Send + Default + 'static>(
  255. config: NodeConfiguration<C>,
  256. ) -> Result<impl AbstractService, ServiceError> {
  257. // type RpcExtension = jsonrpc_core::IoHandler<substrate_rpc::Metadata>;
  258. let inherent_data_providers = InherentDataProviders::new();
  259. let service = ServiceBuilder::new_light::<Block, RuntimeApi, Executor>(config)?
  260. .with_select_chain(|_config, backend| Ok(LongestChain::new(backend.clone())))?
  261. .with_transaction_pool(|config, client| {
  262. Ok(TransactionPool::new(
  263. config,
  264. transaction_pool::FullChainApi::new(client),
  265. ))
  266. })?
  267. .with_import_queue_and_fprb(
  268. |_config, client, backend, fetcher, _select_chain, _tx_pool| {
  269. let fetch_checker = fetcher
  270. .map(|fetcher| fetcher.checker().clone())
  271. .ok_or_else(|| {
  272. "Trying to start light import queue without active fetch checker"
  273. })?;
  274. let grandpa_block_import = grandpa::light_block_import::<_, _, _, RuntimeApi>(
  275. client.clone(),
  276. backend,
  277. &*client,
  278. Arc::new(fetch_checker),
  279. )?;
  280. let finality_proof_import = grandpa_block_import.clone();
  281. let finality_proof_request_builder =
  282. finality_proof_import.create_finality_proof_request_builder();
  283. let (babe_block_import, babe_link) = babe::block_import(
  284. babe::Config::get_or_compute(&*client)?,
  285. grandpa_block_import,
  286. client.clone(),
  287. client.clone(),
  288. )?;
  289. let import_queue = babe::import_queue(
  290. babe_link,
  291. babe_block_import,
  292. None,
  293. Some(Box::new(finality_proof_import)),
  294. client.clone(),
  295. client,
  296. inherent_data_providers.clone(),
  297. )?;
  298. Ok((import_queue, finality_proof_request_builder))
  299. },
  300. )?
  301. .with_network_protocol(|_| Ok(NodeProtocol::new()))?
  302. .with_finality_proof_provider(|client, backend| {
  303. Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _)
  304. })?
  305. // We don't have any custom rpc extensions
  306. // .with_rpc_extensions(|client, pool| -> RpcExtension {
  307. // node_rpc::create(client, pool)
  308. // })?
  309. .build()?;
  310. Ok(service)
  311. }