Fault-Tolerant Clustering and Replication with PostgreSQL 10 and Zookeeper

Reference: https://github.com/Abraxos/vagrant-postgres-replication

Overview

The most recent version of PostgreSQL, version 10, has added logical replication. This makes certain desirable enterprise database features possible. Specifically, we can now set up a geographically distributed system of database read-replicas all receiving database updates from a primary write-server. However, any distributed system requires fault-tolerance. This write-up focuses on introducing fault-tolerance features into a system of PostgreSQL 10 machines using Zookeeper for distributed computing primitives such as leader election. It describes what I learned, what features we can gain from both systems, and which elements need to be scripted by us. The final, result, in my opinion proves that we can have a highly reliable, fault-tolerant system with very little custom code to link Zookeeper functionality to PostgreSQL logical replication. As proof of this, I created proof of concept code and a basic demonstration with Vagrant, however, the key takeaway from this should be how we can conceptually make a fault-tolerant distributed solution with relatively little code of our own, and not a focus on the specific implementation that I generated as a proof of concept.

Objective: To be able to run a geographically distributed PostgreSQL database with one node receiving all INSERT, UPDATE, and DELETE queries and publishing them to a set of subscriber database serers. This system must be able to withstand network connectivity issues, the loss of all but one server, including the primary, and the possibility of nodes rejoining the system with minimal data loss and continue to function.

The Software Involved

Logical Replication in PostgreSQL 10

This is the key feature. The general mechanism by which logical replication functions in PostgreSQL 10 is as follows:

  • A database can be configured to publish all INSERT, UPDATE, and DELETE queries (IUDs) to either a given table, or all tables. This is accomplished with the CREATE PUBLICATION <pub name> FOR ALL TABLES and the CREATE PUBLICATION <pub name> FOR TABLE <table list> queries. More details can be found here.
  • Please note that TRUNCATE queries are not replicated.
  • Another database server, the subscriber, makes a connection to the publisher using a user with replication permissions and creates a subscription that corresponds to an existing publication.
  • CREATE SUBSCRIPTION <sub name> CONNECTION <dsn> PUBLICATION <pub name>. Reference.
  • This creates something called a subscription slot on the publisher. Writes to a replicated table are written to the subscription slot and then sent over to the subscriber node.
    • Subscription slots exist on the publisher, and if a subscriber cannot be reached, the subscription slot will begin to fill up with data. Unless the subscriber associated with the slot comes back, the slot may fill up the whole drive and cause the database server to crash. This possibility is explored extensively below.
  • Subscription/publication relationships can have multiple constraints on them, for example, we can control the set of tables being replicated, or we can add requirements so that only some IUDs are published. See "Parameters" section
  • A subscription may either be synchronous, or asynchronous. If its synchronous, a transaction will be considered complete only if all the subscribers have received the associated data. This means that IUD queries are slow, but they are guaranteed to make it onto every subscriber. On the other hand asynchronous queries are much faster, but may cause data loss if nodes go down and not all data is replicated yet due to network delays.
  • We can have as many subscribers as we like, so long as we have custom-named subscription slots for each one. Since subscription slots are named after their subscription by default, the easiest approach is to name each subscription something like sub-<hostname> assuming that each server in your system has a unique hostname.
  • In order to avoid having to resolve conflicts, we are operating under the constraint that only the publisher node can be written to, and there is only one publisher node. The subscribers are for read-queries only. Other alternative setups with conflict resolution are possible, but they are not described here.

More details can be found here: https://www.postgresql.org/docs/10/static/logical-replication.html

Zookeeper

Zookeeper is a service that provides distributed computing primitives which we can use to provide fault-tolerance to the system. As you can see above, aside from the concept of replication slots, which can cause problems of their own, PostgreSQL 10 logical replication lacks features that would provide fault tolerance. In order to add those, we need something like Zookeeper.

Its worth noting that it is not necessary to use Zookeeper specifically, but there is a need for something like it. A valid alternative to both Zookeeper and Tooz (described below) would be something like Atomix.

  • Zookeeper, amongst many of its features, provides leader election, and the ability to know who left/joined the cluster. This is the only thing we really need from it.
  • If we need to expand the system with more complex functionality, Zookeeper provides distributed data structures as well. This is not described in this document though.

Tooz

Tooz is a layer on top of zookeeper that provides distributed computing primitives to a python programming environment. It doesn't need to use Zookeeper, it can use other back-ends, but Zookeeper is considered the standard. It is by far the most stable and best supported. There are multiple other systems that can be used (as mentioned above, Atomix can provide the functionality of both Zookeeper and Tooz).

This is, in general, all the primitives we really need to create a fault-tolerant cluster of PostgreSQL 10 databases.

Custom Clustering System

Basic Algorithm Overview

The basic operating mechanism of the clustering system is, in general, event-driven, and responds to the following conditions with the following actions:

  1. When a new node joins the group for the first time, as defined by Zookeeper, one of the following actions is taken:
  2. If the node is the only node in the group, it is elected as the primary and configured as a write-node, more specifically, as a publisher with either all tables, or some tables, depending on your desired configuration.
  3. If there are other nodes already in the group, it executes a call to find out who the primary is, and then configures itself to subscribe to the primary node.
  4. When a node becomes unavailable, the following actions are taken depending on whether the node is the publisher or not:
  5. If the node is the leader of the cluster, which means it is also the publisher, then the other nodes execute leader election automatically (this is an automatic function of Zookeeper/Tooz) and when a node is elected leader, all other nodes are notified, must sync up their replicated tables to the new leader. This is described further below.
  6. If the node is a subscriber, nothing happens immediately. However, the system must keep polling the size of the subscription slot associated with the node that went down. Once the node has been down for either a configured timeout, or when the subscription slot takes up a pre-configured amount of space on the disk, then the publisher will delete the subscription slot in question.

Key Primitives

There are several primitives, from the perspective of this clustering logic, that need to be implemented in order for this to function properly. Many of them need to be implemented as transactions.

Configuring PostgreSQL 10

Installation of postgres should be done essentially how it is recommended on the postgres website. In my case, I was using the Ubuntu package manager: https://www.postgresql.org/download/. On Ubuntu 16.04, that looked like the following:

echo 'deb http://apt.postgresql.org/pub/repos/apt/ xenial-pgdg main' > /etc/apt/sources.list.d/pgdg.list
apt-get install wget ca-certificates
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
apt-get update; apt-get upgrade -y
apt-get install postgresql-10 -y

Then you will need to add your replication user (described below) and make sure that your database is remotely accessible to the appropriate users. This likely involves editing the /etc/postgresql/10/main/pg_hba.conf file, as well as executing the following two queries as the postgres user:

sudo -u postgres psql -d testdb

Launches the shell as the postgres user, and then...

ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET listen_addresses = '*'; -- Or whatever you want to make sure that you can access remotely

Finally, restart postgres: service postgresql restart

Configuring Zookeeper

This is all outlined in more detail here: https://zookeeper.apache.org/doc/r3.3.3/zookeeperStarted.html in the Running Replicated ZooKeeper section.

Installation of zookeeper in Ubuntu is fairly simple, its just: apt-get install zookeeperd -y however, configuration is non-trivial. You will need to make sure that ports 2888, and 3888 are open on each machine in your cluster (these are default values, they can be changed) and then you will need to edit the /etc/zookeeper/conf/zoo.cfg file, to add information about every other node in the cluster:

server.1=pg-node-1:2888:3888
server.2=pg-node-2:2888:3888
server.3=pg-node-3:2888:3888
server.4=pg-node-4:2888:3888
server.5=pg-node-5:2888:3888

Note that the values on the left, like server.1 etc, need to be precisely like above, where the number after server. is the number of the node in the cluster. The value on the right is simply the hostname or IP address with two ports behind it. In my case, I used the default ports of 2888 and 3888 and each of my nodes had a hostname that corresponded with their ID number: pg-node-<num>. I also ended up editing each node's hosts file to make sure that they could all reach each other by said hostname.

Then you need to tell zookeeper precisely which node it is by executing the following command to set the ID of the node into the myid file:

echo '1' > /var/lib/zookeeper/myid

Finally, you need to restart Zookeeper:

service zookeeper restart

Concerning the hosts file: There is a notable concern with the hosts file in linux. There may be difficulties if you do not make sure to configure the IP address of the node in question properly. The short version is that you wanna comment out the first line of Linux's default hosts file, and then append a list of all the nodes in your cluster, including the current one into it. If you want to, you can see how its done in my vagrant file.

Creating a User with Replication Privileges

In order to create a user who has rights to replicate on a given database, we need to make sure that there are sufficient privileges for them to do so. The following command can be used to create a user on a vagrant testing box and in turn demonstrates how this can be done:

CREATE ROLE  vagrant SUPERUSER LOGIN REPLICATION PASSWORD 'vagrant';

Using Tooz to React To Leader Election and Group Joins/Departures

This is where we have to actually write custom code.

from tooz import coordination
import time
import arrow
from database import *
from socket import gethostname

leader = None

def configure_db_publish():
    configure_publisher()


def configure_db_subscribe(node):
    configure_subscriber(PG_NODES[node])


def configure_db_standalone():
    configure_standalone()


def on_group_join(event):
    print('{}-EVENT: {} joined {}'.format(gethostname(),
                                          event.member_id.decode(),
                                          event.group_id.decode()))

def on_group_leave(event):
    print('{}-EVENT: {} left {}'.format(gethostname(),
                                        event.member_id.decode(),
                                        event.group_id.decode()))

def on_elected_leader(event):
    global leader
    leader = gethostname()
    print('{}-EVENT: I am now the leader of {}'.format(gethostname(),
                                                       event.group_id.decode()))
    print('{}-EVENT: Reconfiguring database to publish'.format(gethostname()))
    configure_db_publish()


def join(name):
    while 42:
        try:
            return coordinator.join_group_create(name)
        except coordination.MemberAlreadyExist:
            print('{}-EVENT: Someone with the name {} already part of the cluster. Retrying...'\
                  .format(gethostname(), gethostname()))
            time.sleep(1)

start_local_postgres()

coordinator = coordination.get_coordinator('kazoo://', gethostname().encode())
coordinator.start()

group_name = 'pg-cluster-1'.encode()
request = join(group_name)

coordinator.watch_join_group(group_name, on_group_join)
coordinator.watch_leave_group(group_name, on_group_leave)
coordinator.watch_elected_as_leader(group_name, on_elected_leader)

configure_db_standalone()

start = time.time()
try:
    while 42:
        req = coordinator.get_leader(group_name)
        current_leader = req.get()
        current_leader = None if not current_leader else current_leader.decode()
        print('{}-HEARTBEAT: {} is leader of {} at: {}'.format(gethostname(),
                                                               leader,
                                                               group_name.decode(),
                                                               arrow.utcnow()))
        if leader != current_leader:
            print('{}-EVENT: Leader change detected! {} -> {}'.format(gethostname(),
                                                                      leader,
                                                                      current_leader))
            if not current_leader:
                print('{}-EVENT: No leader detected, stopping subscriptions')
                configure_db_standalone()
            else:
                leader = current_leader
                print('{}-EVENT: Subscribing database to: {}'.format(gethostname(),
                                                                     leader))
                configure_db_subscribe(leader)
        coordinator.heartbeat()
        coordinator.run_watchers()
        time.sleep(1)
except KeyboardInterrupt as e:
    print("Keyboard Interrupt. Stopping...")
    stop_local_postgres()
    coordinator.stop()
    print("Done!")
    exit(0)

Expect this section to get updated as I provide more functionality, however, the code above demonstrates in basic terms how to recover from the loss of a publisher (or any node) in the network.

Synchrony in an Asynchronous System

Coming soon.

Synchronizing Data with a New Primary

Truncate Approach
Metadata/Delta Approach
Clean Failovers