Attempting to build an actually good data engine

Building a Data Engine

I've started building a data engine and I'm excited to share!!

1. The Realization

I've been trying to build systematic trading infrastructure for a while now, and my first real attempt was highlighted in this post a while back. Since then, I've started over several times, and learned a ton of useful info from places like reddit, convos with experienced devs, and blog posts. One resource I found particularly useful was this HeadlandsTech blog post on the structure of quant trading.

That first profitron project I made was a monolithic architecture that coupled data collection, strategy, execution, portfolio management, backtesting, aka pretty much everything, together all at once. Things got way too complex way too quickly. Changing/adding features was a nightmare because the architecture was way too entangled. I needed to take a microservices approach. Rather than doing several things at once, half assed, I will do one thing really well, and ensure its integrity/correctness in isolation. The following post is about the data component.


2. The Problem and The Solution

I've tried to create automated trading systems for equities, crypto, and most recently prediction markets. Every single time I created the data component as a sort of bespoke, one off implementation that exclusively supports the asset class in which I'm trading. Each time I've fetched, processed and stored my data in slightly different ways and it's been a pain in the ass to deal with all of the different data formats, apis etc.. It would be really nice if I had one single data module that supported:

  1. many different asset classes
  2. multiple different exchanges and data feeds per asset class
  3. rich, well defined data structures to store, understand and express the data
  4. easy extensibility to more instruments and markets
  5. easy joinability of any two data tables, regardless of differences in timestamps, instrument type, etc.

This would make life so much easier! Specifically:

  1. Cross-exchange and cross-asset strategies would become 10x easier to research and create.
  2. Rate limiting and api costs would become less of an issue.
  3. I'd have a single, consistent query interface for all my data regardless of where it came from.
  4. Historical data would accumulate passively over time, so when I want to backtest something six months from now, the data is already there waiting.

General structure

There are two components of this system:

  1. Data fetcher + api

  2. Cold storage database (DISCLAIMER When I say database I really mean just a file system with a bunch of parquet files)

Data fetcher + API:

This runs 24/7 on a Hetzner CX33 VPS (4 vCPU, 8GB RAM, 80GB NVMe, ~€5.50/month). The general structure of this fetcher is:

  1. For each exchange, a collector thread connects to websockets and normalize incoming messages into canonical row dicts
  2. A singe buffer thread drains those rows from a shared asyncio.Queue and accumulates them in memory
  3. A writer thread (scheduled via crontab) periodically flushes the buffer to zstd-compressed Parquet files
  4. An API sits on the same machine and exposes methods like l2(), trades(), and sql(), backed by DuckDB, returning Polars DataFrames

Everything conforms to strict Arrow schemas defined once in schema.py. There are two data types (with more to come):

  • L2 orderbook updates: timestamp, source, market, side, price, size, sequence number
  • Trades: timestamp, source, market, side, price, size, trade ID

All timestamps are stored as int64 ns

Cold storage database:

For this I am using a Backblaze B2 machine (around 0.50/month and growing very slowly). After writing locally, the Parquet files get uploaded to B2 and the local copies are deleted. B2 is the database. Files are organized in a Hive-style partition layout:

data/{table}/source={source}/market={market}/date={date}/{seq}.parquet

This gives me two nice things:

  1. DuckDB can glob over specific partitions without scanning everything
  2. I can query the data from anywhere using duckDB

Cost Analysis

I'm currently collecting L2 orderbook updates and trades for 6 instruments across two exchanges:

  • Coinbase (spot crypto): BTC-USD, ETH-USD, SOL-USD
  • Kalshi (prediction markets): KXBTC, KXETH, KXSOL series (these auto-discover active contracts as they rotate)

This will allow me to create cross-exchange crypto strategies which I think is fertile ground as of late given all of the new crypto prediction market contracts that have been popping up.

After about 3 days of collection, the B2 bucket is sitting at roughly 6 GB, or about 2 GB/day. With the weak assumption that my data needs stay the same:

Timeframe Storage
1 month ~60 GB
6 months ~360 GB
1 year ~730 GB

B2 charges $0.006/GB/month for storage, with the first 10 GB free. Downloads are free up to 1 GB/day (more than enough for research queries). So:

Component Now After 1 month After 1 year
Hetzner VPS €5.50 €5.50 €5.50
B2 storage ~$0.00 ~$0.30 ~$4.32
B2 downloads $0.00 $0.00 $0.00
Total ~$6 ~$6.30 ~$10

Even after a full year of 24/7 tick-level data collection across 6 instruments, everything is pretty cost effective so far. For comparison a single month of historical L2 data from a vendor like Databento can cost more than an entire year of running this.

Here are the specs of my Hetzner machine:
hetzner.png

Tech Stack

Component Choice Why
Event loop uvloop Claude said it's better than asyncio
Serialization msgspec Typed structs, single-pass JSON decode, faster than dataclasses
Storage PyArrow + Parquet Columnar, fast
Compression zstd (level 3) fast compression
Query engine DuckDB fast querying
DataFrames Polars Faster than pandas I think
Remote storage Backblaze B2 S3-compatible, dirt cheap
Logging structlog Structured JSON logs, machine readable

3. Interesting Challenges Along the Way

Buffering and Flush Strategy

The buffer sits between the collectors and the writer, and getting the flush strategy right took a couple of tries. The two triggers are:

  1. Row count: flush at 5 million rows
  2. Time: flush after 1 hour if we haven't hit the row count yet

Early on I had the row threshold set way too low (around 5,000 rows), which meant the system was writing a new Parquet file every few seconds. That creates a lot of tiny files, which is really bad for query performance since DuckDB has to open and scan each one individually.

Bumping it up to 5 million rows means each file is a reasonable size, and the 1 hour time trigger ensures data doesn't sit in memory forever during quiet periods. On shutdown, the buffer catches CancelledError and does a final flush so nothing gets lost.

Compaction

Even with a higher flush threshold, you still end up with multiple files per partition over time. And if you ever need to restart the service, you get a flush on shutdown and another shortly after startup. So I built a compaction utility that merges all the Parquet files within a date partition into a single sorted file.

The tricky part is making this safe to run while the collector is still writing. The compaction script writes to a temp file in the same directory, deletes the old files, and then does an atomic rename to 0000.parquet. Readers never see a partial file.

Sync Boundary

Here's a subtle problem: if I'm writing data on the VPS and querying from my laptop, there's a window where data exists locally but hasn't been uploaded to B2 yet. If I run a query during that window, I get incomplete results and don't even know it.

To deal with this I just have a cut off. The query API clamps all queries to the last completed 1 hour boundary. If it's 2:07 PM, the query will only return data up to the last flush, probably between 1:07PM and beyond. The buffer flushes frequently enough that anything older than 1 hour is guaranteed to be in B2. This flush frequency is subject to change as my data coverage evolves.

Query Performance

One thing I've noticed is that queries against B2 can be pretty slow. Even simple lookups like "how many rows do I have for BTC-USD on March 20th" require DuckDB to reach out to B2, read parquet metadata over the network and scan through it. For a single file that's ok but when you have thousands of files across multiple instruments and dates, it becomes infeasible.

Soon I will build a metadata cache that sits locally and gets updated as data flows through the system. Every time the writer flushes a new file or the compaction script merges a partition, it would update this cache with things like row counts, date ranges, file sizes, and min/max timestamps per instrument. Then instead of hitting B2 every time I want to know basic stuff about my data, I'd just read from the cache. If I structure this correctly, this could make a lot of things that were otherwise pretty slow into trivial operations. If anyone has any thoughts on how I could do this, input would be appreciated.

Data gaps

I restart all the time when I want to add or change features for this data module. This stops the websocket feeds and creates data gaps. For feature computation, this creates the issue where features computed right after a restart could accidentally use stale data and compute bad features, which could mean bad trades.

The first solution I have to handle this gracefully is: Inside of the calculation logic for each feature, I check that the sequence numbers and timestamps satisfy constraints that ensure their correctness. For instance, if I'm calculating a feature for volatility over the last 30 seconds, I don't want tick bars that occurred before that 30-second window.


Feedback would be appreciated.

That's all for now!!!

Ok, so the first thing I've been trying to iterate on is this total search time issue.

As shown above I use B2 for everything. This meant issuing S3 LIST calls over the network, parsing hive-style paths, and filtering by table, source, market, and date before I even read any rows. Super big bottleneck that grows linearly with my scope of markets/instruments/data.

I figured this would be a nice opportunity to use a simple data structure to reduce the amount of unncessary work done. The structure is a four level nested dictionary that mirrors the hive partition layout, going from table to source to market to date, with a list of S3 keys at the leaves. When we write files to b2, the following now happens:

  1. The S3 key for that file gets appended to a local text file (in the directory that corresponds to that file)
  2. The same key gets inserted into the in-memory map (so if there's a search right after, it can find it)

And let's say we reboot our data-platform module:

  1. On startup, the catalog reads through that text file line by line and replaces each key back into the right spot
  2. The full index is rebuilt locally without making any S3 calls

And our search index is rebuilt!

Now that search is quicker, it is trivially easy to accept any combination of exact matches, date ranges, and market prefix filters. We just traverse the dicts to collect matching keys and then we have the exact file paths. No more globs! Fetching all Kalshi BTC markets in a given week is just a prefix match at the market level and a range filter at the date level. Sweet!!!!

Comments

No comments yet.