blob: c490af2624475f6a3d20aacb3d4d677e025d2ced [file] [log] [blame]
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! A set CRDT that resolves concurrent modifications by comparing the generation number (a.k.a. the
//! causal length).
//!
//! See [`Set`].
use std::{
collections::{hash_map::RandomState, HashMap, HashSet},
fmt::Debug,
hash::{BuildHasher, Hash},
};
use crate::{
delta::{AsDeltaMut, AsDeltaRef},
utils::{zip_hash_map, ZipItem},
ContentEq, CrdtState,
};
use arbitrary::{Arbitrary, Unstructured};
use derive_where::derive_where;
use serde::{Deserialize, Serialize};
/// The metadata associated with an element in the set. This is an alternative encoding to the
/// "causal-length" described in the paper. The paper uses odd numbers to represent `removed =
/// false` and even numbers to represent `removed = true`, whereas this uses a separate boolean to
/// be more explicit.
///
/// Translation from this struct to causal-length can be done by `causal-length = generation * 2 +
/// if removed { 1 } else { 0 } - 1`.
#[derive(Debug, Clone, Copy, Arbitrary, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct ElementState {
/// The generation number of this element. During merge, the element with higher generation
/// wins.
generation: u32,
/// Whether the element is removed. The element itself needs to be kept as CRDT metadata (a.k.a.
/// tombstone) to ensure convergence. During merge, `removed = true` wins for elements with the
/// same generation number.
removed: bool,
}
impl Default for ElementState {
fn default() -> Self {
Self {
generation: 0,
removed: true,
}
}
}
/// A Causal Length Set, which is a set CRDT that resolves concurrent modifications by comparing
/// the generation number (a.k.a. the causal length).
///
/// The generation number is incremented when an entry goes from removed state to added state. That
/// means the more a node has observed an entry being (re-)added, the more likely its modification
/// will win during a merge. Adding a value that already exists or removing a value that is not in
/// the set are no-ops.
///
/// Reference: [Causal-Length Set](https://dl.acm.org/doi/abs/10.1145/3380787.3393678)
///
/// Values of this set must implement [`Eq`]. In addition to the reflexive, symmetric, and
/// transitive properties required by `Eq`, the value should also make sure that any non-trivial
/// fields (i.e. fields sent over the wire) participate in the `Eq` comparison. Convergence of
/// fields not participating in the `Eq` comparison is not guaranteed.
///
/// Garbage collection is not supported, meaning removed elements (a.k.a. "tombstones") remain in
/// the set as metadata.
///
/// # Implementations
/// * See [`SetRead`] for methods on read-only references of [`Set`].
/// * See [`SetWrite`] for methods on mutable references of [`Set`].
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive_where(Default; S)]
#[derive_where(PartialEq, Eq; E: Eq + Hash, S: BuildHasher)]
#[serde(transparent)]
pub struct Set<E, S = RandomState> {
#[serde(bound(
serialize = "E: Serialize",
deserialize = "E: Deserialize<'de> + Eq + Hash, S: BuildHasher + Default"
))]
elements: HashMap<E, ElementState, S>,
}
// Manual implementation of Arbitrary that always have no base, and is not bound on `S: Arbitrary`.
impl<'a, E, S> Arbitrary<'a> for Set<E, S>
where
E: Hash + Eq + Arbitrary<'a>,
S: BuildHasher + Default,
{
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
Ok(Self {
elements: u.arbitrary_iter()?.collect::<Result<_, _>>()?,
})
}
}
/// Cannot add new element to the set because the generation number space is exhausted.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GenerationExhausted;
/// A private helper function, intended for use by [`SetRead`] to get the element state taking
/// both the base and the delta into account.
fn get_entry<E, S>(
base: Option<&Set<E, S>>,
delta: Option<&Set<E, S>>,
value: &E,
) -> Option<ElementState>
where
E: Hash + Eq,
S: BuildHasher,
{
let original = base.and_then(|b| b.elements.get(value));
let delta = delta.and_then(|delta| delta.elements.get(value));
match (original, delta) {
(None, None) => None,
(Some(s), None) | (None, Some(s)) => Some(*s),
(Some(o), Some(d)) => Some(*o.max(d)),
}
}
fn iter_entries<'a, E, S>(
base: Option<&'a Set<E, S>>,
delta: Option<&'a Set<E, S>>,
) -> impl Iterator<Item = (&'a E, &'a ElementState)>
where
S: 'static,
E: Hash + Eq,
S: BuildHasher + Default,
{
enum HeterogeneousIter<I1, I2, I3> {
Empty,
Iter1(I1),
Iter2(I2),
Iter3(I3),
}
impl<I1, I2, I3> Iterator for HeterogeneousIter<I1, I2, I3>
where
I1: Iterator,
I2: Iterator<Item = I1::Item>,
I3: Iterator<Item = I1::Item>,
{
type Item = I1::Item;
fn next(&mut self) -> Option<Self::Item> {
match self {
HeterogeneousIter::Empty => None,
HeterogeneousIter::Iter1(i1) => i1.next(),
HeterogeneousIter::Iter2(i2) => i2.next(),
HeterogeneousIter::Iter3(i3) => i3.next(),
}
}
}
match (base, delta) {
(None, None) => HeterogeneousIter::Empty,
(None, Some(delta)) => HeterogeneousIter::Iter1(delta.elements.iter()),
(Some(base), None) => HeterogeneousIter::Iter2(base.elements.iter()),
(Some(base), Some(delta)) => {
HeterogeneousIter::Iter3(zip_hash_map(&base.elements, &delta.elements).map(
|(key, item)| -> (&E, &ElementState) {
match item {
ZipItem::Left(elem) | ZipItem::Right(elem) => (key, elem),
ZipItem::Both(left, right) => (key, left.max(right)),
}
},
))
}
}
}
/// Read-only operations for this Set CRDT.
///
/// ## See also
/// * See [`Set`] for a description of this set type.
/// * See [`SetWrite`] for mutating operations on this set type.
pub trait SetRead<E, S>: AsDeltaRef<Set<E, S>> {
/// Get the set of entries currently in the map.
fn entries(&self) -> HashSet<&E>
where
S: 'static,
E: Hash + Eq,
S: BuildHasher + Default,
{
iter_entries(self.base(), self.delta())
.filter(|(_, element_state)| !element_state.removed)
.map(|(element, _)| element)
.collect()
}
/// Returns `true` if the set contains `value`.
fn contains(&self, value: &E) -> bool
where
E: Hash + Eq,
S: BuildHasher,
{
get_entry(self.base(), self.delta(), value)
.map(|state| !state.removed)
.unwrap_or(false)
}
/// Copies the data without the associated metadata and returns the plain type.
///
/// See also: [`crate::HasPlainRepresentation`].
fn to_plain(&self) -> HashSet<E, S>
where
E: Hash + Eq + Clone,
S: BuildHasher + Default + 'static,
{
self.entries().into_iter().cloned().collect()
}
}
/// Mutating operations for this Set CRDT.
///
/// ## See also
/// * See [`Set`] for a description of this set type.
/// * See [`SetRead`] for read-only operations of this set type.
pub trait SetWrite<E, S>: SetRead<E, S> + AsDeltaMut<Set<E, S>> {
/// Add an element into the set.
///
/// # Returns
/// True if the set has changed. False if the value is already in the set.
fn add(&mut self, value: E) -> Result<bool, GenerationExhausted>
where
E: Hash + Eq,
S: BuildHasher,
{
let generation = match get_entry(self.base(), self.delta(), &value) {
Some(entry) => {
if entry.removed {
entry.generation.checked_add(1).ok_or(GenerationExhausted)?
} else {
return Ok(false);
}
}
None => 1,
};
let _ = self.delta_mut().elements.insert(
value,
ElementState {
generation,
removed: false,
},
);
Ok(true)
}
/// Removes an element from the set.
///
/// Note that the element will be kept as metadata, but will be marked as removed such that it
/// won't show up in [`SetRead::entries`].
///
/// # Returns
/// True if the value is removed. False if the value was not in the set to begin with.
fn remove(&mut self, value: E) -> bool
where
E: Hash + Eq,
S: BuildHasher,
{
if let Some(v) = get_entry(self.base(), self.delta(), &value) {
if !v.removed {
let _ = self.delta_mut().elements.insert(
value,
ElementState {
generation: v.generation,
removed: true,
},
);
return true;
}
}
false
}
/// Adds the given `value` to the set, and bumps the generation number by `increment`.
///
/// Intended for testing only, so tests can artificially create a set with a large generation
/// number, and test the behavior when the generation number is exhausted. Unlike
/// [`add`][SetWrite::add], this will increment the generation number even if `value` is already
/// in the set.
#[cfg(any(test, feature = "testing"))]
fn bump_generation_number_for_testing(
&mut self,
value: E,
increment: u32,
) -> Result<(), GenerationExhausted>
where
E: Hash + Eq,
S: BuildHasher,
{
let generation = match get_entry(self.base(), self.delta(), &value) {
Some(entry) => entry
.generation
.checked_add(increment)
.ok_or(GenerationExhausted)?,
None => increment,
};
let _ = self.delta_mut().elements.insert(
value,
ElementState {
generation,
removed: false,
},
);
Ok(())
}
/// Apply the changes in the plain representation into this CRDT. This assumes that all changes
/// in the plain representation were made by the calling node, updating the associated CRDT
/// metadata in the process.
///
/// After applying the changes, [`to_plain`][SetRead::to_plain] should return the same
/// value as `plain`.
///
/// # Returns
///
/// `true` if applying the change results in an update in the CRDT state, or false if Returns
/// true if applying the change results in an update in the CRDT state, or false if `plain` is
/// the same as [`to_plain`][SetRead::to_plain] to begin with. This can be used to avoid
/// sending unnecessary update messages over the network, or writing changes to disk
/// unnecessarily.
fn apply_changes(&mut self, plain: HashSet<E, S>) -> Result<bool, GenerationExhausted>
where
E: Hash + Eq + Clone,
S: BuildHasher + Default + 'static,
{
let mut changed = false;
let to_be_removed: Vec<_> = self
.entries()
.into_iter()
.filter(|e| !plain.contains(e))
.cloned()
.collect();
for e in to_be_removed {
let _ = self.remove(e);
changed = true;
}
for e in plain {
changed = self.add(e)? || changed;
}
Ok(changed)
}
}
impl<E, S> CrdtState for Set<E, S>
where
E: Hash + Eq + Clone,
S: BuildHasher + Default,
{
fn merge(a: &Self, b: &Self) -> Self {
Self {
elements: {
zip_hash_map(&a.elements, &b.elements)
.map(|(key, item)| match item {
ZipItem::Left(v) | ZipItem::Right(v) => (key.clone(), *v),
ZipItem::Both(l, r) => (key.clone(), *l.max(r)),
})
.collect()
},
}
}
}
impl<E, S> ContentEq for Set<E, S>
where
E: Hash + Eq,
S: BuildHasher,
{
fn content_eq(&self, other: &Self) -> bool {
self == other
}
}
impl<E, S, T> SetRead<E, S> for T where T: AsDeltaRef<Set<E, S>> {}
impl<E, S, T> SetWrite<E, S> for T where T: AsDeltaMut<Set<E, S>> {}
/// Checker to help implement invariant tests over arbitrary operations on a set.
///
/// Requires the feature _`checker`_.
#[cfg(any(test, feature = "checker"))]
pub mod checker {
use std::{collections::HashSet, fmt::Debug, hash::Hash};
use arbitrary::Arbitrary;
use crate::{
checker::{
simulation::{Operation, SimulationContext},
test_fakes::TriState,
utils::DeterministicHasher,
},
delta::DeltaMut,
set::SetRead,
};
use super::{Set, SetWrite};
/// Mutation operations on a [`SetWrite`].
///
/// Can be used with [`Arbitrary`] to generate arbitrary operations to be applied on the set.
#[derive(Debug, Clone, Arbitrary)]
pub enum SetOp<E: Eq + Hash> {
/// Represents the [`SetWrite::add`] operation.
Add {
/// The value passed to [`SetWrite::add`].
value: E,
},
/// Represents the [`SetWrite::remove`] operation.
Remove {
/// The value passed to [`SetWrite::remove`].
value: E,
},
/// Represents the [`SetWrite::apply_changes`] operation.
ApplyChanges {
/// The plain representation of the set passed to [`SetWrite::apply_changes`].
value_set: HashSet<E, DeterministicHasher>,
},
}
impl<V> Operation<Set<V, DeterministicHasher>, TriState> for SetOp<V>
where
V: Clone + Eq + Hash + Debug,
{
fn apply(
self,
mut state: DeltaMut<Set<V, DeterministicHasher>>,
_: &SimulationContext<TriState>,
) {
match self {
SetOp::Add { value } => {
if state.add(value.clone()).is_ok() {
assert!(state.contains(&value));
}
}
SetOp::Remove { value } => {
let _ = state.remove(value.clone());
assert!(!state.contains(&value));
}
SetOp::ApplyChanges { value_set } => {
if state.apply_changes(value_set.clone()).is_ok() {
assert_eq!(HashSet::from_iter(&value_set), state.entries())
}
}
}
}
}
}
#[cfg(feature = "proto")]
mod proto {
use submerge_internal_proto::{FromProto, FromProtoError, NodeMapping, ToProto};
use super::{ElementState, Set};
impl FromProto for Set<Vec<u8>> {
type Proto = submerge_internal_proto::protos::submerge::SubmergeSet;
fn from_proto(proto: &Self::Proto, _node_ids: &[String]) -> Result<Self, FromProtoError> {
Ok(Self {
elements: proto
.elements
.iter()
.map(|elem| {
(
elem.value.clone(),
ElementState {
generation: elem.generation,
removed: elem.removed,
},
)
})
.collect(),
})
}
}
impl ToProto for Set<Vec<u8>> {
type Proto = submerge_internal_proto::protos::submerge::SubmergeSet;
fn to_proto(&self, _node_ids: &mut NodeMapping<String>) -> Self::Proto {
submerge_internal_proto::protos::submerge::SubmergeSet {
elements: self
.elements
.iter()
.map(|(value, set_element)| {
submerge_internal_proto::protos::submerge::submerge_set::SetElement {
value: value.clone(),
generation: set_element.generation,
removed: set_element.removed,
..Default::default()
}
})
.collect(),
..Default::default()
}
}
}
#[cfg(test)]
#[derive_fuzztest::proptest]
fn set_roundtrip(set: Set<Vec<u8>>) {
let mut node_ids = NodeMapping::default();
assert_eq!(
Set::from_proto(&set.to_proto(&mut node_ids), &node_ids.into_vec()).unwrap(),
set
);
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use crate::{
checker::{test_fakes::TriState, utils::DeterministicHasher},
set::GenerationExhausted,
CrdtState,
};
use super::{Set, SetRead, SetWrite};
type TestSet = Set<u8, DeterministicHasher>;
#[test]
fn test_add() {
let mut set: TestSet = Set::default();
let _ = set.add(1).unwrap();
let _ = set.add(2).unwrap();
let _ = set.add(3).unwrap();
assert_eq!(HashSet::from_iter(&[1, 2, 3]), set.entries());
}
#[test]
fn test_remove() {
let mut set: TestSet = Set::default();
let _ = set.add(1).unwrap();
let _ = set.add(2).unwrap();
let _ = set.remove(3);
let _ = set.remove(1);
assert_eq!(HashSet::from_iter(&[2]), set.entries());
}
#[test]
fn test_generation_exhausted() {
let mut set = Set::<TriState>::default();
set.bump_generation_number_for_testing(TriState::A, u32::MAX)
.unwrap();
let _ = set.add(TriState::A).unwrap(); // This is a no-op since 1 is already in the set
let _ = set.add(TriState::B).unwrap();
let _ = set.remove(TriState::A);
assert_eq!(GenerationExhausted, set.add(TriState::A).unwrap_err());
let _ = set.add(TriState::C).unwrap(); // Adding other values should still work
assert_eq!(
HashSet::from_iter(&[TriState::B, TriState::C]),
set.entries()
);
}
#[test]
fn test_merge() {
let mut set1: TestSet = Set::default();
let _ = set1.add(1).unwrap();
let _ = set1.add(2).unwrap();
let _ = set1.add(3).unwrap();
let mut set2: TestSet = Set::default();
let _ = set2.add(2).unwrap();
let _ = set2.add(3).unwrap();
let _ = set2.add(4).unwrap();
let merged = CrdtState::merge(&set1, &set2);
assert_eq!(HashSet::from_iter(&[1, 2, 3, 4]), merged.entries());
}
#[test]
fn test_remove_remote() {
let mut set1: TestSet = Set::default();
let _ = set1.add(1).unwrap();
let _ = set1.add(2).unwrap();
let _ = set1.add(3).unwrap();
let set2: TestSet = Set::default();
let mut set2 = CrdtState::merge(&set1, &set2);
let _ = set2.remove(2);
assert_eq!(HashSet::from_iter(&[1, 3]), set2.entries());
}
}