Read Along: Time, Clocks, and the Ordering of Events in a Distributed System

Table of Contents

In my first few months at Amazon, I needed (or at least thought I needed) to synchronize writes to a Dynamo table. My deskmate and I started diagramming an implementation for a distributed locking-mechanism. A recent grad school graduate, overhearing our conversation 1 interrupted and said, "Oh, there's like, a whole paper about this stuff by a famous guy from Microsoft."

It took me a few years to get to it, but I finally read the famous guy's paper, "Time, Clocks, and the Ordering of Events in a Distributed System." Here's what I learned.

First, a couple of definitions.

System
a collection of processes
Process
a sequence of events
Distributed System
a collection of processes which exchange messages. The time it takes to transmit messages is non-negligible compared to the time between events.

Crucially, Lamport's definition of a distributed system is based on the transmission delay associated with sending a message, but it doesn't address the reliability of the transmission medium.

Also, note that nothing about this definition restricts it to a collection of computers. A distributed system could just as easily be comprised of the individual components of a single computer or two army generals standing on distant hilltops.

The first half of the paper describes a system that imposes a total ordering on all events. This system can produce unintuitive orderings which don't correspond with the passage of time, so in the second half of the paper Lamport extends this system to impose a total ordering of events that incorporates events in physical space-time.

This post covers only the first half of Lamport's paper. Maybe one day I'll get around to posting part two.

1 Partial Ordering

Remember how the processes in a distributed system don't have to be computers? Let's make our processes people. Our messages aren't encrypted, so we'll dispense with Bob and Alice. Let's call our processes something more interesting, like Bonesaw and Andromeda.

Time to introduce our first relation: "happened-before," denoted \(\rightarrow\)

The happened-before relation must satisfy three properties:

  1. If events \(a\) and \(b\) belong to the same process, and \(a\) comes before \(b\), then \(a \rightarrow b\).
  2. If \(a\) corresponds to the sending of a message by one process, and \(b\) corresponds to the receipt of that message by another process, then \(a \rightarrow b\).
  3. The relation is transitive. That is, if \(a \rightarrow b\) and \(b \rightarrow c\) then \(a \rightarrow c\).

We call \(a\) and \(b\) "concurrent" if \(a \nrightarrow b\) and \(b \nrightarrow a\) - in other words, neither event happened first because their relative ordering is indeterminate.

Another way to define the happened-before relation is to say that \(a \rightarrow b\) when it is possible that \(a\) could have caused \(b\). Two events are concurrent if neither event could have possibly caused the other.

Notice that physical time appears nowhere in these definitions.

2 Logical Clocks

Now we're ready to bring in clocks. Unlike wall-clocks, which run continuously, our logical clocks are discrete and event-driven. That is, they only "tick" in response to events. Each process has its own clock which assigns, to each of its events, an integer representing the "time" at which the event occurred.

More formally, each process \(P_i\) has an associated clock \(C_i\) defined as a function which assigns a number \(C_i \langle a \rangle\) to event \(a\) in process \(P_i\). We define \(C\) as the entire system of clocks which assigns to any event \(b\) the number \(C \langle b \rangle\), such that \(C \langle b \rangle = C_j \langle b \rangle\) if \(b\) is an event in process \(P_j\).

For our clocks to be useful, they should ensure that if \(a\) happens before \(b\), then \(a\) happens at an earlier time than \(b\). This is called the Clock Condition.

Formally, it states that:

\begin{equation} \text{For any events } a, b, \text{if } a \rightarrow b \text{ then } C \langle a \rangle < C \langle b \rangle. \end{equation}

Lamport claims that "it is easy to see" that the converse does not hold. It wasn't easy for me, so I attempted an informal proof, which you can check here if you care to.

Applying the definition of the happened-before relation, we can restate the Clock Condition in the form of the following two conditions.

C1
If \(a\) and \(b\) are events in process \(P_i\), and \(a\) comes before \(b\), then \(C_i \langle a \rangle < C_i \langle b \rangle\).
C2
If \(a\) is the sending of a message by process \(P_i\) and \(b\) is the receipt of the message by process \(P_j\), then \(C_i \langle a \rangle < C_j \langle b \rangle\).

It's easy to define an algorithm that satisfies both conditions. We need only two implementation rules.

IR1
Each process increments its clock after each event. (Satisfies C1)
IR2
Every process timestamps outbound messages with its current clock value. The receiving process sets its clock forward if necessary to ensure that its clock value is greater than the timestamp in the message. (Satisfies C2)

The only downside of this approach is that still imposes only a partial ordering. For concurrent events the ordering is stil non-deterministic.

The remedy is simple: just arbitrarily define a total ordering on the processes.

Under this scheme, if Bonesaw and Andromeda send concurrent messages, a total ordering in which Andromeda \(<\) Bonesaw would dictate that we always order Andromeda's message before Bonesaw's (Poor Bonesaw).

And that's it. We now have a system of logical clocks which imposes a total ordering on events. Why is this useful? Because now our collection of independent processes shares a consistent, ordered, view of the system's events. We can use this as the basis for many distributed algorithms, including a mutual exclusion mechanism.

That is, I can finally implement that (still naïve) distributed lock. Let's go ahead and do that.

3 Implementation

You can find all of this code on GitHub.

Let's get some imports out of the way.

from random import randint
import logging

3.1 LogicalClock

A LogicalClock keeps track of the current time in a Process. The clock can tick in two ways:

increment
in which the time increases by 1 (i.e., a tick)
ensure_at_least
fast-forward if necessary to ensure the clock is set to at least the given time

The former is used upon sending a message (IR1). The latter is used upon receiving a message (IR2).

class LogicalClock(object):

    def __init__(self):
        self.time = 0

    def increment(self):
        self.time += 1

    def ensure_at_least(self, t):
        self.time = max(self.time, t)

3.2 Mutex

A Mutex can have at most one owner at a given time. It tracks its current owner and the number of times it has been claimed and released, respectively.

class Mutex(object):

    def __init__(self):
        self._owner = None
        self.num_claims = 0
        self.num_releases = 0

    def owner(self):
        return self._owner

We fail loudly if a Process attempts to claim the lock while it is owned by another Process. We also perform a sanity check that each claim has had a corresponding release.

    def claim(self, new_owner):
        assert self._owner is None, "Aaaaaaaaaaaagh. {} tried to claim the lock, but {} owns it.".format(new_owner, self._owner)
        logging.debug("{} claims the lock.".format(new_owner))
        self._owner = new_owner
        self.num_claims += 1
        assert self.num_claims == self.num_releases + 1

We also fail loudly if any Process other than the lock's owner attempts to release it. Upon release, we again assert that every claim has a corresponding release.

    def release(self, owner):
        assert self._owner is owner, "{} tried to release a lock that {} owns.".format(owner, self._owner)
        logging.debug("{} releases the lock.".format(owner))
        self._owner = None
        self.num_releases += 1
        assert self.num_claims == self.num_releases

3.3 Message

A Message has three components: a sender, a recipient, and a timestamp. The timestamp is the local time at which the message was sent from the perspective of the sending Process.

class Message(object):

    def __init__(self, sender, recipient, sent_at):
        self.sender = sender
        self.recipient = recipient
        self.sent_at = sent_at

    def __repr__(self):
        return "{}@{} -> {}: {}".format(self.sender.name, self.sent_at, self.recipient.name, self.content)

There are three classes of Messages: a Request for the mutex, a Release of the mutex, and an Acknowledgement of a Request by another Process.

    @classmethod
    def a_mutex_request(cls, sender, recipient, sent_at):
        m = Message(sender, recipient, sent_at)
        m.content = "REQUEST"
        return m

    @classmethod
    def a_mutex_release(cls, sender, recipient, sent_at):
        m = Message(sender, recipient, sent_at)
        m.content = "RELEASE"
        return m

    @classmethod
    def an_ack(cls, sender, recipient, sent_at):
        m = Message(sender, recipient, sent_at)
        m.content = "ACK"
        return m

3.4 Message Broker

The MsgBroker mediates communication between Processes. A Process sends messages by posting the message to the MsgBroker, which enqueues the message for delivery.

class MsgBroker(object):

    def __init__(self):
        self.queue = {}

    def send_message(self, sender, recipient, msg):
        queue_key = (sender, recipient)
        outbox = self.queue.get(queue_key, [])
        self.queue[queue_key] = outbox
        outbox.append(msg)

We add a random delay to the delivery of messages to simulate network latency. As Lamport points out, we are making the unrealistic assumption that all messages from a particular process are not only guaranteed to arrive, but are guaranteed to arrive in the same order in which they were sent. In the real world we would use a protocol that allows the receiver to detect when messages are lost or delivered out of order.

    def deliver(self):
        for (sender, recipient), outbox in self.queue.items():
            while outbox and randint(1, 20) == 1:
                msg = outbox.pop(0)
                logging.debug("[MSG]: {}".format(msg))
                recipient.receive_message(sender, msg)

3.5 Process

Each process is uniquely identified by a name. Processes coordinate access to a shared resource via the_lock and communicate via a msg_broker.

class Process(object):

    def __init__(self, name, the_lock, msg_broker, num_peers):
        self.name = name
        self.the_lock = the_lock
        self.msg_broker = msg_broker
        self.num_peers = num_peers

Every process maintains its own logical clock. Every claim on the lock is stored in the request queue. Upon receiving a message from another process, we record the current time and associate it with the sender of the message in the latest_ack_from dict. We will refer to this dict to determine whether another process has acknowledged our request to claim the lock.

        self.clock = LogicalClock()
        self.request_queue = []
        self.latest_ack_from = {}

To request the lock we send a timestamped message to each of our peers.

    def request_lock(self, peers):
        for p in peers:
            msg = Message.a_mutex_request(self, p, self.time())
            self.send_message(p, msg)

We also add the timestamped request to our own request queue. Then we increment the clock. This clock tick corresponds to IR1, which dictates that the clock be incremented between successive events.

        self.request_queue.append(Message.a_mutex_request(self, self, self.time()))
        self.clock.increment()

A process issues a request for the lock about once every 10 cycles of our simulation provided that it has no requests pending. We can run the simulation under different degrees of lock contention by adjusting the probability that the process "wants" the lock.

    def wants_lock(self):
        return not self.has_request_pending() and randint(1, 10) == 1

    def has_request_pending(self):
        return any(x for x in self.request_queue if x.sender is self)

If a process owns the lock, its request is guaranteed to be at the front of the queue. Thus, to release the lock we pop the head of the queue and notify the other processes. Sending these messages corresponds to an event, so we increment the clock in accordance with IR1.

    def release_lock(self, peers):
        assert self.the_lock.owner() is self, "Tried to release a lock we don't own!"
        req = self.request_queue.pop(0)
        assert req.sender is self, "We somehow claimed the lock without being at the front of the queue!"
        self.the_lock.release(self)
        for p in peers:
            self.send_message(p, Message.a_mutex_release(self, p, self.time()))
        self.clock.increment()

We can adjust how long a process holds the lock here. Increasing the expected lock hold time while holding constant the probability that a process wants the lock will increase lock contention.

    def ready_to_release(self):
        return randint(1, 2)

According to IR2, when a process receives a message it sets its clock to a value greater than or equal to its present value and greater than timestamp on the incoming message.

    def receive_message(self, sender, msg):
        self.clock.ensure_at_least(msg.sent_at + 1)

The first type of message we must handle is a request for the lock. After we place the request on the request_queue, we send a timestamped acknowledgement to the process that claimed the lock.

        if msg.content == "REQUEST":
            self.request_queue.append(msg)
            self.send_message(sender, Message.an_ack(self, sender, self.time()))

The second type of message is a release of the lock. The (now former) owner of the lock is guaranteed to be at the head of the queue, so we just pop the head of the queue.

        elif msg.content == "RELEASE":
            logging.debug("{} processing release by {}. Before removing:".format(self, sender))
            logging.debug(str(self.request_queue))
            self.request_queue.pop(0)
            logging.debug("After: {}:".format(self.request_queue))

The third and final type of message is an acknowledgement of our claim from another process. In this case we simply record the time of the acknowledgement and the process that sent it.

        elif msg.content == "ACK":
            self.latest_ack_from[sender.name] = msg.sent_at

We could potentially claim the lock in response to two types of events: an acknowledgement of our claim (if no other process owns the lock) or a notification of a release. When we receive either of these messages, we check whether we have the right to claim the lock. If we do, we take it.

        if msg.content in ["ACK", "RELEASE"] and self.can_claim_lock():
                self.the_lock.claim(self)

We can claim the lock if and only if the following two conditions hold:

  1. Our request for the lock is ordered before every other request.
  2. We have received an acknowledgement of our claim from every other process which is timestamped after our request.
    def can_claim_lock(self):
        first_req = self.get_request_queue()[0]
        if first_req.sender is self:
            acks = [sender for (sender, t) in self.latest_ack_from.iteritems()
                        if t > first_req.sent_at]
            return len(acks) == self.num_peers

Ordering events by their timestamp would produce a partial ordering, but we need a total ordering. Thus, we break ties using an alphabetical ordering based on the process' name. Note that this requires that process names be unique.

    @classmethod
    def total_ordering(cls, msg):
        return (msg.sent_at, msg.sender.name)

    def get_request_queue(self):
        self.request_queue.sort(key=Process.total_ordering)
        return self.request_queue

Processes delegate the delivery of messages to the Message Broker.

    def send_message(self, recipient, msg):
        self.msg_broker.send_message(self, recipient, msg)

Each Process maintains a local view of time.

    def time(self):
        return self.clock.time

3.6 Driver

Our driver program is simple. We instantiate 10 processes, a lock, and a message broker. In each time cycle, every process has the chance to (randomly) request the lock. If the lock is owned, the owner randomly releases the lock. The message broker then delivers pending messages.

When the program exits, we print some summary statistics to show how many times the lock was claimed and released, as well as how many messages were enqueued for delivery. A non-zero exit status indicates that none of our assertions failed and our distributed algorithm worked.

import argparse

if __name__ == "__main__":

    parser = argparse.ArgumentParser('Simulate Processes coordinating mutual exclusion.')
    parser.add_argument(
        '-v', '--verbose',
        help='print every message exchanged between processes',
        action='store_true')
    args = parser.parse_args()
    log_level = logging.DEBUG if args.verbose else logging.INFO
    logging.basicConfig(level=log_level)
    the_lock = Mutex()
    msg_broker = MsgBroker()

    num_processes = 10

    a = Process("Andromeda", the_lock, msg_broker, num_processes - 1)
    b = Process("Bonesaw", the_lock, msg_broker, num_processes - 1)
    c = Process("Charybda", the_lock, msg_broker, num_processes - 1)
    d = Process("Doofus", the_lock, msg_broker, num_processes - 1)
    e = Process("Egbertina", the_lock, msg_broker, num_processes - 1)
    f = Process("Fido", the_lock, msg_broker, num_processes - 1)
    g = Process("Gary", the_lock, msg_broker, num_processes - 1)
    h = Process("Hufflepuff", the_lock, msg_broker, num_processes - 1)
    i = Process("Iola", the_lock, msg_broker, num_processes - 1)
    j = Process("Jethro", the_lock, msg_broker, num_processes - 1)

    processes = [a, b, c, d, e, f, g, h, i, j]
    assert len(processes) == num_processes

    SIMULATION_NUM_CYCLES = 10000

    logging.info("Starting simulation with {} processes...".format(len(processes)))
    for t in range(1, SIMULATION_NUM_CYCLES):
        for p in processes:
            peers = [x for x in processes if not x is p]
            if p.wants_lock():
                p.request_lock(peers)
            if p == the_lock.owner() and p.ready_to_release():
                p.release_lock(peers)

        msg_broker.deliver()

    logging.info("Done. Simulation finished with no errors.")
    logging.info("The lock was claimed {} times and released {} times"
                .format(the_lock.num_claims, the_lock.num_releases))

Let's run it…

../code/tcoeds/main.py
INFO:root:Starting simulation with 10 processes...
INFO:root:Done. Simulation finished with no errors.
INFO:root:The lock was claimed 357 times and released 357 times

4 Appendix

4.1 Proof that converse of Clock Condition does not hold

|     |
|     |
a2    |
|     b1
a1    |
|     |
A     B

To see why, consider the above scenario and assume that both the Clock Condition and its converse are true.

That is for any events \(a\) and \(b\), we have the Clock Condition:

\begin{equation} \label{eq:clock-condition} \text{if } a \rightarrow b \text{ then } C \langle a \rangle < C \langle b \rangle \\ \end{equation}

And its converse:

\begin{equation} \label{eq:clock-condition-converse} \text{if } C \langle a \rangle < C \langle b \rangle \text{ then } a \rightarrow b \end{equation}

\(a_1\) and \(a_2\) occur in the same process, so by the first rule of the happened-before relation

\begin{equation} \label{eq:a1} a_1 \rightarrow a_2 \end{equation}

No messages are exchanged between processes \(A\) and \(B\), so by the second and third rules of the happened-before relation

\begin{equation} \label{eq:a2} a_1 \nrightarrow b_1 \end{equation} \begin{equation} \label{eq:a3} a_2 \nrightarrow b_1 \end{equation}

If \(C \langle a_1 \rangle < C \langle b_1 \rangle\), then \ref{eq:clock-condition-converse} implies that \(a_1 \rightarrow b_1\), which contradicts \ref{eq:a2}. Thus,

\begin{equation} \label{eq:a4} C \langle a_1 \rangle \ge C \langle b_1 \rangle \end{equation}

If \(C \langle a_1 \rangle > C \langle b_1 \rangle\), then \ref{eq:clock-condition-converse} implies that \(b_1 \rightarrow a_1\), which also contradicts \ref{eq:a2}. Thus, by \ref{eq:a4}

\begin{equation} \label{eq:a5} C \langle a_1 \rangle = C \langle b_1 \rangle \end{equation}

By a similar argument, we have that

\begin{equation} \label{eq:a6} C \langle a_2 \rangle = C \langle b_1 \rangle \end{equation}

Combining \ref{eq:a5} and \ref{eq:a6}, we have that \(C \langle a_1 \rangle = C \langle a_2 \rangle = C \langle b_1 \rangle\). But we have in \ref{eq:a1} that \(a_1 \rightarrow a_2\), so to satisfy \ref{eq:clock-condition} requires that \(C \langle a_1 \rangle < C \langle a_2 \rangle\). This leads to a contradiction, so \ref{eq:clock-condition-converse} does not hold.

Footnotes:

1

Lest the corporate overlords responsible for high-density seating tout this as a success story, I should point out that said coworker was singing AC/DC's "Highway to Hell" and playing the air drums with No. 2 pencils just before joining our conversation. Odd guy. True story.