Posts

robustness principle and mocks

Jul 11, 2018

The Robustness Principle (or Postel's Law) states

Be conservative in what you do, be liberal in what you accept from others (often reworded as "Be conservative in what you send, be liberal in what you accept").

This principle has some criticisms.

I realized this has interesting implications for mocks. Suppose you have

public class MyObj {
  private final Integer x;

  public MyObj(Integer x) {
    if (x < 0) {
      throw new IllegalArgumentException("negative x!");
    }
    this.x = x;
  }

  public int getX() {
    return x;
  }
}

In a unit test, we can have

MyObj myObj = mock(MyObj.class);
when(myObj.getX()).thenReturn(-1);

which bypasses our sanity checks!

My takeaways from this are:

Don't use mocks for POJO/data-ish objects!

If this feels painful, create a fixtures class:

public class MyObjFixtures {
  @Inject Random random;

  public MyObj create() {
    return new MyObj(Math.abs(random.nextInt()));
  }
}

Create "fake" implementations when possible/appropriate.

Usually, it's quite a bit easier to create a fake implementation than a real implementation:

  1. use maps/lists instead of a real database
  2. take shortcuts like avoiding concurrency where possible
  3. depend on fakes of whatever API calls you need

There's some danger in this -- you should definitely consider wiring things up in a way that your tests can run against "real" implementations or sandboxes of them.

Another major concern becomes having a way to "reset" the fake implementations between tests to reduce flake, or coming up with a way to test that avoids the need to reset.

cross-dc sync with feed published KV

Jul 10, 2018

It's been fun describing the feeds framework we use at Square. Today we'll dive into a concrete problem:

  1. We'll stick with the feed-published kv table again.
  2. We want two instances of some application code to bidirectionally synchronize the writes that happened on their instance to the other.
  3. Eventually consistent is ok.

First, a bit of justification, though: I use this KV table to remember things I might have to look up without usual context, like my motorcycle's license plate number, or that one weird python snippet I can never remember. I also have a whole slew of them at work -- a bunch of random representative IDs for a bunch of things in our systems that I use from time to time. I also use a bunch of these as todo items at work, but that happens to work differently and is a topic for a future blog post :-)

The end goal here is that I should be able to partition my keys into

  1. work-only things (which don't get replicated to my work machine at all)
  2. work-appropriate personal things (which are bidirectionally synchronized between my personal machines and my work laptop)
  3. personal things that I don't want on my work machine

In practice, I don't really have any of item (3). But the work vs personal laptop is a good case study for a system that has frequent network partitions.

The basic approach here is to have several kv instances served by feeds

WORK     PERSONAL

that each consume eachother.

The only other thing we need is some method to resolve conflicts when a certain k already exists but the associated v are different. One easy approach is just inserting it as a new value, like

def safe_k(kv, source):
  return kv.k + "-Conflicted-" + source + "-" + kv.feed_sync_id

history preserving data models

Jul 2, 2018

Start with a super simple data model:

CREATE TABLE kv (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  k VARCHAR(255) NOT NULL,
  v LONGBLOB NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY u_k (`k`)
) Engine=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin

Suppose we want to audit "changes" to this data model.

Approach 1: kv_log

add data model like

CREATE TABLE `kv_log` (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  changed_at TIMESTAMP NOT NULL,
  k VARCHAR(255) NOT NULL,
  old_v LONGBLOB NOT NULL,
  new_v LONGBLOB NOT NULL,

)

current value query: unchanged

log query:

SELECT *
FROM kv_log
WHERE k = :key;
  • PRO: straightforward
  • PRO: separation of concerns
  • CON: might forget to write kv_log record when updating records via a ROLL plan, or even

Approach 2: kv_events

replace kv with

CREATE TABLE `kv_events` (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  update_type ENUM("INSERT", "UPDATE", "DELETE") NOT NULL,
  changed_at TIMESTAMP NOT NULL,
  k VARCHAR(255) NOT NULL,
  v LONGBLOB NOT NULL,
  PRIMARY KEY (`id`),
  KEY k__key__created_at (`k`, `created_at`)
)

current value query (NOTE: this is a bit incorrect as it stands, because the most recent record might be a DELETE...)

SELECT *
FROM kv_events
WHERE k = :key
ORDER BY changed_at DESC
LIMIT 1;

log query:

SELECT *
FROM kv_events
WHERE k = :key;

Approach 3: kv2

replace kv with:

CREATE TABLE kv2 (
  id BIGINT(22) NOT NULL AUTO_INCREMENT,
  effective_at TIMESTAMP NOT NULL,
  effective_until TIMESTAMP,
  active TINYINT(1),
  k VARCHAR(255) NOT NULL,
  v LONGBLOB NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_k` (`k`, `active`)
) Engine=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin

current value query:

SELECT *
FROM kv2
WHERE active = 1
AND k = :key;

log query:

SELECT *
FROM kv2
WHERE k = :key;

mysql feeds

Jun 29, 2018

At work, we use a pattern called feeds that gets an incredible amount of work done. I've been wanting to describe it here for quite a while, and now seems as good of time as any.

The basic premise is: You have a service A with some data that other "consuming" services B, C, and D want to find out about. Maybe the data is payments, maybe it's support cases, maybe it's password changes... whatever. The other services might include your data warehouse, some event listeners, whatever.

Crazybob described it like this:

Lots of messages to pass around; i.e., payment processing needs to tell a capture system, a settlement system, risk systems, etc about things that happen. The standard approach would be to use a messaging server for that, with reliability implemented with additional messaging servers. The producer and consumer are probably already HA systems, so this adds another cluster to deploy and maintain.

Instead of messaging, they’re using a message feed based system.

  1. Client asks for all records
  2. Server responds with the records and current version
  3. Later, Client asks for the delta since the last version it saw
  4. Server responds with delta

The use case here is similar to Kafka, but with a few differences in implementation and implications:

  1. backed by MySQL : Most of our applications are built on a MySQL datastore. The feeds framework keeps application code mostly as-is, so the message feed comes for (almost) free with already existing data models.
  2. serve messages over standard HTTP / JSON (or -- mostly -- other canonical RPC / serialization format), so it's easy to use standard tools to inspect the stream of messages.
  3. there is no broker, the application code is also responsible for serving the feed fetch requests.
  4. consumers maintain their own cursors.

We do have to make a few concessions, though:

  1. permit losing intermediate updates, but guarantee transmitting of the latest version.

Let's start with a super-simplified data model and walk through how it all works.

Suppose we have a table storing a mapping of keys (k) to values (v):

CREATE TABLE `kv` (
  `id` bigint(22) NOT NULL AUTO_INCREMENT,
  `k` varchar(255) NOT NULL,
  `v` longblob NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_k` (`k`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Our application code that does things like:

INSERT INTO kv (k, v)
VALUES ("fred", "bob");

publishing

To make this table feed published, we would add a new feed_sync_id column:

ALTER TABLE `kv`
ADD COLUMN `feed_sync_id` BIGINT(22) DEFAULT NULL
ADD INDEX `k_fsi` (`feed_sync_id`);

Then add a background process that does something like

next_id = sql("SELECT id FROM kv WHERE feed_sync_id IS NULL LIMIT 1");
next_fsi = sql("SELECT MAX(feed_sync_id)+1 FROM kv")
sql("UPDATE kv SET feed_sync_id = :fsi WHERE id = :id LIMIT 1", 
    id=next_id, 
    fsi=next_fsi,
    )

Of course, the production implementation is significantly more correct and performant, at the cost of considerable extra complexity. A few improvements and optimizations you'd probably need to make, though:

  1. store the next_fsi in a dedicated table to avoid reassigning the same feed sync id in the case of republishes.
  2. assign feed_sync_id in batch.

serving

Now that we're assigning feed_sync_ids to our records, we can use those feed_sync_id to paginate over the records.

SELECT *
FROM kv
WHREE kv.feed_sync_id > :fsi
ORDER BY kv.feed_sync_id
LIMIT :limit

A few things to note here:

  1. we apply a strict inequality from the previous feed sync id. This is provided by the client.
  2. we apply a limit. This is generally client-provided, but the producing app can apply an upper bound to avoid doing too much work.

Now we can just hook up an HTTP endpoint wired up to return the results of that query; maybe

/_feeds/fetch/kv?after=3&limit=5

could return

{
  "entries": [
    {
      "k": "fred",
      "v": "bob",
      "feed_sync_id": 4
    },
    {
      "k": "pi",
      "v": "3.14159",
      "feed_sync_id": 5
    }
  ]
}

This works, but now we've "leaked" this implementation detail (that we're using feeds behind the scenes) into our API responses. The client would need to know how to aggregate up the feed_sync_id to use on the next fetch request. Neither of these properties is great; to fix them, the endpoint can do a bit of extra work, transforming it instead into:

{
  "entries": [
    {
      "k": "fred",
      "v": "bob"
    },
    {
      "k": "pi",
      "v": "3.14159"
    }
  ],
  "cursor": 5
}

consuming

Now we're in great shape! We're ready for a consumer to come along and hit our endpoint.

We need to define some contract for starting to consume the feed; for everything I've laid out so far, we can just start with after=0 for the initial fetch. Therefore, the consumer library will do an API call like

/_feeds/fetch/kv?after=0&limit=5

pass off the list of entries to some application code -- whatever the consuming app needs these entries for! Assuming there were no problems processing the entries, the consumer library can then store the cursor vale to some persistence layer. Naturally, we'll read that persistence layer before the next feed fetch to use the last cursor the serving API gave us.

A simple choice for the persistence layer is

CREATE TABLE `cursors` (
  `id` BIGINT(22) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) NOT NULL,
  `cursor` VARCHAR(255) NOT NULL DEFAULT "",
  PRIMARY KEY (`id`),
  UNIQUE KEY `u_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

constraint violations in tests

May 31, 2018

I had some test code that looked something like

@Test public void a() {
  db.insert(0, "a");
  assertThat(db.query(1).getKey()).isEqualTo("a");
}

@Test public void b() {
  db.insert(0, "a");
  db.insert(1, "b");
  assertThat(db.query(2).getKey()).isEqualTo("b");
}

@Test public void c() {
  assertThat(db.query(3)).isNull();
}

There's some stuff to like about this:

  1. pretty straightforward which test is touching which records
  2. pretty clear what you expect to see in the database.

There's a bunch of coincidences here that make this work, though:

  • Some JUnit magic that clears the database before running the test.
  • no other tests running concurrently
  • the database must be non-empty

But it wasn't working, so I got constraint violation errors about a duplicate of id=1.

The 1/2/3 values are accidental complexity -- I have to assign them in my test code, even though that's normally the database's job! The first two would be easy to fix with something like:

long id = db.insertReturningId("a");
assertThat(db.query(id).getKey()).isEqualTo("a");

but what about the third? One approach is just do a bunch of queries:

long id = 1;
while (db.query(id) != null) {
  id++;
}
assertThat(db.query(id)).isNull();

but that test always passes, so is it a useful test?

(introduce a new PK, uuid?)

(random id)

(use -1?)

multiple thread test cases in java

May 15, 2018

Some of my day job lately has been trying to get a better handle on threads in Java. In particular, we had a really weird race condition resulting from multiple threads processing an input data stream and causing some unexpected conflicts in our data access layer.

To try to replicate the problem in tests, I started with something like this:

private static Runnable runWithDelay(String name, long millis) {
  return () -> {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    System.out.println(name + " finished");
  };
}

This is just a Runnable that waits a while, and then prints that it finished. Then we can test with something like:

public static void main(String[] args) throws InterruptedException {
  Thread t0 = new Thread(runWithDelay("r0", 200));
  Thread t1 = new Thread(runWithDelay("r1", 300));
  Thread t2 = new Thread(runWithDelay("r2", 100));

  t0.start(); t1.start(); t2.start();

  t0.join(); t1.join(); t2.join();

  System.out.println("Completely finished");
}

which outputs

r2 finished
r0 finished
r1 finished
Completely finished

as we'd probably expect.

This isn't a very sophisticated use of threads, but it was actually already enough to reproduce our concurrency bug pretty reliably in a local development environment!

big-ish personal storage

Apr 25, 2018

I was curious what it would cost to have something like 5-30TB of storage space on hand. The obvious choices are between buying some hard drives and paying for an S3 bucket or something.

The amazon costs are pretty straightforward: you pick a region, they tell you the cost. Starting with their pricing table:

aws = [
    {"level": "standard", "c_millidollars": 23},
    {"level": "infrequent", "c_millidollars": 12.5},
    {"level": "one_zone", "c_millidollars": 10},
    {"level": "glacier", "c_millidollars": 4}
]

we can say

gbs = 10000
for level in aws:
    dollars_per_gb = level["c_millidollars"] / 1000
    dollars = gbs * dollars_per_gb
    print("{} ==> ${} * {} = ${}/month".format(
      level["level"], dollars_per_gb, gbs, dollars
    ))

and get back

standard ==> $0.023 * 12000 = $276.0/month
infrequent ==> $0.0125 * 12000 = $150.0/month
one_zone ==> $0.01 * 12000 = $120.0/month
glacier ==> $0.004 * 12000 = $48.0/month

Figuring out the hard drive cost, I started from some raw data:

raw = [
  {"slug": "toshiba_x300_4tb", "c": 11299, "tb": 4, "link": "https://www.amazon.com/Toshiba-Performance-Desktop-Internal-HDWE150XZSTA/dp/B013JPKUU2"},
  {"slug": "toshiba_x300_5tb", "c": 12999, "tb": 5, "link": "https://www.amazon.com/Toshiba-Performance-Desktop-Internal-HDWE150XZSTA/dp/B013JPLKQK"},
  {"slug": "toshiba_x300_6tb", "c": 17505, "tb": 6, "link": "https://www.amazon.com/Toshiba-Performance-Desktop-Internal-HDWE150XZSTA/dp/B013JPLKJC"},
  {"slug": "toshiba_x300_8tb", "c": 23973, "tb": 8, "link": "https://www.amazon.com/Toshiba-Performance-Desktop-Internal-HDWE150XZSTA/dp/B074BTZ2YJ"},
  {"slug": "seagate_barracuda_4tb", "c":  9399, "tb": 4, "link": "https://www.amazon.com/Seagate-BarraCuda-3-5-Inch-Internal-ST4000DM004/dp/B071WLPRHN"},
  {"slug": "seagate_barracuda_5tb", "c": 18857, "tb": 5, "link": "https://www.amazon.com/Seagate-Barracuda-2-5-Inch-Internal-ST5000LM000/dp/B01M0AADIX"},
]

This gives us a figure of $29.38/TB for hard drives.

Just to blow up our hard drive costs a bit, we can assume some drive-level redundancy. Maybe we want to use ZFS 2 (but maybe we don't). One alternative is using hardware or maybe even software RAID. Let's assume we want to do 5 disks with 2 parity (something like RAID6). Just sticking with 4TB drives for a sec, we'd be buying 20TB for 12TB of capacity.

Using our "average-priced" drives, that'd be $587.70 upfront cost for hard drives.

Interestingly, there's some ongoing costs for this option, too: the electricity for the computer the drives plug into. Assuming a 200-watt machine and San Francisco electricity prices ($0.201 / kWh), we'd be looking at

1 month @ 200 watts = 200 watts * 730 hours = 146,000 watt-hours = 146 kWh
==> $29.35

sfpark api

Apr 24, 2018

I recently came to learn of the SFpark API, which lets one make queries like:

LAT=37.787702
LONG=-122.407796
curl "http://api.sfpark.org/sfpark/rest/availabilityservice?lat=${LAT}&long=${LONG}&radius=0.25&uom=mile&response=json" | pbcopy
pbpaste | jq '.AVL[] | select (.TYPE | contains("OFF"))'

and get a response including records like:

{
  "TYPE": "OFF",
  "OSPID": "950",
  "NAME": "Union Square Garage",
  "DESC": "333 Post Street",
  "INTER": "Geary between Stockton & Powell",
  "TEL": "(415) 397-0631",
  "OPHRS": {
    "OPS": {
      "FROM": "7 Days/Wk",
      "BEG": "24 Hrs/Day"
    }
  },
  "OCC": "381",
  "OPER": "670",
  "PTS": "1",
  "LOC": "-122.407447946,37.7876789151"
}

Pretty cool!

regex in java

Feb 23, 2018

I can never remember how to do java regexes with Pattern.

Pattern pattern = Pattern.compile("x_(yy|zz)");
String input = "x_yy";
boolean m = pattern.asPredicate().test(input);
System.out.println("matches: " + m);
Matcher matcher = pattern.matcher(input);
while (matcher.find()) {
  System.out.println("whole matched string: " + matcher.group(0));
  System.out.println("matched group: " + matcher.group(1));
}

I also learned a really cool thing IntelliJ can do with pattern matches!

java regex

guava RangeMap

Jan 26, 2018

We had some code like this:

NavigableMap<Double, Integer> PRIORITY_MULTIPLIERS = new TreeMap<Double, Integer>() {{ 
  put(LOW_THRESHOLD, 1);    // (20k..40k)  => 1
  put(MEDIUM_THRESHOLD, 2); // [40k..100k) => 2
  put(HIGH_THRESHOLD, 3);   // [100k..+∞)  => 3
}};

Integer getPriorityMultiplier(Double v) {
  if (v < LOW_THRESHOLD) {
    return 0;
  }
  return PRIORITY_MULTIPLIERS.floorEntry(v).getValue()
}

I replaced with a RangeMap that uses Range instances as keys:

RangeMap<Double, Integer> PRIORITY_MULTIPLIERS = ImmutableRangeMap.<Double, Long>builder()
      .put(Range.lessThan(LOW_THRESHOLD), 0)
      .put(Range.closedOpen(LOW_THRESHOLD, MEDIUM_THRESHOLD), 1)
      .put(Range.closedOpen(MEDIUM_THRESHOLD, HIGH_THRESHOLD), 2)
      .put(Range.atLeast(HIGH_THRESHOLD), 3)
      .build();
Integer getPriorityMultiplier(Double v) {
  return rangePriorityMultipliers.get(v);
}

Advantages here being:

  1. ImmutableRangeMap explicitly checks for overlapping ranges, so specifying Range.atMost for the first range would fail at the time of instantiation.
  2. Puts declaration & endpoint behavior (open vs closed) together, instead of partly in the Map declaration and partly in the map.floorEntry call.
  3. Simple to test that entire range is covered:

    @Test public void multipliersForEntireRange() {
      Set<Range<Double>> ranges = PRIORITY_MULTIPLIERS.asMapOfRanges().keySet();
      ImmutableRangeSet<Double> rangeSet = ImmutableRangeSet.copyOf(ranges);
      assertThat(rangeSet.encloses(Range.all())).isTrue();
    }
    

subscribe via RSS

Powered by Olark