|
(ns delta-state-mv-register |
|
"A fully working implementation of a δ-CRDT Multi-Value Register data type. |
|
It supports both Clojure and ClojureScript. |
|
|
|
This namespace includes code to replicate the data type using core.async. |
|
The data type requires a *causal* delivery abstraction to ensure strong |
|
eventual consistency. |
|
|
|
The cool thing about δ-CRDTs is that they combine the best properties of both |
|
state-based and op-based CRDTs. |
|
|
|
With state-based CRDTs, it's OK if messages are dropped in between -- the |
|
full history is not needed for replication. The problem is with state-based |
|
CRDTs, *every* message grows unbounded with the size of the dataset. |
|
|
|
With op-based CRDTs, no messages can be dropped or reordered. In return the |
|
message size is bounded, but they suffer another problem: each peer must |
|
receive the full history for the initial sync. |
|
|
|
δ-CRDTs use a hybrid approach: the size of the messages can be variable and |
|
bounded because δ-CRDT implementations can choose to either send partial *or* |
|
full state in the message. The history is compressed using vector clocks so |
|
the initial sync can be fast. |
|
|
|
Reference: Delta State Replicated Data Types |
|
https://arxiv.org/abs/1603.01529" |
|
(:refer-clojure :exclude [read]) |
|
(:require [clojure.set :as set] |
|
[clojure.core.async :as async :refer [go go-loop <! >!]])) |
|
|
|
;; MV Register ADT |
|
|
|
(defn- clock-next [i c] |
|
(update c i inc)) |
|
|
|
(defn- clock-union [& cs] |
|
(apply merge-with max cs)) |
|
|
|
(defn- clock-contains? [x other] |
|
(and |
|
(every? (fn [[i n]] (<= (get other i) n)) x) |
|
(every? (fn [[i n]] (<= n (get x i))) other))) |
|
|
|
(defn mv-register [i] |
|
[i {} {i 0}]) |
|
|
|
(defn write [v [i m c]] |
|
(let [d (clock-next i c) |
|
m' {d v} |
|
c' (apply clock-union d (keys m))] |
|
[i m' c'])) |
|
|
|
(defn read [[_ m _]] |
|
(set (vals m))) |
|
|
|
(defn join [[i m1 c1] [_ m2 c2]] |
|
(let [intersection (set/intersection (set (keys m1)) (set (keys m2))) |
|
s1 (set (select-keys m1 intersection)) |
|
s2 (set (filter (fn [[d _]] (not (clock-contains? c2 d))) m1)) |
|
s3 (set (filter (fn [[d _]] (not (clock-contains? c1 d))) m2)) |
|
m3 (into {} (set/union s1 s2 s3)) |
|
c3 (clock-union c1 c2)] |
|
[i m3 c3])) |
|
|
|
;; Peer Process Using Gossiping Broadcast |
|
|
|
(defn start [{:keys [pid in broadcast on-update] :as config}] |
|
(let [state (atom {:payload (mv-register pid) |
|
:stopped? false})] |
|
(go-loop [] |
|
(when (not (:stopped? @state)) |
|
(when-some [{:keys [payload]} (<! in)] |
|
(let [new-val (swap! state update :payload #(join % payload))] |
|
(on-update (:payload new-val)) |
|
(recur))))) |
|
(go-loop [] |
|
(when-not (:stopped? @state) |
|
(>! broadcast {:pid pid, :payload (:payload @state)}) |
|
(<! (async/timeout (+ 3000 (rand-int 7000)))) |
|
(recur))) |
|
{:state state})) |
|
|
|
(defn payload [{:keys [state] :as peer}] |
|
(:payload @state)) |
|
|
|
(defn stop! [peer-or-peers] |
|
(if (sequential? peer-or-peers) |
|
(dorun (map stop! peer-or-peers)) |
|
(swap! (:state peer-or-peers) assoc :stopped? true))) |
|
|
|
(defn write! [{:keys [state] :as peer} value] |
|
(swap! state update :payload #(write value %))) |
|
|
|
;; Network Simulation |
|
|
|
(defn- log-payload [pid payload] |
|
(println (str pid " " (pr-str (read payload))))) |
|
|
|
(defn simulate-one [pid broadcast-mult broadcast-src] |
|
(let [process-in (async/chan 100 (remove (comp #{pid} :pid)))] |
|
(async/tap broadcast-mult process-in) |
|
(start {:pid pid |
|
:in process-in |
|
:broadcast broadcast-src |
|
:on-update (partial log-payload pid)}))) |
|
|
|
(defn simulate-all [process-ids] |
|
(let [broadcast-src (async/chan 100) |
|
broadcast-mult (async/mult broadcast-src)] |
|
(->> process-ids |
|
(map (fn [pid] [pid (simulate-one pid broadcast-mult broadcast-src)])) |
|
(doall) |
|
(into {})))) |
|
|
|
(comment |
|
|
|
;;;; Try it yourself! |
|
|
|
;;; Start the network |
|
|
|
(def peers (simulate-all [:a :b :c])) |
|
|
|
;;; Create a conflict |
|
|
|
;; If you execute both of these commands in the REPL at the same time, |
|
;; the value will converge to #{"bar" "foo"}. |
|
|
|
(write! (:a peers) "foo") |
|
(write! (:b peers) "bar") |
|
; => |
|
; :a #{"bar" "foo"} |
|
; :b #{"bar" "foo"} |
|
; :c #{"bar" "foo"} |
|
|
|
;;; Resolve a conflict |
|
|
|
;; If you execute one of these commands in the REPL, wait, then execute the |
|
;; second, the value will converge to #{"bar"}. This is because the last write |
|
;; will resolve all conflicts if the peer doing the writing has seen |
|
;; all other previous writes. |
|
|
|
(write! (:a peers) "foo") ; now wait 10 seconds |
|
(write! (:b peers) "bar") |
|
; => |
|
; :a #{"bar"} |
|
; :b #{"bar"} |
|
; :c #{"bar"} |
|
|
|
;;; Shut down the network |
|
|
|
(stop! peers) |
|
|
|
) |