Suppose that:

  1. our humble KV feed sees a lot of traffic.
  2. someone needs to consume our KV feed with multiple threads.

data

The first step is to introduce a notion of "shards" into our data model:

ALTER TABLE `kv`
  ADD COLUMN `shard` INT(11) DEFAULT '0',
  ADD INDEX `k_fsi_s` (`feed_sync_id`, `shard`);

publishing

We don't need to alter the publishing until the publishing itself is too slow to work with a single thread, but this introduces a lot of complications, so let's just hold off for now.

serving

SELECT *
FROM kv
WHERE kv.feed_sync_id > :fsi
  AND kv.shard = :shard
ORDER BY kv.feed_sync_id
LIMIT :limit

which is supported by an API like

/_feeds/fetch/kv?after=3&limit=5&shard=3

We can also support client-side shard configurations by also supporting a shard_count argument:

/_feeds/fetch/kv?after=3&limit=5&shard=3&shard_count=4

consumption

We need to update our cursors table to include a shard column as well:

ALTER TABLE cursors
  ADD COLUMN `shard` INT(11) DEFAULT '0',
  ADD COLUMN `shard_count` INT(11) DEFAULT '1',
  ADD UNIQUE KEY `u_name_shard` (`name`,`shard`),
  DROP KEY `u_name`;

data vs consumer shards

We haven't discussed the relationship between the number of threads a particular consumer uses (the shard_count value it provides to the API) and the range of shard values in the data model. Coupling the two together is completely sufficient for a prototype, and can even work surprisingly well for a surprising amount of traffic!

But at some point, when a system has grown to several consumers, and especially when those consumers need to do different amounts of work or have different locking requirements or performance characteristics, it can become useful to decouple the two.

The basic ideas are to

  1. write the data with a relatively large number of data shards DS (distinct shard values; between 2**9 and 2**12 are common in examples I've worked with),
  2. consume with relatively fewer consumer shards CS (e.g. <=2**5),
  3. query by mapping a given cs_i to a range of [DS_min, DS_max) values.

I've still pulled one punch here: we haven't said anything about what to actually set that new shard field to when writing new records! That'll be in the next post.