blob: 44b81d5be49253a4657cb067eeab4cf5a21f180d [file] [log] [blame]
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! A vector data with one entry per node, where each node can only modify their own entry.
//!
//! See [`VectorData`] for details.
use std::{cmp::max_by_key, collections::BTreeMap};
use crate::{
delta::{AsDeltaMut, AsDeltaRef},
utils::{zip_btree_map, Either, ZipItem},
ContentEq, CrdtState, UpdateContext,
};
use arbitrary::Arbitrary;
use derive_where::derive_where;
use serde::{Deserialize, Serialize};
/// An entry in the vector data.
///
/// When `data` is modified, the `version` must also be incremented.
#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Serialize, Deserialize)]
struct VecEntry<V> {
version: u32,
data: Option<V>,
}
/// A vector of data with one entry per node, where each node can only modify their own entry.
///
/// The name draws similarity to how "vector clock" is a vector of logical clocks and how "version
/// vector" is a vector of version numbers.
///
/// The data is tagged with a monotonically increasing version number, which is guaranteed to be
/// unique because there is only a single writer.
///
/// When this type is merged, the vector is merged entry-by-entry, taking the entry with the higher
/// version number. Since the local version number is always incremented on data change, an equal
/// version number must imply that the data `V` is also equal.
///
/// # Implementations
/// * See [`VectorDataRead`] for methods on read-only references of [`VectorData`].
/// * See [`VectorDataWrite`] for methods on mutable references of [`VectorData`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
#[derive_where(Default)]
pub struct VectorData<N: Ord, V> {
elements: BTreeMap<N, VecEntry<V>>,
}
/// Error applying a change to [`VectorData`] because the version number will overflow.
#[derive(Debug)]
pub struct VersionOverflow;
/// Read-only operations for this vector data CRDT.
///
/// ## See also
/// * See [`VectorData`] for a description of this vector data type.
/// * See [`VectorDataWrite`] for mutating operations on this vector data type.
pub trait VectorDataRead<N: Ord, V>: AsDeltaRef<VectorData<N, V>> {
/// Get all values from the vector.
///
/// # Returns
/// A map with values from every node. Clients can consider using functions like `fold` to
/// compute an aggregate value from this.
fn entries(&self) -> BTreeMap<&N, &V>
where
N: 'static,
V: 'static,
{
let iter_opt = match (self.base(), self.delta()) {
(None, None) => None,
(None, Some(v)) | (Some(v), None) => Some(Either::Left(v.elements.iter())),
(Some(base), Some(delta)) => {
let vec = zip_btree_map(&base.elements, &delta.elements)
.map(|(k, item)| match item {
ZipItem::Left(l) => (k, l),
ZipItem::Right(r) => (k, r),
ZipItem::Both(_, r) => (k, r),
})
.collect::<Vec<_>>()
.into_iter();
Some(Either::Right(vec))
}
};
iter_opt
.into_iter()
.flat_map(|e| e.into_iter())
.filter_map(|(k, v)| Some((k, v.data.as_ref()?)))
.collect()
}
/// Copies the data without the associated metadata and returns the plain type.
///
/// See also: [`crate::HasPlainRepresentation`].
fn to_plain(&self) -> BTreeMap<N, V>
where
N: Clone + 'static,
V: Clone + 'static,
{
self.entries()
.into_iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
}
/// Mutating operations for this vector data CRDT.
///
/// ## See also
/// * See [`VectorData`] for a description of this vector data type.
/// * See [`VectorDataRead`] for read-only operations of this vector data type.
pub trait VectorDataWrite<N: Ord, V>: VectorDataRead<N, V> + AsDeltaMut<VectorData<N, V>> {
/// Set the value to `value` at the given `id`.
///
/// # Returns
/// `true` if the value was inserted or updated, or `false` if `value` is already equal to the
/// entry in the vector.
fn set(&mut self, id: N, value: Option<V>) -> Result<bool, VersionOverflow>
where
N: 'static,
V: PartialEq + 'static,
{
let current_delta_entry = self.delta().and_then(|delta| delta.elements.get(&id));
let current_base_entry = self.base().and_then(|b| b.elements.get(&id));
let current_entry = max_by_key(current_base_entry, current_delta_entry, |entry_opt| {
entry_opt.map(|e| e.version)
});
let new_version = match current_entry {
Some(entry) => {
if value == entry.data {
return Ok(false);
}
entry.version.checked_add(1).ok_or(VersionOverflow)?
}
None => 1,
};
let _ = self.delta_mut().elements.insert(
id,
VecEntry {
version: new_version,
data: value,
},
);
Ok(true)
}
/// Sets the value to `None`, and bumps the version number by `increment`.
///
/// Intended for testing only, so tests can artificially create a set with a large version
/// number, and test the behavior when the version number overflows.
#[cfg(any(test, feature = "testing"))]
fn bump_version_number_for_testing(
&mut self,
id: N,
increment: u32,
) -> Result<(), VersionOverflow> {
let current_delta_entry = self.delta().and_then(|delta| delta.elements.get(&id));
let current_base_entry = self.base().and_then(|b| b.elements.get(&id));
let current_entry = max_by_key(current_base_entry, current_delta_entry, |entry_opt| {
entry_opt.map(|e| e.version)
});
let new_version = match current_entry {
Some(entry) => entry
.version
.checked_add(increment)
.ok_or(VersionOverflow)?,
None => increment,
};
let _ = self.delta_mut().elements.insert(
id,
VecEntry {
version: new_version,
data: None,
},
);
Ok(())
}
/// Apply the changes in the plain representation into this CRDT. This assumes that all changes
/// in the plain representation were made by the calling node, updating the associated CRDT
/// metadata in the process.
///
/// After applying the changes, [`to_plain`][VectorDataRead::to_plain] should return the same
/// value as `plain`.
///
/// # Returns
///
/// `true` if applying the change results in an update in the CRDT state, or false if Returns
/// true if applying the change results in an update in the CRDT state, or false if `plain` is
/// the same as [`to_plain`][VectorDataRead::to_plain] to begin with. This can be used to avoid
/// sending unnecessary update messages over the network, or writing changes to disk
/// unnecessarily.
fn apply_changes(
&mut self,
ctx: &impl UpdateContext<N>,
mut plain: BTreeMap<N, V>,
) -> Result<bool, VectorDataError>
where
N: Clone + 'static,
V: Clone + Eq + 'static,
{
let updater = ctx.updater();
let entries: BTreeMap<_, _> = self.entries().into_iter().collect();
// Validate the plain representation, making sure only the entry for the updater is changed.
for (key, item) in zip_btree_map::<N, N, &N, V, &V>(&plain, &entries) {
match item {
ZipItem::Left(_) | ZipItem::Right(_) => {
if key != updater {
return Err(VectorDataError::MismatchedNode);
}
}
ZipItem::Both(plain, crdt) => {
if key != updater && plain != *crdt {
return Err(VectorDataError::MismatchedNode);
}
}
}
}
let plain_entry = plain.remove(updater);
if self.entries().remove(updater) == plain_entry.as_ref() {
return Ok(false);
}
Ok(self.set(updater.clone(), plain_entry)?)
}
}
impl<N, V> CrdtState for VectorData<N, V>
where
N: Ord + Clone,
V: Clone + Eq,
{
fn merge(a: &Self, b: &Self) -> Self {
Self {
elements: zip_btree_map(&a.elements, &b.elements)
.map(|(key, item)| match item {
ZipItem::Left(value) | ZipItem::Right(value) => (key.clone(), value.clone()),
ZipItem::Both(value_a, value_b) => (
key.clone(),
max_by_key(value_a, value_b, |v| v.version).clone(),
),
})
.collect(),
}
}
#[cfg(any(test, feature = "checker"))]
fn is_valid_collection<'a>(collection: impl IntoIterator<Item = &'a Self>) -> bool
where
Self: 'a,
{
use crate::checker::utils::IterExt as _;
// Convergence requirement: Entries from the same node that have the same version number
// must contain the same data. This is enforced for local modifications by always
// incrementing the version number when new data is `set`. Concurrent modifications are not
// allowed because each node can only write to its own entry in the vector.
collection
.into_iter()
.iter_pairs_unordered()
.all(|(s1, s2)| {
s1.elements.keys().chain(s2.elements.keys()).all(|k| {
match (s1.elements.get(k), s2.elements.get(k)) {
(Some(v1), Some(v2)) => v1 == v2 || v1.version != v2.version,
_ => true,
}
})
})
}
}
impl<N, V> ContentEq for VectorData<N, V>
where
N: Ord,
V: PartialEq,
{
fn content_eq(&self, other: &Self) -> bool {
self == other
}
}
impl<N: Ord, V, T: AsDeltaRef<VectorData<N, V>>> VectorDataRead<N, V> for T {}
impl<N: Ord, V, T: AsDeltaMut<VectorData<N, V>>> VectorDataWrite<N, V> for T {}
/// Error returned from [`VectorDataWrite::apply_changes`].
#[derive(Debug, PartialEq, Eq)]
pub enum VectorDataError {
/// Changes cannot be applied because data corresponding to a node other than the local node has
/// been modified.
MismatchedNode,
/// Error applying a change to [`VectorData`] because the version number will overflow.
VersionOverflow,
}
impl From<VersionOverflow> for VectorDataError {
fn from(_: VersionOverflow) -> Self {
VectorDataError::VersionOverflow
}
}
/// Checker to help implement invariant tests over arbitrary operations on a vector data.
///
/// Requires the feature _`checker`_.
#[cfg(any(test, feature = "checker"))]
pub mod checker {
use std::{collections::BTreeMap, fmt::Debug};
use arbitrary::Arbitrary;
use crate::{
checker::simulation::{Operation, SimulationContext},
delta::{AsDeltaRef, DeltaMut},
vector_data::VectorDataRead,
};
use super::{VectorData, VectorDataWrite};
/// Mutation operations on a [`VectorDataWrite`].
///
/// Can be used with [`Arbitrary`] to generate arbitrary operations to be applied on the vector
/// data.
#[derive(Debug, Clone, Arbitrary)]
#[allow(missing_docs)]
pub enum VecDataOp<N: Ord, V> {
Set { node: N, value: V },
ApplyChanges { node: N, plain: BTreeMap<N, V> },
}
impl<N, V> Operation<VectorData<N, V>, N> for VecDataOp<N, V>
where
N: Debug + Ord + Clone + 'static,
V: Debug + PartialEq + Eq + Clone + 'static,
{
fn apply(self, mut state: DeltaMut<VectorData<N, V>>, ctx: &SimulationContext<N>) {
let before = state.as_ref().merge();
let before_entries = before.entries();
match self {
VecDataOp::Set { node, value } => {
let result = state.set(node.clone(), Some(value.clone()));
let entries = state.entries();
if result.is_ok() {
assert_eq!(entries.get(&node), Some(&&value));
}
assert_non_updater_entries_unchanged(node, before_entries, entries);
}
VecDataOp::ApplyChanges { node, plain } => {
let result = state.apply_changes(&ctx.context(&node), plain.clone());
if result.is_ok() {
assert_eq!(plain, state.to_plain());
}
assert_non_updater_entries_unchanged(node, before_entries, state.entries());
}
}
}
}
fn assert_non_updater_entries_unchanged<N, V>(
updater: N,
mut before_entries: BTreeMap<&N, &V>,
mut after_entries: BTreeMap<&N, &V>,
) where
N: Ord + Eq + Clone + Debug + 'static,
V: Clone + Eq + Debug + 'static,
{
let _ = before_entries.remove(&updater);
let _ = after_entries.remove(&updater);
assert_eq!(before_entries, after_entries);
}
}
#[cfg(feature = "proto")]
mod proto {
use submerge_internal_proto::{FromProto, FromProtoError, NodeMapping, ToProto};
use super::{VecEntry, VectorData};
impl FromProto for VectorData<String, Vec<u8>> {
type Proto = submerge_internal_proto::protos::submerge::SubmergeVectorData;
fn from_proto(proto: &Self::Proto, node_ids: &[String]) -> Result<Self, FromProtoError> {
Ok(Self {
elements: proto
.elements
.iter()
.map(|elem| {
Ok((
node_ids
.get(elem.node as usize)
.ok_or(FromProtoError::MissingNodeId)?
.clone(),
VecEntry {
version: elem.version,
data: elem.value.clone(),
},
))
})
.collect::<Result<_, _>>()?,
})
}
}
impl ToProto for VectorData<String, Vec<u8>> {
type Proto = submerge_internal_proto::protos::submerge::SubmergeVectorData;
fn to_proto(&self, node_ids: &mut NodeMapping<String>) -> Self::Proto {
submerge_internal_proto::protos::submerge::SubmergeVectorData {
elements: self
.elements
.iter()
.map(|(node, v)| {
submerge_internal_proto::protos::submerge::submerge_vector_data::VecElement {
node: node_ids.get_index(node),
value: v.data.clone(),
version: v.version,
..Default::default()
}
})
.collect(),
..Default::default()
}
}
}
#[cfg(test)]
#[derive_fuzztest::proptest]
fn vector_data_roundtrip(vector_data: VectorData<String, Vec<u8>>) {
let mut node_ids = NodeMapping::default();
assert_eq!(
VectorData::from_proto(&vector_data.to_proto(&mut node_ids), &node_ids.into_vec())
.unwrap(),
vector_data
);
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use crate::{
checker::test_fakes::FakeContext,
vector_data::{VectorDataError, VectorDataRead, VectorDataWrite},
CrdtState,
};
use super::VectorData;
#[test]
fn set_value() {
let mut v = VectorData::<u8, u8>::default();
let _ = v.set(0, Some(66)).unwrap();
let _ = v.set(1, Some(88)).unwrap();
assert_eq!(BTreeMap::from_iter([(&0, &66), (&1, &88)]), v.entries());
}
#[test]
fn version_overflow() {
let mut v = VectorData::<u8, u8>::default();
v.bump_version_number_for_testing(0, u32::MAX).unwrap();
let _ = v.set(0, Some(66)).unwrap_err();
let _ = v.set(1, Some(88)).unwrap();
assert_eq!(BTreeMap::from_iter([(&1, &88)]), v.entries());
}
#[test]
fn apply_changes() {
let mut v = VectorData::<u8, u8>::default();
let _ = v.set(0, Some(66)).unwrap();
let _ = v.set(1, Some(88)).unwrap();
let mut plain = v.to_plain();
let _ = plain.entry(0).and_modify(|e| *e = 99);
let changed = v.apply_changes(&FakeContext::new(0, 0_u8), plain).unwrap();
assert!(changed);
assert_eq!(BTreeMap::from_iter([(&0, &99), (&1, &88)]), v.entries());
}
#[test]
fn apply_changes_mismatched() {
let mut v = VectorData::<u8, u8>::default();
let _ = v.set(0, Some(66)).unwrap();
let _ = v.set(1, Some(88)).unwrap();
let mut plain = v.to_plain();
let _ = plain.entry(1).and_modify(|e| *e = 99);
let result = v.apply_changes(&FakeContext::new(0, 0_u8), plain);
assert_eq!(VectorDataError::MismatchedNode, result.unwrap_err());
}
#[test]
fn merge() {
let mut v_parent = VectorData::<u8, u8>::default();
let _ = v_parent.set(0, Some(66)).unwrap();
let mut child1 = v_parent.clone();
let _ = child1.set(1, Some(44)).unwrap();
let _ = v_parent.set(0, Some(88)).unwrap();
let mut child2 = v_parent.clone();
let _ = child2.set(2, Some(22)).unwrap();
let v_merged = VectorData::merge(&child1, &child2);
assert_eq!(
BTreeMap::from_iter([(&0, &88), (&1, &44), (&2, &22)]),
v_merged.entries()
);
}
#[test]
fn merge_undefined_behavior() {
let mut v1 = VectorData::<u8, u8>::default();
let _ = v1.set(1, Some(88)).unwrap();
let mut v2 = VectorData::<u8, u8>::default();
let _ = v2.set(1, Some(44)).unwrap();
let _ = VectorData::merge(&v1, &v2);
// This is undefined behavior (in the generic sense of the word), it should never happen
// since set(1, 44) should always happen after observing set(1, 88) and thus increment the
// version number
}
#[test]
fn test_apply_changes() {
let mut vector_data = VectorData::<u8, u8>::default();
let plain = BTreeMap::from_iter([(1, 1)]);
let _ = vector_data
.apply_changes(&FakeContext::new(1, 0_u8), plain.clone())
.unwrap();
assert_eq!(plain, vector_data.to_plain());
}
#[test]
fn test_apply_changes_mismatch() {
let mut vector_data = VectorData::<u8, u8>::default();
let plain = BTreeMap::from_iter([(1, 1)]);
// Apply changes from Node 2, but trying to change data on Node 1
assert_eq!(
Err(VectorDataError::MismatchedNode),
vector_data.apply_changes(&FakeContext::new(2, 0_u8), plain.clone())
);
}
#[test]
fn test_init_from_plain() {
let plain = BTreeMap::from_iter([(1, 1)]);
let mut vector_data = VectorData::default();
let _ = vector_data
.apply_changes(&FakeContext::new(1, 0_u8), plain.clone())
.unwrap();
assert_eq!(plain, vector_data.to_plain());
}
#[test]
fn test_init_from_plain_empty() {
let plain: BTreeMap<u8, u8> = BTreeMap::new();
let mut vector_data = VectorData::default();
let _ = vector_data
.apply_changes(&FakeContext::new(1, 0_u8), plain.clone())
.unwrap();
assert_eq!(plain, vector_data.to_plain());
}
#[test]
fn test_init_from_plain_mismatch() {
let plain = BTreeMap::from_iter([(1, 1)]);
let mut vector_data = VectorData::default();
assert_eq!(
Err(VectorDataError::MismatchedNode),
vector_data.apply_changes(&FakeContext::new(2, 0_u8), plain.clone()),
);
}
}