Doing one thing really well Pt 1: Data Engine

Building a Data Engine

I've started building a data engine and I'm excited to share!!

1. The Realization

I've been trying to build systematic trading infrastructure for a while now, and my first real attempt was highlighted in this post a while back. Since then, I've started over several times, and learned a ton of useful info from places like reddit, convos with experienced devs, and blog posts. One resource I found particularly useful was this HeadlandsTech blog post on the structure of quant trading.

That first profitron project I made was a monolithic architecture that coupled data collection, strategy, execution, portfolio management, backtesting, aka pretty much everything. My mistake was that I tried to do it all at once. Things got way too complex way too quickly. Changing/adding features were a nightmare because the architecture was way too entangled. I needed to take a microservices approach. Rather than doing several things at once, half assed, I will do one thing really well, and ensure its integrity/correctness in isolation. The following post is about the data component.


2. The Problem and The Solution

I've tried to create automated trading systems for equities, crypto, and most recently prediction markets. Every single time I created the data component as a sort of bespoke, one off implementation that exclusively supports the asset class in which I'm trading. Each time I've fetched, processed and stored my data in slightly different ways and it's been a pain in the ass to deal with all of the different data formats, apis etc.. It would be really nice if I had one single data module that supported:

  1. many different asset classes
  2. multiple different exchanges and data feeds per asset class
  3. rich, well defined data structures to store, understand and express the data
  4. easy extensibility to more instruments and markets
  5. easy joinability of any two data tables, regardless of differences in timestamps, instrument type, etc.

This would make life so much easier! Specifically:

  1. Cross-exchange and cross-asset strategies would become 10x easier to research and create.
  2. Rate limiting and api costs would become less of an issue.
  3. I'd have a single, consistent query interface for all my data regardless of where it came from.
  4. Historical data would accumulate automatically over time, so when I want to backtest something six months from now, the data is already there waiting.

General structure

There are two components of this system:

  1. Data fetcher + api

  2. Cold storage database

Data fetcher + API runs 24/7 on a Hetzner CX33 VPS (4 vCPU, 8GB RAM, 80GB NVMe, ~€5.50/month). The pipeline looks like this:

  1. Collectors connect to exchange WebSockets and normalize incoming messages into canonical row dicts
  2. Buffer drains those rows from a shared asyncio.Queue and accumulates them in memory
  3. Writer periodically flushes the buffer to zstd-compressed Parquet files
  4. Query API sits on the same machine and exposes methods like l2(), trades(), and sql(), backed by DuckDB, returning Polars DataFrames

Below is a drawing I made to show the components:
dplat_diagram.png

Everything conforms to strict Arrow schemas defined once in schema.py. There are two data types (with more to come):

  • L2 orderbook updates: timestamp, source, market, side, price, size, sequence number
  • Trades: timestamp, source, market, side, price, size, trade ID

All timestamps are stored as int64 ns

Cold storage is Backblaze B2 (around 0.50/month and growing very slowly). After writing locally, the Parquet files get uploaded to B2 and the local copies are deleted. B2 is the database. Files are organized in a Hive-style partition layout:

data/{table}/source={source}/market={market}/date={date}/{seq}.parquet

This gives me two nice things:

  1. DuckDB can glob over specific partitions without scanning everything
  2. I can query the data from anywhere via DuckDB's httpfs extension without downloading anything

Cost Analysis

I'm currently collecting L2 orderbook updates and trades for 6 instruments across two exchanges:

  • Coinbase (spot crypto): BTC-USD, ETH-USD, SOL-USD
  • Kalshi (prediction markets): KXBTC, KXETH, KXSOL series (these auto-discover active contracts as they rotate)

This will allow me to create cross-exchange crypto strategies which I think is fertile ground as of late given all of the new crypto prediction market contracts that have been popping up.

After about 3 days of collection, the B2 bucket is sitting at roughly 6 GB, or about 2 GB/day. Extrapolating:

Timeframe Storage
1 month ~60 GB
6 months ~360 GB
1 year ~730 GB

B2 charges $0.006/GB/month for storage, with the first 10 GB free. Downloads are free up to 1 GB/day (more than enough for research queries). So:

Component Now After 1 month After 1 year
Hetzner VPS €5.50 €5.50 €5.50
B2 storage ~$0.00 ~$0.30 ~$4.32
B2 downloads $0.00 $0.00 $0.00
Total ~$6 ~$6.30 ~$10

Even after a full year of 24/7 tick-level data collection across 6 instruments, everything is pretty cost effective so far. For comparison a single month of historical L2 data from a vendor like Databento can cost more than an entire year of running this.

Here are the specs of my Hetzner machine:
hetzner.png

Tech Stack

Component Choice Why
Event loop uvloop Drop-in replacement for asyncio, noticeably fast
Serialization msgspec Typed structs, single-pass JSON decode, faster than dataclasses
Storage PyArrow + Parquet Columnar, self-describing, compresses well
Compression zstd (level 3) Good balance of speed and ratio
Query engine DuckDB SQL over Parquet globs, no server needed
DataFrames Polars Faster than pandas I think
Remote storage Backblaze B2 S3-compatible, dirt cheap
Logging structlog Structured JSON logs, machine-readable

3. Interesting Challenges Along the Way

Buffering and Flush Strategy

The buffer sits between the collectors and the writer, and getting the flush strategy right took some tuning. The two triggers are:

  1. Row count: flush at 5 million rows
  2. Time: flush after 1 hour if we haven't hit the row count yet

Early on I had the row threshold set way too low (around 5,000 rows), which meant the system was writing a new Parquet file every few seconds. That creates a lot of tiny files, which is really bad for query performance since DuckDB has to open and scan each one individually.

Bumping it up to 5 million rows means each file is a reasonable size, and the 1 hour time trigger ensures data doesn't sit in memory forever during quiet periods. On shutdown, the buffer catches CancelledError and does a final flush so nothing gets lost.

Compaction

Even with a higher flush threshold, you still end up with multiple files per partition over time. And if you ever need to restart the service, you get a flush on shutdown and another shortly after startup. So I built a compaction utility that merges all the Parquet files within a date partition into a single sorted file.

The tricky part is making this safe to run while the collector is still writing. The compaction script writes to a temp file in the same directory, deletes the old files, and then does an atomic rename to 0000.parquet. Readers never see a partial file.

Sync Boundary

Here's a subtle problem: if I'm writing data on the VPS and querying from my laptop, there's a window where data exists locally but hasn't been uploaded to B2 yet. If I run a query during that window, I get incomplete results and don't even know it.

To deal with this I just have a cut off. The query API clamps all queries to the last completed 1 hour boundary. If it's 2:07 PM, the query will only return data up to the last flush. The buffer flushes frequently enough that anything older than 1 hour is guaranteed to be in B2. This flush frequency is subject to change as my data coverage evolves.

Query Performance

One thing I've noticed is that queries against B2 can be pretty slow. Even simple lookups like "how many rows do I have for BTC-USD on March 20th" require DuckDB to reach out to B2, read parquet metadata over the network and scan through it. For a single file that's ok but when you have thousands of files across multiple instruments and dates, it becomes infeasible.

Soon I will build a metadata cache that sits locally and gets updated as data flows through the system. Every time the writer flushes a new file or the compaction script merges a partition, it would update this cache with things like row counts, date ranges, file sizes, and min/max timestamps per instrument. Then instead of hitting B2 every time I want to know basic stuff about my data, I'd just read from the cache. If I structure this correctly, this could make a lot of things that were otherwise pretty slow into trivially fast operations. If anyone has any thoughts on how I could do this, input would be appreciated.


Feedback would be appreciated.

That's all for now!!!

Comments

No comments yet.