Listening to Postgres: How LISTEN and NOTIFY Syntax Promote High Availability at the Application Layer

November 08, 2022

An often overlooked aspect of building a highly available platform is the role of the application layer itself. There is usually so much focus on keeping the database online, having standby nodes, maintaining backups of various description, deploying a cluster management utility like EFM or repmgr, and so on. But what good is a robust database cluster if it’s being fundamentally misused?

How “available” is Postgres? How do we quantify that? We can consider metrics like RPO or RTO, but that only guarantees the data itself is safe. What about latency? If every query requires five seconds to execute, does it really matter that the database is never offline? It’s possible to make crucial architectural mistakes that contribute to exactly this problem, and we’re here to provide a remedy.


The social phenomenon

Everyone is building some manner of Social Network stack these days. Buoyed by the endless proliferation of new social apps, APIs, services, development frameworks, libraries, and kitchen sinks, everyone wants to build the Next Big Thing. It also makes a great topic in classroom environments due to the straightforward data model, rife with relatable examples. So let’s build a very rudimentary database for just such an application.

  • We want it to have the following attributes:
  • Members are unique.
  • Members can follow other members.
  • Members can post content.
  • Followers are informed of new posts by someone they’re following.

Given those constraints, our model might look something like this:

CREATE TABLE member (
  member_id  INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
  handle     TEXT NOT NULL UNIQUE,
  join_date  TIMESTAMPTZ
);

CREATE TABLE post (
  Post_id    INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
  Member_id  INT NOT NULL REFERENCES member,
  Content    TEXT,
  post_date  TIMESTAMPTZ
);

CREATE TABLE follower (
  Member_id    INT REFERENCES member,
  follower_id  INT REFERENCES member,
  PRIMARY KEY (member_id, follower_id)
);

CREATE TABLE notification (
  notification_id  INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
  post_id          INT REFERENCES post,
  Content          TEXT NOT NULL,
  Notify_date      TIMESTAMPTZ
);

CREATE TABLE notify_member (
  Notification_id  INT NOT NULL REFERENCES notification,
  Member_id        INT REFERENCES member,
  PRIMARY KEY (notification_id, member_id)
);

There’s a table to hold members, a table to hold posts from those members, a table to track who is subscribed to who, a table to track the content of a notification, and a final table so we know which members received which notification. It’s not much, and it certainly won’t win any awards for functionality, but it gets the job done.

What about data? Well, it just so happens that this new platform is named Skeletor Social, for all devoted fans of Skeletor. Good ol’ Skeletor joined on the first day the app opened, and was promptly followed by a million screaming groupies. Let’s save that for posterity:

INSERT INTO member (handle, join_date) VALUES ('skeletor', '2010-03-17');

INSERT INTO member (handle, join_date)
SELECT 'skeletor_groupie' || a.id, '2010-03-17'::DATE + (INTERVAL '5m') * a.id
  FROM generate_series(1, 1000000) a(id);

INSERT INTO follower (member_id, follower_id)
SELECT 1, a.id + 1
  FROM generate_series(1, 1000000) a(id);

ANALYZE member;
ANALYZE follower;

A million followers isn’t much these days—poor Skeletor—but it’s still nice to feel loved.


Trigger me timbers

With the requisite data model in place, we’re faced with a dilemma: how to inform Skeletor’s rabid fanbase of his latest musings. A simple approach might be to use a trigger to capture each post and immediately notify all followers. It certainly simplifies application design and prevents accidents, so let’s do it!

All we’ll need is the trigger function and declaration itself. Here’s how that might look:

CREATE OR REPLACE FUNCTION f_notify_followers()
RETURNS TRIGGER AS $$
DECLARE
  poster TEXT;
  notify_string TEXT;
BEGIN
  SELECT INTO poster handle
    FROM member
   WHERE member_id = NEW.member_id;

  notify_string := format('%s just posted something pretty dope!', poster);

  WITH notif AS (
      INSERT INTO notification (post_id, content, notify_date)
      VALUES (NEW.post_id, notify_string, now())
      RETURNING notification_id
  )
  INSERT INTO notify_member (notification_id, member_id)
  SELECT notif.notification_id, follower_id
    FROM follower, notif
   WHERE member_id = NEW.member_id;

  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER t_notify_followers_ai
AFTER INSERT ON post
   FOR EACH ROW EXECUTE FUNCTION f_notify_followers();

There’s no reason to overcomplicate this. After each post is recorded, we just retrieve the member name to emit a nifty bit of flavor-text and create a corresponding notification. Then we link that notification to every follower of whomever made the post. Easy eh? Sure Skeletor is probably doing most of the posting, but fans can gab amongst themselves as well.

Now Skeletor wants to post his latest missive regarding his protracted series of disagreements with He-Man. Let’s see what happens:

INSERT INTO post (member_id, content, post_date)
VALUES (1, 'I''l get you He-Man!', now());


INSERT 0 1
Time: 10697.268 ms (00:10.697)

Oh. Oh dear! Did that post take 10 seconds to resolve?! We’re trying to build a viable Social Media platform here! If every post incurs latency based on how many followers a member has, not only will it be unusably slow, but we’d end up purchasing expensive server hardware to compensate.

How can we fix this?


Keeping up to date

The primary issue with our platform right now is that each post is glacially slow, and users won’t accept that for long. The quintessential difficulty is that transactions don’t resolve until each sequential operation completes. Thus no post will “post” until all followers are notified. If our business logic is too complicated, every subsequent operation will suffer additional delays. In this case, each post invokes a costly side-effect of potentially performing millions of other actions. We’re being faced by write amplification, but are suffering the effects immediately.

If there was some way we could asynchronously inform followers of posts, that would solve everything. Sure, there are still one million notifications happening in the background every time Skeletor posts, but not everyone needs to be informed immediately. And it’s not like anyone pays attention to their notifications anyway, so we could miss a few and still be fine. It’s the posts that actually matter.

As it turns out, Postgres provides NOTIFY and LISTEN syntax for just this scenario. What if, instead of performing the follower notifications directly, our trigger simply sent a notification to some helper daemon that records them instead? All we need to do is write a very simple program that sends a LISTEN command to Postgres, and monitors the channel for NOTIFY results. Every time a payload arrives from a Postgres NOTIFY, the program springs into action and starts sending post notifications. Easy!

Since this is just a proof of concept, let’s see that in Python:

import psycopg

conn = psycopg.connect("dbname=skelechat", autocommit=True)

channel = psycopg.connect("dbname=skelechat", autocommit=True)
channel.execute("LISTEN user_post")
gen = channel.notifies()

for notify in gen:
    notify_id = int(notify.payload)
    print("Informing followers of new post: %s" % notify_id)

    with conn.cursor() as cur:
      cur.execute("""
        INSERT INTO notify_member (notification_id, member_id)
        SELECT n.notification_id, f.follower_id
          FROM notification n
          JOIN post p USING (post_id)
          JOIN follower f USING (member_id)
         WHERE n.notification_id = %s
      """, (notify_id,))

gen.close()
print("there, I stopped")


Nothing complicated here. But do note that we opened two connections to the database. The first is for inserting the post notifications, and the second is to strictly listen for Postgres NOTIFY events. We could do both on the same connection, but if a NOTIFY is triggered while we’re in that INSERT block, we would miss it. Rather than risk that, a dedicated connection is a small price to pay to ensure we get most of the NOTIFY events.

This also requires a corresponding modification to the trigger, since it only needs to send a NOTIFY event rather than perform the inserts itself. Now the trigger function looks like this:

CREATE OR REPLACE FUNCTION f_notify_followers()
RETURNS TRIGGER AS $$
DECLARE
  poster TEXT;
  notify_string TEXT;
  id_payload TEXT;
BEGIN
  SELECT INTO poster handle
    FROM member
   WHERE member_id = NEW.member_id;

  notify_string := format('%s just posted something pretty dope!', poster);

  INSERT INTO notification (post_id, content, notify_date)
  VALUES (NEW.post_id, notify_string, now())
  RETURNING notification_id INTO id_payload;

  PERFORM pg_notify('user_post', id_payload);

  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

Once again, Skeletor voices his displeasure upon once again being thwarted by He-Man, and now we see the fruits of our labor:

INSERT INTO post (member_id, content, post_date)
VALUES (1, 'I''l get you He-Man!', now());

INSERT 0 1
Time: 5.564 ms

A 5ms execution time is an improvement of nearly 2000x. Now Skeletor can rant at dozens of posts per second without anything to stall his diatribe. Progress!


A queued alternative

There’s still a kind of flaw with relying on NOTIFY events from Postgres: they’re ephemeral. If we stopped the listener app for any reason, nobody would receive notifications at all. If we later started the app again, notifications would resume, but any missed notifications would be lost forever. There’s also a small stampede problem if we want multiple iterations of the notification app to run simultaneously. We understand that post notifications are hardly critical data, but we can do better.

Consider the structure of our tables for a moment. We have a single record for every single notification, so why not add an attribute that says we handled the corresponding follower notifications? All we need to do is add a single new column:

ALTER TABLE notification
  ADD is_sent BOOLEAN NOT NULL DEFAULT FALSE;

So how do we actually use this column? Well, Postgres SELECT syntax allows a user to specify FOR UPDATE when interacting with a row. This means it’s possible to lock a specific notification so other applications can’t modify it. But the real magic happens by adding SKIP LOCKED to our lock request. If we have two background notification handlers running at the same time, and one has already locked a row for processing, we can skip it and simply jump to the next unsent notification. This gives us implied queue processing, essentially for free.

All we need to do is issue these two statements when handling post notifications:

-- Acquire a lock on a notification we want to process.

SELECT notification_id
  FROM notification
 WHERE NOT is_sent
   FOR UPDATE SKIP LOCKED
 LIMIT 1

-- Mark the notification as sent once we’re done.

UPDATE notification
   SET is_sent = TRUE
 WHERE notification_id = %s

With that in mind, the Python app no longer needs to monitor for Postgres NOTIFY events, nor does it require two channels to avoid missing notifications. Instead, it becomes a simple queue poller:

import psycopg
import time

conn = psycopg.connect("dbname=tickets")
cur = conn.cursor()

while True:
  cur.execute("""
      SELECT notification_id
        FROM notification
       WHERE NOT is_sent
         FOR UPDATE SKIP LOCKED
       LIMIT 1
  """)

  for (notify_id,) in cur:
    cur.execute("""
      INSERT INTO notify_member (notification_id, member_id)
      SELECT n.notification_id, f.follower_id
        FROM notification n
        JOIN post p USING (post_id)
        JOIN follower f USING (member_id)
       WHERE n.notification_id = %s
    """, (notify_id,))

    cur.execute("""
      UPDATE notification
         SET is_sent = true
       WHERE notification_id = %s
    """, (notify_id,))

  conn.commit()
  time.sleep(1)    

This program checks for any new unhandled notifications once per second. If there are none, it simply sleeps and waits for the next iteration. We can launch any number of these handlers and they’ll happily process all new post notifications without blocking other handlers. There’s no more risk of missed notifications, and it’s completely asynchronous with Skeletor’s posting activity and the rambling of his fans.

The benefit to this approach is that we could handle notification in batches or in parallel. Is it too risky to handle all one million notifications at once? Why not in smaller batches of 10,000? If we utilize threading, each handler could fork multiple threads for each necessary batch, and keep the notification lock until all of the child threads complete.


High availability redux

Building a sophisticated highly-scalable platform isn’t easy! It’s obviously important to account for database availability, and we also need to incorporate fundamental philosophies of asynchronous operation and parallelization. Without them, we could have a perfect magic box database that never goes down or loses data, and users would still have an awful experience.
Whether it’s LISTEN and NOTIFY, or FOR UPDATE SKIP LOCKED, Postgres helps facilitate best practice application design that encourages asynchronous and parallel-safe operation. Considering we’re only scratching the surface of tools Postgres provides to curious developers, don’t be afraid to peruse the Postgres documentation for more esoteric capabilities.

After all, Skeletor Social is a flourishing platform, and needs all the help it can get.
 

Share this