//! # Data directory module //! Data directory module for the Joystream platform manages IPFS content id, storage providers, //! owners of the content. It allows to add and accept or reject the content in the system. //! //! ## Comments //! //! Data object type registry module uses working group module to authorize actions. //! //! ## Supported extrinsics //! //! ### Public extrinsic //! - [add_content](./struct.Module.html#method.add_content) - Adds the content to the system. //! //! ### Private extrinsics //! - accept_content - Storage provider accepts a content. //! - reject_content - Storage provider rejects a content. //! - remove_known_content_id - Removes the content id from the list of known content ids. Requires root privileges. //! - set_known_content_id - Sets the content id from the list of known content ids. Requires root privileges. //! // Do not delete! Cannot be uncommented by default, because of Parity decl_module! issue. //#![warn(missing_docs)] use codec::{Decode, Encode}; use frame_support::dispatch::DispatchResult; use frame_support::traits::Get; use frame_support::{decl_error, decl_event, decl_module, decl_storage, ensure}; use sp_std::collections::btree_map::BTreeMap; use sp_std::vec::Vec; use system::ensure_root; #[cfg(feature = "std")] use serde::{Deserialize, Serialize}; use common::origin::ActorOriginValidator; pub use common::storage::{ContentParameters, StorageObjectOwner}; pub(crate) use common::BlockAndTime; use crate::data_object_type_registry; use crate::data_object_type_registry::IsActiveDataObjectType; use crate::*; /// The _Data directory_ main _Trait_. pub trait Trait: pallet_timestamp::Trait + system::Trait + data_object_type_registry::Trait + membership::Trait + working_group::Trait + common::MembershipTypes + common::StorageOwnership { /// _Data directory_ event type. type Event: From> + Into<::Event>; /// Provides random storage provider id. type StorageProviderHelper: StorageProviderHelper; /// Active data object type validator. type IsActiveDataObjectType: data_object_type_registry::IsActiveDataObjectType; /// Validates member id and origin combination. type MemberOriginValidator: ActorOriginValidator, Self::AccountId>; type MaxObjectsPerInjection: Get; /// Default content quota for all actors. type DefaultQuota: Get; } decl_error! { /// _Data object storage registry_ module predefined errors. pub enum Error for Module{ /// Content with this ID not found. CidNotFound, /// Only the liaison for the content may modify its status. LiaisonRequired, /// Cannot create content for inactive or missing data object type. DataObjectTypeMustBeActive, /// "Data object already added under this content id". DataObjectAlreadyAdded, /// Require root origin in extrinsics. RequireRootOrigin, /// DataObject Injection Failed. Too Many DataObjects. DataObjectsInjectionExceededLimit, /// Contant uploading failed. Actor quota objects limit exceeded. QuotaObjectsLimitExceeded, /// Contant uploading failed. Actor quota size limit exceeded. QuotaSizeLimitExceeded, /// Quota size limit upper bound exceeded QuotaSizeLimitUpperBoundExceeded, /// Quota objects limit upper bound exceeded QuotaObjectsLimitUpperBoundExceeded, /// Contant uploading failed. Actor quota size limit exceeded. GlobalQuotaSizeLimitExceeded, /// Contant uploading failed. Actor quota objects limit exceeded. GlobalQuotaObjectsLimitExceeded, } } /// The decision of the storage provider when it acts as liaison. #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] #[derive(Clone, Encode, Decode, PartialEq, Debug)] pub enum LiaisonJudgement { /// Content awaits for a judgment. Pending, /// Content accepted. Accepted, /// Content rejected. Rejected, } impl Default for LiaisonJudgement { fn default() -> Self { LiaisonJudgement::Pending } } /// Alias for DataObjectInternal pub type DataObject = DataObjectInternal< MemberId, ChannelId, DAOId, ::BlockNumber, ::Moment, DataObjectTypeId, StorageProviderId, >; /// Manages content ids, type and storage provider decision about it. #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] #[derive(Clone, Encode, Decode, PartialEq, Debug)] pub struct DataObjectInternal< MemberId, ChannelId, DAOId, BlockNumber, Moment, DataObjectTypeId, StorageProviderId, > { /// Content owner. pub owner: StorageObjectOwner, /// Content added at. pub added_at: BlockAndTime, /// Content type id. pub type_id: DataObjectTypeId, /// Content size in bytes. pub size: u64, /// Storage provider id of the liaison. pub liaison: StorageProviderId, /// Storage provider as liaison judgment. pub liaison_judgement: LiaisonJudgement, /// IPFS content id. pub ipfs_content_id: Vec, } #[derive(Clone, Copy)] pub struct Voucher { pub size: u64, pub objects: u64, } /// Uploading quota for StorageObjectOwner #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] #[derive(Clone, Copy, Encode, Decode, PartialEq, Eq, Debug, Default)] pub struct Quota { // Total objects size limit per StorageObjectOwner pub size_limit: u64, // Total objects number limit per StorageObjectOwner pub objects_limit: u64, pub size_used: u64, pub objects_used: u64, } impl Quota { /// Create new quota with provided size & objects limits pub const fn new(size_limit: u64, objects_limit: u64) -> Self { Self { size_limit, objects_limit, size_used: 0, objects_used: 0, } } /// Calculate free quota pub fn calculate_voucher(&self) -> Voucher { Voucher { size: self.size_limit - self.size_used, objects: self.objects_limit - self.objects_used, } } pub fn fill_quota(self, voucher: Voucher) -> Self { Self { size_used: self.size_used + voucher.size, objects_used: self.objects_used + voucher.objects, ..self } } pub fn release_quota(self, voucher: Voucher) -> Self { Self { size_used: self.size_used - voucher.size, objects_used: self.objects_used - voucher.objects, ..self } } pub fn set_new_size_limit(&mut self, new_size_limit: u64) { self.size_limit = new_size_limit; } pub fn set_new_objects_limit(&mut self, new_objects_limit: u64) { self.objects_limit = new_objects_limit; } } /// A map collection of unique DataObjects keyed by the ContentId pub type DataObjectsMap = BTreeMap, DataObject>; decl_storage! { trait Store for Module as DataDirectory { /// List of ids known to the system. pub KnownContentIds get(fn known_content_ids) config(): Vec> = Vec::new(); /// Maps data objects by their content id. pub DataObjectByContentId get(fn data_object_by_content_id) config(): map hasher(blake2_128_concat) T::ContentId => Option>; /// Maps storage owner to it`s quota. Created when the first upload by the new actor occured. pub Quotas get(fn quotas) config(): map hasher(blake2_128_concat) StorageObjectOwner, ChannelId, DAOId> => Quota; /// Upper bound for the Quota size limit. pub QuotaSizeLimitUpperBound get(fn quota_size_limit_upper_bound) config(): u64; /// Upper bound for the Quota objects number limit. pub QuotaObjectsLimitUpperBound get(fn quota_objects_limit_upper_bound) config(): u64; /// Global quota. pub GlobalQuota get(fn global_quota) config(): Quota; } } decl_event! { /// _Data directory_ events pub enum Event where StorageObjectOwner = StorageObjectOwner, ChannelId, DAOId>, StorageProviderId = StorageProviderId, Content = Vec, DataObjectTypeId>>, ContentId = ContentId, QuotaLimit = u64 { /// Emits on adding of the content. /// Params: /// - Content parameters representation. /// - StorageObjectOwner enum. ContentAdded(Content, StorageObjectOwner), /// Emits when the storage provider accepts a content. /// Params: /// - Id of the relationship. /// - Id of the storage provider. ContentAccepted(ContentId, StorageProviderId), /// Emits when the storage provider rejects a content. /// Params: /// - Id of the relationship. /// - Id of the storage provider. ContentRejected(ContentId, StorageProviderId), /// Emits when the storage object owner quota size limit update performed. /// Params: /// - StorageObjectOwner enum. /// - quota size limit. StorageObjectOwnerQuotaSizeLimitUpdated(StorageObjectOwner, QuotaLimit), /// Emits when the storage object owner quota objects limit update performed. /// Params: /// - StorageObjectOwner enum. /// - quota objects limit. StorageObjectOwnerQuotaObjectsLimitUpdated(StorageObjectOwner, QuotaLimit), } } decl_module! { /// _Data directory_ substrate module. pub struct Module for enum Call where origin: T::Origin { /// Default deposit_event() handler fn deposit_event() = default; /// Predefined errors. type Error = Error; /// Maximum objects allowed per inject_data_objects() transaction const MaxObjectsPerInjection: u32 = T::MaxObjectsPerInjection::get(); /// Adds the content to the system. The created DataObject /// awaits liaison to accept or reject it. #[weight = 10_000_000] // TODO: adjust weight pub fn add_content( origin, owner: StorageObjectOwner, ChannelId, DAOId>, content: Vec, DataObjectTypeId>> ) { // Ensure given origin can perform operation under specific storage object owner Self::ensure_storage_object_owner_origin(origin, &owner)?; Self::ensure_content_is_valid(&content)?; let owner_quota = Self::get_quota(&owner); // Ensure owner quota constraints satisfied. // Calculate upload voucher let upload_voucher = Self::ensure_owner_quota_constraints_satisfied(owner_quota, &content)?; // Ensure global quota constraints satisfied. Self::ensure_global_quota_constraints_satisfied(upload_voucher)?; let liaison = T::StorageProviderHelper::get_random_storage_provider()?; // // == MUTATION SAFE == // // Let's create the entry then Self::upload_content(owner_quota, upload_voucher, liaison, content.clone(), owner.clone()); Self::deposit_event(RawEvent::ContentAdded(content, owner)); } /// Updates storage object owner quota objects limit. Requires leader privileges. #[weight = 10_000_000] // TODO: adjust weight pub fn update_storage_object_owner_quota_objects_limit( origin, abstract_owner: StorageObjectOwner, ChannelId, DAOId>, new_quota_objects_limit: u64 ) { >::ensure_origin_is_active_leader(origin)?; ensure!(new_quota_objects_limit <= Self::quota_objects_limit_upper_bound(), Error::::QuotaSizeLimitUpperBoundExceeded); // // == MUTATION SAFE == // if >::contains_key(&abstract_owner) { >::mutate(&abstract_owner, |quota| { quota.set_new_objects_limit(new_quota_objects_limit); }); } else { let mut quota = T::DefaultQuota::get(); quota.set_new_objects_limit(new_quota_objects_limit); >::insert(&abstract_owner, quota); }; Self::deposit_event(RawEvent::StorageObjectOwnerQuotaObjectsLimitUpdated(abstract_owner, new_quota_objects_limit)); } /// Updates storage object owner quota size limit. Requires leader privileges. #[weight = 10_000_000] // TODO: adjust weight pub fn update_storage_object_owner_quota_size_limit( origin, abstract_owner: StorageObjectOwner, ChannelId, DAOId>, new_quota_size_limit: u64 ) { >::ensure_origin_is_active_leader(origin)?; ensure!(new_quota_size_limit <= Self::quota_size_limit_upper_bound(), Error::::QuotaObjectsLimitUpperBoundExceeded); // // == MUTATION SAFE == // if >::contains_key(&abstract_owner) { >::mutate(&abstract_owner, |quota| { quota.set_new_size_limit(new_quota_size_limit); }); } else { let mut quota = T::DefaultQuota::get(); quota.set_new_size_limit(new_quota_size_limit); >::insert(&abstract_owner, quota); }; Self::deposit_event(RawEvent::StorageObjectOwnerQuotaSizeLimitUpdated(abstract_owner, new_quota_size_limit)); } /// Storage provider accepts a content. Requires signed storage provider account and its id. /// The LiaisonJudgement can be updated, but only by the liaison. #[weight = 10_000_000] // TODO: adjust weight pub(crate) fn accept_content( origin, storage_provider_id: StorageProviderId, content_id: T::ContentId ) { >::ensure_worker_signed(origin, &storage_provider_id)?; // == MUTATION SAFE == Self::update_content_judgement(&storage_provider_id, content_id, LiaisonJudgement::Accepted)?; >::mutate(|ids| ids.push(content_id)); Self::deposit_event(RawEvent::ContentAccepted(content_id, storage_provider_id)); } /// Storage provider rejects a content. Requires signed storage provider account and its id. /// The LiaisonJudgement can be updated, but only by the liaison. #[weight = 10_000_000] // TODO: adjust weight pub(crate) fn reject_content( origin, storage_provider_id: StorageProviderId, content_id: T::ContentId ) { >::ensure_worker_signed(origin, &storage_provider_id)?; // == MUTATION SAFE == Self::update_content_judgement(&storage_provider_id, content_id, LiaisonJudgement::Rejected)?; Self::deposit_event(RawEvent::ContentRejected(content_id, storage_provider_id)); } // Sudo methods /// Removes the content id from the list of known content ids. Requires root privileges. #[weight = 10_000_000] // TODO: adjust weight fn remove_known_content_id(origin, content_id: T::ContentId) { ensure_root(origin)?; // == MUTATION SAFE == let upd_content_ids: Vec = Self::known_content_ids() .into_iter() .filter(|&id| id != content_id) .collect(); >::put(upd_content_ids); } /// Injects a set of data objects and their corresponding content id into the directory. /// The operation is "silent" - no events will be emitted as objects are added. /// The number of objects that can be added per call is limited to prevent the dispatch /// from causing the block production to fail if it takes too much time to process. /// Existing data objects will be overwritten. #[weight = 10_000_000] // TODO: adjust weight pub(crate) fn inject_data_objects(origin, objects: DataObjectsMap) { ensure_root(origin)?; // Must provide something to inject ensure!(objects.len() <= T::MaxObjectsPerInjection::get() as usize, Error::::DataObjectsInjectionExceededLimit); for (id, object) in objects.into_iter() { // append to known content ids // duplicates will be removed at the end >::mutate(|ids| ids.push(id)); >::insert(id, object); } // remove duplicate ids >::mutate(|ids| { ids.sort(); ids.dedup(); }); } } } impl Module { // Ensure given origin can perform operation under specific storage object owner fn ensure_storage_object_owner_origin( origin: T::Origin, owner: &StorageObjectOwner, ChannelId, DAOId>, ) -> DispatchResult { if let StorageObjectOwner::Member(member_id) = owner { T::MemberOriginValidator::ensure_actor_origin(origin, *member_id)?; } else { ensure_root(origin)?; }; Ok(()) } // Get owner quota if exists, otherwise return default one. fn get_quota(owner: &StorageObjectOwner, ChannelId, DAOId>) -> Quota { if >::contains_key(owner) { Self::quotas(owner) } else { T::DefaultQuota::get() } } // Ensure owner quota constraints satisfied, returns total object length and total size voucher for this upload. fn ensure_owner_quota_constraints_satisfied( owner_quota: Quota, content: &[ContentParameters>], ) -> Result> { let owner_quota_voucher = owner_quota.calculate_voucher(); // Ensure total content length is less or equal then available per given owner quota let content_length = content.len() as u64; ensure!( owner_quota_voucher.objects >= content_length, Error::::QuotaObjectsLimitExceeded ); // Ensure total content size is less or equal then available per given owner quota let content_size = content .iter() .fold(0, |total_size, content| total_size + content.size); ensure!( owner_quota_voucher.size >= content_size, Error::::QuotaSizeLimitExceeded ); Ok(Voucher { size: content_size, objects: content_length, }) } // Ensures global quota constraints satisfied. fn ensure_global_quota_constraints_satisfied(upload_voucher: Voucher) -> DispatchResult { let global_quota_voucher = Self::global_quota().calculate_voucher(); ensure!( global_quota_voucher.objects >= upload_voucher.objects, Error::::GlobalQuotaObjectsLimitExceeded ); ensure!( global_quota_voucher.size >= upload_voucher.size, Error::::GlobalQuotaSizeLimitExceeded ); Ok(()) } // Complete content upload, update quotas fn upload_content( owner_quota: Quota, upload_voucher: Voucher, liaison: StorageProviderId, multi_content: Vec>>, owner: StorageObjectOwner, ChannelId, DAOId>, ) { for content in multi_content { let data: DataObject = DataObjectInternal { type_id: content.type_id, size: content.size, added_at: common::current_block_time::(), owner: owner.clone(), liaison, liaison_judgement: LiaisonJudgement::Pending, ipfs_content_id: content.ipfs_content_id, }; >::insert(content.content_id, data); } // Updade or create owner quota. >::insert(owner, owner_quota.fill_quota(upload_voucher)); // Update global quota ::mutate(|global_quota| global_quota.fill_quota(upload_voucher)); } fn ensure_content_is_valid( multi_content: &[ContentParameters>], ) -> DispatchResult { for content in multi_content { ensure!( T::IsActiveDataObjectType::is_active_data_object_type(&content.type_id), Error::::DataObjectTypeMustBeActive ); ensure!( !>::contains_key(&content.content_id), Error::::DataObjectAlreadyAdded ); } Ok(()) } fn update_content_judgement( storage_provider_id: &StorageProviderId, content_id: T::ContentId, judgement: LiaisonJudgement, ) -> DispatchResult { let mut data = Self::data_object_by_content_id(&content_id).ok_or(Error::::CidNotFound)?; // Make sure the liaison matches ensure!( data.liaison == *storage_provider_id, Error::::LiaisonRequired ); data.liaison_judgement = judgement; >::insert(content_id, data); Ok(()) } } /// Provides random storage provider id. We use it when assign the content to the storage provider. pub trait StorageProviderHelper { /// Provides random storage provider id. fn get_random_storage_provider() -> Result, &'static str>; } /// Content access helper. pub trait ContentIdExists { /// Verifies the content existence. fn has_content(id: &T::ContentId) -> bool; /// Returns the data object for the provided content id. fn get_data_object(id: &T::ContentId) -> Result, &'static str>; } impl ContentIdExists for Module { fn has_content(content_id: &T::ContentId) -> bool { Self::data_object_by_content_id(*content_id).is_some() } fn get_data_object(content_id: &T::ContentId) -> Result, &'static str> { match Self::data_object_by_content_id(*content_id) { Some(data) => Ok(data), None => Err(Error::::LiaisonRequired.into()), } } } impl common::storage::StorageSystem for Module { fn atomically_add_content( owner: StorageObjectOwner, ChannelId, DAOId>, content: Vec>>, ) -> DispatchResult { Self::ensure_content_is_valid(&content)?; let liaison = T::StorageProviderHelper::get_random_storage_provider()?; let owner_quota = Self::get_quota(&owner); let upload_voucher = Self::ensure_owner_quota_constraints_satisfied(owner_quota, &content)?; Self::upload_content(owner_quota, upload_voucher, liaison, content, owner); Ok(()) } fn can_add_content( owner: StorageObjectOwner, ChannelId, DAOId>, content: Vec>>, ) -> DispatchResult { T::StorageProviderHelper::get_random_storage_provider()?; let owner_quota = Self::get_quota(&owner); Self::ensure_owner_quota_constraints_satisfied(owner_quota, &content)?; Self::ensure_content_is_valid(&content) } }