At work, we use a pattern called feeds that gets an incredible amount of work done. I've been wanting to describe it here for quite a while, and now seems as good of time as any.

The basic premise is: You have a service A with some data that other "consuming" services B, C, and D want to find out about. Maybe the data is payments, maybe it's support cases, maybe it's password changes... whatever. The other services might include your data warehouse, some event listeners, whatever.

Crazybob described it like this:

Lots of messages to pass around; i.e., payment processing needs to tell a capture system, a settlement system, risk systems, etc about things that happen. The standard approach would be to use a messaging server for that, with reliability implemented with additional messaging servers. The producer and consumer are probably already HA systems, so this adds another cluster to deploy and maintain.

Instead of messaging, they’re using a message feed based system.

  1. Client asks for all records
  2. Server responds with the records and current version
  3. Later, Client asks for the delta since the last version it saw
  4. Server responds with delta

The use case here is similar to Kafka, but with a few differences in implementation and implications:

  1. backed by MySQL : Most of our applications are built on a MySQL datastore. The feeds framework keeps application code mostly as-is, so the message feed comes for (almost) free with already existing data models.
  2. serve messages over standard HTTP / JSON (or -- mostly -- other canonical RPC / serialization format), so it's easy to use standard tools to inspect the stream of messages.
  3. there is no broker, the application code is also responsible for serving the feed fetch requests.
  4. consumers maintain their own cursors.

We do have to make a few concessions, though:

  1. permit losing intermediate updates, but guarantee transmitting of the latest version.

Let's start with a super-simplified data model and walk through how it all works.

Suppose we have a table storing a mapping of keys (k) to values (v):

CREATE TABLE `kv` (
  `id` bigint(22) NOT NULL AUTO_INCREMENT,
  `k` varchar(255) NOT NULL,
  `v` longblob NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_k` (`k`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Our application code that does things like:

INSERT INTO kv (k, v)
VALUES ("fred", "bob");

publishing

To make this table feed published, we would add a new feed_sync_id column:

ALTER TABLE `kv`
ADD COLUMN `feed_sync_id` BIGINT(22) DEFAULT NULL
ADD INDEX `k_fsi` (`feed_sync_id`);

Then add a background process that does something like

next_id = sql("SELECT id FROM kv WHERE feed_sync_id IS NULL LIMIT 1");
next_fsi = sql("SELECT MAX(feed_sync_id)+1 FROM kv")
sql("UPDATE kv SET feed_sync_id = :fsi WHERE id = :id LIMIT 1", 
    id=next_id, 
    fsi=next_fsi,
    )

Of course, the production implementation is significantly more correct and performant, at the cost of considerable extra complexity. A few improvements and optimizations you'd probably need to make, though:

  1. store the next_fsi in a dedicated table to avoid reassigning the same feed sync id in the case of republishes.
  2. assign feed_sync_id in batch.

serving

Now that we're assigning feed_sync_ids to our records, we can use those feed_sync_id to paginate over the records.

SELECT *
FROM kv
WHREE kv.feed_sync_id > :fsi
ORDER BY kv.feed_sync_id
LIMIT :limit

A few things to note here:

  1. we apply a strict inequality from the previous feed sync id. This is provided by the client.
  2. we apply a limit. This is generally client-provided, but the producing app can apply an upper bound to avoid doing too much work.

Now we can just hook up an HTTP endpoint wired up to return the results of that query; maybe

/_feeds/fetch/kv?after=3&limit=5

could return

{
  "entries": [
    {
      "k": "fred",
      "v": "bob",
      "feed_sync_id": 4
    },
    {
      "k": "pi",
      "v": "3.14159",
      "feed_sync_id": 5
    }
  ]
}

This works, but now we've "leaked" this implementation detail (that we're using feeds behind the scenes) into our API responses. The client would need to know how to aggregate up the feed_sync_id to use on the next fetch request. Neither of these properties is great; to fix them, the endpoint can do a bit of extra work, transforming it instead into:

{
  "entries": [
    {
      "k": "fred",
      "v": "bob"
    },
    {
      "k": "pi",
      "v": "3.14159"
    }
  ],
  "cursor": 5
}

consuming

Now we're in great shape! We're ready for a consumer to come along and hit our endpoint.

We need to define some contract for starting to consume the feed; for everything I've laid out so far, we can just start with after=0 for the initial fetch. Therefore, the consumer library will do an API call like

/_feeds/fetch/kv?after=0&limit=5

pass off the list of entries to some application code -- whatever the consuming app needs these entries for! Assuming there were no problems processing the entries, the consumer library can then store the cursor vale to some persistence layer. Naturally, we'll read that persistence layer before the next feed fetch to use the last cursor the serving API gave us.

A simple choice for the persistence layer is

CREATE TABLE `cursors` (
  `id` BIGINT(22) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) NOT NULL,
  `cursor` VARCHAR(255) NOT NULL DEFAULT "",
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;