Fault-Tolerent Distributed Systems with Replicated State Machines in Go
Sunday 17 March 2024 ยท 2 hours 58 mins read ยท Viewed 83 timesTable of contents ๐
- Table of contents
- Introduction
- What is a distributed system?
- Examples of stateful distributed systems
-
The example: A distributed key-value store with Raft
- The objective
- Understanding Raft at its core
- Logs and state machines
- Bootstrapping the project
- Implementing the key-value store
-
Using Raft to distribute commands
- Understanding Raft's lifecycle
- Understanding Raft's RPCs and Term
- Implementing the Finite State Machine for Raft
- The "crash" recovery: snapshots and restoring logs
- Preparing the storage for Raft
- Replacing the network layer with a mutual TLS transport
- Adding the "Join" and "Leave" methods
- Sending commands
- Adding an API for interactivity
- Implementing the "main" function of the server
- Implementing the client
- Testing failures
- About Serf, the service discovery system
- Conclusion
- References
Introduction ๐
I've just read the book by Travis Jeffery, "Distributed Services with Go" and I wanted to share a simple example of a distributed system in Go.
In his book, Travis Jeffery doesn't really define what a distributed system is, but focuses on implementing a "reliable, scalable, and maintainable system" as per the subtitle of the book, which is a fine and practical approach if you wish to really learn a production-ready example. However, he doesn't explain his choice of tools and libraries, which is a bit frustrating when it's a book about "implementing".
Thus, he creates a Go service with gRPC, OpenTelemetry, LB, and other tools to create a reliable distributed system.
The objective of this article is to create a simple example of distributed system and focus on the distributed aspect of the system. Whether you implement gRPC as transport or OpenTelemetry for observability is up to you. I made the choice to use different technologies than Travis Jeffery to give you a different perspective.
Before starting, a few words about what has changed since the writing of Travis Jeffery's book:
- Go 1.22 is out. Many standard libraries have deprecated/moved some functions:
ioutil
has been deprecated,boltdb
is less efficient thanpebble
, etc... - The book uses some libraries that are simply non-standard like
go-mmap
, while he could have used thesyscall
library. - He forked
hashicorp/raft-boltdb
to useetcd-io/bbolt
instead ofboltdb/bolt
.
This article will try to stick to the standard library as much as possible, though I cannot guarantee that it will be deprecated in the future.
We will also fork hashicorp/raft
to implement a custom RPC and fix the network layer to allow mutual TLS (which, by the way, Travis Jeffery didn't do). Forking this library is almost necessary since hashicorp/raft
is "too" close to the paper and didn't think about possible extensions.
Let's start with a few definitions.
What is a distributed system? ๐
A distributed system is a system whose components are located on different networked computers, which communicate and coordinate their actions by passing messages to one another to achieve a common goal.
One style of distributed system is called "fault-tolerant distributed system", or basically replicated services which communicate with each other to ensure that the system is always available and that the data is always consistent.
As you may know, you have two type of services:
-
Stateless services: They don't store any state alongside the process. They are easy to scale and are fault-tolerant by design. They are also easy to implement and maintain.
Stateless services don't really need to communicate with each other. They can use their "already" distributed database or event broker to communicate with each other.
Often, this is the architecture you want to follow when you can delegate the state to a database, or the commands to an event broker.
-
Stateful services: The service has some kind of state, like a persistent storage.
User Load Balancers Services lb1 lb2 service1 service2 This time, the services need to communicate with each other to ensure that the state is consistent across the cluster.
This architecture is used when your "state" is a lot more complex and cannot be delegated to a database or an event broker. There could also be because of latency issues with the database or the event broker.
The article aims to implement a simple example of distributed stateful service. There are already plentiful guides (and even architecture) about creating distributed stateless services with an event broker or a distributed database/lock.
Instead, let's look examples of already existing distributed systems.
Examples of stateful distributed systems ๐
ETCD ๐
Let's start with etcd, a distributed key-value store. ETCD is often the "base" of many distributed systems like Kubernetes, ArgoCD, etc... because, as a key-value store, it is observable, scalable, and fault-tolerant. It serves as a good distributed lock and a good distributed configuration store.
In distributed systems, replication is a key feature to scale up. Being able to automatically scale up a stateful system is one of the main objectives to any Site-Reliability Engineer. If you look at MySQL or PostgreSQL, it is very easy to deploy "read-only" replicas. However, it's a lot more complex to deploy "write" replicas. MySQL and PostgreSQL have a "source" and "replicas" architecture, where the "source" is the only one that can write and the "replicas" can only read. The problem with this architecture is that if the "source" goes down, the "replicas" cannot write, and you will need to manually promote a "replica" into a new "source".
However, instead of using a "source-replicas" architecture, ETCD uses a "leader-elected" architecture through the Raft consensus algorithm. A consensus algorithm elects a "leader" that can write, and the "followers" can only read. If the "leader" goes down, a new "leader" is elected through some kind of voting. This is an automated process and doesn't require any manual intervention.
References:
Bitcoin ๐
Bitcoin is a decentralized cryptocurrency, though I want to talk about its blockchain. If you don't already know, a blockchain is simply a linked list (or a queue) of blocks. A block contains diverse data like the hash of the previous block, or the list of transactions.
What's the most interesting is how the blockchain is replicated across the network. The blockchain is very similar to etcd, but instead of replicating a key-value store, it replicates a linked list. The blockchain also uses a different consensus algorithm called "Proof of Work" (PoW) to elect a "leader" that can write a new block. This algorithm doesn't elect through voting, but via some kind of competition: the first one to solve a complex mathematical problem can write a new block.
There is also additional major difference with Bitcoin and etcd: its node discovery strategy. Because Bitcoin is a public cryptocurrency, Bitcoin uses hard-coded bootstrap nodes to discover new "actual" nodes. Comparatively, ETCD is either configured statically or through a discovery service.
goquorum
, a fork of go-ethereum
, uses the same consensus algorithm as etcd for private blockchains. As you can see, etcd
is quite similar to a blockchain.
References:
IPFS ๐
The InterPlanetary File System (IPFS) is a content delivery network (CDN) built to distribute data around the world. IPFS is a lot more complex than etcd and Bitcoin:
- Because it is a distributed file system, the node discovery is a different and not all nodes are equal. IPFS store files in block with ID. Each IPFS node uses "Want lists" to exchange data with other nodes. The "Want list" is a list of blocks that the node wants to retrieve from other nodes. The "Want list" is also used to discover new nodes. This is called the "BitSwap" protocol.
- IPFS uses a distributed hash table (DHT) containing keys mapped to values. There are 3 types of keys: One to search for blocks. One to resolve identities/names. One to search for peers that may be reached. As said in the article: "it's a catalog and a navigation system to search for blocks". Bitcoin and etcd don't have this kind of system. To spread information, Bitcoin uses the "gossip" protocol and etcd uses RPCs of the "Raft" consensus algorithm.
Overall, IPFS is drastically different from etcd and Bitcoin. IPFS aims to distribute files across partially the network and follows an approach similar to BitTorrent. P2P and BitSwap is used to exchange data, while DHT is used to routes the data. No consensus algorithm is used since the "writer" is not replicated, as we do not care about fault-tolerance on the writer.
However, if you want to replicate the writer, you can use IPFS Cluster which can uses CRDT (Conflict-free Replicated Data Types) or Raft (log-based replication) as consensus algorithm.
References:
- IPFS - BitSwap
- IPFS - DHT
- IPFS Cluster - Architecture Overview
- Bitcoin's P2P Network - Network Level Privacy
- etcd - Raft RPCs
- etcd - Raft Paper Page 4
Summary ๐
As you can see, the "distributed" aspect of the fault-tolerant system can be summarized in three points:
- Node discovery: How do nodes discover each other? Are all nodes equal? How do they exchange data?
- Consensus algorithm/Writer election: How do nodes elect a leader? How do they ensure that the data is consistent?
- Data exchange/spreading: How do nodes exchange data? How do they ensure that the data is consistent?
This article will simply focus on Raft and its paper. We will answer each of these questions while implementing the example.
The example: A distributed key-value store with Raft ๐
The objective ๐
The objective of the example is to create a simple distributed key-value store with Raft. Not only this is the most popular example, but it also answers the three questions (node discovery, writer election, data exchange).
Before even starting, I want to say that Raft solves the very issue of "replicated state machines", systems that are stateful because they have a finite state machine. If you are looking to "distribute" a service with a nondeterministic state machine, I suggest you to stop reading this article and look for a different solution.
Understanding Raft at its core ๐
Please, do read the paper (part 5) and visit the official website to understand the consensus algorithm. Still, I will try to summarize the paper in a few words. Though, this article doesn't aim to implement Raft from scratch, but to use it as a library.
A stateful service could be defined as a state machine. A state machine is a system that can be in a finite number of states and can change from one state to another. A state machine can be deterministic or nondeterministic. A deterministic state machine is a state machine that can only be in one state at a time and can only change to one state at a time. A nondeterministic state machine is a state machine that can be in multiple states at a time and can change to multiple states at a time.
Replicating deterministic state machines results in fault-tolerance since the replication of the "deterministic" aspect permits the "rebuilding" of the state simply by running the same commands.
Logs and state machines ๐
Replicated state machines uses logs (an ordered list of commands) which each state machines executes. State machine can be replicated efficiently across the network by simply replicating and running the logs.
Example: If a node goes down, the logs can be used to "replay" the state machine and bring the node back to the same state as the other nodes just by executing the commands written in the logs.
Benefits are immediate. I cite:
- They ensure safety (never returning an incorrect result) under all non-Byzantine conditions, including network delays, partitions, and packet loss, duplication, and reordering.
- They are fully functional (available) as long as any majority of the servers are operational and can communicate with each other and with clients. Thus, a typical cluster of five servers can tolerate the failure of any two servers. Servers are assumed to fail by stopping; they may later recover from state on stable storage and rejoin the cluster.
- They do not depend on timing to ensure the consistency of the logs: faulty clocks and extreme message delays can, at worst, cause availability problems.
- In the common case, a command can complete as soon as a majority of the cluster has responded to a single round of remote procedure calls; a minority of slow servers need not impact overall system performance.
By the way, I hated the fact that Travis Jeffery's book uses logs as an example, which makes things more confusing... He even reused his "homemade" store as the logs store of Raft. I'd prefer he would use something other than the same data structure as Raft's as an example. Here, we will use a simple key-value store instead of a logs store.
Logs are already implemented in the Raft library, but we still need to define our state machine.
But! Before of that, let's start by bootstrapping the project. We will come back to the state machine later.
Bootstrapping the project ๐
Git clone the following repository:
1git clone -b bootstrap https://github.com/Darkness4/distributed-kv.git
The repository is already setup for:
- GitHub Actions.
- Main functions stored in
cmd/dkv
andcmd/dkvctl
. - A simple Makefile. (Please read it to understand the commands.)
- A Dockerfile.
- Tools that will be used later.
Commands available are:
make all
: compiles the client and server.make unit
: runs unit tests.make integration
: runs integration tests.make lint
: lint the code.make fmt
: formats the code.make protos
: compiles the protocol buffers into generated Go code.make certs
: generates the certificates for the server, client and CA.make clean
: cleans the project.
Now, let's implement the key-value store.
Implementing the key-value store ๐
The state machine ๐
A state machine is defined by state and commands. To define the state machine, start thinking about the commands that can mutate a state. In our case, the commands are:
SET(value)
: Set a key to a value.DELETE
: Delete a key.
The state machine of the KV store is actually a composition of state machines, where a key has its own state machine. The state is the value of the key. The state of the KV store is the state of all the keys.
The implementation ๐
We will use pebble
as the persisted KV-store. Feel free to use SQLite or something else. Since we will be also using pebble
to store the data (the logs of the state machines) of Raft, we kill two birds with one stone.
Create the file internal/store/persisted/store.go
:
1package persisted
2
3import (
4 "path/filepath"
5
6 "github.com/cockroachdb/pebble"
7)
8
9type Store struct {
10 *pebble.DB
11}
12
13func New(path string) *Store {
14 path = filepath.Join(path, "pebble")
15 db, err := pebble.Open(path, &pebble.Options{})
16 if err != nil {
17 panic(err)
18 }
19 return &Store{db}
20}
21
22func (s *Store) Get(key string) (string, error) {
23 v, closer, err := s.DB.Get([]byte(key))
24 if err != nil {
25 return "", err
26 }
27 defer closer.Close()
28 return string(v), nil
29}
30
31func (s *Store) Set(key string, value string) error {
32 return s.DB.Set([]byte(key), []byte(value), pebble.Sync)
33}
34
35func (s *Store) Delete(key string) error {
36 return s.DB.Delete([]byte(key), pebble.Sync)
37}
38
39func (s *Store) Close() error {
40 return s.DB.Close()
41}
42
Write the tests if you want to. Tests are stored in the repository.
Using Raft to distribute commands ๐
Understanding Raft's lifecycle ๐
Now that we've implemented the store, we need to use Raft to distribute the commands across the network. As we said in the past sections, Raft uses elections to elect a leader that can write to the logs (list of commands). In raft, nodes can be in three states:
- Follower: The node is waiting for a leader to send commands.
- Candidate: The node is trying to become a leader.
- Leader: The node can send commands to the followers.
At the very beginning, all nodes are followers. Only after a timeout (ElectionTimeout
) without receiving AppendEntries
RPC that followers become candidates. The candidate sends a RequestVote
RPC to the other nodes while voting for himself. If the candidate receives a majority of votes, it becomes a leader. If the candidate doesn't receive a majority of votes, it becomes a follower again.
Upon election, leaders will send AppendEntries
RPCs (hearbeats) to the followers to prevent the election timeout.
Understanding Raft's RPCs and Term ๐
At its core, Raft has only 2 RPCs:
RequestVote
: Sent by candidates to gather votes.AppendEntries
: Sent by leaders to replicate logs. It is also used to send heartbeats.
Since we won't be implementing the internals of Raft (we will use hashicorp/raft
), I recommend the read the Figure 2 of the paper to see the parameters and results of each RPC.
The "term" is a number that is incremented every time a new leader is elected. It is used to prevent "old" leaders to send commands to the followers.
Do also note that hashicorp/raft
implements other RPCs to optimize the consensus algorithm. hashicorp/raft
also implements the InstallSnapshot
RPC to compact the logs and restore the logs after a crash, as per the paper.
Implementing the Finite State Machine for Raft ๐
Define commands for Raft ๐
We will use protocol buffers to define commands for Raft. Do note that these are the commands for peer-to-peer replication, not the commands for server-to-client. Transport is already handled by Raft, so you don't need to write a service
. Heck, you can either use JSON or prefixing a byte to indicate different commands (Travis Jeffery' Method, which is simply too ambiguous ๐คฆ).
-
Create the file
protos/dkv/v1/dkv.proto
:1syntax = "proto3"; 2 3package dkv.v1; 4 5// Command is a message used in Raft to replicate log entries. 6message Command { 7 oneof command { 8 Set set = 1; 9 Delete delete = 2; 10 } 11} 12 13message Set { 14 string key = 1; 15 string value = 2; 16} 17 18message Delete { string key = 1; } 19
-
We use Buf to standardize the protocol buffers layout. Create this file
protos/buf.yaml
to standardize the layout:1version: v1 2breaking: 3 use: 4 - FILE 5lint: 6 use: 7 - DEFAULT 8 service_suffix: API
-
Now, simply run:
1make protos
The directory
gen
should be created. It contains the generated Go code. Feel free to look its implementation as it shows how data is packed and unpacked.
Implementing the Finite State Machine ๐
We will use hashicorp/raft
and not etcd-io/raft
, because it is more modular and easier to use. hashicorp/raft
requires us to implement the raft.FSM
interface.
-
Create the file
internal/store/distributed/fsm.go
:1package distributed 2 3import ( 4 dkvv1 "distributed-kv/gen/dkv/v1" 5 "encoding/csv" 6 "errors" 7 "io" 8 9 "github.com/hashicorp/raft" 10 "google.golang.org/protobuf/proto" 11) 12 13var _ raft.FSM = (*FSM)(nil) 14 15type Storer interface { 16 Delete(key string) error 17 Set(key, value string) error 18 Get(key string) (string, error) 19} 20 21type FSM struct { 22 storer Storer 23} 24 25func NewFSM(storer Storer) *FSM { 26 return &FSM{storer: storer} 27} 28 29// Apply execute the command from the Raft log entry. 30func (f *FSM) Apply(l *raft.Log) interface{} { 31 panic("unimplemented") 32} 33 34// Restore restores the state of the FSM from a snapshot. 35func (f *FSM) Restore(snapshot io.ReadCloser) error { 36 panic("unimplemented") 37} 38 39// Snapshot dumps the state of the FSM to a snapshot. 40func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { 41 panic("unimplemented") 42}
We must implement the
raft.FSM
interface. Let's just implement theApply
method for now, as this is the most important method. -
Implement the
Apply
method:1// Apply execute the command from the Raft log entry. 2func (f *FSM) Apply(l *raft.Log) interface{} { 3 // Unpack the data 4 var cmd dkvv1.Command 5 if err := proto.Unmarshal(l.Data, &cmd); err != nil { 6 return err 7 } 8 9 // Apply the command 10 switch c := cmd.Command.(type) { 11 case *dkvv1.Command_Set: 12 return f.storer.Set(c.Set.Key, c.Set.Value) 13 case *dkvv1.Command_Delete: 14 return f.storer.Delete(c.Delete.Key) 15 } 16 17 return errors.New("unknown command") 18}
As you can see, using Protocol Buffers is not only efficient, but also readable.
Feel free to implement tests for the
Apply
method. The repository contains the tests and mocks.
The "crash" recovery: snapshots and restoring logs ๐
Raft can use an additional store to compact the logs and restore the logs after a crash. The store is called the file snapshot store (fss
).
Snapshots are used to quickly restore logs before fetching the rest of the logs from peers after a crash. It is used to compact the logs. Snapshots are taken when the logs reach a certain size.
Internally, this adds another RPC called InstallSnapshot
. For us, we simply have to implement the Snapshot
, Restore
methods of the FSM and define the raft.FSMSnapshot
interface. Snapshot
and Restore
requires us to use io.ReadCloser
and io.WriteCloser
(raft.SnapshotSink
) to read and write the snapshots. To optimize reading and writing the snapshot, we will extend the Store
to allow an instant "dump" and "restore" of the store.
-
Extend the interface to "dump" the store:
1type Storer interface { 2 Delete(key string) error 3 Set(key, value string) error 4 Get(key string) (string, error) 5 Dump() map[string]string 6 Clear() 7}
-
Implement the
Snapshot
andRestore
methods of the FSM. We will use thecsv
library to pack the snapshot since:- Our data is a list of strings (basically)
- It's easy to read and write sequentially, especially for key-value stores.
- It's faster than JSON and protocol buffers.
csv.Writer
andcsv.Reader
implementsio.Writer
andio.Reader
, which is required byraft.SnapshotSink
andio.ReadCloser
.
1// Restore restores the state of the FSM from a snapshot. 2func (f *FSM) Restore(snapshot io.ReadCloser) error { 3 f.storer.Clear() 4 r := csv.NewReader(snapshot) 5 for { 6 record, err := r.Read() 7 if err == io.EOF { 8 break 9 } 10 if err != nil { 11 return err 12 } 13 if err := f.storer.Set(record[0], record[1]); err != nil { 14 return err 15 } 16 } 17 return nil 18} 19 20// Snapshot dumps the state of the FSM to a snapshot. 21// 22// nolint: ireturn 23func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { 24 return &fsmSnapshot{store: f.storer.Dump()}, nil 25} 26 27var _ raft.FSMSnapshot = (*fsmSnapshot)(nil) 28 29type fsmSnapshot struct { 30 store map[string]string 31} 32 33// Persist should dump all necessary state to the WriteCloser 'sink', 34// and call sink.Close() when finished or call sink.Cancel() on error. 35func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { 36 err := func() error { 37 csvWriter := csv.NewWriter(sink) 38 for k, v := range f.store { 39 if err := csvWriter.Write([]string{k, v}); err != nil { 40 return err 41 } 42 } 43 csvWriter.Flush() 44 if err := csvWriter.Error(); err != nil { 45 return err 46 } 47 return sink.Close() 48 }() 49 50 if err != nil { 51 if err = sink.Cancel(); err != nil { 52 panic(err) 53 } 54 } 55 56 return err 57} 58 59// Release is invoked when we are finished with the snapshot. 60func (f *fsmSnapshot) Release() {}
Feel free to implement tests for the
Snapshot
andRestore
methods. The Git repository contains the tests and mocks. -
Extend the
Store
struct frominternal/store/persisted
to implement theStorer
interface:1func (s *Store) Dump() map[string]string { 2 s.mu.RLock() 3 defer s.mu.RUnlock() 4 data := make(map[string]string, len(s.data)) 5 for k, v := range s.data { 6 data[k] = v 7 } 8 return data 9} 10 11func (s *Store) Clear() { 12 s.mu.Lock() 13 defer s.mu.Unlock() 14 s.data = make(map[string]string) 15}
Preparing the storage for Raft ๐
Raft uses two stores to store logs: the Logs store (ldb
) and the Stable store (sdb
). hashicorp/raft
normally uses hashicorp/raft-boltdb
to store the logs. However, boltdb
has fallen out of favor for pebble
(especially in the blockchain ecosystem) and therefore, it is more appropriate to use pebble
instead of boltdb
.
However, we still need to implement the raft.LogStore
and raft.StableStore
interfaces for pebble
. You can use weedge
's pebble
library to implement the interfaces, but, it is preferable to copy the files instead of importing the library to fix breaking changes between pebble
versions. You can also copy my implementation from the repository. You could also fork weedge
's pebble
library and fix the breaking changes.
-
Import the "raftpebble" files in the
internal/raftpebble
directory. -
Create a file
internal/store/distributed/store.go
:1package distributed 2 3import "github.com/hashicorp/raft" 4 5const ( 6 retainSnapshotCount = 2 7) 8 9type Store struct { 10 // RaftDir is the directory where the Stable and Logs data is stored. 11 RaftDir string 12 // RaftBind is the address to bind the Raft server. 13 RaftBind string 14 // RaftID is the ID of the local node. 15 RaftID string 16 // RaftAdvertisedAddr is the address other nodes should use to communicate with this node. 17 RaftAdvertisedAddr raft.ServerAddress 18 19 fsm *FSM 20 raft *raft.Raft 21} 22 23func NewStore( 24 raftDir, raftBind, raftID string, 25 raftAdvertisedAddr raft.ServerAddress, 26 storer Storer, 27) *Store { 28 return &Store{ 29 RaftDir: raftDir, 30 RaftBind: raftBind, 31 RaftID: raftID, 32 RaftAdvertisedAddr: raftAdvertisedAddr, 33 fsm: NewFSM(storer), 34 } 35} 36
-
Add the
Open
method, which is used to open the store and the Raft consensus communication:1// ... 2 3func (s *Store) Open(bootstrap bool) error { 4 // Setup Raft configuration. 5 config := raft.DefaultConfig() 6 config.LocalID = raft.ServerID(s.RaftID) 7 8 // Setup Raft communication. 9 addr, err := net.ResolveTCPAddr("tcp", s.RaftBind) 10 if err != nil { 11 return err 12 } 13 // Create the snapshot store. This allows the Raft to truncate the log. 14 fss, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr) 15 if err != nil { 16 return fmt.Errorf("file snapshot store: %s", err) 17 } 18 19 // Create the log store and stable store. 20 ldb, err := raftpebble.New(raftpebble.WithDbDirPath(filepath.Join(s.RaftDir, "logs.dat"))) 21 if err != nil { 22 return fmt.Errorf("new pebble: %s", err) 23 } 24 sdb, err := raftpebble.New(raftpebble.WithDbDirPath(filepath.Join(s.RaftDir, "stable.dat"))) 25 if err != nil { 26 return fmt.Errorf("new pebble: %s", err) 27 } 28 29 // Instantiate the transport. 30 transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr) 31 if err != nil { 32 return err 33 } 34 35 // Instantiate the Raft systems. 36 ra, err := raft.NewRaft(config, s.fsm, ldb, sdb, fss, transport) 37 if err != nil { 38 return fmt.Errorf("new raft: %s", err) 39 } 40 s.raft = ra 41 42 // Check if there is an existing state, if not bootstrap. 43 hasState, err := raft.HasExistingState( 44 ldb, 45 sdb, 46 fss, 47 ) 48 if err != nil { 49 return err 50 } 51 if bootstrap && !hasState { 52 slog.Info( 53 "bootstrapping new raft node", 54 "id", 55 config.LocalID, 56 "addr", 57 transport.LocalAddr(), 58 ) 59 config := raft.Configuration{ 60 Servers: []raft.Server{ 61 { 62 ID: config.LocalID, 63 Address: transport.LocalAddr(), 64 }, 65 }, 66 } 67 err = s.raft.BootstrapCluster(config).Error() 68 } 69 return err 70}
Feel free to implement tests for the
Open
method. The Git repository contains the tests.If you read the code, there isn't anything special, but we haven't talked about the network layer of
hashicorp/raft
. Right now, we are using an insecure "raw" network layer: just ye olde TCP. In Travis Jeffery's book, he replaced theStreamLayer
ofhashicorp/raft
and added TLS for peer-to-peer communication. Since this is best practice, we will do the same.
Replacing the network layer with a mutual TLS transport ๐
Mutual TLS is based on one private CA that signs peer certificates. Since TLS uses asymmetric cryptography, the private key of the CA is used to sign the peer certificates. Peers use the public key of the CA (the certificate of the CA contains the public key) to verify the peer certificates.
This is called "public key cryptography". The public key is used to verify the signature of the peer certificate. The private key is used to sign the peer certificate.
Peers verifies the membership simply by checking the signature of the peer certificate. If the signature is valid, the peer is a member of the network. If the signature is invalid, the peer is not a member of the network.
To implement this, the idea is to "upgrade" the TCP connection to a TLS connection. The tls
standard library contains the tls.Server
and tls.Client
functions to do that. The tls.Server
and tls.Client
functions are used to create a tls.Conn
from a net.Conn
.
-
Create a file
internal/store/distributed/stream_layer.go
and implement theraft.StreamLayer
interface:1package distributed 2 3import ( 4 "crypto/tls" 5 "net" 6 "time" 7 8 "github.com/hashicorp/raft" 9) 10 11var _ raft.StreamLayer = (*TLSStreamLayer)(nil) 12 13type TLSStreamLayer struct { 14 net.Listener 15 AdvertizedAddress raft.ServerAddress 16 ServerTLSConfig *tls.Config 17 ClientTLSConfig *tls.Config 18} 19 20func (s *TLSStreamLayer) Accept() (net.Conn, error) { 21 conn, err := s.Listener.Accept() 22 if err != nil { 23 return nil, err 24 } 25 if s.ServerTLSConfig != nil { 26 return tls.Server(conn, s.ServerTLSConfig), nil 27 } 28 return conn, nil 29} 30 31func (s *TLSStreamLayer) PublicAddress() raft.ServerAddress { 32 return s.AdvertizedAddress 33} 34 35func (s *TLSStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) { 36 dialer := &net.Dialer{Timeout: timeout} 37 conn, err := dialer.Dial("tcp", string(address)) 38 if s.ClientTLSConfig != nil { 39 serverName, _, serr := net.SplitHostPort(string(address)) 40 if serr != nil { 41 serverName = string(address) 42 } 43 tlsConfig := s.ClientTLSConfig.Clone() 44 tlsConfig.ServerName = serverName 45 return tls.Client(conn, tlsConfig), err 46 } 47 return conn, err 48} 49
WARNINGSince we are using mutual TLS, we need to set the
ServerName
of thetls.Config
to the address of the peer. TheServerName
is used to verify the certificate of the peer.However, there is an issue with the transport using an IP instead of the address. This is because the listener resolve the address and only store the IP.
The implementation in
hashicorp/raft
wrongfully useslistener.Addr()
as the advertised address, which outputs an IP instead of an address. Instead, please use the forkdarkness4/raft
which adds the methodPublicAddress()
to the stream layer. This fork is reverse compatible withhashicorp/raft
.Just use the following in your
go.mod
:1replace github.com/hashicorp/raft => github.com/darkness4/raft v1.6.3
If you fear about being non-standard, feel free to fork
hashicorp/raft
and implement your own network layer.hashicorp/raft
is mostly a minimal implement of Raft based on the paper.We'll fork anyway
hashicorp/raft
to implement an RPC to forwardApply
requests to the leader. -
Now replace the
raft.NewTCPTransport
withraft.NewNetworkTransport
and extend theStore
to accept the TLS configurations. We use the optional function pattern:1 type Store struct { 2 // RaftDir is the directory where the Stable and Logs data is stored. 3 RaftDir string 4 // RaftBind is the address to bind the Raft server. 5 RaftBind string 6 // RaftID is the ID of the local node. 7 RaftID string 8 // RaftAdvertisedAddr is the address other nodes should use to communicate with this node. 9 RaftAdvertisedAddr raft.ServerAddress 10 11 fsm *FSM 12 raft *raft.Raft 13+ 14+ StoreOptions 15+} 16+ 17+type StoreOptions struct { 18+ serverTLSConfig *tls.Config 19+ clientTLSConfig *tls.Config 20+} 21+ 22+type StoreOption func(*StoreOptions) 23+ 24+func WithServerTLSConfig(config *tls.Config) StoreOption { 25+ return func(o *StoreOptions) { 26+ o.serverTLSConfig = config 27+ } 28+} 29+ 30+func WithClientTLSConfig(config *tls.Config) StoreOption { 31+ return func(o *StoreOptions) { 32+ o.clientTLSConfig = config 33+ } 34+} 35+ 36+func applyStoreOptions(opts []StoreOption) StoreOptions { 37+ var options StoreOptions 38+ for _, o := range opts { 39+ o(&options) 40+ } 41+ return options 42 } 43 44 func NewStore( 45 raftDir, raftBind, raftID string, 46 raftAdvertisedAddr raft.ServerAddress, 47 storer Storer, 48+ opts ...StoreOption, 49 ) *Store { 50+ o := applyStoreOptions(opts) 51 return &Store{ 52 RaftDir: raftDir, 53 RaftBind: raftBind, 54 RaftID: raftID, 55 RaftAdvertisedAddr: raftAdvertisedAddr, 56 fsm: NewFSM(storer), 57+ StoreOptions: o, 58 } 59 } 60 61 62 func (s *Store) Open(localID string, bootstrap bool) error { 63 // Setup Raft configuration. 64 config := raft.DefaultConfig() 65 config.LocalID = raft.ServerID(localID) 66 67- // Setup Raft communication. 68- addr, err := net.ResolveTCPAddr("tcp", s.RaftBind) 69- if err != nil { 70- return err 71- } 72 // Create the snapshot store. This allows the Raft to truncate the log. 73 fss, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr) 74 if err != nil { 75 return fmt.Errorf("file snapshot store: %s", err) 76 } 77 78 // ... 79 80 // Instantiate the transport. 81- transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr) 82+ lis, err := net.Listen("tcp", s.RaftBind) 83 if err != nil { 84 return err 85 } 86+ transport := raft.NewNetworkTransport(&TLSStreamLayer{ 87+ Listener: lis, 88+ AdvertizedAddress: raft.ServerAddress(s.RaftAdvertisedAddr), 89+ ServerTLSConfig: s.serverTLSConfig, 90+ ClientTLSConfig: s.clientTLSConfig, 91+ }, 3, 10*time.Second, os.Stderr) 92
NOTEThe lifecycle of the listener
lis
is handled by theNetworkTransport
. Callings.raft.Shutdown
will close the listener. Therefore, there is no need to close the listener manually.
Adding the "Join" and "Leave" methods ๐
Right now, our store only works in single node mode. We need to add the "Join" and "Leave" methods to request a node to join or leave the cluster.
These methods are pretty standard and can be found in many examples, so I'm also just copying it:
1func (s *Store) Join(id raft.ServerID, addr raft.ServerAddress) error {
2 slog.Info("request node to join", "id", id, "addr", addr)
3
4 configFuture := s.raft.GetConfiguration()
5 if err := configFuture.Error(); err != nil {
6 slog.Error("failed to get raft configuration", "error", err)
7 return err
8 }
9 // Check if the server has already joined
10 for _, srv := range configFuture.Configuration().Servers {
11 // If a node already exists with either the joining node's ID or address,
12 // that node may need to be removed from the config first.
13 if srv.ID == id || srv.Address == addr {
14 // However if *both* the ID and the address are the same, then nothing -- not even
15 // a join operation -- is needed.
16 if srv.Address == addr && srv.ID == id {
17 slog.Info(
18 "node already member of cluster, ignoring join request",
19 "id",
20 id,
21 "addr",
22 addr,
23 )
24 return nil
25 }
26
27 if err := s.raft.RemoveServer(id, 0, 0).Error(); err != nil {
28 return fmt.Errorf("error removing existing node %s at %s: %s", id, addr, err)
29 }
30 }
31 }
32
33 // Add the new server
34 return s.raft.AddVoter(id, addr, 0, 0).Error()
35}
36
37func (s *Store) Leave(id raft.ServerID) error {
38 slog.Info("request node to leave", "id", id)
39 return s.raft.RemoveServer(id, 0, 0).Error()
40}
As you can see, Raft uses futures to make asynchronous operations. When calling .Error()
, it waits for the operation to complete.
We can also add a "Shutdown", "WaitForLeader", "GetLeader" and "GetServers" method to help us with the tests and main function:
1type Store struct {
2 // ...
3
4 shutdownCh chan struct{}
5}
6
7func NewStore(raftDir, raftBind, raftID string, storer Storer, opts ...StoreOption) *Store {
8 o := applyStoreOptions(opts)
9 return &Store{
10 // ...
11 shutdownCh: make(chan struct{}),
12 }
13}
14
15func (s *Store) WaitForLeader(timeout time.Duration) (raft.ServerID, error) {
16 slog.Info("waiting for leader", "timeout", timeout)
17 timeoutCh := time.After(timeout)
18 ticker := time.NewTicker(time.Second)
19 defer ticker.Stop()
20 for {
21 select {
22 case <-s.shutdownCh:
23 return "", errors.New("shutdown")
24 case <-timeoutCh:
25 return "", errors.New("timed out waiting for leader")
26 case <-ticker.C:
27 addr, id := s.raft.LeaderWithID()
28 if addr != "" {
29 slog.Info("leader found", "addr", addr, "id", id)
30 return id, nil
31 }
32 }
33 }
34}
35
36func (s *Store) Shutdown() error {
37 slog.Warn("shutting down store")
38 select {
39 case s.shutdownCh <- struct{}{}:
40 default:
41 }
42 if s.raft != nil {
43 if err := s.raft.Shutdown().Error(); err != nil {
44 return err
45 }
46 s.raft = nil
47 }
48 s.fsm.storer.Clear()
49 return nil
50}
51
52func (s *Store) ShutdownCh() <-chan struct{} {
53 return s.shutdownCh
54}
55
56func (s *Store) GetLeader() (raft.ServerAddress, raft.ServerID) {
57 return s.raft.LeaderWithID()
58}
59
60func (s *Store) GetServers() ([]raft.Server, error) {
61 configFuture := s.raft.GetConfiguration()
62 if err := configFuture.Error(); err != nil {
63 return nil, err
64 }
65 return configFuture.Configuration().Servers, nil
66}
I highly recommend implementing tests for the Join
and Leave
methods to test the consensus. We are almost done. The Git repository contains the tests.
Sending commands ๐
We are finally at the last step: sending commands to the Raft cluster. Our cluster is already able to replicate logs, but we still need to mutate the state of the store (otherwise, we wouldn't be able to play with it).
We can do this by sending commands using raft.Apply
. Add the following methods to the Store
struct:
1func (s *Store) apply(req *dkvv1.Command) (any, error) {
2 b, err := proto.Marshal(req)
3 if err != nil {
4 return nil, err
5 }
6 timeout := 10 * time.Second
7 future := s.raft.Apply(b, timeout)
8 if err := future.Error(); err != nil {
9 return nil, err
10 }
11 res := future.Response()
12 if err, ok := res.(error); ok {
13 return nil, err
14 }
15 return res, nil
16}
17
18func (s *Store) Set(key string, value string) error {
19 _, err := s.apply(&dkvv1.Command{
20 Command: &dkvv1.Command_Set{
21 Set: &dkvv1.Set{
22 Key: key,
23 Value: value,
24 },
25 },
26 })
27 return err
28}
29
30func (s *Store) Delete(key string) error {
31 _, err := s.apply(&dkvv1.Command{
32 Command: &dkvv1.Command_Delete{
33 Delete: &dkvv1.Delete{
34 Key: key,
35 },
36 },
37 })
38 return err
39}
40
As you can see, Protobuf makes things explicit and type-safe. Let's also add the getter:
1func (s *Store) Get(key string) (string, error) {
2 return s.fsm.storer.Get(key)
3}
Please add the tests for the Set
, Delete
and Get
methods. See the Git repository for the tests.
We are done! We now have a fully functional fault-tolerant distributed key-value store. We've implemented:
- The consensus/writer election using Raft.
- The data exchange/spreading using the logs and RPCs of Raft.
Now, we need to actually implement an API to interact with the store. We also need to think about the node discovery, load-balancing and the client-to-server communication.
Adding an API for interactivity ๐
We are going to use ConnectRPC, a slight alternative to gRPC. ConnectRPC allows JSON and gRPC clients, and is more modular than gRPC, much closer to the standard library net/http
.
This allows to use the API without having to deploy a gRPC Gateway or use an Envoy proxy for Web clients.
-
Edit the
protos/dkv/v1/dkv.proto
and add a service:1// ... 2 3service DkvAPI { 4 rpc Get(GetRequest) returns (GetResponse); 5 rpc Set(SetRequest) returns (SetResponse); 6 rpc Delete(DeleteRequest) returns (DeleteResponse); 7} 8 9message GetRequest { string key = 1; } 10message GetResponse { string value = 1; } 11 12message SetRequest { string key = 1; string value = 2; } 13message SetResponse {} 14 15message DeleteRequest { string key = 1; } 16message DeleteResponse {}
To avoid repeating the same messages, rename the
Set
andDelete
messages toSetRequest
andDeleteRequest
. Apply the refactor accordingly across the code.When running
make protos
, the generated code will use ConnectRPC to translate the service into a stub for the server, and a client to interact with the server:1// Extract of gen/dkv/v1/dkvv1connect/dkv.connect.go 2// NewDkvAPIClient constructs a client for the dkv.v1.DkvAPI service. By default, it uses the 3// Connect protocol with the binary Protobuf Codec, asks for gzipped responses, and sends 4// uncompressed requests. To use the gRPC or gRPC-Web protocols, supply the connect.WithGRPC() or 5// connect.WithGRPCWeb() options. 6// 7// The URL supplied here should be the base URL for the Connect or gRPC server (for example, 8// http://api.acme.com or https://acme.com/grpc). 9func NewDkvAPIClient(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) DkvAPIClient { 10 baseURL = strings.TrimRight(baseURL, "/") 11 return &dkvAPIClient{ 12 get: connect.NewClient[v1.GetRequest, v1.GetResponse]( 13 httpClient, 14 baseURL+DkvAPIGetProcedure, 15 connect.WithSchema(dkvAPIGetMethodDescriptor), 16 connect.WithClientOptions(opts...), 17 ), 18 set: connect.NewClient[v1.SetRequest, v1.SetResponse]( 19 httpClient, 20 baseURL+DkvAPISetProcedure, 21 connect.WithSchema(dkvAPISetMethodDescriptor), 22 connect.WithClientOptions(opts...), 23 ), 24 delete: connect.NewClient[v1.DeleteRequest, v1.DeleteResponse]( 25 httpClient, 26 baseURL+DkvAPIDeleteProcedure, 27 connect.WithSchema(dkvAPIDeleteMethodDescriptor), 28 connect.WithClientOptions(opts...), 29 ), 30 } 31} 32 33 34// NewDkvAPIHandler builds an HTTP handler from the service implementation. It returns the path on 35// which to mount the handler and the handler itself. 36// 37// By default, handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf 38// and JSON codecs. They also support gzip compression. 39func NewDkvAPIHandler(svc DkvAPIHandler, opts ...connect.HandlerOption) (string, http.Handler) { 40 dkvAPIGetHandler := connect.NewUnaryHandler( 41 DkvAPIGetProcedure, 42 svc.Get, 43 connect.WithSchema(dkvAPIGetMethodDescriptor), 44 connect.WithHandlerOptions(opts...), 45 ) 46 dkvAPISetHandler := connect.NewUnaryHandler( 47 DkvAPISetProcedure, 48 svc.Set, 49 connect.WithSchema(dkvAPISetMethodDescriptor), 50 connect.WithHandlerOptions(opts...), 51 ) 52 dkvAPIDeleteHandler := connect.NewUnaryHandler( 53 DkvAPIDeleteProcedure, 54 svc.Delete, 55 connect.WithSchema(dkvAPIDeleteMethodDescriptor), 56 connect.WithHandlerOptions(opts...), 57 ) 58 return "/dkv.v1.DkvAPI/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 59 switch r.URL.Path { 60 case DkvAPIGetProcedure: 61 dkvAPIGetHandler.ServeHTTP(w, r) 62 case DkvAPISetProcedure: 63 dkvAPISetHandler.ServeHTTP(w, r) 64 case DkvAPIDeleteProcedure: 65 dkvAPIDeleteHandler.ServeHTTP(w, r) 66 default: 67 http.NotFound(w, r) 68 } 69 }) 70} 71 72// DkvAPIHandler is an implementation of the dkv.v1.DkvAPI service. 73type DkvAPIHandler interface { 74 Get(context.Context, *connect.Request[v1.GetRequest]) (*connect.Response[v1.GetResponse], error) 75 Set(context.Context, *connect.Request[v1.SetRequest]) (*connect.Response[v1.SetResponse], error) 76 Delete(context.Context, *connect.Request[v1.DeleteRequest]) (*connect.Response[v1.DeleteResponse], error) 77}
-
Let's implement the handler. Create the file
internal/api/dkv_handler.go
and add:1package api 2 3import ( 4 "context" 5 dkvv1 "distributed-kv/gen/dkv/v1" 6 "distributed-kv/gen/dkv/v1/dkvv1connect" 7 "distributed-kv/internal/store/distributed" 8 9 "connectrpc.com/connect" 10) 11 12var _ dkvv1connect.DkvAPIHandler = (*DkvAPIHandler)(nil) 13 14type DkvAPIHandler struct { 15 *distributed.Store 16} 17 18func (d *DkvAPIHandler) Delete( 19 _ context.Context, 20 req *connect.Request[dkvv1.DeleteRequest], 21) (*connect.Response[dkvv1.DeleteResponse], error) { 22 return &connect.Response[dkvv1.DeleteResponse]{}, d.Store.Delete(req.Msg.Key) 23} 24 25func (d *DkvAPIHandler) Get( 26 _ context.Context, 27 req *connect.Request[dkvv1.GetRequest], 28) (*connect.Response[dkvv1.GetResponse], error) { 29 res, err := d.Store.Get(req.Msg.Key) 30 if err != nil { 31 return nil, err 32 } 33 return &connect.Response[dkvv1.GetResponse]{Msg: &dkvv1.GetResponse{Value: res}}, nil 34} 35 36func (d *DkvAPIHandler) Set( 37 _ context.Context, 38 req *connect.Request[dkvv1.SetRequest], 39) (*connect.Response[dkvv1.SetResponse], error) { 40 return &connect.Response[dkvv1.SetResponse]{}, d.Store.Set(req.Msg.Key, req.Msg.Value) 41}
As you can see, it is pretty immediate. Actually, you could even add a
Store
interface instead of using the*distributed.Store
to make it more modular. -
Therefore, let's finally add the
Store
interface tointernal/store/store.go
:1package store 2 3type Store interface { 4 Delete(key string) error 5 Set(key, value string) error 6 Get(key string) (string, error) 7}
And use it in
internal/api/dkv_handler.go
instead of*distributed.Store
:1type DkvAPIHandler struct { 2 store.Store 3}
Now, you can add tests for the
DkvAPIHandler
. The Git repository contains the tests.Since we are using ConnectRPC, we are able to use the
httptest
to implement unit test.
Implementing the "main" function of the server ๐
Usage ๐
Let's think a little about the usage of our server. Let's see how etcd is deployed. We will only use a static discovery strategy.
ETCD is configured by setting:
1# infra0
2etcd --name infra0 --initial-advertise-peer-urls http://10.0.1.10:2380 \
3 --listen-peer-urls http://10.0.1.10:2380 \
4 --listen-client-urls http://10.0.1.10:2379,http://127.0.0.1:2379 \
5 --advertise-client-urls http://10.0.1.10:2379 \
6 --initial-cluster-token etcd-cluster-1 \
7 --initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
8 --initial-cluster-state new
9
10# infra1
11etcd --name infra1 --initial-advertise-peer-urls http://10.0.1.11:2380 \
12 --listen-peer-urls http://10.0.1.11:2380 \
13 --listen-client-urls http://10.0.1.11:2379,http://127.0.0.1:2379 \
14 --advertise-client-urls http://10.0.1.11:2379 \
15 --initial-cluster-token etcd-cluster-1 \
16 --initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
17 --initial-cluster-state new
18
19# infra2
20etcd --name infra2 --initial-advertise-peer-urls http://10.0.1.12:2380 \
21 --listen-peer-urls http://10.0.1.12:2380 \
22 --listen-client-urls http://10.0.1.12:2379,http://127.0.0.1:2379 \
23 --advertise-client-urls http://10.0.1.12:2379 \
24 --initial-cluster-token etcd-cluster-1 \
25 --initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
26 --initial-cluster-state new
- Port 2380 is used for the peer-to-peer communication.
- Port 2379 is used for the client-to-server communication.
- The
--initial-cluster
flag is used to statically define the cluster members. - The
--initial-cluster-state
flag is used to define the state of the cluster. It can benew
orexisting
. - The
--initial-cluster-token
flag is used to define the token of the cluster. - The
--name
flag is used to define the name of the node. - The
--initial-advertise-peer-urls
flag is used to advertise peers URLs to the rest of the cluster. - The
--listen-peer-urls
flag is used to define the listen address for Raft. - The
--listen-client-urls
flag is used to define the listen address for the client. - The
--advertise-client-urls
flag is used to define to advertise listen addresses for the client, for client-side load-balancing.
We will use a similar strategy, but we won't be handling the "advertisement" of the peers, nor the "initial cluster token". The reason we want this approach instead of manually setting "bootstrap" and "join" flags is because we want the flags to be almost identical so that the replication of services is easy to handle on Kubernetes.
For example, on Kubernetes, one manifest is called StatefulSet
, and it is used to deploy replicated systems like the one we are developing. However, the "templating" of the manifest is quite limited, and you may need to implement a shell script to handle the templating (which is Travis Jeffery's method).
Taking the etcd approach, we can use the Pod
(the container hostname) name to pass to the name
parameter. Kubernetes also automatically handles the DNS resolution of the Pod
names. Meaning, knowing the number of replicas, we can easily generate the --initial-cluster
flag.
For example, the command line flags will be on Kubernetes would be:
1etcd --name "$(POD_NAME)" \
2 --listen-peer-urls "${PROTOCOL}://0.0.0.0:${PEER_PORT}" \
3 --listen-client-urls "${PROTOCOL}://0.0.0.0:${CLIENT_PORT}" \
4 --initial-cluster "${INITIAL_CLUSTER}" \
5 --initial-cluster-state "${INITIAL_CLUSTER_STATE}"
Each variable would be set in a configuration file:
1# .env file
2PROTOCOL=https
3PEER_PORT=2380
4CLIENT_PORT=2379
5INITIAL_CLUSTER=infra0=https://infra0.dkv.svc.cluster.local:2380,infra1=https://infra1.dkv.svc.cluster.local:2380,infra2=https://infra2.dkv.svc.cluster.local:2380
6INITIAL_CLUSTER_STATE=new
Now, about the bootstrapping, it's very simple: we use the first node from the initial-cluster
flag to bootstrap the cluster. The first node knows he will be bootstrapping since the name matches the first node in the initial-cluster
flag. The other nodes will join the cluster.
We also need to implement mutual TLS here.
To summarize, these are the flags we will use:
1 --name value Unique name for this node [$DKV_NAME]
2 --advertise-nodes value [ --advertise-nodes value ] List of nodes to advertise [$DKV_ADVERTISE_NODES]
3 --listen-peer-address value Address to listen on for peer traffic (default: ":2380") [$DKV_LISTEN_PEER_ADDRESS]
4 --listen-client-address value Address listen on for client traffic (default: ":3000") [$DKV_LISTEN_CLIENT_ADDRESS]
5 --initial-cluster value [ --initial-cluster value ] Initial cluster configuration for bootstrapping [$DKV_INITIAL_CLUSTER]
6 --initial-cluster-state value Initial cluster state (new, existing) [$DKV_INITIAL_CLUSTER_STATE]
7 --peer-cert-file value Path to the peer server TLS certificate file [$DKV_PEER_CERT_FILE]
8 --peer-key-file value Path to the peer server TLS key file [$DKV_PEER_KEY_FILE]
9 --peer-trusted-ca-file value Path to the peer server TLS trusted CA certificate file [$DKV_PEER_TRUSTED_CA_FILE]
10 --cert-file value Path to the client server TLS certificate file [$DKV_CERT_FILE]
11 --key-file value Path to the client server TLS key file [$DKV_KEY_FILE]
12 --trusted-ca-file value Path to the client server TLS trusted CA certificate file [$DKV_TRUSTED_CA_FILE]
13 --data-dir value Path to the data directory (default: "data") [$DKV_DATA_DIR]
Usage would be:
1# No mutual TLS
2dkv --name dkv-0 \
3 --initial-cluster=dkv-0=dkv-0.dkv.default.svc.cluster.local:2380,dkv-1=dkv-1.dkv.default.svc.cluster.local:2380,dkv-2=dkv-2.dkv.default.svc.cluster.local:2380 \
4 --initial-cluster-state=new \
5 --data-dir=/var/lib/dkv
Implementing the bootstrap function ๐
In cmd/dkv/main.go
, we will use urfave/cli
to implement the command line flags. Add the following code:
1package main
2
3import (
4 "context"
5 "crypto/tls"
6 "distributed-kv/gen/dkv/v1/dkvv1connect"
7 "distributed-kv/internal/api"
8 "distributed-kv/internal/store/distributed"
9 "distributed-kv/internal/store/persisted"
10 internaltls "distributed-kv/internal/tls"
11 "fmt"
12 "log"
13 "log/slog"
14 "net"
15 "net/http"
16 "os"
17 "strings"
18 "time"
19
20 "github.com/hashicorp/raft"
21 "github.com/joho/godotenv"
22 "github.com/urfave/cli/v2"
23 "golang.org/x/net/http2"
24 "golang.org/x/net/http2/h2c"
25)
26
27var (
28 version string
29
30 name string
31 listenPeerAddress string
32 listenClientAddress string
33 initialCluster cli.StringSlice
34 initialClusterState string
35 advertiseNodes cli.StringSlice
36
37 peerCertFile string
38 peerKeyFile string
39 peerTrustedCAFile string
40
41 certFile string
42 keyFile string
43 trustedCAFile string
44
45 dataDir string
46)
47
48var app = &cli.App{
49 Name: "dkv",
50 Version: version,
51 Usage: "Distributed Key-Value Store",
52 Suggest: true,
53 EnableBashCompletion: true,
54 Flags: []cli.Flag{
55 &cli.StringFlag{
56 Name: "name",
57 Usage: "Unique name for this node",
58 EnvVars: []string{"DKV_NAME"},
59 Destination: &name,
60 Required: true,
61 },
62 &cli.StringSliceFlag{
63 Name: "advertise-nodes",
64 Usage: "List of nodes to advertise",
65 EnvVars: []string{"DKV_ADVERTISE_NODES"},
66 Destination: &advertiseNodes,
67 },
68 &cli.StringFlag{
69 Name: "listen-peer-address",
70 Usage: "Address to listen on for peer traffic",
71 EnvVars: []string{"DKV_LISTEN_PEER_ADDRESS"},
72 Value: ":2380",
73 Destination: &listenPeerAddress,
74 },
75 &cli.StringFlag{
76 Name: "listen-client-address",
77 Usage: "Address listen on for client traffic",
78 EnvVars: []string{"DKV_LISTEN_CLIENT_ADDRESS"},
79 Value: ":3000",
80 Destination: &listenClientAddress,
81 },
82 &cli.StringSliceFlag{
83 Name: "initial-cluster",
84 Usage: "Initial cluster configuration for bootstrapping",
85 EnvVars: []string{"DKV_INITIAL_CLUSTER"},
86 Required: true,
87 Destination: &initialCluster,
88 },
89 &cli.StringFlag{
90 Name: "initial-cluster-state",
91 Usage: "Initial cluster state (new, existing)",
92 EnvVars: []string{"DKV_INITIAL_CLUSTER_STATE"},
93 Required: true,
94 Destination: &initialClusterState,
95 },
96 &cli.StringFlag{
97 Name: "peer-cert-file",
98 Usage: "Path to the peer server TLS certificate file",
99 EnvVars: []string{"DKV_PEER_CERT_FILE"},
100 Destination: &peerCertFile,
101 },
102 &cli.StringFlag{
103 Name: "peer-key-file",
104 Usage: "Path to the peer server TLS key file",
105 EnvVars: []string{"DKV_PEER_KEY_FILE"},
106 Destination: &peerKeyFile,
107 },
108 &cli.StringFlag{
109 Name: "peer-trusted-ca-file",
110 Usage: "Path to the peer server TLS trusted CA certificate file",
111 EnvVars: []string{"DKV_PEER_TRUSTED_CA_FILE"},
112 Destination: &peerTrustedCAFile,
113 },
114 &cli.StringFlag{
115 Name: "cert-file",
116 Usage: "Path to the client server TLS certificate file",
117 EnvVars: []string{"DKV_CERT_FILE"},
118 Destination: &certFile,
119 },
120 &cli.StringFlag{
121 Name: "key-file",
122 Usage: "Path to the client server TLS key file",
123 EnvVars: []string{"DKV_KEY_FILE"},
124 Destination: &keyFile,
125 },
126 &cli.StringFlag{
127 Name: "trusted-ca-file",
128 Usage: "Path to the client server TLS trusted CA certificate file",
129 EnvVars: []string{"DKV_TRUSTED_CA_FILE"},
130 Destination: &trustedCAFile,
131 },
132 &cli.StringFlag{
133 Name: "data-dir",
134 Usage: "Path to the data directory",
135 EnvVars: []string{"DKV_DATA_DIR"},
136 Value: "data",
137 Destination: &dataDir,
138 },
139 },
140 Action: func(c *cli.Context) error {
141 ctx := c.Context
142 // TODO
143 return nil
144 },
145}
146
147func main() {
148 _ = godotenv.Load(".env.local")
149 _ = godotenv.Load(".env")
150 if err := app.Run(os.Args); err != nil {
151 log.Fatal(err)
152 }
153}
Now, let's add the bootstrap
function:
1func bootstrapDStore(
2 storer distributed.Storer,
3 storeOpts []distributed.StoreOption,
4) (dstore *distributed.Store, err error) {
5 // Bootstrap
6 nodes := initialCluster.Value()
7 if len(nodes) == 0 {
8 return nil, fmt.Errorf("invalid initial cluster configuration (no nodes): %s", nodes)
9 }
10 bootstrapNode, _, ok := strings.Cut(nodes[0], "=")
11 if !ok {
12 return nil, fmt.Errorf("invalid initial cluster configuration: %s", nodes)
13 }
14 advertizedPeers := make(map[raft.ServerID]raft.ServerAddress)
15 for _, node := range nodes {
16 id, addr, ok := strings.Cut(node, "=")
17 if !ok {
18 return nil, fmt.Errorf("invalid initial cluster configuration: %s", node)
19 }
20 advertizedPeers[raft.ServerID(id)] = raft.ServerAddress(addr)
21 }
22
23 dstore = distributed.NewStore(
24 dataDir,
25 listenPeerAddress,
26 name,
27 advertizedPeers[raft.ServerID(name)],
28 storer,
29 storeOpts...,
30 )
31
32 bootstrap := initialClusterState == "new" && bootstrapNode == name
33 if err := dstore.Open(bootstrap); err != nil {
34 return nil, err
35 }
36 // Periodically try to join the peers
37 go func() {
38 ticker := time.NewTicker(5 * time.Second)
39 defer ticker.Stop()
40 for {
41 select {
42 case <-dstore.ShutdownCh():
43 slog.Error("stopped joining peers due to store shutdown")
44 return
45 case <-ticker.C:
46 leaderAddr, leaderID := dstore.GetLeader()
47 if leaderAddr == "" {
48 slog.Error("no leader")
49 continue
50 }
51 // Not leader
52 if leaderID != raft.ServerID(name) {
53 continue
54 }
55 members, err := dstore.GetServers()
56 if err != nil {
57 slog.Error("failed to get servers", "error", err)
58 continue
59 }
60 peers:
61 for id, addr := range advertizedPeers {
62 // Ignore self
63 if id == raft.ServerID(name) {
64 continue
65 }
66 // Ignore if already member
67 for _, member := range members {
68 if member.ID == id {
69 continue peers
70 }
71 }
72 slog.Info("request peer to join", "id", id, "addr", addr)
73 if err := dstore.Join(id, addr); err != nil {
74 slog.Error("failed to join peer", "id", id, "addr", addr, "error", err)
75 }
76 }
77 }
78 }
79 }()
80 return dstore, nil
81}
We've added a "join" loop to automatically join the peers. Otherwise, we would need to either use a "membership" service or manually join the peers.
Implementing helper functions for TLS ๐
Now, let's add some helper functions to generate the TLS configurations:
1func setupServerTLSConfig(crt, key, ca string) (*tls.Config, error) {
2 var cfg tls.Config
3 if crt != "" && key != "" {
4 certificate, err := tls.LoadX509KeyPair(crt, key)
5 if err != nil {
6 return nil, err
7 }
8 cfg.Certificates = []tls.Certificate{certificate}
9 }
10 if ca != "" {
11 caCert, err := os.ReadFile(ca)
12 if err != nil {
13 return nil, err
14 }
15 cfg.ClientCAs = x509.NewCertPool()
16 cfg.ClientCAs.AppendCertsFromPEM(caCert)
17 cfg.ClientAuth = tls.RequireAndVerifyClientCert
18 }
19
20 return &cfg, nil
21}
22
23func setupClientTLSConfig(crt, key, ca string) (*tls.Config, error) {
24 var cfg tls.Config
25 if crt != "" && key != "" {
26 certificate, err := tls.LoadX509KeyPair(crt, key)
27 if err != nil {
28 return nil, err
29 }
30 cfg.Certificates = []tls.Certificate{certificate}
31 }
32 if ca != "" {
33 caCert, err := os.ReadFile(ca)
34 if err != nil {
35 return nil, err
36 }
37 if cfg.RootCAs == nil {
38 cas, err := x509.SystemCertPool()
39 if err != nil {
40 cfg.RootCAs = x509.NewCertPool()
41 }
42 cfg.RootCAs = cas
43 }
44 cfg.RootCAs.AppendCertsFromPEM(caCert)
45 }
46 return &cfg, nil
47}
Assembling everything ๐
Now, let's assemble everything and complete the Action
function:
1 Action: func(c *cli.Context) (err error) {
2 ctx := c.Context
3 // TLS configurations
4 storeOpts := []distributed.StoreOption{}
5 if peerCertFile != "" && peerKeyFile != "" {
6 peerTLSConfig, err := setupServerTLSConfig(
7 peerCertFile,
8 peerKeyFile,
9 peerTrustedCAFile,
10 )
11 if err != nil {
12 return err
13 }
14 storeOpts = append(storeOpts, distributed.WithServerTLSConfig(peerTLSConfig))
15 }
16
17 if (peerCertFile != "" && peerKeyFile != "") || peerTrustedCAFile != "" {
18 peerClientTLSConfig, err := setupClientTLSConfig(
19 peerCertFile,
20 peerKeyFile,
21 peerTrustedCAFile,
22 )
23 if err != nil {
24 return err
25 }
26 storeOpts = append(storeOpts, distributed.WithClientTLSConfig(peerClientTLSConfig))
27 }
28
29 var tlsConfig *tls.Config
30 if certFile != "" && keyFile != "" {
31 tlsConfig, err = setupServerTLSConfig(certFile, keyFile, trustedCAFile)
32 if err != nil {
33 return err
34 }
35 }
36
37 // Store configuration
38 store := persisted.New(dataDir)
39 defer func() {
40 _ = store.Close()
41 }()
42 dstore, err := bootstrapDStore(store, storeOpts)
43 if err != nil {
44 return err
45 }
46 defer func() {
47 err := dstore.Shutdown()
48 if err != nil {
49 slog.Error("failed to shutdown store", "error", err)
50 }
51 slog.Warn("store shutdown")
52 }()
53
54 // Routes
55 r := http.NewServeMux()
56 r.Handle(dkvv1connect.NewDkvAPIHandler(&api.DkvAPIHandler{
57 Store: dstore,
58 }))
59
60 // Start the server
61 l, err := net.Listen("tcp", listenClientAddress)
62 if err != nil {
63 return err
64 }
65 if tlsConfig != nil {
66 l = tls.NewListener(l, tlsConfig)
67 }
68 slog.Info("server listening", "address", listenClientAddress)
69 srv := &http.Server{
70 BaseContext: func(_ net.Listener) context.Context { return ctx },
71 Handler: h2c.NewHandler(r, &http2.Server{}),
72 }
73 defer func() {
74 _ = srv.Shutdown(ctx)
75 _ = l.Close()
76 slog.Warn("server shutdown")
77 }()
78 return srv.Serve(l)
79 },
Smoke tests with Vagrant and K0s ๐
While we could technically use MiniKube, I prefer to use Vagrant and K0s as it is very close to the production environment. K0s is a lightweight Kubernetes distribution that is very easy to use. It's also very easy to use Vagrant to deploy a cluster of K0s.
-
Create a directory
smoke-tests
and generate an SSH key pair for smoke tests:1mkdir -p smoke-tests 2ssh-keygen -f smoke-tests/id_rsa_smoke_test -N "" 3chmod 600 smoke-tests/id_rsa_smoke_test*
-
Add a
smoke-tests/Vagrantfile
. As I'm on Linux, I will be using thelibvirt
provider. If you are on Windows or macOS, you can use thevirtualbox
provider.1# -*- mode: ruby -*- 2# vi: set ft=ruby : 3 4hosts = { 5 "manager0" => "192.168.77.10", 6 "worker0" => "192.168.77.11", 7 "worker1" => "192.168.77.12", 8} 9 10Vagrant.configure("2") do |config| 11 config.vm.provider "libvirt" 12 ssh_pub_key = File.readlines("./id_rsa_smoke_test.pub").first.strip 13 14 hosts.each do |name, ip| 15 config.vm.define name do |machine| 16 machine.vm.box = "generic/rocky9" 17 machine.vm.network "private_network", ip: ip 18 config.vm.provision "shell" do |s| 19 s.inline = <<-SHELL 20 mkdir -p ~/.ssh && touch authorized_keys 21 echo #{ssh_pub_key} >> /home/vagrant/.ssh/authorized_keys 22 echo #{ssh_pub_key} >> /root/.ssh/authorized_keys 23 systemctl stop firewalld 24 systemctl disable firewalld 25 SHELL 26 end 27 end 28 end 29end
This vagrant file will deploy 3 nodes: 1 manager and 2 workers. The nodes will be accessible via SSH using the
id_rsa_smoke_test
key pair. -
Run
vagrant up
to deploy the cluster. If something fails, you can runvagrant destroy -f
to destroy the cluster. -
Create a
smoke-tests/k0sctl.yaml
:1apiVersion: k0sctl.k0sproject.io/v1beta1 2kind: Cluster 3metadata: 4 name: k8s.example.com-cluster 5spec: 6 hosts: 7 - ssh: 8 address: 192.168.77.10 9 user: root 10 port: 22 11 keyPath: id_rsa_smoke_test 12 role: controller+worker 13 hostname: manager0 14 noTaints: true 15 privateInterface: eth1 16 privateAddress: 192.168.77.10 17 installFlags: 18 - --debug 19 hooks: 20 apply: 21 before: 22 # Set SELinux Permissive 23 - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then sed -i s/^SELINUX=.*$/SELINUX=permissive/ /etc/selinux/config; fi' 24 - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then setenforce 0; fi' 25 26 - ssh: 27 address: 192.168.77.11 28 user: root 29 port: 22 30 keyPath: id_rsa_smoke_test 31 role: worker 32 hostname: worker0 33 privateInterface: eth1 34 privateAddress: 192.168.77.11 35 installFlags: 36 - --debug 37 hooks: 38 apply: 39 before: 40 # Set SELinux Permissive 41 - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then sed -i s/^SELINUX=.*$/SELINUX=permissive/ /etc/selinux/config; fi' 42 - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then setenforce 0; fi' 43 44 - ssh: 45 address: 192.168.77.12 46 user: root 47 port: 22 48 keyPath: id_rsa_smoke_test 49 role: worker 50 hostname: worker1 51 privateInterface: eth1 52 privateAddress: 192.168.77.12 53 installFlags: 54 - --debug 55 hooks: 56 apply: 57 before: 58 # Set SELinux Permissive 59 - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then sed -i s/^SELINUX=.*$/SELINUX=permissive/ /etc/selinux/config; fi' 60 - sh -c 'if [ "$(getenforce)" != "Permissive" ] && [ "$(getenforce)" != "Disabled" ]; then setenforce 0; fi' 61 62 k0s: 63 version: '1.29.2+k0s.0' 64 dynamicConfig: false 65 config: 66 apiVersion: k0s.k0sproject.io/v1beta1 67 kind: ClusterConfig 68 metadata: 69 name: k8s.example.com 70 spec: 71 api: 72 k0sApiPort: 9443 73 port: 6443 74 installConfig: 75 users: 76 etcdUser: etcd 77 kineUser: kube-apiserver 78 konnectivityUser: konnectivity-server 79 kubeAPIserverUser: kube-apiserver 80 kubeSchedulerUser: kube-scheduler 81 konnectivity: 82 adminPort: 8133 83 agentPort: 8132 84 network: 85 calico: 86 mode: 'vxlan' 87 overlay: Always 88 mtu: 0 89 wireguard: false 90 kubeProxy: 91 disabled: false 92 mode: iptables 93 kuberouter: 94 autoMTU: true 95 mtu: 0 96 peerRouterASNs: '' 97 peerRouterIPs: '' 98 podCIDR: 10.244.0.0/16 99 provider: calico 100 serviceCIDR: 10.96.0.0/12 101 podSecurityPolicy: 102 defaultPolicy: 00-k0s-privileged 103 storage: 104 type: kine 105 telemetry: 106 enabled: false
-
Run
k0sctl apply --debug
to deploy K0s. -
Fetch the
kubeconfig
and load it into the environment:1# Inside the smoke-tests directory 2k0sctl kubeconfig > kubeconfig 3chmod 600 kubeconfig 4export KUBECONFIG=$(pwd)/kubeconfig
-
Deploy
cert-manager
:1kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.4/cert-manager.yaml
cert-manager
permits us to automatically generate TLS certificates for our services. -
Deploy a self-signed certificate cluster issuer. Create a
smoke-tests/certificates/selfsigned-cluster-issuer.yaml
:1apiVersion: cert-manager.io/v1 2kind: ClusterIssuer 3metadata: 4 name: selfsigned-cluster-issuer 5spec: 6 selfSigned: {}
And
kubectl apply -f certificates/selfsigned-cluster-issuer.yaml
. -
Deploy the CA. Create a
smoke-tests/certificates/root-ca.yaml
:1apiVersion: cert-manager.io/v1 2kind: Certificate 3metadata: 4 name: root-ca 5 namespace: cert-manager 6spec: 7 secretName: root-ca-secret 8 duration: 2160h # 90d 9 renewBefore: 360h # 15d 10 subject: 11 organizations: [My Awesome Company] 12 countries: [FR] 13 organizationalUnits: [IT] 14 localities: [Paris] 15 commonName: My Root CA 16 isCA: true 17 privateKey: 18 algorithm: RSA 19 encoding: PKCS1 20 size: 2048 21 issuerRef: 22 name: selfsigned-cluster-issuer 23 kind: ClusterIssuer
And
kubectl apply -f certificates/root-ca.yaml
. -
Deploy the CA Cluster Issuer. Create a
smoke-tests/certificates/private-cluster-issuer.yaml
:1apiVersion: cert-manager.io/v1 2kind: ClusterIssuer 3metadata: 4 name: private-cluster-issuer 5spec: 6 ca: 7 secretName: root-ca-secret
And
kubectl apply -f certificates/private-cluster-issuer.yaml
. -
Deploy the peer server certificate. Create a
smoke-tests/certificates/peer-certificate.yaml
:1apiVersion: cert-manager.io/v1 2kind: Certificate 3metadata: 4 name: dkv-peer-cert 5 namespace: default 6spec: 7 secretName: dkv-peer-cert-secret 8 duration: 2160h # 90d 9 renewBefore: 360h # 15d 10 subject: 11 organizations: [My Awesome Company] 12 countries: [FR] 13 organizationalUnits: [IT] 14 localities: [Paris] 15 commonName: dkv.default.svc.cluster.local 16 dnsNames: 17 - dkv.default.svc.cluster.local 18 - dkv-0.dkv.default.svc.cluster.local 19 - dkv-1.dkv.default.svc.cluster.local 20 - dkv-2.dkv.default.svc.cluster.local 21 issuerRef: 22 name: private-cluster-issuer 23 kind: ClusterIssuer 24 usages: 25 - server auth 26 - client auth 27 - key encipherment 28 - digital signature
We will deploy our
StatefulSet
inside thedefault
namespace with the headless servicedkv
, thus the namedkv.default.svc.cluster.local
. Thedkv-0
,dkv-1
anddkv-2
are theStatefulSet
pods (replicas).Run
kubectl apply -f certificates/peer-certificate.yaml
. -
Deploy a local storage provider:
1kubectl apply -f https://raw.githubusercontent.com/rancher/local-path-provisioner/v0.0.26/deploy/local-path-storage.yaml
Volumes can be created using the
local-path
storage class. Local path volumes are bound to one node, and are not replicated. Trying to schedule a pod where the volume is not available will result in aPending
state.To avoid this, our
StatefulSet
will set thetopologySpreadConstraints
. This will ensure that the pods are scheduled on different nodes. -
(Optional, for local/private development) Either deploy a registry, or use a public registry:
1# smoke-tests/manifests/registry/deployment.yaml 2apiVersion: apps/v1 3kind: Deployment 4metadata: 5 name: registry 6spec: 7 selector: 8 matchLabels: 9 app: registry 10 template: 11 metadata: 12 labels: 13 app: registry 14 spec: 15 containers: 16 - name: registry 17 image: registry:2 18 resources: 19 limits: 20 memory: '128Mi' 21 cpu: '500m' 22 ports: 23 - containerPort: 5000 24 name: http
1# smoke-tests/manifests/registry/service.yaml 2apiVersion: v1 3kind: Service 4metadata: 5 name: registry 6spec: 7 type: NodePort 8 selector: 9 app: registry 10 ports: 11 - port: 5000 12 nodePort: 30000 13 name: http 14 targetPort: http
1kubectl apply -f manifests/registry/
We use a
NodePort
service to expose the registry. The registry will be accessible on the port30000
of the nodes. Note that the registry is also using the memory as a storage backend. This is not recommended for production. -
(Optional) Build the image and push it to the registry:
1# In the project directory, not smoke-tests 2docker build -t 192.168.77.10:30000/library/dkv:v1 . 3docker push 192.168.77.10:30000/library/dkv:v1
Note that you may need to configure your Docker or Podman to allow insecure registries.
-
Deploy the
StatefulSet
and the headless service:1# smoke-tests/manifests/dkv/statefulset.yaml 2apiVersion: apps/v1 3kind: StatefulSet 4metadata: 5 name: dkv 6spec: 7 selector: 8 matchLabels: 9 app: dkv 10 serviceName: dkv 11 replicas: 3 12 template: 13 metadata: 14 labels: 15 app: dkv 16 spec: 17 topologySpreadConstraints: 18 - maxSkew: 1 19 topologyKey: kubernetes.io/hostname 20 labelSelector: 21 matchLabels: 22 app: dkv 23 whenUnsatisfiable: DoNotSchedule 24 securityContext: 25 runAsUser: 1000 26 runAsGroup: 1000 27 fsGroup: 1000 28 containers: 29 - name: dkv 30 # The URL is localhost since the registry is exposed on the nodes. 31 image: localhost:30000/library/dkv:latest 32 # Use this image if you don't want to build the image and deploy on a local registry. 33 # image: ghcr.io/darkness4/dkv:dev 34 imagePullPolicy: Always 35 securityContext: 36 runAsUser: 1000 37 runAsGroup: 1000 38 ports: 39 - containerPort: 2380 40 name: raft 41 - containerPort: 3000 42 name: http 43 env: 44 # DKV_NAME uses the pod name 45 - name: DKV_NAME 46 valueFrom: 47 fieldRef: 48 fieldPath: metadata.name 49 # DKV_INITIAL_CLUSTER does not need to set the protocols since we are using TCP over TLS 50 - name: DKV_INITIAL_CLUSTER 51 value: dkv-0=dkv-0.dkv.default.svc.cluster.local:2380,dkv-1=dkv-1.dkv.default.svc.cluster.local:2380,dkv-2=dkv-2.dkv.default.svc.cluster.local:2380 52 # DKV_INITIAL_CLUSTER_STATE is set to new. After deploying successfully, change it to existing. 53 - name: DKV_INITIAL_CLUSTER_STATE 54 value: new 55 - name: DKV_PEER_CERT_FILE 56 value: /etc/dkv/peer-certs/tls.crt 57 - name: DKV_PEER_KEY_FILE 58 value: /etc/dkv/peer-certs/tls.key 59 - name: DKV_PEER_TRUSTED_CA_FILE 60 value: /etc/dkv/peer-certs/ca.crt 61 # For now, the client-side will be insecure. We will change it later. 62 # - name: DKV_CERT_FILE 63 # value: "" 64 # - name: DKV_KEY_FILE 65 # value: "" 66 # - name: DKV_TRUSTED_CA_FILE 67 # value: "" 68 - name: DKV_DATA_DIR 69 value: /data 70 volumeMounts: 71 - name: data 72 mountPath: /data 73 - name: peer-certs 74 mountPath: /etc/dkv/peer-certs 75 volumes: 76 - name: peer-certs 77 secret: 78 secretName: dkv-peer-cert-secret 79 volumeClaimTemplates: # This adds one volume per pod. 80 - metadata: 81 name: data 82 spec: 83 accessModes: ['ReadWriteOnce'] 84 storageClassName: local-path 85 resources: 86 requests: 87 storage: 1Gi
1apiVersion: v1 2kind: Service 3metadata: 4 name: dkv 5spec: 6 clusterIP: None # This defines that the service is headless 7 selector: 8 app: dkv 9 ports: 10 - port: 2380 11 name: raft 12 targetPort: raft 13 - port: 3000 14 name: http 15 targetPort: http
Now run:
1kubectl apply -f manifests/dkv/
-
Now, our DKV cluster should be running! Look at Lens/K9s/kubectl to see the pods and services.
Implementing the client ๐
Usage ๐
Time to do a CLI client. We expect the usage to be something like:
1dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> get <key>
2dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> set <key> <value>
3dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> delete <key>
We can also add Raft management commands:
1dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> member-join <name> <address>
2dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> member-leave <name>
3dkvctl --cert <path> --key <path> --cacert <path> --endpoint <endpoint> member-list
Though, we will need to add a new service. For now, let's focus on the get
, set
and delete
commands.
About client-side load balancing ๐
Before implementing the client, we need to think about client-side load balancing. Right now, only the leader is able to "write" to the store. The other nodes are only able to "read" from the store. This is because we are using the Raft consensus algorithm, and the leader is the only one able to commit the logs. The other nodes are only able to replicate the logs.
This issue is covered in a blog article from the gRPC team. This article presents 3 load balancing strategies:
- Thick client-side load balancing: the client keeps track of the leader and sends the "write" requests to the leader. "read" requests are sent to any node following a load balancing strategy like round-robin.
- Proxy load balancing: the client sends the requests to a proxy, and the proxy forwards the "write" requests to the leader. "read" requests are forwarded to any node following a load balancing strategy like round-robin.
- Look-aside load balancing: the client keeps track of the leader by querying a service discovery system. The client sends the "write" requests to the leader. "read" requests are sent to any node following a load balancing strategy like round-robin.
In Travis's book, he uses the "thick client-side load balancing" strategy: he uses the pre-integrated load balancing feature in gRPC by adding a loadBalancingConfig
in the serviceConfig
. More precisely:
- He creates a custom resolver that resolves the addresses of the nodes using a getter in the gRPC service. (Another method would be to resolve from DNS.)
- He implemented a custom load balancer with a custom picker that picks the leader for "write" requests and any node for "read" requests.
Another solution made by Jillie uses gRPC health checking to load balance the requests, though, he doesn't implement a custom load balancer.
So what do we do?
As you know, I'm using ConnectRPC, which doesn't include these "embedded" features. However, we can still try to implement client-side load balancing by doing something similar to etcd:
- Nodes can advertize specific nodes using the
--advertise-nodes
flag (with a small modification to include the ID of the node). - We retrieve the list of nodes from a node using a gRPC service. This is the list of addresses with their roles. The list may or may not contain the leader.
- We balance the load of "read" requests on all nodes, either by round-robin, random, or any other strategy. Preferably, we send the "write" request to the leader, otherwise we send it to any node.
- We implement server-side request forwarding. Followers forward write requests to the leader.
This assures us that the client is able to "write" even if it hits a follower while having a minimum of load-balancing and efficiency.
Implementing server-side request forwarding ๐
This part can be quite complex. The idea is to create a custom RPC which means we need to fork hashicorp/raft
to handle this RPC. While this looks like bad practice, this is actually common as hashicorp/raft
is too coupled to the paper implementation.
Actually, Jillie or Travis's method might be much simpler. I personally chose to implement server-side request forwarding to have a high-performance load-balancing strategy. You may also be interested into go-libp2p-raft which is a fork of hashicorp/raft
that uses libp2p for the transport layer. This project is also used by IPFS. rqlite
(a distributed SQLite database) forwards via HTTP.
Anyway, this issue is still open to this day. Feel free to follow the discussion.
hashicorp/raft
packs its RPCs following this layout:
- 1
uint8
declaring the RPC type. Right now, the RPC types are:AppendEntries
: used to replicate logs.RequestVote
: used to request votes.InstallSnapshot
: used to install snapshots.TimeoutNow
: used to request a new election.
- Encoded data using
github.com/hashicorp/go-msgpack/v2/codec
.
Sadly, there is no simple way to inject a new RPC in hashicorp/raft
. Therefore, we need to fork hashicorp/raft
and add the new RPC. Technically, we could add a connection multiplexer on the listener of the StreamLayer
which can decide if the RPC is a Raft RPC or a custom RPC (Travis Jeffery's method). However, this approach is also complex.
Therefore, we won't take this approach, and instead we edit raft.go
, transport.go
and net_transport.go
to support our new RPC. The repository is available at github.com/Darkness4/raft@v1.6.3
. Inject this repository in your go.mod
file:
1require (
2 // ...
3)
4
5replace github.com/hashicorp/raft => github.com/darkness4/raft v1.6.3
You can compare the difference on github.com/hashicorp/raft/compare/main...Darkness4:raft:main (look for ForwardApply
).
Now, the Raft
struct has a new method ForwardApply
. Use this method in the apply
method of the Store
:
1 func (s *Store) apply(req *dkvv1.Command) (any, error) {
2 b, err := proto.Marshal(req)
3 if err != nil {
4 return nil, err
5 }
6+ addr, id := s.raft.LeaderWithID()
7+ if addr == "" || id == "" {
8+ return nil, errors.New("no leader")
9+ }
10 timeout := 10 * time.Second
11
12+ if id != raft.ServerID(s.RaftID) {
13+ return nil, s.raft.ForwardApply(id, addr, b, timeout)
14+ }
15
16 future := s.raft.Apply(b, timeout)
17 if err := future.Error(); err != nil {
18 return nil, err
19 }
20 res := future.Response()
21 if err, ok := res.(error); ok {
22 return nil, err
23 }
24 return res, nil
25 }
Now that we have server-side request forwarding, we can add the discovery service.
Implementing the membership service ๐
The membership service will help us manage the membership of the cluster. We will also be able to retrieve the list of nodes and their roles.
Technically, you would also implement RBAC here (using Casbin for example). RBAC is not the subject of this article though, but Travis Jeffery covers this in his book.
-
In the protobuf file, add the
MembershipAPI
service:1service MembershipAPI { 2 rpc GetServers(GetServersRequest) returns (GetServersResponse); 3 rpc JoinServer(JoinServerRequest) returns (JoinServerResponse); 4 rpc LeaveServer(LeaveServerRequest) returns (LeaveServerResponse); 5} 6 7message Server { 8 string id = 1; 9 string raft_address = 2; 10 string rpc_address = 3; 11 bool is_leader = 4; 12} 13 14message GetServersRequest {} 15message GetServersResponse { repeated Server servers = 1; } 16 17message JoinServerRequest { 18 string id = 1; 19 string address = 2; 20} 21message JoinServerResponse {} 22 23message LeaveServerRequest { string id = 1; } 24message LeaveServerResponse {}
Run
make protos
to generate the Go code. -
Create a file
internal/api/membership_handler.go
. Remember that we want the service to output the list of nodes based on the--advertise-nodes
flag.1package api 2 3import ( 4 "context" 5 dkvv1 "distributed-kv/gen/dkv/v1" 6 "distributed-kv/gen/dkv/v1/dkvv1connect" 7 "distributed-kv/internal/store/distributed" 8 9 "connectrpc.com/connect" 10 "github.com/hashicorp/raft" 11) 12 13var _ dkvv1connect.MembershipAPIHandler = (*MembershipAPIHandler)(nil) 14 15type MembershipAPIHandler struct { 16 AdvertiseNodes map[raft.ServerID]string 17 Store *distributed.Store 18} 19 20func (m *MembershipAPIHandler) GetServers( 21 context.Context, 22 *connect.Request[dkvv1.GetServersRequest], 23) (*connect.Response[dkvv1.GetServersResponse], error) { 24 srvs, err := m.Store.GetServers() 25 if err != nil { 26 return nil, err 27 } 28 protoServers := make([]*dkvv1.Server, 0, len(srvs)) 29 leaderAddr, leaderID := m.Store.GetLeader() 30 for _, node := range srvs { 31 protoServers = append(protoServers, &dkvv1.Server{ 32 Id: string(node.ID), 33 RaftAddress: string(node.Address), 34 RpcAddress: m.AdvertiseNodes[node.ID], 35 IsLeader: node.ID == leaderID && node.Address == leaderAddr, 36 }) 37 } 38 39 return &connect.Response[dkvv1.GetServersResponse]{ 40 Msg: &dkvv1.GetServersResponse{ 41 Servers: protoServers, 42 }, 43 }, nil 44} 45 46func (m *MembershipAPIHandler) JoinServer( 47 _ context.Context, 48 req *connect.Request[dkvv1.JoinServerRequest], 49) (*connect.Response[dkvv1.JoinServerResponse], error) { 50 return &connect.Response[dkvv1.JoinServerResponse]{}, m.Store.Join( 51 raft.ServerID(req.Msg.GetId()), 52 raft.ServerAddress(req.Msg.GetAddress()), 53 ) 54} 55 56func (m *MembershipAPIHandler) LeaveServer( 57 _ context.Context, 58 req *connect.Request[dkvv1.LeaveServerRequest], 59) (*connect.Response[dkvv1.LeaveServerResponse], error) { 60 return &connect.Response[dkvv1.LeaveServerResponse]{}, m.Store.Leave( 61 raft.ServerID(req.Msg.GetId()), 62 ) 63}
-
In
cmd/dkv/main.go
, add theMembershipAPIHandler
to the router:1 var ( 2+ advertiseNodes cli.StringSlice 3 // ... 4 ) 5 6 var app = &cli.App{ 7 // ... 8 Flags: []cli.Flag{ 9 // ... 10+ &cli.StringSliceFlag{ 11+ Name: "advertise-nodes", 12+ Usage: "List of nodes to advertise", 13+ EnvVars: []string{"DKV_ADVERTISE_NODES"}, 14+ Required: true, 15+ Destination: &advertiseNodes, 16+ }, 17 // ... 18 }, 19 Action: func(c *cli.Context) (err error) { 20 // ... 21 r := http.NewServeMux() 22 r.Handle(dkvv1connect.NewDkvAPIHandler(api.NewDkvAPIHandler(dstore))) 23 24+ nodes := make(map[raft.ServerID]string) 25+ for _, node := range advertiseNodes.Value() { 26+ id, addr, ok := strings.Cut(node, "=") 27+ if !ok { 28+ slog.Error("invalid initial cluster configuration", "node", node) 29+ continue 30+ } 31+ nodes[raft.ServerID(id)] = addr 32+ } 33+ r.Handle(dkvv1connect.NewMembershipAPIHandler(&api.MembershipAPIHandler{ 34+ AdvertiseNodes: nodes, 35+ Store: dstore, 36+ }))
Implementing the "main" function of the client ๐
Finally! We can implement the "main" function of the client.
Add the following code to cmd/dkvctl/main.go
:
1package main
2
3import (
4 "context"
5 "crypto/tls"
6 "fmt"
7 "log"
8 "math/rand"
9 "net"
10 "net/http"
11 "os"
12 "strconv"
13
14 dkvv1 "distributed-kv/gen/dkv/v1"
15 "distributed-kv/gen/dkv/v1/dkvv1connect"
16
17 "connectrpc.com/connect"
18 "github.com/joho/godotenv"
19 "github.com/urfave/cli/v2"
20 "golang.org/x/net/http2"
21)
22
23var (
24 version string
25
26 certFile string
27 keyFile string
28 trustedCAFile string
29 endpoint string
30)
31
32var (
33 dkvClient dkvv1connect.DkvAPIClient
34 leaderDkvClient dkvv1connect.DkvAPIClient
35 membershipClient dkvv1connect.MembershipAPIClient
36 leaderMembershipClient dkvv1connect.MembershipAPIClient
37)
38
39var app = &cli.App{
40 Name: "dkvctl",
41 Version: version,
42 Usage: "Distributed Key-Value Store Client",
43 Suggest: true,
44 EnableBashCompletion: true,
45 Flags: []cli.Flag{
46 &cli.StringFlag{
47 Name: "cert",
48 Usage: "Client certificate file",
49 EnvVars: []string{"DKVCTL_CERT"},
50 Destination: &certFile,
51 },
52 &cli.StringFlag{
53 Name: "key",
54 Usage: "Client key file",
55 EnvVars: []string{"DKVCTL_KEY"},
56 Destination: &keyFile,
57 },
58 &cli.StringFlag{
59 Name: "cacert",
60 Usage: "Trusted CA certificate file",
61 EnvVars: []string{"DKVCTL_CACERT"},
62 Destination: &trustedCAFile,
63 },
64 &cli.StringFlag{
65 Name: "endpoint",
66 Usage: "Server endpoint",
67 EnvVars: []string{"DKVCTL_ENDPOINT"},
68 Destination: &endpoint,
69 Required: true,
70 },
71 },
72 Before: func(c *cli.Context) (err error) {
73 // TLS configuration
74 var tlsConfig *tls.Config = nil
75 if (certFile != "" && keyFile != "") || trustedCAFile != "" {
76 tlsConfig, err = setupClientTLSConfig(certFile, keyFile, trustedCAFile)
77 if err != nil {
78 return err
79 }
80 }
81
82 http := &http.Client{
83 Transport: &http2.Transport{
84 AllowHTTP: true,
85 DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
86 var d net.Dialer
87 conn, err := d.DialContext(ctx, network, addr)
88 if tlsConfig != nil {
89 serverName, _, serr := net.SplitHostPort(addr)
90 if serr != nil {
91 serverName = addr
92 }
93 tlsConfig := tlsConfig.Clone()
94 tlsConfig.ServerName = serverName
95 return tls.Client(conn, tlsConfig), err
96 }
97 return conn, err
98 },
99 },
100 }
101 scheme := "http://"
102 if tlsConfig != nil {
103 scheme = "https://"
104 }
105 dkvClient = dkvv1connect.NewDkvAPIClient(http, scheme+endpoint, connect.WithGRPC())
106 membershipClient = dkvv1connect.NewMembershipAPIClient(
107 http,
108 scheme+endpoint,
109 connect.WithGRPC(),
110 )
111 leaderEndpoint := findEndpoint(c.Context)
112 if leaderEndpoint == "" {
113 leaderEndpoint = endpoint
114 }
115 leaderDkvClient = dkvv1connect.NewDkvAPIClient(
116 http,
117 scheme+leaderEndpoint,
118 connect.WithGRPC(),
119 )
120 leaderMembershipClient = dkvv1connect.NewMembershipAPIClient(
121 http,
122 scheme+leaderEndpoint,
123 connect.WithGRPC(),
124 )
125 return nil
126 },
127 // get, set, delete, member-join, member-leave, member-list
128 Commands: []*cli.Command{
129 {
130 Name: "get",
131 Usage: "Get the value of a key",
132 ArgsUsage: "KEY",
133 Action: func(c *cli.Context) error {
134 ctx := c.Context
135 key := c.Args().First()
136 if key == "" {
137 return cli.ShowCommandHelp(c, "get")
138 }
139 resp, err := dkvClient.Get(ctx, &connect.Request[dkvv1.GetRequest]{
140 Msg: &dkvv1.GetRequest{
141 Key: key,
142 },
143 })
144 if err != nil {
145 return err
146 }
147 fmt.Println(resp.Msg.GetValue())
148 return nil
149 },
150 },
151 {
152 Name: "set",
153 Usage: "Set the value of a key",
154 ArgsUsage: "KEY VALUE",
155 Action: func(c *cli.Context) error {
156 ctx := c.Context
157 key := c.Args().Get(0)
158 value := c.Args().Get(1)
159 if key == "" || value == "" {
160 return cli.ShowCommandHelp(c, "set")
161 }
162 _, err := leaderDkvClient.Set(ctx, &connect.Request[dkvv1.SetRequest]{
163 Msg: &dkvv1.SetRequest{
164 Key: key,
165 Value: value,
166 },
167 })
168 return err
169 },
170 },
171 {
172 Name: "delete",
173 Usage: "Delete a key",
174 ArgsUsage: "KEY",
175 Action: func(c *cli.Context) error {
176 ctx := c.Context
177 key := c.Args().First()
178 if key == "" {
179 return cli.ShowCommandHelp(c, "delete")
180 }
181 _, err := leaderDkvClient.Delete(ctx, &connect.Request[dkvv1.DeleteRequest]{
182 Msg: &dkvv1.DeleteRequest{
183 Key: key,
184 },
185 })
186 return err
187 },
188 },
189 {
190 Name: "member-join",
191 Usage: "Join the cluster",
192 ArgsUsage: "ID ADDRESS",
193 Action: func(c *cli.Context) error {
194 ctx := c.Context
195 id := c.Args().Get(0)
196 address := c.Args().Get(1)
197 if id == "" || address == "" {
198 return cli.ShowCommandHelp(c, "member-join")
199 }
200 _, err := leaderMembershipClient.JoinServer(
201 ctx,
202 &connect.Request[dkvv1.JoinServerRequest]{
203 Msg: &dkvv1.JoinServerRequest{
204 Id: id,
205 Address: address,
206 },
207 },
208 )
209 return err
210 },
211 },
212 {
213 Name: "member-leave",
214 Usage: "Leave the cluster",
215 ArgsUsage: "ID",
216 Action: func(c *cli.Context) error {
217 ctx := c.Context
218 id := c.Args().First()
219 if id == "" {
220 return cli.ShowCommandHelp(c, "member-leave")
221 }
222 _, err := leaderMembershipClient.LeaveServer(
223 ctx,
224 &connect.Request[dkvv1.LeaveServerRequest]{
225 Msg: &dkvv1.LeaveServerRequest{
226 Id: id,
227 },
228 },
229 )
230 return err
231 },
232 },
233 {
234 Name: "member-list",
235 Usage: "List the cluster members",
236 Action: func(c *cli.Context) error {
237 ctx := c.Context
238 resp, err := membershipClient.GetServers(
239 ctx,
240 &connect.Request[dkvv1.GetServersRequest]{
241 Msg: &dkvv1.GetServersRequest{},
242 },
243 )
244 if err != nil {
245 return err
246 }
247 fmt.Println("ID\t| Raft Address\t| RPC Address\t| Leader")
248 for _, server := range resp.Msg.GetServers() {
249 fmt.Printf(
250 "%s\t| %s\t| %s\t| %s\n",
251 server.GetId(),
252 server.GetRaftAddress(),
253 server.GetRpcAddress(),
254 strconv.FormatBool(server.GetIsLeader()),
255 )
256 }
257 return nil
258 },
259 },
260 },
261}
262
263func findEndpoint(ctx context.Context) (addr string) {
264 servers, err := membershipClient.GetServers(ctx, &connect.Request[dkvv1.GetServersRequest]{
265 Msg: &dkvv1.GetServersRequest{},
266 })
267 if err != nil {
268 return ""
269 }
270 // No server? Use the RPC that was provided.
271 if len(servers.Msg.GetServers()) == 0 {
272 return ""
273 }
274 // Filter the server and only get the servers with RPC address
275 advertisedServers := make([]*dkvv1.Server, len(servers.Msg.GetServers()))
276 for _, server := range servers.Msg.GetServers() {
277 if server.GetRpcAddress() != "" {
278 advertisedServers = append(advertisedServers, server)
279 }
280 }
281 // No advertised server? Use the RPC that was provided.
282 if len(advertisedServers) == 0 {
283 return ""
284 }
285 // Find the leader
286 for _, server := range advertisedServers {
287 // Request the first leader.
288 if server.GetIsLeader() {
289 return server.GetRpcAddress()
290 }
291 }
292
293 // No leader? Request random server.
294 idx := rand.Intn(len(advertisedServers))
295 return advertisedServers[idx].GetRpcAddress()
296}
297
298func setupClientTLSConfig(crt, key, ca string) (*tls.Config, error) {
299 var cfg tls.Config
300 if crt != "" && key != "" {
301 certificate, err := tls.LoadX509KeyPair(crt, key)
302 if err != nil {
303 return nil, err
304 }
305 cfg.Certificates = []tls.Certificate{certificate}
306 }
307 if ca != "" {
308 caCert, err := os.ReadFile(ca)
309 if err != nil {
310 return nil, err
311 }
312 if cfg.RootCAs == nil {
313 cas, err := x509.SystemCertPool()
314 if err != nil {
315 cfg.RootCAs = x509.NewCertPool()
316 }
317 cfg.RootCAs = cas
318 }
319 cfg.RootCAs.AppendCertsFromPEM(caCert)
320 }
321 return &cfg, nil
322}
323
324func main() {
325 _ = godotenv.Load(".env.local")
326 _ = godotenv.Load(".env")
327 if err := app.Run(os.Args); err != nil {
328 log.Fatal(err)
329 }
330}
Code dump. But technically, this is quite simple.
We invoke the dkvClient
and leaderDkvClient
to send the requests. We also invoke the membershipClient
and leaderMembershipClient
to send the membership requests.
Write commands tries to send the request to the leader. If the leader is not available, the request is sent to any node (findEndpoint
).
Read commands are immediately sent to the indicated node (endpoint
).
Exposing the nodes to the external network ๐
Edit the smoke-tests/manifests/dkv/service.yaml
and append:
1---
2apiVersion: v1
3kind: Service
4metadata:
5 name: dkv-nodeport
6spec:
7 type: NodePort
8 selector:
9 app: dkv
10 ports:
11 - port: 3000
12 name: http
13 nodePort: 30001
14 targetPort: http
This will make the service available on the port 30001
of the nodes. The service automatically load-balances the requests to the pods using a random strategy.
Run kubectl apply -f manifests/dkv/service.yaml
.
To healthcheck the cluster, run dkvctl --endpoint 192.168.77.10:30001 member-list
.
Setting mutual TLS on the client-side ๐
Setting TLS for dkvctl
Add one more certificate smoke-tests/certificates/dkvctl-certificate.yaml
for the client:
1apiVersion: cert-manager.io/v1
2kind: Certificate
3metadata:
4 name: dkvctl-cert
5 namespace: default
6spec:
7 secretName: dkvctl-cert-secret
8 duration: 2160h # 90d
9 renewBefore: 360h # 15d
10 subject:
11 organizations: [My Awesome Company]
12 countries: [FR]
13 organizationalUnits: [IT]
14 localities: [Paris]
15 commonName: dkvctl
16 issuerRef:
17 name: private-cluster-issuer
18 kind: ClusterIssuer
19 usages:
20 - client auth
21 - key encipherment
22 - digital signature
And run kubectl apply -f certificates/dkvctl-certificate.yaml
.
Get the certificates:
1kubectl get secret dkvctl-cert-secret -o jsonpath='{.data.tls\.crt}' | base64 -d > dkvctl.crt
2kubectl get secret dkvctl-cert-secret -o jsonpath='{.data.tls\.key}' | base64 -d > dkvctl.key
3kubectl get secret dkvctl-cert-secret -o jsonpath='{.data.ca\.crt}' | base64 -d > dkvctl-ca.crt
Setting TLS for the server
Add one more certificate for the server, client-side smoke-tests/certificates/public-certificate.yaml
:
1apiVersion: cert-manager.io/v1
2kind: Certificate
3metadata:
4 name: dkv.example.com-cert
5 namespace: default
6spec:
7 secretName: dkv.example.com-cert-secret
8 duration: 2160h # 90d
9 renewBefore: 360h # 15d
10 subject:
11 organizations: [My Awesome Company]
12 countries: [FR]
13 organizationalUnits: [IT]
14 localities: [Paris]
15 commonName: dkv.example.com
16 dnsNames:
17 - dkv.example.com
18 issuerRef:
19 name: private-cluster-issuer
20 kind: ClusterIssuer
21 usages:
22 - server auth
23 - key encipherment
24 - digital signature
And run kubectl apply -f certificates/public-certificate.yaml
.
Edit the smoke-tests/manifests/statefulset.yaml
:
1 - name: dkv
2 # ...
3 env:
4 # ...
5+ - name: DKV_CERT_FILE
6+ value: /etc/dkv/certs/tls.crt
7+ - name: DKV_KEY_FILE
8+ value: /etc/dkv/certs/tls.key
9+ - name: DKV_TRUSTED_CA_FILE
10+ value: /etc/dkv/certs/ca.crt
11 volumeMounts:
12 # ...
13+ - name: certs
14+ mountPath: /etc/dkv/certs
15 volumes:
16 # ...
17+ - name: certs
18+ secret:
19+ secretName: dkv.example.com-cert-secret
Run kubectl apply -f manifests/dkv/statefulset.yaml
.
Testing the client
Edit your DNS or your /etc/hosts
to forward dkv.example.com
to 192.168.77.10
:
1sudo su
2echo "192.168.77.10 dkv.example.com" >> /etc/hosts
Now, you can test your cluster simply by running:
1dkvctl --cacert dkvctl-ca.crt --cert dkvctl.crt --key dkvctl.key --endpoint dkv.example.com:30001 set key1 value1
Testing failures ๐
If you try to kill the leader, you will see that the client is still able to write to the store and the leader has changed:
1kubectl delete pod dkv-0
2./bin/dkvctl \
3 --cacert dkvctl-ca.crt \
4 --cert dkvctl.crt \
5 --key dkvctl.key \
6 --endpoint dkv.example.com:30001 \
7 member-list
8
9# ID | Raft Address | RPC Address | Leader
10# dkv-0 | dkv-0.dkv.default.svc.cluster.local:2380 | | false
11# dkv-1 | dkv-1.dkv.default.svc.cluster.local:2380 | | false
12# dkv-2 | dkv-2.dkv.default.svc.cluster.local:2380 | | true
If the pod is also killed for a long time, and then restarted, you will see that the state is still the same:
1kubectl scale --replicas=2 statefulset dkv
2# member-list doesn't show the health of the node, though, you will see the leader moves
3./bin/dkvctl \
4 --cacert dkvctl-ca.crt \
5 --cert dkvctl.crt \
6 --key dkvctl.key \
7 --endpoint dkv.example.com:30001 \
8 set key2 value2
9./bin/dkvctl \
10 --cacert dkvctl-ca.crt \
11 --cert dkvctl.crt \
12 --key dkvctl.key \
13 --endpoint dkv.example.com:30001 \
14 set key3 value3
15# You can verify the values with `get <key> <value>`
16
17kubectl scale --replicas=3 statefulset dkv
18./bin/dkvctl \
19 --cacert dkvctl-ca.crt \
20 --cert dkvctl.crt \
21 --key dkvctl.key \
22 --endpoint dkv.example.com:30001 \
23 get key2
24# value2
25./bin/dkvctl \
26 --cacert dkvctl-ca.crt \
27 --cert dkvctl.crt \
28 --key dkvctl.key \
29 --endpoint dkv.example.com:30001 \
30 get key3
31# value3
Do note that the Kubernetes service automatically load-balances the requests to the pods (random L2 load-balancing). Therefore, try to spam the requests and see there is one node responding badly (probability of 1/3).
Normally, there is no failure.
About Serf, the service discovery system ๐
As you know, we've implemented "static" service discovery. Travis Jeffery tried to use hashicorp/serf
, a way to implement a membership service with the gossip protocol. Basically, hashicorp/serf
would replace the "join" loop inside the bootstrapDStore
function.
hashicorp/serf
uses TCP and UDP to discover nodes, handle failures, and handle the membership of the cluster. It also has a built-in event system to handle the membership changes.
In practice, nodes would contact each other using Serf to join the cluster (a different port from Raft). An event would trigger the leader to call the Join
method of Raft. Moreover, serf
can be configured to use a shared key to authenticate the nodes.
During my implementation, I have found many issues with hashicorp/serf
:
hashicorp/serf
resolves the domain name of the nodes, and prefer to use IPs. Because of this, when a pod is deleted,hashicorp/serf
is unable to resolve the new IP address of the pod. A way to fix this is to use a Service per pod, which is a waste of resources.hashicorp/serf
uses additional ports to communicate with the nodes. We already have two ports (RPC and Raft), and adding a third port (TCP and UDP) is simply too much. ETCD don't use that many ports.hashicorp/serf
is redundant with the integrated "membership" system of Raft. Raft already handles failures and membership changes.
This is why, I've decided to scrap hashicorp/serf
and simply use a "join" loop, which works perfectly.
Conclusion ๐
This is my longest article to date, as this article is mostly my report on how I implemented a distributed key-value store using Raft and ConnectRPC. It's also a project to review my skills and my understanding of Travis Jeffery's book.
Most of the time, if you need to implement a distributed system, you would either use a distributed key-store like ETCD, or a distributed event-broker like NATS or Kafka. If you need to implement a distributed system from scratch, you probably have a complex state machine to replicate, and you would use Raft.
If you feel lost, maybe it's worth checking Travis Jeffery's book. Thought, there are some point that are ambiguous and some technical choices that can be discussed, it is in fact a great book to build a reliable, fault-tolerant distributed system. I would just recommend to NOT use logs as main "data structure" to replicate and avoid hashicorp/serf
. Either use a linked-list "ร la blockchain"-style, or use a simple state machine (the position of a robot for example).
This article doesn't show how to implement Observability and Role-Based Access Control as the implementation varies a lot depending on the use case. ConnectRPC handles OpenTelemetry differently, and RBAC is simply not the subject of this article.
I've also separated the HTTP API from the Raft API. Travis Jeffery combines both by using a connection multiplexer cmux
. If I were to implement a production ready distributed system, I would've replaced the whole network layer of Raft with ConnectRPC. This would allow me to use the same network layer for the HTTP API and the Raft API.