Fault-Tolerent Distributed Systems with Replicated State Machines in Go

Sunday 17 March 2024 ยท 2 hours 58 mins read ยท Viewed 83 times

Table of contents ๐Ÿ”—


Introduction ๐Ÿ”—

I've just read the book by Travis Jeffery, "Distributed Services with Go" and I wanted to share a simple example of a distributed system in Go.

In his book, Travis Jeffery doesn't really define what a distributed system is, but focuses on implementing a "reliable, scalable, and maintainable system" as per the subtitle of the book, which is a fine and practical approach if you wish to really learn a production-ready example. However, he doesn't explain his choice of tools and libraries, which is a bit frustrating when it's a book about "implementing".

Thus, he creates a Go service with gRPC, OpenTelemetry, LB, and other tools to create a reliable distributed system.

The objective of this article is to create a simple example of distributed system and focus on the distributed aspect of the system. Whether you implement gRPC as transport or OpenTelemetry for observability is up to you. I made the choice to use different technologies than Travis Jeffery to give you a different perspective.

Before starting, a few words about what has changed since the writing of Travis Jeffery's book:

  • Go 1.22 is out. Many standard libraries have deprecated/moved some functions: ioutil has been deprecated, boltdb is less efficient than pebble, etc...
  • The book uses some libraries that are simply non-standard like go-mmap, while he could have used the syscall library.
  • He forked hashicorp/raft-boltdb to use etcd-io/bbolt instead of boltdb/bolt.

This article will try to stick to the standard library as much as possible, though I cannot guarantee that it will be deprecated in the future.

We will also fork hashicorp/raft to implement a custom RPC and fix the network layer to allow mutual TLS (which, by the way, Travis Jeffery didn't do). Forking this library is almost necessary since hashicorp/raft is "too" close to the paper and didn't think about possible extensions.

Let's start with a few definitions.

What is a distributed system? ๐Ÿ”—

A distributed system is a system whose components are located on different networked computers, which communicate and coordinate their actions by passing messages to one another to achieve a common goal.

One style of distributed system is called "fault-tolerant distributed system", or basically replicated services which communicate with each other to ensure that the system is always available and that the data is always consistent.

As you may know, you have two type of services:

  • Stateless services: They don't store any state alongside the process. They are easy to scale and are fault-tolerant by design. They are also easy to implement and maintain.

    UserLoad BalancersServicesDatabaseslb1lb2service1service2db1db2

    Stateless services don't really need to communicate with each other. They can use their "already" distributed database or event broker to communicate with each other.

    Often, this is the architecture you want to follow when you can delegate the state to a database, or the commands to an event broker.

  • Stateful services: The service has some kind of state, like a persistent storage.

    UserLoad BalancersServiceslb1lb2service1service2

    This time, the services need to communicate with each other to ensure that the state is consistent across the cluster.

    This architecture is used when your "state" is a lot more complex and cannot be delegated to a database or an event broker. There could also be because of latency issues with the database or the event broker.

The article aims to implement a simple example of distributed stateful service. There are already plentiful guides (and even architecture) about creating distributed stateless services with an event broker or a distributed database/lock.

Instead, let's look examples of already existing distributed systems.

Examples of stateful distributed systems ๐Ÿ”—

ETCD ๐Ÿ”—

Let's start with etcd, a distributed key-value store. ETCD is often the "base" of many distributed systems like Kubernetes, ArgoCD, etc... because, as a key-value store, it is observable, scalable, and fault-tolerant. It serves as a good distributed lock and a good distributed configuration store.

In distributed systems, replication is a key feature to scale up. Being able to automatically scale up a stateful system is one of the main objectives to any Site-Reliability Engineer. If you look at MySQL or PostgreSQL, it is very easy to deploy "read-only" replicas. However, it's a lot more complex to deploy "write" replicas. MySQL and PostgreSQL have a "source" and "replicas" architecture, where the "source" is the only one that can write and the "replicas" can only read. The problem with this architecture is that if the "source" goes down, the "replicas" cannot write, and you will need to manually promote a "replica" into a new "source".

However, instead of using a "source-replicas" architecture, ETCD uses a "leader-elected" architecture through the Raft consensus algorithm. A consensus algorithm elects a "leader" that can write, and the "followers" can only read. If the "leader" goes down, a new "leader" is elected through some kind of voting. This is an automated process and doesn't require any manual intervention.

References:

Bitcoin ๐Ÿ”—

Bitcoin is a decentralized cryptocurrency, though I want to talk about its blockchain. If you don't already know, a blockchain is simply a linked list (or a queue) of blocks. A block contains diverse data like the hash of the previous block, or the list of transactions.

What's the most interesting is how the blockchain is replicated across the network. The blockchain is very similar to etcd, but instead of replicating a key-value store, it replicates a linked list. The blockchain also uses a different consensus algorithm called "Proof of Work" (PoW) to elect a "leader" that can write a new block. This algorithm doesn't elect through voting, but via some kind of competition: the first one to solve a complex mathematical problem can write a new block.

There is also additional major difference with Bitcoin and etcd: its node discovery strategy. Because Bitcoin is a public cryptocurrency, Bitcoin uses hard-coded bootstrap nodes to discover new "actual" nodes. Comparatively, ETCD is either configured statically or through a discovery service.

NOTE

goquorum, a fork of go-ethereum, uses the same consensus algorithm as etcd for private blockchains. As you can see, etcd is quite similar to a blockchain.

References:

IPFS ๐Ÿ”—

The InterPlanetary File System (IPFS) is a content delivery network (CDN) built to distribute data around the world. IPFS is a lot more complex than etcd and Bitcoin:

  • Because it is a distributed file system, the node discovery is a different and not all nodes are equal. IPFS store files in block with ID. Each IPFS node uses "Want lists" to exchange data with other nodes. The "Want list" is a list of blocks that the node wants to retrieve from other nodes. The "Want list" is also used to discover new nodes. This is called the "BitSwap" protocol.
  • IPFS uses a distributed hash table (DHT) containing keys mapped to values. There are 3 types of keys: One to search for blocks. One to resolve identities/names. One to search for peers that may be reached. As said in the article: "it's a catalog and a navigation system to search for blocks". Bitcoin and etcd don't have this kind of system. To spread information, Bitcoin uses the "gossip" protocol and etcd uses RPCs of the "Raft" consensus algorithm.

Overall, IPFS is drastically different from etcd and Bitcoin. IPFS aims to distribute files across partially the network and follows an approach similar to BitTorrent. P2P and BitSwap is used to exchange data, while DHT is used to routes the data. No consensus algorithm is used since the "writer" is not replicated, as we do not care about fault-tolerance on the writer.

However, if you want to replicate the writer, you can use IPFS Cluster which can uses CRDT (Conflict-free Replicated Data Types) or Raft (log-based replication) as consensus algorithm.

References:

Summary ๐Ÿ”—

As you can see, the "distributed" aspect of the fault-tolerant system can be summarized in three points:

  • Node discovery: How do nodes discover each other? Are all nodes equal? How do they exchange data?
  • Consensus algorithm/Writer election: How do nodes elect a leader? How do they ensure that the data is consistent?
  • Data exchange/spreading: How do nodes exchange data? How do they ensure that the data is consistent?

This article will simply focus on Raft and its paper. We will answer each of these questions while implementing the example.

The example: A distributed key-value store with Raft ๐Ÿ”—

The objective ๐Ÿ”—

The objective of the example is to create a simple distributed key-value store with Raft. Not only this is the most popular example, but it also answers the three questions (node discovery, writer election, data exchange).

Before even starting, I want to say that Raft solves the very issue of "replicated state machines", systems that are stateful because they have a finite state machine. If you are looking to "distribute" a service with a nondeterministic state machine, I suggest you to stop reading this article and look for a different solution.

Understanding Raft at its core ๐Ÿ”—

Please, do read the paper (part 5) and visit the official website to understand the consensus algorithm. Still, I will try to summarize the paper in a few words. Though, this article doesn't aim to implement Raft from scratch, but to use it as a library.

A stateful service could be defined as a state machine. A state machine is a system that can be in a finite number of states and can change from one state to another. A state machine can be deterministic or nondeterministic. A deterministic state machine is a state machine that can only be in one state at a time and can only change to one state at a time. A nondeterministic state machine is a state machine that can be in multiple states at a time and can change to multiple states at a time.

Replicating deterministic state machines results in fault-tolerance since the replication of the "deterministic" aspect permits the "rebuilding" of the state simply by running the same commands.

Logs and state machines ๐Ÿ”—

Replicated state machines uses logs (an ordered list of commands) which each state machines executes. State machine can be replicated efficiently across the network by simply replicating and running the logs.

Example: If a node goes down, the logs can be used to "replay" the state machine and bring the node back to the same state as the other nodes just by executing the commands written in the logs.

Benefits are immediate. I cite:

  • They ensure safety (never returning an incorrect result) under all non-Byzantine conditions, including network delays, partitions, and packet loss, duplication, and reordering.
  • They are fully functional (available) as long as any majority of the servers are operational and can communicate with each other and with clients. Thus, a typical cluster of five servers can tolerate the failure of any two servers. Servers are assumed to fail by stopping; they may later recover from state on stable storage and rejoin the cluster.
  • They do not depend on timing to ensure the consistency of the logs: faulty clocks and extreme message delays can, at worst, cause availability problems.
  • In the common case, a command can complete as soon as a majority of the cluster has responded to a single round of remote procedure calls; a minority of slow servers need not impact overall system performance.
NOTE

By the way, I hated the fact that Travis Jeffery's book uses logs as an example, which makes things more confusing... He even reused his "homemade" store as the logs store of Raft. I'd prefer he would use something other than the same data structure as Raft's as an example. Here, we will use a simple key-value store instead of a logs store.

Logs are already implemented in the Raft library, but we still need to define our state machine.

But! Before of that, let's start by bootstrapping the project. We will come back to the state machine later.

Bootstrapping the project ๐Ÿ”—

Git clone the following repository:

1git clone -b bootstrap https://github.com/Darkness4/distributed-kv.git

The repository is already setup for:

  • GitHub Actions.
  • Main functions stored in cmd/dkv and cmd/dkvctl.
  • A simple Makefile. (Please read it to understand the commands.)
  • A Dockerfile.
  • Tools that will be used later.

Commands available are:

  • make all: compiles the client and server.
  • make unit: runs unit tests.
  • make integration: runs integration tests.
  • make lint: lint the code.
  • make fmt: formats the code.
  • make protos: compiles the protocol buffers into generated Go code.
  • make certs: generates the certificates for the server, client and CA.
  • make clean: cleans the project.

Now, let's implement the key-value store.

Implementing the key-value store ๐Ÿ”—

The state machine ๐Ÿ”—

A state machine is defined by state and commands. To define the state machine, start thinking about the commands that can mutate a state. In our case, the commands are:

  • SET(value): Set a key to a value.
  • DELETE: Delete a key.

The state machine of the KV store is actually a composition of state machines, where a key has its own state machine. The state is the value of the key. The state of the KV store is the state of all the keys.

startemptyset(value) DELETESET(value)DELETESET(value)

The implementation ๐Ÿ”—

We will use pebble as the persisted KV-store. Feel free to use SQLite or something else. Since we will be also using pebble to store the data (the logs of the state machines) of Raft, we kill two birds with one stone.

Create the file internal/store/persisted/store.go:

 1package persisted
 2
 3import (
 4  "path/filepath"
 5
 6  "github.com/cockroachdb/pebble"
 7)
 8
 9type Store struct {
10  *pebble.DB
11}
12
13func New(path string) *Store {
14  path = filepath.Join(path, "pebble")
15  db, err := pebble.Open(path, &pebble.Options{})
16  if err != nil {
17    panic(err)
18  }
19  return &Store{db}
20}
21
22func (s *Store) Get(key string) (string, error) {
23  v, closer, err := s.DB.Get([]byte(key))
24  if err != nil {
25    return "", err
26  }
27  defer closer.Close()
28  return string(v), nil
29}
30
31func (s *Store) Set(key string, value string) error {
32  return s.DB.Set([]byte(key), []byte(value), pebble.Sync)
33}
34
35func (s *Store) Delete(key string) error {
36  return s.DB.Delete([]byte(key), pebble.Sync)
37}
38
39func (s *Store) Close() error {
40  return s.DB.Close()
41}
42

Write the tests if you want to. Tests are stored in the repository.

Using Raft to distribute commands ๐Ÿ”—

Understanding Raft's lifecycle ๐Ÿ”—

Now that we've implemented the store, we need to use Raft to distribute the commands across the network. As we said in the past sections, Raft uses elections to elect a leader that can write to the logs (list of commands). In raft, nodes can be in three states:

  • Follower: The node is waiting for a leader to send commands.
  • Candidate: The node is trying to become a leader.
  • Leader: The node can send commands to the followers.

At the very beginning, all nodes are followers. Only after a timeout (ElectionTimeout) without receiving AppendEntries RPC that followers become candidates. The candidate sends a RequestVote RPC to the other nodes while voting for himself. If the candidate receives a majority of votes, it becomes a leader. If the candidate doesn't receive a majority of votes, it becomes a follower again.

Upon election, leaders will send AppendEntries RPCs (hearbeats) to the followers to prevent the election timeout.

startfollowercandidateleader ElectionTimeoutMajority of votesDiscovers current leader or new termRequestVoteDiscovers server with higher term

Understanding Raft's RPCs and Term ๐Ÿ”—

At its core, Raft has only 2 RPCs:

  • RequestVote: Sent by candidates to gather votes.
  • AppendEntries: Sent by leaders to replicate logs. It is also used to send heartbeats.

Since we won't be implementing the internals of Raft (we will use hashicorp/raft), I recommend the read the Figure 2 of the paper to see the parameters and results of each RPC.

The "term" is a number that is incremented every time a new leader is elected. It is used to prevent "old" leaders to send commands to the followers.

Do also note that hashicorp/raft implements other RPCs to optimize the consensus algorithm. hashicorp/raft also implements the InstallSnapshot RPC to compact the logs and restore the logs after a crash, as per the paper.

Implementing the Finite State Machine for Raft ๐Ÿ”—

Define commands for Raft ๐Ÿ”—

We will use protocol buffers to define commands for Raft. Do note that these are the commands for peer-to-peer replication, not the commands for server-to-client. Transport is already handled by Raft, so you don't need to write a service. Heck, you can either use JSON or prefixing a byte to indicate different commands (Travis Jeffery' Method, which is simply too ambiguous ๐Ÿคฆ).

  1. Create the file protos/dkv/v1/dkv.proto:

     1syntax = "proto3";
     2
     3package dkv.v1;
     4
     5// Command is a message used in Raft to replicate log entries.
     6message Command {
     7  oneof command {
     8    Set set = 1;
     9    Delete delete = 2;
    10  }
    11}
    12
    13message Set {
    14  string key = 1;
    15  string value = 2;
    16}
    17
    18message Delete { string key = 1; }
    19
    
  2. We use Buf to standardize the protocol buffers layout. Create this file protos/buf.yaml to standardize the layout:

    1version: v1
    2breaking:
    3  use:
    4    - FILE
    5lint:
    6  use:
    7    - DEFAULT
    8  service_suffix: API
    
  3. Now, simply run:

    1make protos
    

    The directory gen should be created. It contains the generated Go code. Feel free to look its implementation as it shows how data is packed and unpacked.

Implementing the Finite State Machine ๐Ÿ”—

We will use hashicorp/raft and not etcd-io/raft, because it is more modular and easier to use. hashicorp/raft requires us to implement the raft.FSM interface.

  1. Create the file internal/store/distributed/fsm.go:

     1package distributed
     2
     3import (
     4  dkvv1 "distributed-kv/gen/dkv/v1"
     5  "encoding/csv"
     6  "errors"
     7  "io"
     8
     9  "github.com/hashicorp/raft"
    10  "google.golang.org/protobuf/proto"
    11)
    12
    13var _ raft.FSM = (*FSM)(nil)
    14
    15type Storer interface {
    16  Delete(key string) error
    17  Set(key, value string) error
    18  Get(key string) (string, error)
    19}
    20
    21type FSM struct {
    22  storer Storer
    23}
    24
    25func NewFSM(storer Storer) *FSM {
    26  return &FSM{storer: storer}
    27}
    28
    29// Apply execute the command from the Raft log entry.
    30func (f *FSM) Apply(l *raft.Log) interface{} {
    31  panic("unimplemented")
    32}
    33
    34// Restore restores the state of the FSM from a snapshot.
    35func (f *FSM) Restore(snapshot io.ReadCloser) error {
    36  panic("unimplemented")
    37}
    38
    39// Snapshot dumps the state of the FSM to a snapshot.
    40func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
    41  panic("unimplemented")
    42}
    

    We must implement the raft.FSM interface. Let's just implement the Apply method for now, as this is the most important method.

  2. Implement the Apply method:

     1// Apply execute the command from the Raft log entry.
     2func (f *FSM) Apply(l *raft.Log) interface{} {
     3  // Unpack the data
     4  var cmd dkvv1.Command
     5  if err := proto.Unmarshal(l.Data, &cmd); err != nil {
     6    return err
     7  }
     8
     9  // Apply the command
    10  switch c := cmd.Command.(type) {
    11  case *dkvv1.Command_Set:
    12    return f.storer.Set(c.Set.Key, c.Set.Value)
    13  case *dkvv1.Command_Delete:
    14    return f.storer.Delete(c.Delete.Key)
    15  }
    16
    17  return errors.New("unknown command")
    18}
    

    As you can see, using Protocol Buffers is not only efficient, but also readable.

    Feel free to implement tests for the Apply method. The repository contains the tests and mocks.

The "crash" recovery: snapshots and restoring logs ๐Ÿ”—

Raft can use an additional store to compact the logs and restore the logs after a crash. The store is called the file snapshot store (fss).

Snapshots are used to quickly restore logs before fetching the rest of the logs from peers after a crash. It is used to compact the logs. Snapshots are taken when the logs reach a certain size.

Internally, this adds another RPC called InstallSnapshot. For us, we simply have to implement the Snapshot, Restore methods of the FSM and define the raft.FSMSnapshot interface. Snapshot and Restore requires us to use io.ReadCloser and io.WriteCloser (raft.SnapshotSink) to read and write the snapshots. To optimize reading and writing the snapshot, we will extend the Store to allow an instant "dump" and "restore" of the store.

  1. Extend the interface to "dump" the store:

    1type Storer interface {
    2  Delete(key string) error
    3  Set(key, value string) error
    4  Get(key string) (string, error)
    5  Dump() map[string]string
    6  Clear()
    7}
    
  2. Implement the Snapshot and Restore methods of the FSM. We will use the csv library to pack the snapshot since:

    • Our data is a list of strings (basically)
    • It's easy to read and write sequentially, especially for key-value stores.
    • It's faster than JSON and protocol buffers.
    • csv.Writer and csv.Reader implements io.Writer and io.Reader, which is required by raft.SnapshotSink and io.ReadCloser.
     1// Restore restores the state of the FSM from a snapshot.
     2func (f *FSM) Restore(snapshot io.ReadCloser) error {
     3  f.storer.Clear()
     4  r := csv.NewReader(snapshot)
     5  for {
     6    record, err := r.Read()
     7    if err == io.EOF {
     8      break
     9    }
    10    if err != nil {
    11      return err
    12    }
    13    if err := f.storer.Set(record[0], record[1]); err != nil {
    14      return err
    15    }
    16  }
    17  return nil
    18}
    19
    20// Snapshot dumps the state of the FSM to a snapshot.
    21//
    22// nolint: ireturn
    23func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
    24  return &fsmSnapshot{store: f.storer.Dump()}, nil
    25}
    26
    27var _ raft.FSMSnapshot = (*fsmSnapshot)(nil)
    28
    29type fsmSnapshot struct {
    30  store map[string]string
    31}
    32
    33// Persist should dump all necessary state to the WriteCloser 'sink',
    34// and call sink.Close() when finished or call sink.Cancel() on error.
    35func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
    36  err := func() error {
    37    csvWriter := csv.NewWriter(sink)
    38    for k, v := range f.store {
    39      if err := csvWriter.Write([]string{k, v}); err != nil {
    40        return err
    41      }
    42    }
    43    csvWriter.Flush()
    44    if err := csvWriter.Error(); err != nil {
    45      return err
    46    }
    47    return sink.Close()
    48  }()
    49
    50  if err != nil {
    51    if err = sink.Cancel(); err != nil {
    52      panic(err)
    53    }
    54  }
    55
    56  return err
    57}
    58
    59// Release is invoked when we are finished with the snapshot.
    60func (f *fsmSnapshot) Release() {}
    

    Feel free to implement tests for the Snapshot and Restore methods. The Git repository contains the tests and mocks.

  3. Extend the Store struct from internal/store/persisted to implement the Storer interface:

     1func (s *Store) Dump() map[string]string {
     2  s.mu.RLock()
     3  defer s.mu.RUnlock()
     4  data := make(map[string]string, len(s.data))
     5  for k, v := range s.data {
     6    data[k] = v
     7  }
     8  return data
     9}
    10
    11func (s *Store) Clear() {
    12  s.mu.Lock()
    13  defer s.mu.Unlock()
    14  s.data = make(map[string]string)
    15}
    

Preparing the storage for Raft ๐Ÿ”—

Raft uses two stores to store logs: the Logs store (ldb) and the Stable store (sdb). hashicorp/raft normally uses hashicorp/raft-boltdb to store the logs. However, boltdb has fallen out of favor for pebble (especially in the blockchain ecosystem) and therefore, it is more appropriate to use pebble instead of boltdb.

However, we still need to implement the raft.LogStore and raft.StableStore interfaces for pebble. You can use weedge's pebble library to implement the interfaces, but, it is preferable to copy the files instead of importing the library to fix breaking changes between pebble versions. You can also copy my implementation from the repository. You could also fork weedge's pebble library and fix the breaking changes.

  1. Import the "raftpebble" files in the internal/raftpebble directory.

  2. Create a file internal/store/distributed/store.go:

     1package distributed
     2
     3import "github.com/hashicorp/raft"
     4
     5const (
     6  retainSnapshotCount = 2
     7)
     8
     9type Store struct {
    10  // RaftDir is the directory where the Stable and Logs data is stored.
    11  RaftDir string
    12  // RaftBind is the address to bind the Raft server.
    13  RaftBind string
    14  // RaftID is the ID of the local node.
    15  RaftID string
    16  // RaftAdvertisedAddr is the address other nodes should use to communicate with this node.
    17  RaftAdvertisedAddr raft.ServerAddress
    18
    19  fsm  *FSM
    20  raft *raft.Raft
    21}
    22
    23func NewStore(
    24  raftDir, raftBind, raftID string,
    25  raftAdvertisedAddr raft.ServerAddress,
    26  storer Storer,
    27) *Store {
    28  return &Store{
    29    RaftDir:            raftDir,
    30    RaftBind:           raftBind,
    31    RaftID:             raftID,
    32    RaftAdvertisedAddr: raftAdvertisedAddr,
    33    fsm:                NewFSM(storer),
    34  }
    35}
    36
    
  3. Add the Open method, which is used to open the store and the Raft consensus communication:

     1// ...
     2
     3func (s *Store) Open(bootstrap bool) error {
     4  // Setup Raft configuration.
     5  config := raft.DefaultConfig()
     6  config.LocalID = raft.ServerID(s.RaftID)
     7
     8  // Setup Raft communication.
     9  addr, err := net.ResolveTCPAddr("tcp", s.RaftBind)
    10  if err != nil {
    11    return err
    12  }
    13  // Create the snapshot store. This allows the Raft to truncate the log.
    14  fss, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr)
    15  if err != nil {
    16    return fmt.Errorf("file snapshot store: %s", err)
    17  }
    18
    19  // Create the log store and stable store.
    20  ldb, err := raftpebble.New(raftpebble.WithDbDirPath(filepath.Join(s.RaftDir, "logs.dat")))
    21  if err != nil {
    22    return fmt.Errorf("new pebble: %s", err)
    23  }
    24  sdb, err := raftpebble.New(raftpebble.WithDbDirPath(filepath.Join(s.RaftDir, "stable.dat")))
    25  if err != nil {
    26    return fmt.Errorf("new pebble: %s", err)
    27  }
    28
    29  // Instantiate the transport.
    30  transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr)
    31  if err != nil {
    32    return err
    33  }
    34
    35  // Instantiate the Raft systems.
    36  ra, err := raft.NewRaft(config, s.fsm, ldb, sdb, fss, transport)
    37  if err != nil {
    38    return fmt.Errorf("new raft: %s", err)
    39  }
    40  s.raft = ra
    41
    42  // Check if there is an existing state, if not bootstrap.
    43  hasState, err := raft.HasExistingState(
    44    ldb,
    45    sdb,
    46    fss,
    47  )
    48  if err != nil {
    49    return err
    50  }
    51  if bootstrap && !hasState {
    52    slog.Info(
    53      "bootstrapping new raft node",
    54      "id",
    55      config.LocalID,
    56      "addr",
    57      transport.LocalAddr(),
    58    )
    59    config := raft.Configuration{
    60      Servers: []raft.Server{
    61        {
    62          ID:      config.LocalID,
    63          Address: transport.LocalAddr(),
    64        },
    65      },
    66    }
    67    err = s.raft.BootstrapCluster(config).Error()
    68  }
    69  return err
    70}
    

    Feel free to implement tests for the Open method. The Git repository contains the tests.

    If you read the code, there isn't anything special, but we haven't talked about the network layer of hashicorp/raft. Right now, we are using an insecure "raw" network layer: just ye olde TCP. In Travis Jeffery's book, he replaced the StreamLayer of hashicorp/raft and added TLS for peer-to-peer communication. Since this is best practice, we will do the same.

Replacing the network layer with a mutual TLS transport ๐Ÿ”—

Mutual TLS is based on one private CA that signs peer certificates. Since TLS uses asymmetric cryptography, the private key of the CA is used to sign the peer certificates. Peers use the public key of the CA (the certificate of the CA contains the public key) to verify the peer certificates.

This is called "public key cryptography". The public key is used to verify the signature of the peer certificate. The private key is used to sign the peer certificate.

SigningVerificationCertificate (Request)Signed CertificateSigned CertificateVerified Certificate CA's Private KeyCA's Public Key

Peers verifies the membership simply by checking the signature of the peer certificate. If the signature is valid, the peer is a member of the network. If the signature is invalid, the peer is not a member of the network.

To implement this, the idea is to "upgrade" the TCP connection to a TLS connection. The tls standard library contains the tls.Server and tls.Client functions to do that. The tls.Server and tls.Client functions are used to create a tls.Conn from a net.Conn.

  1. Create a file internal/store/distributed/stream_layer.go and implement the raft.StreamLayer interface:

     1package distributed
     2
     3import (
     4  "crypto/tls"
     5  "net"
     6  "time"
     7
     8  "github.com/hashicorp/raft"
     9)
    10
    11var _ raft.StreamLayer = (*TLSStreamLayer)(nil)
    12
    13type TLSStreamLayer struct {
    14  net.Listener
    15  AdvertizedAddress raft.ServerAddress
    16  ServerTLSConfig   *tls.Config
    17  ClientTLSConfig   *tls.Config
    18}
    19
    20func (s *TLSStreamLayer) Accept() (net.Conn, error) {
    21  conn, err := s.Listener.Accept()
    22  if err != nil {
    23    return nil, err
    24  }
    25  if s.ServerTLSConfig != nil {
    26    return tls.Server(conn, s.ServerTLSConfig), nil
    27  }
    28  return conn, nil
    29}
    30
    31func (s *TLSStreamLayer) PublicAddress() raft.ServerAddress {
    32  return s.AdvertizedAddress
    33}
    34
    35func (s *TLSStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
    36  dialer := &net.Dialer{Timeout: timeout}
    37  conn, err := dialer.Dial("tcp", string(address))
    38  if s.ClientTLSConfig != nil {
    39    serverName, _, serr := net.SplitHostPort(string(address))
    40    if serr != nil {
    41      serverName = string(address)
    42    }
    43    tlsConfig := s.ClientTLSConfig.Clone()
    44    tlsConfig.ServerName = serverName
    45    return tls.Client(conn, tlsConfig), err
    46  }
    47  return conn, err
    48}
    49
    
    WARNING

    Since we are using mutual TLS, we need to set the ServerName of the tls.Config to the address of the peer. The ServerName is used to verify the certificate of the peer.

    However, there is an issue with the transport using an IP instead of the address. This is because the listener resolve the address and only store the IP.

    The implementation in hashicorp/raft wrongfully uses listener.Addr() as the advertised address, which outputs an IP instead of an address. Instead, please use the fork darkness4/raft which adds the method PublicAddress() to the stream layer. This fork is reverse compatible with hashicorp/raft.

    Just use the following in your go.mod:

    1replace github.com/hashicorp/raft => github.com/darkness4/raft v1.6.3
    

    If you fear about being non-standard, feel free to fork hashicorp/raft and implement your own network layer. hashicorp/raft is mostly a minimal implement of Raft based on the paper.

    We'll fork anyway hashicorp/raft to implement an RPC to forward Apply requests to the leader.

  2. Now replace the raft.NewTCPTransport with raft.NewNetworkTransport and extend the Store to accept the TLS configurations. We use the optional function pattern:

     1 type Store struct {
     2   // RaftDir is the directory where the Stable and Logs data is stored.
     3   RaftDir string
     4   // RaftBind is the address to bind the Raft server.
     5   RaftBind string
     6   // RaftID is the ID of the local node.
     7   RaftID string
     8   // RaftAdvertisedAddr is the address other nodes should use to communicate with this node.
     9   RaftAdvertisedAddr raft.ServerAddress
    10
    11   fsm  *FSM
    12   raft *raft.Raft
    13+
    14+  StoreOptions
    15+}
    16+
    17+type StoreOptions struct {
    18+  serverTLSConfig *tls.Config
    19+  clientTLSConfig *tls.Config
    20+}
    21+
    22+type StoreOption func(*StoreOptions)
    23+
    24+func WithServerTLSConfig(config *tls.Config) StoreOption {
    25+  return func(o *StoreOptions) {
    26+    o.serverTLSConfig = config
    27+  }
    28+}
    29+
    30+func WithClientTLSConfig(config *tls.Config) StoreOption {
    31+  return func(o *StoreOptions) {
    32+    o.clientTLSConfig = config
    33+  }
    34+}
    35+
    36+func applyStoreOptions(opts []StoreOption) StoreOptions {
    37+  var options StoreOptions
    38+  for _, o := range opts {
    39+    o(&options)
    40+  }
    41+  return options
    42 }
    43
    44 func NewStore(
    45   raftDir, raftBind, raftID string,
    46   raftAdvertisedAddr raft.ServerAddress,
    47   storer Storer,
    48+  opts ...StoreOption,
    49 ) *Store {
    50+  o := applyStoreOptions(opts)
    51   return &Store{
    52     RaftDir:            raftDir,
    53     RaftBind:           raftBind,
    54     RaftID:             raftID,
    55     RaftAdvertisedAddr: raftAdvertisedAddr,
    56     fsm:                NewFSM(storer),
    57+    StoreOptions: o,
    58   }
    59 }
    60
    61
    62 func (s *Store) Open(localID string, bootstrap bool) error {
    63   // Setup Raft configuration.
    64   config := raft.DefaultConfig()
    65   config.LocalID = raft.ServerID(localID)
    66
    67-  // Setup Raft communication.
    68-  addr, err := net.ResolveTCPAddr("tcp", s.RaftBind)
    69-  if err != nil {
    70-    return err
    71-  }
    72   // Create the snapshot store. This allows the Raft to truncate the log.
    73   fss, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr)
    74   if err != nil {
    75     return fmt.Errorf("file snapshot store: %s", err)
    76   }
    77
    78   // ...
    79
    80   // Instantiate the transport.
    81-  transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr)
    82+  lis, err := net.Listen("tcp", s.RaftBind)
    83   if err != nil {
    84     return err
    85   }
    86+  transport := raft.NewNetworkTransport(&TLSStreamLayer{
    87+    Listener:          lis,
    88+    AdvertizedAddress: raft.ServerAddress(s.RaftAdvertisedAddr),
    89+    ServerTLSConfig:   s.serverTLSConfig,
    90+    ClientTLSConfig:   s.clientTLSConfig,
    91+  }, 3, 10*time.Second, os.Stderr)
    92
    
    NOTE

    The lifecycle of the listener lis is handled by the NetworkTransport. Calling s.raft.Shutdown will close the listener. Therefore, there is no need to close the listener manually.

Adding the "Join" and "Leave" methods ๐Ÿ”—

Right now, our store only works in single node mode. We need to add the "Join" and "Leave" methods to request a node to join or leave the cluster.

These methods are pretty standard and can be found in many examples, so I'm also just copying it:

 1func (s *Store) Join(id raft.ServerID, addr raft.ServerAddress) error {
 2  slog.Info("request node to join", "id", id, "addr", addr)
 3
 4  configFuture := s.raft.GetConfiguration()
 5  if err := configFuture.Error(); err != nil {
 6    slog.Error("failed to get raft configuration", "error", err)
 7    return err
 8  }
 9  // Check if the server has already joined
10  for _, srv := range configFuture.Configuration().Servers {
11    // If a node already exists with either the joining node's ID or address,
12    // that node may need to be removed from the config first.
13    if srv.ID == id || srv.Address == addr {
14      // However if *both* the ID and the address are the same, then nothing -- not even
15      // a join operation -- is needed.
16      if srv.Address == addr && srv.ID == id {
17        slog.Info(
18          "node already member of cluster, ignoring join request",
19          "id",
20          id,
21          "addr",
22          addr,
23        )
24        return nil
25      }
26
27      if err := s.raft.RemoveServer(id, 0, 0).Error(); err != nil {
28        return fmt.Errorf("error removing existing node %s at %s: %s", id, addr, err)
29      }
30    }
31  }
32
33  // Add the new server
34  return s.raft.AddVoter(id, addr, 0, 0).Error()
35}
36
37func (s *Store) Leave(id raft.ServerID) error {
38  slog.Info("request node to leave", "id", id)
39  return s.raft.RemoveServer(id, 0, 0).Error()
40}
NOTE

As you can see, Raft uses futures to make asynchronous operations. When calling .Error(), it waits for the operation to complete.

We can also add a "Shutdown", "WaitForLeader", "GetLeader" and "GetServers" method to help us with the tests and main function:

 1type Store struct {
 2  // ...
 3
 4  shutdownCh chan struct{}
 5}
 6
 7func NewStore(raftDir, raftBind, raftID string, storer Storer, opts ...StoreOption) *Store {
 8  o := applyStoreOptions(opts)
 9  return &Store{
10    // ...
11    shutdownCh:   make(chan struct{}),
12  }
13}
14
15func (s *Store) WaitForLeader(timeout time.Duration) (raft.ServerID, error) {
16  slog.Info("waiting for leader", "timeout", timeout)
17  timeoutCh := time.After(timeout)
18  ticker := time.NewTicker(time.Second)
19  defer ticker.Stop()
20  for {
21    select {
22    case <-s.shutdownCh:
23      return "", errors.New("shutdown")
24    case <-timeoutCh:
25      return "", errors.New("timed out waiting for leader")
26    case <-ticker.C:
27      addr, id := s.raft.LeaderWithID()
28      if addr != "" {
29        slog.Info("leader found", "addr", addr, "id", id)
30        return id, nil
31      }
32    }
33  }
34}
35
36func (s *Store) Shutdown() error {
37  slog.Warn("shutting down store")
38  select {
39  case s.shutdownCh <- struct{}{}:
40  default:
41  }
42  if s.raft != nil {
43    if err := s.raft.Shutdown().Error(); err != nil {
44      return err
45    }
46    s.raft = nil
47  }
48  s.fsm.storer.Clear()
49  return nil
50}
51
52func (s *Store) ShutdownCh() <-chan struct{} {
53  return s.shutdownCh
54}
55
56func (s *Store) GetLeader() (raft.ServerAddress, raft.ServerID) {
57  return s.raft.LeaderWithID()
58}
59
60func (s *Store) GetServers() ([]raft.Server, error) {
61  configFuture := s.raft.GetConfiguration()
62  if err := configFuture.Error(); err != nil {
63    return nil, err
64  }
65  return configFuture.Configuration().Servers, nil
66}

I highly recommend implementing tests for the Join and Leave methods to test the consensus. We are almost done. The Git repository contains the tests.

Sending commands ๐Ÿ”—

We are finally at the last step: sending commands to the Raft cluster. Our cluster is already able to replicate logs, but we still need to mutate the state of the store (otherwise, we wouldn't be able to play with it).

We can do this by sending commands using raft.Apply. Add the following methods to the Store struct:

 1func (s *Store) apply(req *dkvv1.Command) (any, error) {
 2  b, err := proto.Marshal(req)
 3  if err != nil {
 4    return nil, err
 5  }
 6  timeout := 10 * time.Second
 7  future := s.raft.Apply(b, timeout)
 8  if err := future.Error(); err != nil {
 9    return nil, err
10  }
11  res := future.Response()
12  if err, ok := res.(error); ok {
13    return nil, err
14  }
15  return res, nil
16}
17
18func (s *Store) Set(key string, value string) error {
19  _, err := s.apply(&dkvv1.Command{
20    Command: &dkvv1.Command_Set{
21      Set: &dkvv1.Set{
22        Key:   key,
23        Value: value,
24      },
25    },
26  })
27  return err
28}
29
30func (s *Store) Delete(key string) error {
31  _, err := s.apply(&dkvv1.Command{
32    Command: &dkvv1.Command_Delete{
33      Delete: &dkvv1.Delete{
34        Key: key,
35      },
36    },
37  })
38  return err
39}
40

As you can see, Protobuf makes things explicit and type-safe. Let's also add the getter:

1func (s *Store) Get(key string) (string, error) {
2  return s.fsm.storer.Get(key)
3}

Please add the tests for the Set, Delete and Get methods. See the Git repository for the tests.

We are done! We now have a fully functional fault-tolerant distributed key-value store. We've implemented:

  • The consensus/writer election using Raft.
  • The data exchange/spreading using the logs and RPCs of Raft.

Now, we need to actually implement an API to interact with the store. We also need to think about the node discovery, load-balancing and the client-to-server communication.

Adding an API for interactivity ๐Ÿ”—

We are going to use ConnectRPC, a slight alternative to gRPC. ConnectRPC allows JSON and gRPC clients, and is more modular than gRPC, much closer to the standard library net/http.

This allows to use the API without having to deploy a gRPC Gateway or use an Envoy proxy for Web clients.

  1. Edit the protos/dkv/v1/dkv.proto and add a service:

     1// ...
     2
     3service DkvAPI {
     4  rpc Get(GetRequest) returns (GetResponse);
     5  rpc Set(SetRequest) returns (SetResponse);
     6  rpc Delete(DeleteRequest) returns (DeleteResponse);
     7}
     8
     9message GetRequest { string key = 1; }
    10message GetResponse { string value = 1; }
    11
    12message SetRequest { string key = 1; string value = 2; }
    13message SetResponse {}
    14
    15message DeleteRequest { string key = 1; }
    16message DeleteResponse {}
    

    To avoid repeating the same messages, rename the Set and Delete messages to SetRequest and DeleteRequest. Apply the refactor accordingly across the code.

    When running make protos, the generated code will use ConnectRPC to translate the service into a stub for the server, and a client to interact with the server:

     1// Extract of gen/dkv/v1/dkvv1connect/dkv.connect.go
     2// NewDkvAPIClient constructs a client for the dkv.v1.DkvAPI service. By default, it uses the
     3// Connect protocol with the binary Protobuf Codec, asks for gzipped responses, and sends
     4// uncompressed requests. To use the gRPC or gRPC-Web protocols, supply the connect.WithGRPC() or
     5// connect.WithGRPCWeb() options.
     6//
     7// The URL supplied here should be the base URL for the Connect or gRPC server (for example,
     8// http://api.acme.com or https://acme.com/grpc).
     9func NewDkvAPIClient(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) DkvAPIClient {
    10  baseURL = strings.TrimRight(baseURL, "/")
    11  return &dkvAPIClient{
    12    get: connect.NewClient[v1.GetRequest, v1.GetResponse](
    13      httpClient,
    14      baseURL+DkvAPIGetProcedure,
    15      connect.WithSchema(dkvAPIGetMethodDescriptor),
    16      connect.WithClientOptions(opts...),
    17    ),
    18    set: connect.NewClient[v1.SetRequest, v1.SetResponse](
    19      httpClient,
    20      baseURL+DkvAPISetProcedure,
    21      connect.WithSchema(dkvAPISetMethodDescriptor),
    22      connect.WithClientOptions(opts...),
    23    ),
    24    delete: connect.NewClient[v1.DeleteRequest, v1.DeleteResponse](
    25      httpClient,
    26      baseURL+DkvAPIDeleteProcedure,
    27      connect.WithSchema(dkvAPIDeleteMethodDescriptor),
    28      connect.WithClientOptions(opts...),
    29    ),
    30  }
    31}
    32
    33
    34// NewDkvAPIHandler builds an HTTP handler from the service implementation. It returns the path on
    35// which to mount the handler and the handler itself.
    36//
    37// By default, handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf
    38// and JSON codecs. They also support gzip compression.
    39func NewDkvAPIHandler(svc DkvAPIHandler, opts ...connect.HandlerOption) (string, http.Handler) {
    40  dkvAPIGetHandler := connect.NewUnaryHandler(
    41    DkvAPIGetProcedure,
    42    svc.Get,
    43    connect.WithSchema(dkvAPIGetMethodDescriptor),
    44    connect.WithHandlerOptions(opts...),
    45  )
    46  dkvAPISetHandler := connect.NewUnaryHandler(
    47    DkvAPISetProcedure,
    48    svc.Set,
    49    connect.WithSchema(dkvAPISetMethodDescriptor),
    50    connect.WithHandlerOptions(opts...),
    51  )
    52  dkvAPIDeleteHandler := connect.NewUnaryHandler(
    53    DkvAPIDeleteProcedure,
    54    svc.Delete,
    55    connect.WithSchema(dkvAPIDeleteMethodDescriptor),
    56    connect.WithHandlerOptions(opts...),
    57  )
    58  return "/dkv.v1.DkvAPI/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    59    switch r.URL.Path {
    60    case DkvAPIGetProcedure:
    61      dkvAPIGetHandler.ServeHTTP(w, r)
    62    case DkvAPISetProcedure:
    63      dkvAPISetHandler.ServeHTTP(w, r)
    64    case DkvAPIDeleteProcedure:
    65      dkvAPIDeleteHandler.ServeHTTP(w, r)
    66    default:
    67      http.NotFound(w, r)
    68    }
    69  })
    70}
    71
    72// DkvAPIHandler is an implementation of the dkv.v1.DkvAPI service.
    73type DkvAPIHandler interface {
    74  Get(context.Context, *connect.Request[v1.GetRequest]) (*connect.Response[v1.GetResponse], error)
    75  Set(context.Context, *connect.Request[v1.SetRequest]) (*connect.Response[v1.SetResponse], error)
    76  Delete(context.Context, *connect.Request[v1.DeleteRequest]) (*connect.Response[v1.DeleteResponse], error)
    77}
    
  2. Let's implement the handler. Create the file internal/api/dkv_handler.go and add:

     1package api
     2
     3import (
     4  "context"
     5  dkvv1 "distributed-kv/gen/dkv/v1"
     6  "distributed-kv/gen/dkv/v1/dkvv1connect"
     7  "distributed-kv/internal/store/distributed"
     8
     9  "connectrpc.com/connect"
    10)
    11
    12var _ dkvv1connect.DkvAPIHandler = (*DkvAPIHandler)(nil)
    13
    14type DkvAPIHandler struct {
    15  *distributed.Store
    16}
    17
    18func (d *DkvAPIHandler) Delete(
    19  _ context.Context,
    20  req *connect.Request[dkvv1.DeleteRequest],
    21) (*connect.Response[dkvv1.DeleteResponse], error) {
    22  return &connect.Response[dkvv1.DeleteResponse]{}, d.Store.Delete(req.Msg.Key)
    23}
    24
    25func (d *DkvAPIHandler) Get(
    26  _ context.Context,
    27  req *connect.Request[dkvv1.GetRequest],
    28) (*connect.Response[dkvv1.GetResponse], error) {
    29  res, err := d.Store.Get(req.Msg.Key)
    30  if err != nil {
    31    return nil, err
    32  }
    33  return &connect.Response[dkvv1.GetResponse]{Msg: &dkvv1.GetResponse{Value: res}}, nil
    34}
    35
    36func (d *DkvAPIHandler) Set(
    37  _ context.Context,
    38  req *connect.Request[dkvv1.SetRequest],
    39) (*connect.Response[dkvv1.SetResponse], error) {
    40  return &connect.Response[dkvv1.SetResponse]{}, d.Store.Set(req.Msg.Key, req.Msg.Value)
    41}
    

    As you can see, it is pretty immediate. Actually, you could even add a Store interface instead of using the *distributed.Store to make it more modular.

  3. Therefore, let's finally add the Store interface to internal/store/store.go:

    1package store
    2
    3type Store interface {
    4  Delete(key string) error
    5  Set(key, value string) error
    6  Get(key string) (string, error)
    7}
    

    And use it in internal/api/dkv_handler.go instead of *distributed.Store:

    1type DkvAPIHandler struct {
    2  store.Store
    3}
    

    Now, you can add tests for the DkvAPIHandler. The Git repository contains the tests.

    Since we are using ConnectRPC, we are able to use the httptest to implement unit test.

Implementing the "main" function of the server ๐Ÿ”—

Usage ๐Ÿ”—

Let's think a little about the usage of our server. Let's see how etcd is deployed. We will only use a static discovery strategy.

ETCD is configured by setting:

 1# infra0
 2etcd --name infra0 --initial-advertise-peer-urls http://10.0.1.10:2380 \
 3  --listen-peer-urls http://10.0.1.10:2380 \
 4  --listen-client-urls http://10.0.1.10:2379,http://127.0.0.1:2379 \
 5  --advertise-client-urls http://10.0.1.10:2379 \
 6  --initial-cluster-token etcd-cluster-1 \
 7  --initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
 8  --initial-cluster-state new
 9
10# infra1
11etcd --name infra1 --initial-advertise-peer-urls http://10.0.1.11:2380 \
12  --listen-peer-urls http://10.0.1.11:2380 \
13  --listen-client-urls http://10.0.1.11:2379,http://127.0.0.1:2379 \
14  --advertise-client-urls http://10.0.1.11:2379 \
15  --initial-cluster-token etcd-cluster-1 \
16  --initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
17  --initial-cluster-state new
18
19# infra2
20etcd --name infra2 --initial-advertise-peer-urls http://10.0.1.12:2380 \
21  --listen-peer-urls http://10.0.1.12:2380 \
22  --listen-client-urls http://10.0.1.12:2379,http://127.0.0.1:2379 \
23  --advertise-client-urls http://10.0.1.12:2379 \
24  --initial-cluster-token etcd-cluster-1 \
25  --initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
26  --initial-cluster-state new
  • Port 2380 is used for the peer-to-peer communication.
  • Port 2379 is used for the client-to-server communication.
  • The --initial-cluster flag is used to statically define the cluster members.
  • The --initial-cluster-state flag is used to define the state of the cluster. It can be new or existing.
  • The --initial-cluster-token flag is used to define the token of the cluster.
  • The --name flag is used to define the name of the node.
  • The --initial-advertise-peer-urls flag is used to advertise peers URLs to the rest of the cluster.
  • The --listen-peer-urls flag is used to define the listen address for Raft.
  • The --listen-client-urls flag is used to define the listen address for the client.
  • The --advertise-client-urls flag is used to define to advertise listen addresses for the client, for client-side load-balancing.

We will use a similar strategy, but we won't be handling the "advertisement" of the peers, nor the "initial cluster token". The reason we want this approach instead of manually setting "bootstrap" and "join" flags is because we want the flags to be almost identical so that the replication of services is easy to handle on Kubernetes.

For example, on Kubernetes, one manifest is called StatefulSet, and it is used to deploy replicated systems like the one we are developing. However, the "templating" of the manifest is quite limited, and you may need to implement a shell script to handle the templating (which is Travis Jeffery's method).

Taking the etcd approach, we can use the Pod (the container hostname) name to pass to the name parameter. Kubernetes also automatically handles the DNS resolution of the Pod names. Meaning, knowing the number of replicas, we can easily generate the --initial-cluster flag.

For example, the command line flags will be on Kubernetes would be:

1etcd --name "$(POD_NAME)" \
2  --listen-peer-urls "${PROTOCOL}://0.0.0.0:${PEER_PORT}" \
3  --listen-client-urls "${PROTOCOL}://0.0.0.0:${CLIENT_PORT}" \
4  --initial-cluster "${INITIAL_CLUSTER}" \
5  --initial-cluster-state "${INITIAL_CLUSTER_STATE}"

Each variable would be set in a configuration file:

1# .env file
2PROTOCOL=https
3PEER_PORT=2380
4CLIENT_PORT=2379
5INITIAL_CLUSTER=infra0=https://infra0.dkv.svc.cluster.local:2380,infra1=https://infra1.dkv.svc.cluster.local:2380,infra2=https://infra2.dkv.svc.cluster.local:2380
6INITIAL_CLUSTER_STATE=new

Now, about the bootstrapping, it's very simple: we use the first node from the initial-cluster flag to bootstrap the cluster. The first node knows he will be bootstrapping since the name matches the first node in the initial-cluster flag. The other nodes will join the cluster.

We also need to implement mutual TLS here.

To summarize, these are the flags we will use:

 1   --name value                                         Unique name for this node [$DKV_NAME]
 2   --advertise-nodes value [ --advertise-nodes value ]  List of nodes to advertise [$DKV_ADVERTISE_NODES]
 3   --listen-peer-address value                          Address to listen on for peer traffic (default: ":2380") [$DKV_LISTEN_PEER_ADDRESS]
 4   --listen-client-address value                        Address listen on for client traffic (default: ":3000") [$DKV_LISTEN_CLIENT_ADDRESS]
 5   --initial-cluster value [ --initial-cluster value ]  Initial cluster configuration for bootstrapping [$DKV_INITIAL_CLUSTER]
 6   --initial-cluster-state value                        Initial cluster state (new, existing) [$DKV_INITIAL_CLUSTER_STATE]
 7   --peer-cert-file value                               Path to the peer server TLS certificate file [$DKV_PEER_CERT_FILE]
 8   --peer-key-file value                                Path to the peer server TLS key file [$DKV_PEER_KEY_FILE]
 9   --peer-trusted-ca-file value                         Path to the peer server TLS trusted CA certificate file [$DKV_PEER_TRUSTED_CA_FILE]
10   --cert-file value                                    Path to the client server TLS certificate file [$DKV_CERT_FILE]
11   --key-file value                                     Path to the client server TLS key file [$DKV_KEY_FILE]
12   --trusted-ca-file value                              Path to the client server TLS trusted CA certificate file [$DKV_TRUSTED_CA_FILE]
13   --data-dir value                                     Path to the data directory (default: "data") [$DKV_DATA_DIR]

Usage would be:

1# No mutual TLS
2dkv --name dkv-0 \
3  --initial-cluster=dkv-0=dkv-0.dkv.default.svc.cluster.local:2380,dkv-1=dkv-1.dkv.default.svc.cluster.local:2380,dkv-2=dkv-2.dkv.default.svc.cluster.local:2380 \
4  --initial-cluster-state=new \
5  --data-dir=/var/lib/dkv

Implementing the bootstrap function ๐Ÿ”—

In cmd/dkv/main.go, we will use urfave/cli to implement the command line flags. Add the following code:

  1package main
  2
  3import (
  4  "context"
  5  "crypto/tls"
  6  "distributed-kv/gen/dkv/v1/dkvv1connect"
  7  "distributed-kv/internal/api"
  8  "distributed-kv/internal/store/distributed"
  9  "distributed-kv/internal/store/persisted"
 10  internaltls "distributed-kv/internal/tls"
 11  "fmt"
 12  "log"
 13  "log/slog"
 14  "net"
 15  "net/http"
 16  "os"
 17  "strings"
 18  "time"
 19
 20  "github.com/hashicorp/raft"
 21  "github.com/joho/godotenv"
 22  "github.com/urfave/cli/v2"
 23  "golang.org/x/net/http2"
 24  "golang.org/x/net/http2/h2c"
 25)
 26
 27var (
 28  version string
 29
 30  name                string
 31  listenPeerAddress   string
 32  listenClientAddress string
 33  initialCluster      cli.StringSlice
 34  initialClusterState string
 35  advertiseNodes      cli.StringSlice
 36
 37  peerCertFile      string
 38  peerKeyFile       string
 39  peerTrustedCAFile string
 40
 41  certFile      string
 42  keyFile       string
 43  trustedCAFile string
 44
 45  dataDir string
 46)
 47
 48var app = &cli.App{
 49  Name:                 "dkv",
 50  Version:              version,
 51  Usage:                "Distributed Key-Value Store",
 52  Suggest:              true,
 53  EnableBashCompletion: true,
 54  Flags: []cli.Flag{
 55    &cli.StringFlag{
 56      Name:        "name",
 57      Usage:       "Unique name for this node",
 58      EnvVars:     []string{"DKV_NAME"},
 59      Destination: &name,
 60      Required:    true,
 61    },
 62    &cli.StringSliceFlag{
 63      Name:        "advertise-nodes",
 64      Usage:       "List of nodes to advertise",
 65      EnvVars:     []string{"DKV_ADVERTISE_NODES"},
 66      Destination: &advertiseNodes,
 67    },
 68    &cli.StringFlag{
 69      Name:        "listen-peer-address",
 70      Usage:       "Address to listen on for peer traffic",
 71      EnvVars:     []string{"DKV_LISTEN_PEER_ADDRESS"},
 72      Value:       ":2380",
 73      Destination: &listenPeerAddress,
 74    },
 75    &cli.StringFlag{
 76      Name:        "listen-client-address",
 77      Usage:       "Address listen on for client traffic",
 78      EnvVars:     []string{"DKV_LISTEN_CLIENT_ADDRESS"},
 79      Value:       ":3000",
 80      Destination: &listenClientAddress,
 81    },
 82    &cli.StringSliceFlag{
 83      Name:        "initial-cluster",
 84      Usage:       "Initial cluster configuration for bootstrapping",
 85      EnvVars:     []string{"DKV_INITIAL_CLUSTER"},
 86      Required:    true,
 87      Destination: &initialCluster,
 88    },
 89    &cli.StringFlag{
 90      Name:        "initial-cluster-state",
 91      Usage:       "Initial cluster state (new, existing)",
 92      EnvVars:     []string{"DKV_INITIAL_CLUSTER_STATE"},
 93      Required:    true,
 94      Destination: &initialClusterState,
 95    },
 96    &cli.StringFlag{
 97      Name:        "peer-cert-file",
 98      Usage:       "Path to the peer server TLS certificate file",
 99      EnvVars:     []string{"DKV_PEER_CERT_FILE"},
100      Destination: &peerCertFile,
101    },
102    &cli.StringFlag{
103      Name:        "peer-key-file",
104      Usage:       "Path to the peer server TLS key file",
105      EnvVars:     []string{"DKV_PEER_KEY_FILE"},
106      Destination: &peerKeyFile,
107    },
108    &cli.StringFlag{
109      Name:        "peer-trusted-ca-file",
110      Usage:       "Path to the peer server TLS trusted CA certificate file",
111      EnvVars:     []string{"DKV_PEER_TRUSTED_CA_FILE"},
112      Destination: &peerTrustedCAFile,
113    },
114    &cli.StringFlag{
115      Name:        "cert-file",
116      Usage:       "Path to the client server TLS certificate file",
117      EnvVars:     []string{"DKV_CERT_FILE"},
118      Destination: &certFile,
119    },
120    &cli.StringFlag{
121      Name:        "key-file",
122      Usage:       "Path to the client server TLS key file",
123      EnvVars:     []string{"DKV_KEY_FILE"},
124      Destination: &keyFile,
125    },
126    &cli.StringFlag{
127      Name:        "trusted-ca-file",
128      Usage:       "Path to the client server TLS trusted CA certificate file",
129      EnvVars:     []string{"DKV_TRUSTED_CA_FILE"},
130      Destination: &trustedCAFile,
131    },
132    &cli.StringFlag{
133      Name:        "data-dir",
134      Usage:       "Path to the data directory",
135      EnvVars:     []string{"DKV_DATA_DIR"},
136      Value:       "data",
137      Destination: &dataDir,
138    },
139  },
140  Action: func(c *cli.Context) error {
141    ctx := c.Context
142    // TODO
143    return nil
144  },
145}
146
147func main() {
148  _ = godotenv.Load(".env.local")
149  _ = godotenv.Load(".env")
150  if err := app.Run(os.Args); err != nil {
151    log.Fatal(err)
152  }
153}

Now, let's add the bootstrap function:

 1func bootstrapDStore(
 2  storer distributed.Storer,
 3  storeOpts []distributed.StoreOption,
 4) (dstore *distributed.Store, err error) {
 5  // Bootstrap
 6  nodes := initialCluster.Value()
 7  if len(nodes) == 0 {
 8    return nil, fmt.Errorf("invalid initial cluster configuration (no nodes): %s", nodes)
 9  }
10  bootstrapNode, _, ok := strings.Cut(nodes[0], "=")
11  if !ok {
12    return nil, fmt.Errorf("invalid initial cluster configuration: %s", nodes)
13  }
14  advertizedPeers := make(map[raft.ServerID]raft.ServerAddress)
15  for _, node := range nodes {
16    id, addr, ok := strings.Cut(node, "=")
17    if !ok {
18      return nil, fmt.Errorf("invalid initial cluster configuration: %s", node)
19    }
20    advertizedPeers[raft.ServerID(id)] = raft.ServerAddress(addr)
21  }
22
23  dstore = distributed.NewStore(
24    dataDir,
25    listenPeerAddress,
26    name,
27    advertizedPeers[raft.ServerID(name)],
28    storer,
29    storeOpts...,
30  )
31
32  bootstrap := initialClusterState == "new" && bootstrapNode == name
33  if err := dstore.Open(bootstrap); err != nil {
34    return nil, err
35  }
36  // Periodically try to join the peers
37  go func() {
38    ticker := time.NewTicker(5 * time.Second)
39    defer ticker.Stop()
40    for {
41      select {
42      case <-dstore.ShutdownCh():
43        slog.Error("stopped joining peers due to store shutdown")
44        return
45      case <-ticker.C:
46        leaderAddr, leaderID := dstore.GetLeader()
47        if leaderAddr == "" {
48          slog.Error("no leader")
49          continue
50        }
51        // Not leader
52        if leaderID != raft.ServerID(name) {
53          continue
54        }
55        members, err := dstore.GetServers()
56        if err != nil {
57          slog.Error("failed to get servers", "error", err)
58          continue
59        }
60      peers:
61        for id, addr := range advertizedPeers {
62          // Ignore self
63          if id == raft.ServerID(name) {
64            continue
65          }
66          // Ignore if already member
67          for _, member := range members {
68            if member.ID == id {
69              continue peers
70            }
71          }
72          slog.Info("request peer to join", "id", id, "addr", addr)
73          if err := dstore.Join(id, addr); err != nil {
74            slog.Error("failed to join peer", "id", id, "addr", addr, "error", err)
75          }
76        }
77      }
78    }
79  }()
80  return dstore, nil
81}

We've added a "join" loop to automatically join the peers. Otherwise, we would need to either use a "membership" service or manually join the peers.

Implementing helper functions for TLS ๐Ÿ”—

Now, let's add some helper functions to generate the TLS configurations:

 1func setupServerTLSConfig(crt, key, ca string) (*tls.Config, error) {
 2  var cfg tls.Config
 3  if crt != "" && key != "" {
 4    certificate, err := tls.LoadX509KeyPair(crt, key)
 5    if err != nil {
 6      return nil, err
 7    }
 8    cfg.Certificates = []tls.Certificate{certificate}
 9  }
10  if ca != "" {
11    caCert, err := os.ReadFile(ca)
12    if err != nil {
13      return nil, err
14    }
15    cfg.ClientCAs = x509.NewCertPool()
16    cfg.ClientCAs.AppendCertsFromPEM(caCert)
17    cfg.ClientAuth = tls.RequireAndVerifyClientCert
18  }
19
20  return &cfg, nil
21}
22
23func setupClientTLSConfig(crt, key, ca string) (*tls.Config, error) {
24  var cfg tls.Config
25  if crt != "" && key != "" {
26    certificate, err := tls.LoadX509KeyPair(crt, key)
27    if err != nil {
28      return nil, err
29    }
30    cfg.Certificates = []tls.Certificate{certificate}
31  }
32  if ca != "" {
33    caCert, err := os.ReadFile(ca)
34    if err != nil {
35      return nil, err
36    }
37    if cfg.RootCAs == nil {
38      cas, err := x509.SystemCertPool()
39      if err != nil {
40        cfg.RootCAs = x509.NewCertPool()
41      }
42      cfg.RootCAs = cas
43    }
44    cfg.RootCAs.AppendCertsFromPEM(caCert)
45  }
46  return &cfg, nil
47}

Assembling everything ๐Ÿ”—

Now, let's assemble everything and complete the Action function:

 1  Action: func(c *cli.Context) (err error) {
 2    ctx := c.Context
 3    // TLS configurations
 4    storeOpts := []distributed.StoreOption{}
 5    if peerCertFile != "" && peerKeyFile != "" {
 6      peerTLSConfig, err := setupServerTLSConfig(
 7        peerCertFile,
 8        peerKeyFile,
 9        peerTrustedCAFile,
10      )
11      if err != nil {
12        return err
13      }
14      storeOpts = append(storeOpts, distributed.WithServerTLSConfig(peerTLSConfig))
15    }
16
17    if (peerCertFile != "" && peerKeyFile != "") || peerTrustedCAFile != "" {
18      peerClientTLSConfig, err := setupClientTLSConfig(
19        peerCertFile,
20        peerKeyFile,
21        peerTrustedCAFile,
22      )
23      if err != nil {
24        return err
25      }
26      storeOpts = append(storeOpts, distributed.WithClientTLSConfig(peerClientTLSConfig))
27    }
28
29    var tlsConfig *tls.Config
30    if certFile != "" && keyFile != "" {
31      tlsConfig, err = setupServerTLSConfig(certFile, keyFile, trustedCAFile)
32      if err != nil {
33        return err
34      }
35    }
36
37    // Store configuration
38    store := persisted.New(dataDir)
39    defer func() {
40      _ = store.Close()
41    }()
42    dstore, err := bootstrapDStore(store, storeOpts)
43    if err != nil {
44      return err
45    }
46    defer func() {
47      err := dstore.Shutdown()
48      if err != nil {
49        slog.Error("failed to shutdown store", "error", err)
50      }
51      slog.Warn("store shutdown")
52    }()
53
54    // Routes
55    r := http.NewServeMux()
56    r.Handle(dkvv1connect.NewDkvAPIHandler(&api.DkvAPIHandler{
57      Store: dstore,
58    }))
59
60    // Start the server
61    l, err := net.Listen("tcp", listenClientAddress)
62    if err != nil {
63      return err
64    }
65    if tlsConfig != nil {
66      l = tls.NewListener(l, tlsConfig)
67    }
68    slog.Info("server listening", "address", listenClientAddress)
69    srv := &http.Server{
70      BaseContext: func(_ net.Listener) context.Context { return ctx },
71      Handler:     h2c.NewHandler(r, &http2.Server{}),
72    }
73    defer func() {
74      _ = srv.Shutdown(ctx)
75      _ = l.Close()
76      slog.Warn("server shutdown")
77    }()
78    return srv.Serve(l)
79  },

Smoke tests with Vagrant and K0s ๐Ÿ”—

While we could technically use MiniKube, I prefer to use Vagrant and K0s as it is very close to the production environment. K0s is a lightweight Kubernetes distribution that is very easy to use. It's also very easy to use Vagrant to deploy a cluster of K0s.

  1. Install Vagrant and K0sctl.

  2. Create a directory smoke-tests and generate an SSH key pair for smoke tests:

    1mkdir -p smoke-tests
    2ssh-keygen -f smoke-tests/id_rsa_smoke_test -N ""
    3chmod 600 smoke-tests/id_rsa_smoke_test*
    
  3. Add a smoke-tests/Vagrantfile. As I'm on Linux, I will be using the libvirt provider. If you are on Windows or macOS, you can use the virtualbox provider.

     1# -*- mode: ruby -*-
     2# vi: set ft=ruby :
     3
     4hosts = {
     5  "manager0" => "192.168.77.10",
     6  "worker0" => "192.168.77.11",
     7  "worker1" => "192.168.77.12",
     8}
     9
    10Vagrant.configure("2") do |config|
    11  config.vm.provider "libvirt"
    12  ssh_pub_key = File.readlines("./id_rsa_smoke_test.pub").first.strip
    13
    14  hosts.each do |name, ip|
    15    config.vm.define name do |machine|
    16      machine.vm.box = "generic/rocky9"
    17      machine.vm.network "private_network", ip: ip
    18      config.vm.provision "shell" do |s|
    19        s.inline = <<-SHELL
    20          mkdir -p ~/.ssh && touch authorized_keys
    21          echo #{ssh_pub_key} >> /home/vagrant/.ssh/authorized_keys
    22          echo #{ssh_pub_key} >> /root/.ssh/authorized_keys
    23          systemctl stop firewalld
    24          systemctl disable firewalld
    25        SHELL
    26      end
    27    end
    28  end
    29end
    

    This vagrant file will deploy 3 nodes: 1 manager and 2 workers. The nodes will be accessible via SSH using the id_rsa_smoke_test key pair.

  4. Run vagrant up to deploy the cluster. If something fails, you can run vagrant destroy -f to destroy the cluster.

  5. Create a smoke-tests/k0sctl.yaml:

      1apiVersion: k0sctl.k0sproject.io/v1beta1
      2kind: Cluster
      3metadata:
      4  name: k8s.example.com-cluster
      5spec:
      6  hosts:
      7    - ssh:
      8        address: 192.168.77.10
      9        user: root
     10        port: 22
     11        keyPath: id_rsa_smoke_test
     12      role: controller+worker
     13      hostname: manager0
     14      noTaints: true
     15      privateInterface: eth1
     16      privateAddress: 192.168.77.10
     17      installFlags:
     18        - --debug
     19      hooks:
     20        apply:
     21          before:
     22            # Set SELinux Permissive
     23            - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then sed -i s/^SELINUX=.*$/SELINUX=permissive/ /etc/selinux/config; fi'
     24            - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then setenforce 0; fi'
     25
     26    - ssh:
     27        address: 192.168.77.11
     28        user: root
     29        port: 22
     30        keyPath: id_rsa_smoke_test
     31      role: worker
     32      hostname: worker0
     33      privateInterface: eth1
     34      privateAddress: 192.168.77.11
     35      installFlags:
     36        - --debug
     37      hooks:
     38        apply:
     39          before:
     40            # Set SELinux Permissive
     41            - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then sed -i s/^SELINUX=.*$/SELINUX=permissive/ /etc/selinux/config; fi'
     42            - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then setenforce 0; fi'
     43
     44    - ssh:
     45        address: 192.168.77.12
     46        user: root
     47        port: 22
     48        keyPath: id_rsa_smoke_test
     49      role: worker
     50      hostname: worker1
     51      privateInterface: eth1
     52      privateAddress: 192.168.77.12
     53      installFlags:
     54        - --debug
     55      hooks:
     56        apply:
     57          before:
     58            # Set SELinux Permissive
     59            - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then sed -i s/^SELINUX=.*$/SELINUX=permissive/ /etc/selinux/config; fi'
     60            - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then setenforce 0; fi'
     61
     62  k0s:
     63    version: '1.29.2+k0s.0'
     64    dynamicConfig: false
     65    config:
     66      apiVersion: k0s.k0sproject.io/v1beta1
     67      kind: ClusterConfig
     68      metadata:
     69        name: k8s.example.com
     70      spec:
     71        api:
     72          k0sApiPort: 9443
     73          port: 6443
     74        installConfig:
     75          users:
     76            etcdUser: etcd
     77            kineUser: kube-apiserver
     78            konnectivityUser: konnectivity-server
     79            kubeAPIserverUser: kube-apiserver
     80            kubeSchedulerUser: kube-scheduler
     81        konnectivity:
     82          adminPort: 8133
     83          agentPort: 8132
     84        network:
     85          calico:
     86            mode: 'vxlan'
     87            overlay: Always
     88            mtu: 0
     89            wireguard: false
     90          kubeProxy:
     91            disabled: false
     92            mode: iptables
     93          kuberouter:
     94            autoMTU: true
     95            mtu: 0
     96            peerRouterASNs: ''
     97            peerRouterIPs: ''
     98          podCIDR: 10.244.0.0/16
     99          provider: calico
    100          serviceCIDR: 10.96.0.0/12
    101        podSecurityPolicy:
    102          defaultPolicy: 00-k0s-privileged
    103        storage:
    104          type: kine
    105        telemetry:
    106          enabled: false
    
  6. Run k0sctl apply --debug to deploy K0s.

  7. Fetch the kubeconfig and load it into the environment:

    1# Inside the smoke-tests directory
    2k0sctl kubeconfig > kubeconfig
    3chmod 600 kubeconfig
    4export KUBECONFIG=$(pwd)/kubeconfig
    
  8. Deploy cert-manager:

    1kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.4/cert-manager.yaml
    

    cert-manager permits us to automatically generate TLS certificates for our services.

  9. Deploy a self-signed certificate cluster issuer. Create a smoke-tests/certificates/selfsigned-cluster-issuer.yaml:

    1apiVersion: cert-manager.io/v1
    2kind: ClusterIssuer
    3metadata:
    4  name: selfsigned-cluster-issuer
    5spec:
    6  selfSigned: {}
    

    And kubectl apply -f certificates/selfsigned-cluster-issuer.yaml.

  10. Deploy the CA. Create a smoke-tests/certificates/root-ca.yaml:

     1apiVersion: cert-manager.io/v1
     2kind: Certificate
     3metadata:
     4  name: root-ca
     5  namespace: cert-manager
     6spec:
     7  secretName: root-ca-secret
     8  duration: 2160h # 90d
     9  renewBefore: 360h # 15d
    10  subject:
    11    organizations: [My Awesome Company]
    12    countries: [FR]
    13    organizationalUnits: [IT]
    14    localities: [Paris]
    15  commonName: My Root CA
    16  isCA: true
    17  privateKey:
    18    algorithm: RSA
    19    encoding: PKCS1
    20    size: 2048
    21  issuerRef:
    22    name: selfsigned-cluster-issuer
    23    kind: ClusterIssuer
    

    And kubectl apply -f certificates/root-ca.yaml.

  11. Deploy the CA Cluster Issuer. Create a smoke-tests/certificates/private-cluster-issuer.yaml:

    1apiVersion: cert-manager.io/v1
    2kind: ClusterIssuer
    3metadata:
    4  name: private-cluster-issuer
    5spec:
    6  ca:
    7    secretName: root-ca-secret
    

    And kubectl apply -f certificates/private-cluster-issuer.yaml.

  12. Deploy the peer server certificate. Create a smoke-tests/certificates/peer-certificate.yaml:

     1apiVersion: cert-manager.io/v1
     2kind: Certificate
     3metadata:
     4  name: dkv-peer-cert
     5  namespace: default
     6spec:
     7  secretName: dkv-peer-cert-secret
     8  duration: 2160h # 90d
     9  renewBefore: 360h # 15d
    10  subject:
    11    organizations: [My Awesome Company]
    12    countries: [FR]
    13    organizationalUnits: [IT]
    14    localities: [Paris]
    15  commonName: dkv.default.svc.cluster.local
    16  dnsNames:
    17    - dkv.default.svc.cluster.local
    18    - dkv-0.dkv.default.svc.cluster.local
    19    - dkv-1.dkv.default.svc.cluster.local
    20    - dkv-2.dkv.default.svc.cluster.local
    21  issuerRef:
    22    name: private-cluster-issuer
    23    kind: ClusterIssuer
    24  usages:
    25    - server auth
    26    - client auth
    27    - key encipherment
    28    - digital signature
    

    We will deploy our StatefulSet inside the default namespace with the headless service dkv, thus the name dkv.default.svc.cluster.local. The dkv-0, dkv-1 and dkv-2 are the StatefulSet pods (replicas).

    Run kubectl apply -f certificates/peer-certificate.yaml.

  13. Deploy a local storage provider:

    1kubectl apply -f https://raw.githubusercontent.com/rancher/local-path-provisioner/v0.0.26/deploy/local-path-storage.yaml
    

    Volumes can be created using the local-path storage class. Local path volumes are bound to one node, and are not replicated. Trying to schedule a pod where the volume is not available will result in a Pending state.

    To avoid this, our StatefulSet will set the topologySpreadConstraints. This will ensure that the pods are scheduled on different nodes.

  14. (Optional, for local/private development) Either deploy a registry, or use a public registry:

     1# smoke-tests/manifests/registry/deployment.yaml
     2apiVersion: apps/v1
     3kind: Deployment
     4metadata:
     5  name: registry
     6spec:
     7  selector:
     8    matchLabels:
     9      app: registry
    10  template:
    11    metadata:
    12      labels:
    13        app: registry
    14    spec:
    15      containers:
    16        - name: registry
    17          image: registry:2
    18          resources:
    19            limits:
    20              memory: '128Mi'
    21              cpu: '500m'
    22          ports:
    23            - containerPort: 5000
    24              name: http
    
     1# smoke-tests/manifests/registry/service.yaml
     2apiVersion: v1
     3kind: Service
     4metadata:
     5  name: registry
     6spec:
     7  type: NodePort
     8  selector:
     9    app: registry
    10  ports:
    11    - port: 5000
    12      nodePort: 30000
    13      name: http
    14      targetPort: http
    
    1kubectl apply -f manifests/registry/
    

    We use a NodePort service to expose the registry. The registry will be accessible on the port 30000 of the nodes. Note that the registry is also using the memory as a storage backend. This is not recommended for production.

  15. (Optional) Build the image and push it to the registry:

    1# In the project directory, not smoke-tests
    2docker build -t 192.168.77.10:30000/library/dkv:v1 .
    3docker push 192.168.77.10:30000/library/dkv:v1
    

    Note that you may need to configure your Docker or Podman to allow insecure registries.

  16. Deploy the StatefulSet and the headless service:

     1# smoke-tests/manifests/dkv/statefulset.yaml
     2apiVersion: apps/v1
     3kind: StatefulSet
     4metadata:
     5  name: dkv
     6spec:
     7  selector:
     8    matchLabels:
     9      app: dkv
    10  serviceName: dkv
    11  replicas: 3
    12  template:
    13    metadata:
    14      labels:
    15        app: dkv
    16    spec:
    17      topologySpreadConstraints:
    18        - maxSkew: 1
    19          topologyKey: kubernetes.io/hostname
    20          labelSelector:
    21            matchLabels:
    22              app: dkv
    23          whenUnsatisfiable: DoNotSchedule
    24      securityContext:
    25        runAsUser: 1000
    26        runAsGroup: 1000
    27        fsGroup: 1000
    28      containers:
    29        - name: dkv
    30          # The URL is localhost since the registry is exposed on the nodes.
    31          image: localhost:30000/library/dkv:latest
    32          # Use this image if you don't want to build the image and deploy on a local registry.
    33          # image: ghcr.io/darkness4/dkv:dev
    34          imagePullPolicy: Always
    35          securityContext:
    36            runAsUser: 1000
    37            runAsGroup: 1000
    38          ports:
    39            - containerPort: 2380
    40              name: raft
    41            - containerPort: 3000
    42              name: http
    43          env:
    44            # DKV_NAME uses the pod name
    45            - name: DKV_NAME
    46              valueFrom:
    47                fieldRef:
    48                  fieldPath: metadata.name
    49            # DKV_INITIAL_CLUSTER does not need to set the protocols since we are using TCP over TLS
    50            - name: DKV_INITIAL_CLUSTER
    51              value: dkv-0=dkv-0.dkv.default.svc.cluster.local:2380,dkv-1=dkv-1.dkv.default.svc.cluster.local:2380,dkv-2=dkv-2.dkv.default.svc.cluster.local:2380
    52            # DKV_INITIAL_CLUSTER_STATE is set to new. After deploying successfully, change it to existing.
    53            - name: DKV_INITIAL_CLUSTER_STATE
    54              value: new
    55            - name: DKV_PEER_CERT_FILE
    56              value: /etc/dkv/peer-certs/tls.crt
    57            - name: DKV_PEER_KEY_FILE
    58              value: /etc/dkv/peer-certs/tls.key
    59            - name: DKV_PEER_TRUSTED_CA_FILE
    60              value: /etc/dkv/peer-certs/ca.crt
    61            # For now, the client-side will be insecure. We will change it later.
    62            # - name: DKV_CERT_FILE
    63            #   value: ""
    64            # - name: DKV_KEY_FILE
    65            #   value: ""
    66            # - name: DKV_TRUSTED_CA_FILE
    67            #   value: ""
    68            - name: DKV_DATA_DIR
    69              value: /data
    70          volumeMounts:
    71            - name: data
    72              mountPath: /data
    73            - name: peer-certs
    74              mountPath: /etc/dkv/peer-certs
    75      volumes:
    76        - name: peer-certs
    77          secret:
    78            secretName: dkv-peer-cert-secret
    79  volumeClaimTemplates: # This adds one volume per pod.
    80    - metadata:
    81        name: data
    82      spec:
    83        accessModes: ['ReadWriteOnce']
    84        storageClassName: local-path
    85        resources:
    86          requests:
    87            storage: 1Gi
    
     1apiVersion: v1
     2kind: Service
     3metadata:
     4  name: dkv
     5spec:
     6  clusterIP: None # This defines that the service is headless
     7  selector:
     8    app: dkv
     9  ports:
    10    - port: 2380
    11      name: raft
    12      targetPort: raft
    13    - port: 3000
    14      name: http
    15      targetPort: http
    

    Now run:

    1kubectl apply -f manifests/dkv/
    
  17. Now, our DKV cluster should be running! Look at Lens/K9s/kubectl to see the pods and services.

    image-20240314021113799

Implementing the client ๐Ÿ”—

Usage ๐Ÿ”—

Time to do a CLI client. We expect the usage to be something like:

1dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> get <key>
2dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> set <key> <value>
3dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> delete <key>

We can also add Raft management commands:

1dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> member-join <name> <address>
2dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> member-leave <name>
3dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> member-list

Though, we will need to add a new service. For now, let's focus on the get, set and delete commands.

About client-side load balancing ๐Ÿ”—

Before implementing the client, we need to think about client-side load balancing. Right now, only the leader is able to "write" to the store. The other nodes are only able to "read" from the store. This is because we are using the Raft consensus algorithm, and the leader is the only one able to commit the logs. The other nodes are only able to replicate the logs.

This issue is covered in a blog article from the gRPC team. This article presents 3 load balancing strategies:

  • Thick client-side load balancing: the client keeps track of the leader and sends the "write" requests to the leader. "read" requests are sent to any node following a load balancing strategy like round-robin.
  • Proxy load balancing: the client sends the requests to a proxy, and the proxy forwards the "write" requests to the leader. "read" requests are forwarded to any node following a load balancing strategy like round-robin.
  • Look-aside load balancing: the client keeps track of the leader by querying a service discovery system. The client sends the "write" requests to the leader. "read" requests are sent to any node following a load balancing strategy like round-robin.

In Travis's book, he uses the "thick client-side load balancing" strategy: he uses the pre-integrated load balancing feature in gRPC by adding a loadBalancingConfig in the serviceConfig. More precisely:

  • He creates a custom resolver that resolves the addresses of the nodes using a getter in the gRPC service. (Another method would be to resolve from DNS.)
  • He implemented a custom load balancer with a custom picker that picks the leader for "write" requests and any node for "read" requests.

Another solution made by Jillie uses gRPC health checking to load balance the requests, though, he doesn't implement a custom load balancer.

So what do we do?

As you know, I'm using ConnectRPC, which doesn't include these "embedded" features. However, we can still try to implement client-side load balancing by doing something similar to etcd:

  1. Nodes can advertize specific nodes using the --advertise-nodes flag (with a small modification to include the ID of the node).
  2. We retrieve the list of nodes from a node using a gRPC service. This is the list of addresses with their roles. The list may or may not contain the leader.
  3. We balance the load of "read" requests on all nodes, either by round-robin, random, or any other strategy. Preferably, we send the "write" request to the leader, otherwise we send it to any node.
  4. We implement server-side request forwarding. Followers forward write requests to the leader.
Client-side load balancingclientpeer1peer2peer3 ask for the list of nodeslist of nodesreadreadread
Server-Side Forwardingclientpeer1 (leader)peer2peer3 forward writeforward writewrite (immediate response)writewrite

This assures us that the client is able to "write" even if it hits a follower while having a minimum of load-balancing and efficiency.

Implementing server-side request forwarding ๐Ÿ”—

WARNING

This part can be quite complex. The idea is to create a custom RPC which means we need to fork hashicorp/raft to handle this RPC. While this looks like bad practice, this is actually common as hashicorp/raft is too coupled to the paper implementation.

Actually, Jillie or Travis's method might be much simpler. I personally chose to implement server-side request forwarding to have a high-performance load-balancing strategy. You may also be interested into go-libp2p-raft which is a fork of hashicorp/raft that uses libp2p for the transport layer. This project is also used by IPFS. rqlite (a distributed SQLite database) forwards via HTTP.

Anyway, this issue is still open to this day. Feel free to follow the discussion.

hashicorp/raft packs its RPCs following this layout:

  • 1 uint8 declaring the RPC type. Right now, the RPC types are:
    • AppendEntries: used to replicate logs.
    • RequestVote: used to request votes.
    • InstallSnapshot: used to install snapshots.
    • TimeoutNow: used to request a new election.
  • Encoded data using github.com/hashicorp/go-msgpack/v2/codec.

Sadly, there is no simple way to inject a new RPC in hashicorp/raft. Therefore, we need to fork hashicorp/raft and add the new RPC. Technically, we could add a connection multiplexer on the listener of the StreamLayer which can decide if the RPC is a Raft RPC or a custom RPC (Travis Jeffery's method). However, this approach is also complex.

Therefore, we won't take this approach, and instead we edit raft.go, transport.go and net_transport.go to support our new RPC. The repository is available at github.com/Darkness4/raft@v1.6.3. Inject this repository in your go.mod file:

1require (
2  // ...
3)
4
5replace github.com/hashicorp/raft => github.com/darkness4/raft v1.6.3

You can compare the difference on github.com/hashicorp/raft/compare/main...Darkness4:raft:main (look for ForwardApply).

Now, the Raft struct has a new method ForwardApply. Use this method in the apply method of the Store:

 1 func (s *Store) apply(req *dkvv1.Command) (any, error) {
 2   b, err := proto.Marshal(req)
 3   if err != nil {
 4     return nil, err
 5   }
 6+  addr, id := s.raft.LeaderWithID()
 7+  if addr == "" || id == "" {
 8+    return nil, errors.New("no leader")
 9+  }
10   timeout := 10 * time.Second
11
12+  if id != raft.ServerID(s.RaftID) {
13+    return nil, s.raft.ForwardApply(id, addr, b, timeout)
14+  }
15
16   future := s.raft.Apply(b, timeout)
17   if err := future.Error(); err != nil {
18     return nil, err
19   }
20   res := future.Response()
21   if err, ok := res.(error); ok {
22     return nil, err
23   }
24   return res, nil
25 }

Now that we have server-side request forwarding, we can add the discovery service.

Implementing the membership service ๐Ÿ”—

The membership service will help us manage the membership of the cluster. We will also be able to retrieve the list of nodes and their roles.

Technically, you would also implement RBAC here (using Casbin for example). RBAC is not the subject of this article though, but Travis Jeffery covers this in his book.

  1. In the protobuf file, add the MembershipAPI service:

     1service MembershipAPI {
     2  rpc GetServers(GetServersRequest) returns (GetServersResponse);
     3  rpc JoinServer(JoinServerRequest) returns (JoinServerResponse);
     4  rpc LeaveServer(LeaveServerRequest) returns (LeaveServerResponse);
     5}
     6
     7message Server {
     8  string id = 1;
     9  string raft_address = 2;
    10  string rpc_address = 3;
    11  bool is_leader = 4;
    12}
    13
    14message GetServersRequest {}
    15message GetServersResponse { repeated Server servers = 1; }
    16
    17message JoinServerRequest {
    18  string id = 1;
    19  string address = 2;
    20}
    21message JoinServerResponse {}
    22
    23message LeaveServerRequest { string id = 1; }
    24message LeaveServerResponse {}
    

    Run make protos to generate the Go code.

  2. Create a file internal/api/membership_handler.go. Remember that we want the service to output the list of nodes based on the --advertise-nodes flag.

     1package api
     2
     3import (
     4  "context"
     5  dkvv1 "distributed-kv/gen/dkv/v1"
     6  "distributed-kv/gen/dkv/v1/dkvv1connect"
     7  "distributed-kv/internal/store/distributed"
     8
     9  "connectrpc.com/connect"
    10  "github.com/hashicorp/raft"
    11)
    12
    13var _ dkvv1connect.MembershipAPIHandler = (*MembershipAPIHandler)(nil)
    14
    15type MembershipAPIHandler struct {
    16  AdvertiseNodes map[raft.ServerID]string
    17  Store          *distributed.Store
    18}
    19
    20func (m *MembershipAPIHandler) GetServers(
    21  context.Context,
    22  *connect.Request[dkvv1.GetServersRequest],
    23) (*connect.Response[dkvv1.GetServersResponse], error) {
    24  srvs, err := m.Store.GetServers()
    25  if err != nil {
    26    return nil, err
    27  }
    28  protoServers := make([]*dkvv1.Server, 0, len(srvs))
    29  leaderAddr, leaderID := m.Store.GetLeader()
    30  for _, node := range srvs {
    31    protoServers = append(protoServers, &dkvv1.Server{
    32      Id:          string(node.ID),
    33      RaftAddress: string(node.Address),
    34      RpcAddress:  m.AdvertiseNodes[node.ID],
    35      IsLeader:    node.ID == leaderID && node.Address == leaderAddr,
    36    })
    37  }
    38
    39  return &connect.Response[dkvv1.GetServersResponse]{
    40    Msg: &dkvv1.GetServersResponse{
    41      Servers: protoServers,
    42    },
    43  }, nil
    44}
    45
    46func (m *MembershipAPIHandler) JoinServer(
    47  _ context.Context,
    48  req *connect.Request[dkvv1.JoinServerRequest],
    49) (*connect.Response[dkvv1.JoinServerResponse], error) {
    50  return &connect.Response[dkvv1.JoinServerResponse]{}, m.Store.Join(
    51    raft.ServerID(req.Msg.GetId()),
    52    raft.ServerAddress(req.Msg.GetAddress()),
    53  )
    54}
    55
    56func (m *MembershipAPIHandler) LeaveServer(
    57  _ context.Context,
    58  req *connect.Request[dkvv1.LeaveServerRequest],
    59) (*connect.Response[dkvv1.LeaveServerResponse], error) {
    60  return &connect.Response[dkvv1.LeaveServerResponse]{}, m.Store.Leave(
    61    raft.ServerID(req.Msg.GetId()),
    62  )
    63}
    
  3. In cmd/dkv/main.go, add the MembershipAPIHandler to the router:

     1 var (
     2+  advertiseNodes      cli.StringSlice
     3   // ...
     4 )
     5
     6 var app = &cli.App{
     7   // ...
     8   Flags: []cli.Flag{
     9     // ...
    10+    &cli.StringSliceFlag{
    11+      Name:        "advertise-nodes",
    12+      Usage:       "List of nodes to advertise",
    13+      EnvVars:     []string{"DKV_ADVERTISE_NODES"},
    14+      Required:    true,
    15+      Destination: &advertiseNodes,
    16+    },
    17   // ...
    18   },
    19   Action: func(c *cli.Context) (err error) {
    20     // ...
    21     r := http.NewServeMux()
    22     r.Handle(dkvv1connect.NewDkvAPIHandler(api.NewDkvAPIHandler(dstore)))
    23
    24+    nodes := make(map[raft.ServerID]string)
    25+    for _, node := range advertiseNodes.Value() {
    26+      id, addr, ok := strings.Cut(node, "=")
    27+      if !ok {
    28+        slog.Error("invalid initial cluster configuration", "node", node)
    29+        continue
    30+      }
    31+      nodes[raft.ServerID(id)] = addr
    32+    }
    33+    r.Handle(dkvv1connect.NewMembershipAPIHandler(&api.MembershipAPIHandler{
    34+      AdvertiseNodes: nodes,
    35+      Store:          dstore,
    36+    }))
    

Implementing the "main" function of the client ๐Ÿ”—

Finally! We can implement the "main" function of the client.

Add the following code to cmd/dkvctl/main.go:

  1package main
  2
  3import (
  4  "context"
  5  "crypto/tls"
  6  "fmt"
  7  "log"
  8  "math/rand"
  9  "net"
 10  "net/http"
 11  "os"
 12  "strconv"
 13
 14  dkvv1 "distributed-kv/gen/dkv/v1"
 15  "distributed-kv/gen/dkv/v1/dkvv1connect"
 16
 17  "connectrpc.com/connect"
 18  "github.com/joho/godotenv"
 19  "github.com/urfave/cli/v2"
 20  "golang.org/x/net/http2"
 21)
 22
 23var (
 24  version string
 25
 26  certFile      string
 27  keyFile       string
 28  trustedCAFile string
 29  endpoint      string
 30)
 31
 32var (
 33  dkvClient              dkvv1connect.DkvAPIClient
 34  leaderDkvClient        dkvv1connect.DkvAPIClient
 35  membershipClient       dkvv1connect.MembershipAPIClient
 36  leaderMembershipClient dkvv1connect.MembershipAPIClient
 37)
 38
 39var app = &cli.App{
 40  Name:                 "dkvctl",
 41  Version:              version,
 42  Usage:                "Distributed Key-Value Store Client",
 43  Suggest:              true,
 44  EnableBashCompletion: true,
 45  Flags: []cli.Flag{
 46    &cli.StringFlag{
 47      Name:        "cert",
 48      Usage:       "Client certificate file",
 49      EnvVars:     []string{"DKVCTL_CERT"},
 50      Destination: &certFile,
 51    },
 52    &cli.StringFlag{
 53      Name:        "key",
 54      Usage:       "Client key file",
 55      EnvVars:     []string{"DKVCTL_KEY"},
 56      Destination: &keyFile,
 57    },
 58    &cli.StringFlag{
 59      Name:        "cacert",
 60      Usage:       "Trusted CA certificate file",
 61      EnvVars:     []string{"DKVCTL_CACERT"},
 62      Destination: &trustedCAFile,
 63    },
 64    &cli.StringFlag{
 65      Name:        "endpoint",
 66      Usage:       "Server endpoint",
 67      EnvVars:     []string{"DKVCTL_ENDPOINT"},
 68      Destination: &endpoint,
 69      Required:    true,
 70    },
 71  },
 72  Before: func(c *cli.Context) (err error) {
 73    // TLS configuration
 74    var tlsConfig *tls.Config = nil
 75    if (certFile != "" && keyFile != "") || trustedCAFile != "" {
 76      tlsConfig, err = setupClientTLSConfig(certFile, keyFile, trustedCAFile)
 77      if err != nil {
 78        return err
 79      }
 80    }
 81
 82    http := &http.Client{
 83      Transport: &http2.Transport{
 84        AllowHTTP: true,
 85        DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
 86          var d net.Dialer
 87          conn, err := d.DialContext(ctx, network, addr)
 88          if tlsConfig != nil {
 89            serverName, _, serr := net.SplitHostPort(addr)
 90            if serr != nil {
 91              serverName = addr
 92            }
 93            tlsConfig := tlsConfig.Clone()
 94            tlsConfig.ServerName = serverName
 95            return tls.Client(conn, tlsConfig), err
 96          }
 97          return conn, err
 98        },
 99      },
100    }
101    scheme := "http://"
102    if tlsConfig != nil {
103      scheme = "https://"
104    }
105    dkvClient = dkvv1connect.NewDkvAPIClient(http, scheme+endpoint, connect.WithGRPC())
106    membershipClient = dkvv1connect.NewMembershipAPIClient(
107      http,
108      scheme+endpoint,
109      connect.WithGRPC(),
110    )
111    leaderEndpoint := findEndpoint(c.Context)
112    if leaderEndpoint == "" {
113      leaderEndpoint = endpoint
114    }
115    leaderDkvClient = dkvv1connect.NewDkvAPIClient(
116      http,
117      scheme+leaderEndpoint,
118      connect.WithGRPC(),
119    )
120    leaderMembershipClient = dkvv1connect.NewMembershipAPIClient(
121      http,
122      scheme+leaderEndpoint,
123      connect.WithGRPC(),
124    )
125    return nil
126  },
127  // get, set, delete, member-join, member-leave, member-list
128  Commands: []*cli.Command{
129    {
130      Name:      "get",
131      Usage:     "Get the value of a key",
132      ArgsUsage: "KEY",
133      Action: func(c *cli.Context) error {
134        ctx := c.Context
135        key := c.Args().First()
136        if key == "" {
137          return cli.ShowCommandHelp(c, "get")
138        }
139        resp, err := dkvClient.Get(ctx, &connect.Request[dkvv1.GetRequest]{
140          Msg: &dkvv1.GetRequest{
141            Key: key,
142          },
143        })
144        if err != nil {
145          return err
146        }
147        fmt.Println(resp.Msg.GetValue())
148        return nil
149      },
150    },
151    {
152      Name:      "set",
153      Usage:     "Set the value of a key",
154      ArgsUsage: "KEY VALUE",
155      Action: func(c *cli.Context) error {
156        ctx := c.Context
157        key := c.Args().Get(0)
158        value := c.Args().Get(1)
159        if key == "" || value == "" {
160          return cli.ShowCommandHelp(c, "set")
161        }
162        _, err := leaderDkvClient.Set(ctx, &connect.Request[dkvv1.SetRequest]{
163          Msg: &dkvv1.SetRequest{
164            Key:   key,
165            Value: value,
166          },
167        })
168        return err
169      },
170    },
171    {
172      Name:      "delete",
173      Usage:     "Delete a key",
174      ArgsUsage: "KEY",
175      Action: func(c *cli.Context) error {
176        ctx := c.Context
177        key := c.Args().First()
178        if key == "" {
179          return cli.ShowCommandHelp(c, "delete")
180        }
181        _, err := leaderDkvClient.Delete(ctx, &connect.Request[dkvv1.DeleteRequest]{
182          Msg: &dkvv1.DeleteRequest{
183            Key: key,
184          },
185        })
186        return err
187      },
188    },
189    {
190      Name:      "member-join",
191      Usage:     "Join the cluster",
192      ArgsUsage: "ID ADDRESS",
193      Action: func(c *cli.Context) error {
194        ctx := c.Context
195        id := c.Args().Get(0)
196        address := c.Args().Get(1)
197        if id == "" || address == "" {
198          return cli.ShowCommandHelp(c, "member-join")
199        }
200        _, err := leaderMembershipClient.JoinServer(
201          ctx,
202          &connect.Request[dkvv1.JoinServerRequest]{
203            Msg: &dkvv1.JoinServerRequest{
204              Id:      id,
205              Address: address,
206            },
207          },
208        )
209        return err
210      },
211    },
212    {
213      Name:      "member-leave",
214      Usage:     "Leave the cluster",
215      ArgsUsage: "ID",
216      Action: func(c *cli.Context) error {
217        ctx := c.Context
218        id := c.Args().First()
219        if id == "" {
220          return cli.ShowCommandHelp(c, "member-leave")
221        }
222        _, err := leaderMembershipClient.LeaveServer(
223          ctx,
224          &connect.Request[dkvv1.LeaveServerRequest]{
225            Msg: &dkvv1.LeaveServerRequest{
226              Id: id,
227            },
228          },
229        )
230        return err
231      },
232    },
233    {
234      Name:  "member-list",
235      Usage: "List the cluster members",
236      Action: func(c *cli.Context) error {
237        ctx := c.Context
238        resp, err := membershipClient.GetServers(
239          ctx,
240          &connect.Request[dkvv1.GetServersRequest]{
241            Msg: &dkvv1.GetServersRequest{},
242          },
243        )
244        if err != nil {
245          return err
246        }
247        fmt.Println("ID\t| Raft Address\t| RPC Address\t| Leader")
248        for _, server := range resp.Msg.GetServers() {
249          fmt.Printf(
250            "%s\t| %s\t| %s\t| %s\n",
251            server.GetId(),
252            server.GetRaftAddress(),
253            server.GetRpcAddress(),
254            strconv.FormatBool(server.GetIsLeader()),
255          )
256        }
257        return nil
258      },
259    },
260  },
261}
262
263func findEndpoint(ctx context.Context) (addr string) {
264  servers, err := membershipClient.GetServers(ctx, &connect.Request[dkvv1.GetServersRequest]{
265    Msg: &dkvv1.GetServersRequest{},
266  })
267  if err != nil {
268    return ""
269  }
270  // No server? Use the RPC that was provided.
271  if len(servers.Msg.GetServers()) == 0 {
272    return ""
273  }
274  // Filter the server and only get the servers with RPC address
275  advertisedServers := make([]*dkvv1.Server, len(servers.Msg.GetServers()))
276  for _, server := range servers.Msg.GetServers() {
277    if server.GetRpcAddress() != "" {
278      advertisedServers = append(advertisedServers, server)
279    }
280  }
281  // No advertised server? Use the RPC that was provided.
282  if len(advertisedServers) == 0 {
283    return ""
284  }
285  // Find the leader
286  for _, server := range advertisedServers {
287    // Request the first leader.
288    if server.GetIsLeader() {
289      return server.GetRpcAddress()
290    }
291  }
292
293  // No leader? Request random server.
294  idx := rand.Intn(len(advertisedServers))
295  return advertisedServers[idx].GetRpcAddress()
296}
297
298func setupClientTLSConfig(crt, key, ca string) (*tls.Config, error) {
299  var cfg tls.Config
300  if crt != "" && key != "" {
301    certificate, err := tls.LoadX509KeyPair(crt, key)
302    if err != nil {
303      return nil, err
304    }
305    cfg.Certificates = []tls.Certificate{certificate}
306  }
307  if ca != "" {
308    caCert, err := os.ReadFile(ca)
309    if err != nil {
310      return nil, err
311    }
312    if cfg.RootCAs == nil {
313      cas, err := x509.SystemCertPool()
314      if err != nil {
315        cfg.RootCAs = x509.NewCertPool()
316      }
317      cfg.RootCAs = cas
318    }
319    cfg.RootCAs.AppendCertsFromPEM(caCert)
320  }
321  return &cfg, nil
322}
323
324func main() {
325  _ = godotenv.Load(".env.local")
326  _ = godotenv.Load(".env")
327  if err := app.Run(os.Args); err != nil {
328    log.Fatal(err)
329  }
330}

Code dump. But technically, this is quite simple.

We invoke the dkvClient and leaderDkvClient to send the requests. We also invoke the membershipClient and leaderMembershipClient to send the membership requests.

Write commands tries to send the request to the leader. If the leader is not available, the request is sent to any node (findEndpoint).

Read commands are immediately sent to the indicated node (endpoint).

Exposing the nodes to the external network ๐Ÿ”—

Edit the smoke-tests/manifests/dkv/service.yaml and append:

 1---
 2apiVersion: v1
 3kind: Service
 4metadata:
 5  name: dkv-nodeport
 6spec:
 7  type: NodePort
 8  selector:
 9    app: dkv
10  ports:
11    - port: 3000
12      name: http
13      nodePort: 30001
14      targetPort: http

This will make the service available on the port 30001 of the nodes. The service automatically load-balances the requests to the pods using a random strategy.

Run kubectl apply -f manifests/dkv/service.yaml.

To healthcheck the cluster, run dkvctl --endpoint 192.168.77.10:30001 member-list.

Setting mutual TLS on the client-side ๐Ÿ”—

Setting TLS for dkvctl

Add one more certificate smoke-tests/certificates/dkvctl-certificate.yaml for the client:

 1apiVersion: cert-manager.io/v1
 2kind: Certificate
 3metadata:
 4  name: dkvctl-cert
 5  namespace: default
 6spec:
 7  secretName: dkvctl-cert-secret
 8  duration: 2160h # 90d
 9  renewBefore: 360h # 15d
10  subject:
11    organizations: [My Awesome Company]
12    countries: [FR]
13    organizationalUnits: [IT]
14    localities: [Paris]
15  commonName: dkvctl
16  issuerRef:
17    name: private-cluster-issuer
18    kind: ClusterIssuer
19  usages:
20    - client auth
21    - key encipherment
22    - digital signature

And run kubectl apply -f certificates/dkvctl-certificate.yaml.

Get the certificates:

1kubectl get secret dkvctl-cert-secret -o jsonpath='{.data.tls\.crt}' | base64 -d > dkvctl.crt
2kubectl get secret dkvctl-cert-secret -o jsonpath='{.data.tls\.key}' | base64 -d > dkvctl.key
3kubectl get secret dkvctl-cert-secret -o jsonpath='{.data.ca\.crt}' | base64 -d > dkvctl-ca.crt

Setting TLS for the server

Add one more certificate for the server, client-side smoke-tests/certificates/public-certificate.yaml:

 1apiVersion: cert-manager.io/v1
 2kind: Certificate
 3metadata:
 4  name: dkv.example.com-cert
 5  namespace: default
 6spec:
 7  secretName: dkv.example.com-cert-secret
 8  duration: 2160h # 90d
 9  renewBefore: 360h # 15d
10  subject:
11    organizations: [My Awesome Company]
12    countries: [FR]
13    organizationalUnits: [IT]
14    localities: [Paris]
15  commonName: dkv.example.com
16  dnsNames:
17    - dkv.example.com
18  issuerRef:
19    name: private-cluster-issuer
20    kind: ClusterIssuer
21  usages:
22    - server auth
23    - key encipherment
24    - digital signature

And run kubectl apply -f certificates/public-certificate.yaml.

Edit the smoke-tests/manifests/statefulset.yaml:

 1     - name: dkv
 2       # ...
 3       env:
 4         # ...
 5+        - name: DKV_CERT_FILE
 6+          value: /etc/dkv/certs/tls.crt
 7+        - name: DKV_KEY_FILE
 8+          value: /etc/dkv/certs/tls.key
 9+        - name: DKV_TRUSTED_CA_FILE
10+          value: /etc/dkv/certs/ca.crt
11       volumeMounts:
12         # ...
13+        - name: certs
14+          mountPath: /etc/dkv/certs
15   volumes:
16     # ...
17+    - name: certs
18+      secret:
19+        secretName: dkv.example.com-cert-secret

Run kubectl apply -f manifests/dkv/statefulset.yaml.

Testing the client

Edit your DNS or your /etc/hosts to forward dkv.example.com to 192.168.77.10:

1sudo su
2echo "192.168.77.10 dkv.example.com" >> /etc/hosts

Now, you can test your cluster simply by running:

1dkvctl --cacert dkvctl-ca.crt --cert dkvctl.crt --key dkvctl.key --endpoint dkv.example.com:30001 set key1 value1

Testing failures ๐Ÿ”—

If you try to kill the leader, you will see that the client is still able to write to the store and the leader has changed:

 1kubectl delete pod dkv-0
 2./bin/dkvctl \
 3  --cacert dkvctl-ca.crt \
 4  --cert dkvctl.crt \
 5  --key dkvctl.key \
 6  --endpoint dkv.example.com:30001 \
 7  member-list
 8
 9# ID      | Raft Address  | RPC Address   | Leader
10# dkv-0   | dkv-0.dkv.default.svc.cluster.local:2380      |       | false
11# dkv-1   | dkv-1.dkv.default.svc.cluster.local:2380      |       | false
12# dkv-2   | dkv-2.dkv.default.svc.cluster.local:2380      |       | true

If the pod is also killed for a long time, and then restarted, you will see that the state is still the same:

 1kubectl scale --replicas=2 statefulset dkv
 2# member-list doesn't show the health of the node, though, you will see the leader moves
 3./bin/dkvctl \
 4  --cacert dkvctl-ca.crt \
 5  --cert dkvctl.crt \
 6  --key dkvctl.key \
 7  --endpoint dkv.example.com:30001 \
 8  set key2 value2
 9./bin/dkvctl \
10  --cacert dkvctl-ca.crt \
11  --cert dkvctl.crt \
12  --key dkvctl.key \
13  --endpoint dkv.example.com:30001 \
14  set key3 value3
15# You can verify the values with `get <key> <value>`
16
17kubectl scale --replicas=3 statefulset dkv
18./bin/dkvctl \
19  --cacert dkvctl-ca.crt \
20  --cert dkvctl.crt \
21  --key dkvctl.key \
22  --endpoint dkv.example.com:30001 \
23  get key2
24# value2
25./bin/dkvctl \
26  --cacert dkvctl-ca.crt \
27  --cert dkvctl.crt \
28  --key dkvctl.key \
29  --endpoint dkv.example.com:30001 \
30  get key3
31# value3

Do note that the Kubernetes service automatically load-balances the requests to the pods (random L2 load-balancing). Therefore, try to spam the requests and see there is one node responding badly (probability of 1/3).

Normally, there is no failure.

About Serf, the service discovery system ๐Ÿ”—

As you know, we've implemented "static" service discovery. Travis Jeffery tried to use hashicorp/serf, a way to implement a membership service with the gossip protocol. Basically, hashicorp/serf would replace the "join" loop inside the bootstrapDStore function.

hashicorp/serf uses TCP and UDP to discover nodes, handle failures, and handle the membership of the cluster. It also has a built-in event system to handle the membership changes.

In practice, nodes would contact each other using Serf to join the cluster (a different port from Raft). An event would trigger the leader to call the Join method of Raft. Moreover, serf can be configured to use a shared key to authenticate the nodes.

During my implementation, I have found many issues with hashicorp/serf:

  • hashicorp/serf resolves the domain name of the nodes, and prefer to use IPs. Because of this, when a pod is deleted, hashicorp/serf is unable to resolve the new IP address of the pod. A way to fix this is to use a Service per pod, which is a waste of resources.
  • hashicorp/serf uses additional ports to communicate with the nodes. We already have two ports (RPC and Raft), and adding a third port (TCP and UDP) is simply too much. ETCD don't use that many ports.
  • hashicorp/serf is redundant with the integrated "membership" system of Raft. Raft already handles failures and membership changes.

This is why, I've decided to scrap hashicorp/serf and simply use a "join" loop, which works perfectly.

Conclusion ๐Ÿ”—

This is my longest article to date, as this article is mostly my report on how I implemented a distributed key-value store using Raft and ConnectRPC. It's also a project to review my skills and my understanding of Travis Jeffery's book.

Most of the time, if you need to implement a distributed system, you would either use a distributed key-store like ETCD, or a distributed event-broker like NATS or Kafka. If you need to implement a distributed system from scratch, you probably have a complex state machine to replicate, and you would use Raft.

If you feel lost, maybe it's worth checking Travis Jeffery's book. Thought, there are some point that are ambiguous and some technical choices that can be discussed, it is in fact a great book to build a reliable, fault-tolerant distributed system. I would just recommend to NOT use logs as main "data structure" to replicate and avoid hashicorp/serf. Either use a linked-list "ร  la blockchain"-style, or use a simple state machine (the position of a robot for example).

This article doesn't show how to implement Observability and Role-Based Access Control as the implementation varies a lot depending on the use case. ConnectRPC handles OpenTelemetry differently, and RBAC is simply not the subject of this article.

I've also separated the HTTP API from the Raft API. Travis Jeffery combines both by using a connection multiplexer cmux. If I were to implement a production ready distributed system, I would've replaced the whole network layer of Raft with ConnectRPC. This would allow me to use the same network layer for the HTTP API and the Raft API.

References ๐Ÿ”—