delta_state_mv_register.cljc

4 min read Original article ↗
(ns delta-state-mv-register "A fully working implementation of a δ-CRDT Multi-Value Register data type. It supports both Clojure and ClojureScript. This namespace includes code to replicate the data type using core.async. The data type requires a *causal* delivery abstraction to ensure strong eventual consistency. The cool thing about δ-CRDTs is that they combine the best properties of both state-based and op-based CRDTs. With state-based CRDTs, it's OK if messages are dropped in between -- the full history is not needed for replication. The problem is with state-based CRDTs, *every* message grows unbounded with the size of the dataset. With op-based CRDTs, no messages can be dropped or reordered. In return the message size is bounded, but they suffer another problem: each peer must receive the full history for the initial sync. δ-CRDTs use a hybrid approach: the size of the messages can be variable and bounded because δ-CRDT implementations can choose to either send partial *or* full state in the message. The history is compressed using vector clocks so the initial sync can be fast. Reference: Delta State Replicated Data Types https://arxiv.org/abs/1603.01529" (:refer-clojure :exclude [read]) (:require [clojure.set :as set] [clojure.core.async :as async :refer [go go-loop <! >!]])) ;; MV Register ADT (defn- clock-next [i c] (update c i inc)) (defn- clock-union [& cs] (apply merge-with max cs)) (defn- clock-contains? [x other] (and (every? (fn [[i n]] (<= (get other i) n)) x) (every? (fn [[i n]] (<= n (get x i))) other))) (defn mv-register [i] [i {} {i 0}]) (defn write [v [i m c]] (let [d (clock-next i c) m' {d v} c' (apply clock-union d (keys m))] [i m' c'])) (defn read [[_ m _]] (set (vals m))) (defn join [[i m1 c1] [_ m2 c2]] (let [intersection (set/intersection (set (keys m1)) (set (keys m2))) s1 (set (select-keys m1 intersection)) s2 (set (filter (fn [[d _]] (not (clock-contains? c2 d))) m1)) s3 (set (filter (fn [[d _]] (not (clock-contains? c1 d))) m2)) m3 (into {} (set/union s1 s2 s3)) c3 (clock-union c1 c2)] [i m3 c3])) ;; Peer Process Using Gossiping Broadcast (defn start [{:keys [pid in broadcast on-update] :as config}] (let [state (atom {:payload (mv-register pid) :stopped? false})] (go-loop [] (when (not (:stopped? @state)) (when-some [{:keys [payload]} (<! in)] (let [new-val (swap! state update :payload #(join % payload))] (on-update (:payload new-val)) (recur))))) (go-loop [] (when-not (:stopped? @state) (>! broadcast {:pid pid, :payload (:payload @state)}) (<! (async/timeout (+ 3000 (rand-int 7000)))) (recur))) {:state state})) (defn payload [{:keys [state] :as peer}] (:payload @state)) (defn stop! [peer-or-peers] (if (sequential? peer-or-peers) (dorun (map stop! peer-or-peers)) (swap! (:state peer-or-peers) assoc :stopped? true))) (defn write! [{:keys [state] :as peer} value] (swap! state update :payload #(write value %))) ;; Network Simulation (defn- log-payload [pid payload] (println (str pid " " (pr-str (read payload))))) (defn simulate-one [pid broadcast-mult broadcast-src] (let [process-in (async/chan 100 (remove (comp #{pid} :pid)))] (async/tap broadcast-mult process-in) (start {:pid pid :in process-in :broadcast broadcast-src :on-update (partial log-payload pid)}))) (defn simulate-all [process-ids] (let [broadcast-src (async/chan 100) broadcast-mult (async/mult broadcast-src)] (->> process-ids (map (fn [pid] [pid (simulate-one pid broadcast-mult broadcast-src)])) (doall) (into {})))) (comment ;;;; Try it yourself! ;;; Start the network (def peers (simulate-all [:a :b :c])) ;;; Create a conflict ;; If you execute both of these commands in the REPL at the same time, ;; the value will converge to #{"bar" "foo"}. (write! (:a peers) "foo") (write! (:b peers) "bar") ; => ; :a #{"bar" "foo"} ; :b #{"bar" "foo"} ; :c #{"bar" "foo"} ;;; Resolve a conflict ;; If you execute one of these commands in the REPL, wait, then execute the ;; second, the value will converge to #{"bar"}. This is because the last write ;; will resolve all conflicts if the peer doing the writing has seen ;; all other previous writes. (write! (:a peers) "foo") ; now wait 10 seconds (write! (:b peers) "bar") ; => ; :a #{"bar"} ; :b #{"bar"} ; :c #{"bar"} ;;; Shut down the network (stop! peers) )