| // Copyright 2024 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| //! Tests for CRDT convergence by simulating the operations, shuffling the resulting update messages |
| //! and verifying that the merge results are consistent. |
| //! |
| //! See [`Simulation`]. |
| |
| use crate::{ |
| delta::{AsDeltaMut, DeltaMut, DeltaOwned}, |
| CrdtState, |
| }; |
| use arbitrary::Arbitrary; |
| use derive_where::derive_where; |
| use distributed_time::fake_timestamp::FakeTimestamp; |
| use std::{collections::BTreeMap, fmt::Debug}; |
| |
| use super::{ |
| eventual_delivery::EventualDeliveryScenario, |
| test_fakes::{FakeContext, TriState}, |
| }; |
| |
| /// An operation that can be tested in a [`Simulation`]. |
| /// |
| /// Typically an operation is an enum which represents all possible operations for the state `S` |
| /// under test. These operations may include environmental changes like clock value updates. An |
| /// operation should implement [`Arbitrary`] so that an arbitrary [`Simulation`] can be created |
| /// during fuzzing or property testing. |
| pub trait Operation<S, N: Ord + Eq> { |
| /// Applies this operation to the given `state`. |
| /// |
| /// The operation should mutate the given `state`. If `state` contains a non-empty delta |
| /// component when this call returns, the simulation will record the delta as the update |
| /// message. |
| /// |
| /// Implementations can also add `assert!` or `panic!` to check for invariants that should be |
| /// kept after an operation is applied. |
| /// |
| /// # Params |
| /// * `state` - the current state that the operation should be applied to. |
| /// * `context` - the context information for this execution. |
| fn apply(self, state: DeltaMut<S>, context: &SimulationContext<N>); |
| } |
| |
| /// A simulation test for a CRDT implementation `S`. |
| /// |
| /// The simulation starts at an arbitrary `init_state: S`, applies an arbitrary list of operations |
| /// `OP` and gathers the update messages generated during [`Operation::apply`]. It then shuffles the |
| /// gathered messages according to eventual delivery rules (i.e. messages can be reordered and |
| /// duplicated, but not lost). The merge result from the shuffled update messages is then compared |
| /// against the merge result from unshuffled ones, and makes sure that they are equal (i.e. |
| /// satisfies eventual consistency). |
| /// |
| /// ## Params |
| /// * `S`: The CRDT state to test against. |
| /// * `OP`: The operations to be applied onto `S`, typically an enum. |
| /// * `N`: The Node ID. The default is [`TriState`], which means the system has at most 3 nodes. |
| /// |
| /// # Example |
| /// ``` |
| /// use arbitrary::Arbitrary; |
| /// use crdt::{ |
| /// checker::{ |
| /// simulation::{Operation, Simulation, SimulationContext}, |
| /// test_fakes::TriState, |
| /// }, |
| /// delta::{AsDeltaMut, AsDeltaRef, DeltaMut}, |
| /// CrdtState, |
| /// }; |
| /// |
| /// /// Our trivial example CRDT. |
| /// #[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord, Arbitrary)] |
| /// pub struct IntegerMaxCrdt(u8); |
| /// |
| /// /// Merges by taking the max of the two values. |
| /// impl CrdtState for IntegerMaxCrdt { |
| /// fn merge(a: &Self, b: &Self) -> Self { |
| /// a.max(b).clone() |
| /// } |
| /// } |
| /// |
| /// /// Define the operations that can be performed on the CRDT. |
| /// #[derive(Debug, Clone, Arbitrary)] |
| /// pub enum IntegerMaxOp { |
| /// /// Only operation supported in this simulation is setting the integer value |
| /// SetInteger { value: u8 }, |
| /// // Operations outside of the CRDT, like setting the wall clock can also be defined here. |
| /// } |
| /// |
| /// impl Operation<IntegerMaxCrdt, TriState> for IntegerMaxOp { |
| /// fn apply(self, mut state: DeltaMut<IntegerMaxCrdt>, _ctx: &SimulationContext<TriState>) { |
| /// match self { |
| /// IntegerMaxOp::SetInteger { value } => { |
| /// if value > state.as_ref().merge().0 { |
| /// state.delta_mut().0 = value; |
| /// } |
| /// } |
| /// } |
| /// } |
| /// } |
| /// |
| /// #[derive_fuzztest::fuzztest] |
| /// fn simulator(sim: Simulation<IntegerMaxCrdt, IntegerMaxOp>) { |
| /// let _ = sim.run(); |
| /// } |
| /// ``` |
| #[derive(Debug, Clone, Arbitrary)] |
| pub struct Simulation<S, OP, N = TriState> |
| where |
| S: CrdtState, |
| OP: Operation<S, N> + Debug, |
| N: Ord, |
| { |
| init_state: S, |
| operations: Vec<SimulationOperation<N, OP>>, |
| delivery_scenario: EventualDeliveryScenario, |
| } |
| |
| impl<S, OP, N> Simulation<S, OP, N> |
| where |
| S: CrdtState + Clone + Eq + Debug + Default, |
| OP: Operation<S, N> + Debug + Clone, |
| N: Ord, |
| { |
| /// Drains the operations from `self.operations` and gathers the list of resulting messages. |
| fn sent_messages(&mut self) -> (S, Vec<S>) { |
| let mut context = SimulationContext::<N>::default(); |
| let mut update_messages = Vec::with_capacity(self.operations.len()); |
| update_messages.push(self.init_state.clone()); |
| let mut current_state = self.init_state.clone(); |
| for op in self.operations.drain(..) { |
| match op { |
| SimulationOperation::RegularOp(op) => { |
| let mut delta = DeltaOwned::new(¤t_state); |
| op.apply(delta.as_mut(), &context); |
| let delta_component = delta.into_delta(); |
| current_state = CrdtState::merge(¤t_state, &delta_component); |
| update_messages.push(delta_component); |
| } |
| SimulationOperation::ContextOp(op) => { |
| op.apply(&mut context); |
| } |
| } |
| } |
| (current_state, update_messages) |
| } |
| |
| /// Run the simulation test and assert that the results converge after arbitrarily shuffling |
| /// the update messages. |
| /// |
| /// # Returns |
| /// Returns the two resulting states (which are equal according to `Eq`), or `None` if a valid |
| /// shuffling cannot be generated (and the test should be discarded). The returned states can be |
| /// used to verify additional invariants specific to the CRDT implementation, for example if |
| /// there are states that should not be reachable. |
| /// |
| /// # Panics |
| /// Panics if the resulting states from the shuffled and unshuffled operations are not equal. |
| pub fn run(mut self) -> Option<(S, S)> { |
| let (final_state, sent_messages) = self.sent_messages(); |
| let delivered_messages = self.delivery_scenario.scramble(&sent_messages)?; |
| let state1 = sent_messages |
| .into_iter() |
| .fold(self.init_state.clone(), |a, b| CrdtState::merge(&a, &b)); |
| let state2 = delivered_messages |
| .into_iter() |
| .fold(self.init_state.clone(), |a, b| CrdtState::merge(&a, &b)); |
| |
| assert_eq!(state1, state2); |
| assert_eq!(state1, final_state); |
| Some((state1, state2)) |
| } |
| } |
| |
| #[derive(Clone, Debug, Arbitrary)] |
| enum SimulationOperation<N: Ord, OP> { |
| RegularOp(OP), |
| ContextOp(SimulationContextOperation<N>), |
| } |
| |
| /// Stores contextual information for all the nodes in a simulation. |
| /// |
| /// Currently this only stores the wall-clock timestamps for each node. |
| #[derive(Debug, Clone, Arbitrary)] |
| #[derive_where(Default)] |
| pub struct SimulationContext<N: Ord + Eq> { |
| timestamps: BTreeMap<N, FakeTimestamp>, |
| } |
| |
| impl<N> SimulationContext<N> |
| where |
| N: Ord + Eq + Clone, |
| { |
| /// Get the [`UpdateContext`][crate::UpdateContext] for the given node in this simulation. |
| pub fn context(&self, node: &N) -> FakeContext<N, u8> { |
| FakeContext::new( |
| node.clone(), |
| self.timestamps.get(node).cloned().unwrap_or_default().0, |
| ) |
| } |
| } |
| |
| /// A mutation operation on the [`SimulationContext`]. |
| #[derive(Debug, Clone, Arbitrary)] |
| #[allow(missing_docs)] |
| enum SimulationContextOperation<N: Ord + Eq> { |
| Increment { node: N }, |
| Decrement { node: N }, |
| SetValue { node: N, timestamp: FakeTimestamp }, |
| } |
| |
| impl<N: Ord + Eq> SimulationContextOperation<N> { |
| /// Applies the operation on the given simulation context. |
| fn apply(self, state: &mut SimulationContext<N>) { |
| match self { |
| SimulationContextOperation::Increment { node } => { |
| let timestamp = state.timestamps.entry(node).or_default(); |
| if let Some(incremented) = timestamp.0.checked_add(1) { |
| *timestamp = FakeTimestamp(incremented); |
| } |
| } |
| SimulationContextOperation::Decrement { node } => { |
| let timestamp = state.timestamps.entry(node).or_default(); |
| if let Some(decremented) = timestamp.0.checked_sub(1) { |
| *timestamp = FakeTimestamp(decremented); |
| } |
| } |
| SimulationContextOperation::SetValue { node, timestamp } => { |
| let _ = state.timestamps.insert(node, timestamp); |
| } |
| } |
| } |
| } |