absolutely minimal OLTP to OLAP pipeline
- 5 minutes read - 990 wordsSuppose we have some data in a production OLTP database, and we need to send it to some OLAP database. This post describes one of the simplest approaches, and how to make it productional enough to rely on.
For every table t
, we need to:
- introduce a new field,
updated_at
- introduce a new index on that field, so we can get the records that changed after a certain
updated_at
.
For example,
ALTER TABLE t
ADD COLUMN updated_at TIMESTAMP(3)
NOT NULL
DEFAULT CURRENT_TIMESTAMP(3)
ON UPDATE CURRENT_TIMESTAMP(3),
ADD INDEX idx__updated_at (updated_at);
We add the ON UPDATE CURRENT_TIMESTAMP
so that we don’t need to deal with any application code to give us what we need.
We add the idx__updated_at
to support a new query pattern:
SELECT *
FROM t
WHERE updated_at >= :lastCutoff
ORDER BY updated_at
LIMIT :limit
Here, we’re basically going to fetch the records that have been updated since the last time we ran the query, up to a given limit. The limit is an important mechanism, because it lets us paginate over the changed records. Bounding the amount of work we do for a single fetch lets us bound the expected execution time, so we can find out more aggressively when things are broken.
This consumes a single “page” of data. We will need to repeatedly execute this query to emit new records as they are updated.
non-strict inequality
The astute reader might have noticed we used >=
in the lastCutoff
portion of the query; this implies that we might see duplicates.
But it is also important, because in a given fetch, we might not have consumed all records with that timestamp.
For example, consider
rec(id=1, updated_at=1)
rec(id=2, updated_at=2)
rec(id=3, updated_at=3)
rec(id=4, updated_at=3)
rec(id=5, updated_at=4)
consumed from the start with lastCutoff=0
and limit=3
.
The first fetch returns
rec(id=1, updated_at=1)
rec(id=2, updated_at=2)
rec(id=3, updated_at=3)
So if the next fetch query were to be
SELECT *
FROM t
WHERE updated_at > 3
ORDER BY updated_at
LIMIT 3
we would skip the id=4
record.
wedge possibility
The non-strict inequality induces a different problem, though. Consider
rec(id=1, updated_at=1)
rec(id=2, updated_at=1)
rec(id=3, updated_at=1)
rec(id=4, updated_at=1)
rec(id=5, updated_at=1)
consumed with lastCutoff=0
and limit=3
.
The first fetch again returns the first three records, but the next query will be
SELECT *
FROM t
WHERE updated_at >= 1
ORDER BY updated_at
LIMIT 3
which will again return the first three records and therefore fail to make progress – it has become wedged.
There are a variety of resolutions to this process, including:
-
don’t actually resolve it, just
a. pick a relatively large
limit
b. pick a relatively granular
updated_at
.c. just rely on the monitoring provisions below to update these as needed.
-
use an adaptive limit that temporarily increases
limit
if it detects a lack of progress -
use a more feed-like mechanism instead
fixing duplicates
Whether or not we observe duplicates from the precise fetch mechanism, we can also observe duplicates from a sequence of updates like
rec(id=1, updated_at=1, value="t")
rec(id=1, updated_at=2, value="u")
rec(id=1, updated_at=3, value="v")
If observing the intermediate results is inconsequential, we can easily deduplicate these by picking the record with the greatest updated_at
value.
missing intermediate updates / only guarantees LAST update
It should be pretty clear that this mechanism does not necessarily deliver all updates.
If my first fetch runs at t>3
against this history of rec
values:
rec(id=1, updated_at=1, value="t")
rec(id=1, updated_at=2, value="u")
rec(id=1, updated_at=3, value="v")
the database will only actually have a single record:
rec(id=1, updated_at=3, value="v")
and therefore, we will never emit the value=t
or value=u
records!
In a lot of systems, this can be ok! But if it’s not, we must consider either a different mechanism or a different data model. As an example, a different data model could store
rec_update(id=101, rec_id=1, updated_at=1, value="t")
rec_update(id=102, rec_id=1, updated_at=2, value="u")
rec_update(id=103, rec_id=1, updated_at=3, value="v")
monitoring
This being an online system, we should define some appropriate correctness / monitoring criteria.
One possible correctness property of this approach is
the most recently processed
updated_at
is sufficiently recent
but this might not be appropriate if the table being replicated can have long gaps between updates. A slightly better variant might be
the most recent successfully poll operation occurred sufficiently recently
This is good to ensure that we’re making progress and that our process hasn’t broken.
But even if our polling process is working, it might be that the limit
or polling frequency are simply not high enough to keep up!
To account for that, we can instead check that
the number of records after
lastCutoff
is sufficiently small
This works ok, except 1) we need cron-style monitoring for this process as well, and 2) if we are very behind, such a count might be prohibitively expensive. We can lessen the overhead of (2) by changing to:
the difference between the max
updated_at
in the table & thelastCutoff
is sufficiently small
Here, the index on updated_at
makes the query effectively constant-time, which is great!
But it does sacrifice the knowledge of how many records we’re behind by, which is often a better indication of how much work we need to do to catch up.
One intermediate approach is executing a subquery with a LIMIT
clause, so we can obtain precise backlog figures when it is cheap enough to do so, and otherwise tops out at that limit.
I’ve seen those limits set between 1000 and 100000 in various production systems I’ve worked with, and they have been pretty effective.
comparison with feeds
This mechanism bears a lot of similarity to MySQL feeds, but with a few major differences:
- this polling approach is much simpler than feeds because it doesn’t need the
feed_sync_id
assignment process or any API - presence of the API layer in the feed approach make it possible to perform transformations on the exposed records, while there is no such capability in this approach
- the feed approach avoids the “wedge possibility” problem since it is impossible to have multiple records with the same
feed_sync_id