Posts

sharded feeds

Oct 29, 2018

Suppose that:

  1. our humble KV feed sees a lot of traffic.
  2. someone needs to consume our KV feed with multiple threads.

data

The first step is to introduce a notion of "shards" into our data model:

ALTER TABLE `kv`
  ADD COLUMN `shard` INT(11) DEFAULT '0',
  ADD INDEX `k_fsi_s` (`feed_sync_id`, `shard`);

publishing

We don't need to alter the publishing until the publishing itself is too slow to work with a single thread, but this introduces a lot of complications, so let's just hold off for now.

serving

SELECT *
FROM kv
WHERE kv.feed_sync_id > :fsi
  AND kv.shard = :shard
ORDER BY kv.feed_sync_id
LIMIT :limit

which is supported by an API like

/_feeds/fetch/kv?after=3&limit=5&shard=3

We can also support client-side shard configurations by also supporting a shard_count argument:

/_feeds/fetch/kv?after=3&limit=5&shard=3&shard_count=4

consumption

We need to update our cursors table to include a shard column as well:

ALTER TABLE cursors
  ADD COLUMN `shard` INT(11) DEFAULT '0',
  ADD COLUMN `shard_count` INT(11) DEFAULT '1',
  ADD UNIQUE KEY `u_name_shard` (`name`,`shard`),
  DROP KEY `u_name`;

data vs consumer shards

We haven't discussed the relationship between the number of threads a particular consumer uses (the shard_count value it provides to the API) and the range of shard values in the data model. Coupling the two together is completely sufficient for a prototype, and can even work surprisingly well for a surprising amount of traffic!

But at some point, when a system has grown to several consumers, and especially when those consumers need to do different amounts of work or have different locking requirements or performance characteristics, it can become useful to decouple the two.

The basic ideas are to

  1. write the data with a relatively large number of data shards DS (distinct shard values; between 2**9 and 2**12 are common in examples I've worked with),
  2. consume with relatively fewer consumer shards CS (e.g. <=2**5),
  3. query by mapping a given cs_i to a range of [DS_min, DS_max) values.

I've still pulled one punch here: we haven't said anything about what to actually set that new shard field to when writing new records! That'll be in the next post.

moar stupid jquery tricks

Oct 23, 2018

Sometimes it's useful to use tampermonkey on a site without nice ids for the elements you want to edit. It turns out it's still pretty easy!

Take my own website; I have <h2> elements for the headers, but none of them have any ids associated. But as discussed in the jquery selectors post, we can still select by just the tag, with a selector like

$("h2")

This returns a jQuery object, which behaves like an array. If we want to select a particular one -- like "Research Interests" -- we can try accessing different elements until we get the one we want:

> $("h2")[0]
< <h2>​Work​</h2>​
> $("h2")[1]
< <h2>​Education​</h2>​
> $("h2")[2]
< <h2>​Research Interests​</h2>​

One tricky bit here, though: doing the array access returns the dom element, not another jQuery object! The most direct API here is just using the textContent DOM API to set the "text content" of that element, like

$("h2")[2].textContent = "RESEARCH INTERESTS"

But since we have been using jQuery so far, we can also call the $ with dom element, like:

$($("h2")[2])

and now we can call usual jQuery object methods, like #html

$($("h2")[2]).html("RESEARCH INTERESTS")

killing all timeouts in js

Oct 17, 2018

this article has a nice trick for killing all javascript timeouts:

function stopAllTimeouts() {
    var id = window.setTimeout(null, 0);
    while (id--) {
        window.clearTimeout(id);
    }
}

which can be entered on the javascript console and then run with

stopAllTimeouts()

This is an effective way to prevent javascript timeouts from doing a whole variety of things, like

  • carousels
  • endless scrolls
  • any other kind of animations

jquery selectors

Oct 17, 2018

This is silly, but I always forget this:

attribute selector jquery
<tag id=x> #x $('#x')
<tag class=y> .y $('.y')
<tag> tag $('tag')

I also learned you can specify things like

// @require https://code.jquery.com/jquery-2.1.4.min.js

in a Tampermonkey script, even when a given site doesn't already have jquery.

Naming is hard (so don't)

Oct 9, 2018

A lot of times I just want to record something. This should be one of those things computers are good at. Turns out this is a bit harder than it seems: many editors make you name a file to save it.

One easy-sounding way to record something is:

  1. open up a text editor
  2. mash the keyboard until the thought is out of your head and into the text editor
  3. (hard part starts) what do we name that file?
  4. where do we put that file?
  5. do we remember to save that file, given the difficulty of (3) & (4)?

I think picking names too early is at best a minor annoyance, and at worst a pretty major distraction.

  • You have to use that early name to open that file later to make changes.
  • The process of assigning a name and referencing it repeatedly puts a damper on changing the idea "too far" from the original idea and original name.
  • If that name is utilized in a URL or shared with anyone or (worst of all!) gets linked from other documents, changing it means changing everywhere it might be linked from. This problem is hopeless if we permit our friends and coworkers to retain private documents/bookmarks/etc. Even considering shared workspaces, frequently it isn't even possible to enumerate such places. (I'm looking at you, Google Docs.)

One alternative is:

cat >> ~/to_file/$(uuidgen) << EOF
my super clever thought
EOF

This is slightly unsatisfying, too: MacOS tracks update timestamp, not creation timestamp, by default. So we maybe want a separate "meta" file here, if the creation timestamps are important. But we can tackle it with either discipline (never update!) or a small bit of extra complexity, like

U=~/to_file/$(uuidgen)
date +"%s" > ${U}.created_at
cat >> ${U} << EOF
my even more clever thought
EOF

With even that tiny bit of infrastructure in place, we can generate a timestamped log of "recordings" about whatever is useful to you!

The access pattern is kinda interesting here. Obviously, we don't want to -- and won't, or at least shouldn't -- remember the raw UUIDs. So we need to search, instead. We can use something like ag:

~/to_file ➜ ag clever
D5FF7ADF-347B-4F12-BEEF-57A15725BE88
1:my even more clever thought

8319D617-59AE-4369-85C1-D5A738C91ABD
1:my super clever thought

Why bother with uuidgen?

  1. I have built some very small systems on similar ideas but with much shorter "token generation" schemes; the problem here is eventually they conflict, and you either lose data (perhaps unknowingly), concatenate entries (perhaps unknowingly), or need to implement an existence check.
  2. In the case of a shared folder (like Dropbox), the existence check can't check whether records already exist on a "replica" (ie your personal laptop) that hasn't been synced yet.
  3. It's built into MacOS, so there's no custom token generation code to write.

This is conceptually kinda similar to the technique of Write-ahead logging: The ~/to_file directory functions as our "log" (of "thoughts" or "recordings" or whatever), which is to be reconciled later by moving the file into a more appropriate place.

Feeds as cache invalidation mechanism

Sep 30, 2018

One really cool use of feeds we've realized is that it gives a very efficient mechanism for application code to load the most recent versions of a table into memory. The basic idea is:

  1. Set it up as a usual feed published table with an appropriate index on feed_sync_id.
  2. Either alongside or within the cache, represent the latest loaded feed_sync_id.
  3. Set up a cronjob/etc that reads the latest feed_sync_id and compares it to the cache's feed_sync_id.
  4. If they differ, reload the cache.
  5. Ensure that all changes set feed_sync_id to null!

This works really well because the feed_sync_id in the database only gets updated on changes, so the reload cronjob mostly is a no-op. This means we can reload very frequently!

robustness principle and mocks

Jul 11, 2018

The Robustness Principle (or Postel's Law) states

Be conservative in what you do, be liberal in what you accept from others (often reworded as "Be conservative in what you send, be liberal in what you accept").

This principle has some criticisms.

I realized this has interesting implications for mocks. Suppose you have

public class MyObj {
  private final Integer x;

  public MyObj(Integer x) {
    if (x < 0) {
      throw new IllegalArgumentException("negative x!");
    }
    this.x = x;
  }

  public int getX() {
    return x;
  }
}

In a unit test, we can have

MyObj myObj = mock(MyObj.class);
when(myObj.getX()).thenReturn(-1);

which bypasses our sanity checks!

My takeaways from this are:

Don't use mocks for POJO/data-ish objects!

If this feels painful, create a fixtures class:

public class MyObjFixtures {
  @Inject Random random;

  public MyObj create() {
    return new MyObj(Math.abs(random.nextInt()));
  }
}

Create "fake" implementations when possible/appropriate.

Usually, it's quite a bit easier to create a fake implementation than a real implementation:

  1. use maps/lists instead of a real database
  2. take shortcuts like avoiding concurrency where possible
  3. depend on fakes of whatever API calls you need

There's some danger in this -- you should definitely consider wiring things up in a way that your tests can run against "real" implementations or sandboxes of them.

Another major concern becomes having a way to "reset" the fake implementations between tests to reduce flake, or coming up with a way to test that avoids the need to reset.

cross-dc sync with feed published KV

Jul 10, 2018

It's been fun describing the feeds framework we use at Square. Today we'll dive into a concrete problem:

  1. We'll stick with the feed-published kv table again.
  2. We want two instances of some application code to bidirectionally synchronize the writes that happened on their instance to the other.
  3. Eventually consistent is ok.

First, a bit of justification, though: I use this KV table to remember things I might have to look up without usual context, like my motorcycle's license plate number, or that one weird python snippet I can never remember. I also have a whole slew of them at work -- a bunch of random representative IDs for a bunch of things in our systems that I use from time to time. I also use a bunch of these as todo items at work, but that happens to work differently and is a topic for a future blog post :-)

The end goal here is that I should be able to partition my keys into

  1. work-only things (which don't get replicated to my work machine at all)
  2. work-appropriate personal things (which are bidirectionally synchronized between my personal machines and my work laptop)
  3. personal things that I don't want on my work machine

In practice, I don't really have any of item (3). But the work vs personal laptop is a good case study for a system that has frequent network partitions.

The basic approach here is to have several kv instances served by feeds

WORK     PERSONAL

that each consume eachother.

The only other thing we need is some method to resolve conflicts when a certain k already exists but the associated v are different. One easy approach is just inserting it as a new value, like

def safe_k(kv, source):
  return kv.k + "-Conflicted-" + source + "-" + kv.feed_sync_id

history preserving data models

Jul 2, 2018

Start with a super simple data model:

CREATE TABLE kv (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  k VARCHAR(255) NOT NULL,
  v LONGBLOB NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY u_k (`k`)
) Engine=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin

Suppose we want to audit "changes" to this data model.

Approach 1: kv_log

add data model like

CREATE TABLE `kv_log` (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  changed_at TIMESTAMP NOT NULL,
  k VARCHAR(255) NOT NULL,
  old_v LONGBLOB NOT NULL,
  new_v LONGBLOB NOT NULL,

)

current value query: unchanged

log query:

SELECT *
FROM kv_log
WHERE k = :key;
  • PRO: straightforward
  • PRO: separation of concerns
  • CON: might forget to write kv_log record when updating records via a ROLL plan, or even

Approach 2: kv_events

replace kv with

CREATE TABLE `kv_events` (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  update_type ENUM("INSERT", "UPDATE", "DELETE") NOT NULL,
  changed_at TIMESTAMP NOT NULL,
  k VARCHAR(255) NOT NULL,
  v LONGBLOB NOT NULL,
  PRIMARY KEY (`id`),
  KEY k__key__created_at (`k`, `created_at`)
)

current value query (NOTE: this is a bit incorrect as it stands, because the most recent record might be a DELETE...)

SELECT *
FROM kv_events
WHERE k = :key
ORDER BY changed_at DESC
LIMIT 1;

log query:

SELECT *
FROM kv_events
WHERE k = :key;

Approach 3: kv2

replace kv with:

CREATE TABLE kv2 (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  effective_at TIMESTAMP NOT NULL,
  effective_until TIMESTAMP,
  active TINYINT(1),
  k VARCHAR(255) NOT NULL,
  v LONGBLOB NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_k` (`k`, `active`)
) Engine=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin

current value query:

SELECT *
FROM kv2
WHERE active = 1
AND k = :key;

log query:

SELECT *
FROM kv2
WHERE k = :key;

mysql feeds

Jun 29, 2018

At work, we use a pattern called feeds that gets an incredible amount of work done. I've been wanting to describe it here for quite a while, and now seems as good of time as any.

The basic premise is: You have a service A with some data that other "consuming" services B, C, and D want to find out about. Maybe the data is payments, maybe it's support cases, maybe it's password changes... whatever. The other services might include your data warehouse, some event listeners, whatever.

Crazybob described it like this:

Lots of messages to pass around; i.e., payment processing needs to tell a capture system, a settlement system, risk systems, etc about things that happen. The standard approach would be to use a messaging server for that, with reliability implemented with additional messaging servers. The producer and consumer are probably already HA systems, so this adds another cluster to deploy and maintain.

Instead of messaging, they’re using a message feed based system.

  1. Client asks for all records
  2. Server responds with the records and current version
  3. Later, Client asks for the delta since the last version it saw
  4. Server responds with delta

The use case here is similar to Kafka, but with a few differences in implementation and implications:

  1. backed by MySQL : Most of our applications are built on a MySQL datastore. The feeds framework keeps application code mostly as-is, so the message feed comes for (almost) free with already existing data models.
  2. serve messages over standard HTTP / JSON (or -- mostly -- other canonical RPC / serialization format), so it's easy to use standard tools to inspect the stream of messages.
  3. there is no broker, the application code is also responsible for serving the feed fetch requests.
  4. consumers maintain their own cursors.

We do have to make a few concessions, though:

  1. permit losing intermediate updates, but guarantee transmitting of the latest version.

Let's start with a super-simplified data model and walk through how it all works.

Suppose we have a table storing a mapping of keys (k) to values (v):

CREATE TABLE `kv` (
  `id` bigint(22) NOT NULL AUTO_INCREMENT,
  `k` varchar(255) NOT NULL,
  `v` longblob NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_k` (`k`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Our application code that does things like:

INSERT INTO kv (k, v)
VALUES ("fred", "bob");

publishing

To make this table feed published, we would add a new feed_sync_id column:

ALTER TABLE `kv`
ADD COLUMN `feed_sync_id` BIGINT(22) DEFAULT NULL
ADD INDEX `k_fsi` (`feed_sync_id`);

Then add a background process that does something like

next_id = sql("SELECT id FROM kv WHERE feed_sync_id IS NULL LIMIT 1");
next_fsi = sql("SELECT MAX(feed_sync_id)+1 FROM kv")
sql("UPDATE kv SET feed_sync_id = :fsi WHERE id = :id LIMIT 1", 
    id=next_id, 
    fsi=next_fsi,
    )

Of course, the production implementation is significantly more correct and performant, at the cost of considerable extra complexity. A few improvements and optimizations you'd probably need to make, though:

  1. store the next_fsi in a dedicated table to avoid reassigning the same feed sync id in the case of republishes.
  2. assign feed_sync_id in batch.

serving

Now that we're assigning feed_sync_ids to our records, we can use those feed_sync_id to paginate over the records.

SELECT *
FROM kv
WHREE kv.feed_sync_id > :fsi
ORDER BY kv.feed_sync_id
LIMIT :limit

A few things to note here:

  1. we apply a strict inequality from the previous feed sync id. This is provided by the client.
  2. we apply a limit. This is generally client-provided, but the producing app can apply an upper bound to avoid doing too much work.

Now we can just hook up an HTTP endpoint wired up to return the results of that query; maybe

/_feeds/fetch/kv?after=3&limit=5

could return

{
  "entries": [
    {
      "k": "fred",
      "v": "bob",
      "feed_sync_id": 4
    },
    {
      "k": "pi",
      "v": "3.14159",
      "feed_sync_id": 5
    }
  ]
}

This works, but now we've "leaked" this implementation detail (that we're using feeds behind the scenes) into our API responses. The client would need to know how to aggregate up the feed_sync_id to use on the next fetch request. Neither of these properties is great; to fix them, the endpoint can do a bit of extra work, transforming it instead into:

{
  "entries": [
    {
      "k": "fred",
      "v": "bob"
    },
    {
      "k": "pi",
      "v": "3.14159"
    }
  ],
  "cursor": 5
}

consuming

Now we're in great shape! We're ready for a consumer to come along and hit our endpoint.

We need to define some contract for starting to consume the feed; for everything I've laid out so far, we can just start with after=0 for the initial fetch. Therefore, the consumer library will do an API call like

/_feeds/fetch/kv?after=0&limit=5

pass off the list of entries to some application code -- whatever the consuming app needs these entries for! Assuming there were no problems processing the entries, the consumer library can then store the cursor vale to some persistence layer. Naturally, we'll read that persistence layer before the next feed fetch to use the last cursor the serving API gave us.

A simple choice for the persistence layer is

CREATE TABLE `cursors` (
  `id` BIGINT(22) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) NOT NULL,
  `cursor` VARCHAR(255) NOT NULL DEFAULT "",
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

subscribe via RSS

Powered by Olark