Systems Engineering

Architecting Digits Search: Real-time Transaction Indexing With Bleve


We recently launched Digits Search, which brings fast, beautiful, full-depth search to business financial data. We’ve received a ton of great customer feedback, but one question just keeps coming up…

How did you build this!?

Well, let’s break down how Digits Search puts your finances at your fingertips.

Digits Architecture

First, some quick background on Digits:

Digits links with your business’ accounting software and your financial institutions to build, and continually maintain, a living model of your business with the most up-to-date data. Once linked, we ingest all of that financial data in raw form, a collection that we call “facts”. 

We then use machine learning and data processing to normalize all of that overlapping, unstructured data. We perform a significant number of calculations to fill holes in the picture, such as predicting how your latest transactions will be categorized, detecting and predicting recurring activity, etc.

The end result of all this work is a “view”, which is then efficiently loaded into Google Cloud Spanner as well as encrypted and archived in Google Cloud Storage for secondary processing.

Each view we produce is a complete, standalone picture of your company’s entire financial history.

Views are served by our serving layer, which is composed of a number of services communicating over TLS-encrypted GRPC APIs, written in Go and hosted in GKE. Our serving layer aims to optimize for efficiency, security, and reliability.

Architecting Search

There are a great many open-source search tools available. Examples range from library implementations like Lucene, server implementations like Solr, clustered solutions like ElasticSearch, and SaaS solutions like Algolia.

To pick a direction, we thought about how each approach would integrate with our existing Digits architecture.

The most important architectural factor for our design is our complete, standalone view production every time we get new data. This architectural bedrock presents us with some significant design opportunities:

  • Each View is complete and self-contained, which allows for indexing to be done entirely offline and for each view index to be immutable.
  • Each View is a natural sharding element, which is independent from other views, even for the same business.
  • The immutable nature of views and their indexes allows us to think of indexes as deliverable objects, rather than as online mutable state.

With this framing in mind, we began to look for a solution that would allow us to produce offline indexes and reliably serve them from within our current serving layer.

Ideally, this solution would integrate seamlessly with our existing services: Go, GRPC with mTLS, stateless authorization for bulletproof multi-tenancy, minimal GKE footprint, and minimal operator overhead. In particular, mutual TLS and token authentication are part of our system design at every layer of the stack, and act as defense-in-depth measures against pivots within our infrastructure in case of a breach. Any solution we pick for search must support this level of data protection.

It should also be as fast as the dickens.

We took stock of the various search strategies (clusters, servers, libraries, SaaS), and researched options within each space. We found that many solutions are organized around managing updates to online, mutable indexes. However, our view-based architecture allows us to skip over many of the state-management concerns (and the operational overhead) that mutable indexes must handle.

We decided that the library strategy appeared most promising, as it would allow us to take strong advantage of the near-statelessness of offline index shipping. The library approach would also allow us to use our existing common service code and configurations to keep operational overhead low.

Apache Lucene regularly tops the list of open source search libraries, but an ideal solution would be able to leverage all of our experience-hardened service code in our Go monorepo. We’ve strongly prioritized security in our system designs, and being able to leverage our existing, penetration-tested code for authorization, encryption, and network services would help us continue to keep our systems secure. With that in mind, we surveyed the current search landscape in Go, and immediately found a promising option!

Enter: Bleve, a native-Go search library that underpins major projects such as Couchbase and Caddy.

We tried some quick experiments to see if Bleve was suited to our needs, and we were very pleased to learn that it exceeded our expectations! We then built a more complete prototype and quickly demonstrated that it was a great fit.

With our starting strategy in mind, we then began to flesh out the system’s overall design. 

The views themselves would be produced by our streaming and batch pipelines. To ensure the work of indexing our views wouldn’t impact query latency, we wanted to index the data offline, separately from the serving processes. This meant we’d need a way to reliably deliver each completed index to the serving boxes. Finally, we’d also need a service for query processing and to coordinate future sharding. To coordinate the encrypted data moving between systems, Google Cloud PubSub and GCS were our first choice, as we had experience with them elsewhere.

Here’s a quick look at the design we’ve sketched out so far:

Digits Search architecture diagram

Although this might seem like a lot at first blush, each of these services serves a specific purpose, and our Go monorepo automation makes managing them easy and lightweight.

Each service is deployed to GKE via our continuous deployment pipeline and uses common, shared service and GRPC libraries we’ve built to ensure all of our services operate in a consistent, secure manner. Standing on the shoulders of giants, the completed search system amounts to less than 5,000 lines of code, including custom query pre-processing, document handling, and integration tests.

Building Search

Search must address a number of sub-problems: document collection, mapping, and indexing; index storage and cleanup; query pre-processing and routing; and result delivery. To see how this all comes together, we’ll walk through the Indexing and Serving parts of Search.

One thing we learned about Bleve is that while the basic documentation is excellent, the project has added a number of interesting features and optimizations which are not mentioned in the published documentation, but are instead discussed on the mailing list or in Github issues. As we go along, we’ll include code snippets to highlight those additional features as we’ve used them.

Secure Indexing

To build our indexes, we first need to assemble the raw data from the views we produce, a process we call “bundling”. Bundling brings together multiple distinct document types, for example, vendors, categories, and transactions. These document collections are encoded as Protobuf, archived, and encrypted, waiting for our search indexer to retrieve them from Google Cloud Storage.

type Bundle struct {
   LegalEntityID string
   ViewVersion   float64
 
   Vendors      []*search.ArchiveVendor
   Transactions []*search.ArchiveTransaction
   Categories   []*search.ArchiveCategory
}

To populate the Bundle, we download our archives from GCS and decrypt them. Once we’ve got everything in one place, we’re almost ready to index.

But, before we can proceed, we need to tell Bleve how to handle each of our document types. Bleve calls this process mapping, and we’ll need to configure our overall index, document types, and their fields. At the top, we create our IndexMapping(), and add each of our Document types to it:

func (b Bundle) mapping() (mapping.IndexMapping, error) {
   // Base Index Mapping for this Bundle
   m := bleve.NewIndexMapping()
 
   // Each new type this bundle supports must
   // be added here, which will in turn expose
   // it's mapping to the indexer.
   types := []document.Mappable{
       &document.Transaction{},
       &document.Vendor{},
       &document.Category{},
   }
 
   // Iterate over the provided mappable types
   // and map them.
   for _, t := range types {
       m.AddDocumentMapping(t.Type(), t.DocumentMapping())
   }
 
   return m, nil
}

Each of our supported document types implements our Mappable interface, which returns their DocumentMappings, and in turn, their relevant FieldMappings:

func (t *Category) DocumentMapping() *mapping.DocumentMapping {
   doc := t.Extractable.DocumentMapping()
 
   // Disable fields that should not be searched/stored in the index
   doc.AddSubDocumentMapping("category_id", disabledField)
   doc.AddSubDocumentMapping("display_key", disabledField)
   doc.AddSubDocumentMapping("type", disabledField)
   doc.AddSubDocumentMapping("parent_category_id", disabledField)
 
   doc.AddFieldMappingsAt("name", simpleMapping, minimalMapping, whitespaceMapping)
 
   // Add date as a field mapping to this document
   doc.AddFieldMappingsAt("date", t.Date.FieldMapping())
 
   // Add keywords as a field mapping to this document
   doc.AddFieldMappingsAt("keywords", t.Keywords.FieldMappings()...)
 
   return doc
}

There’s a few interesting things happening in this example.

First, we create an Extractable as our base DocumentMapping. Each of our documents embeds Extractable, which allows us to store a protojson encoded copy of our document in a stored field in Bleve. We take this step because it allows us to build our response entirely from search results without calling other systems, which is important for keeping response latencies low. Our Extractable documents also set up type faceting, which lets us count results by type.

// Extractable represents non-indexed fields for being able to extract
// documents from a Bleve index
type Extractable struct {
   Source   string `json:"_source"` // must be string not bytes, https://groups.google.com/forum/#!topic/bleve/BAzJolSqhwU
   DocType  string `json:"_type"`
   Encoding string `json:"_encoding"`
}
 
// DocumentMapping returns a DocumentMapping for Extractable types.
func (Extractable) DocumentMapping() *mapping.DocumentMapping {
   doc := bleve.NewDocumentMapping()
 
   // storeOnly is a text field mapping that results in a copy
   // of the field being stored along with the rest of the document
   // in the index, but not actually included in a searchable way.
   storeOnly := bleve.NewTextFieldMapping()
   storeOnly.Index = false
   storeOnly.IncludeInAll = false
   storeOnly.IncludeTermVectors = false
   storeOnly.DocValues = false
 
   // _source and _encoding are both used to reconstitute the original
   // object from proto-as-json encoded data in search results.
   doc.AddFieldMappingsAt("_source", storeOnly)
   doc.AddFieldMappingsAt("_encoding", storeOnly)
 
   // faceting is a text field mapping that is as close to storeOnly
   // as possible, but with features required for faceting enabled.
   faceting := bleve.NewTextFieldMapping()
   faceting.Index = true
   faceting.IncludeInAll = false
   faceting.IncludeTermVectors = false
   faceting.DocValues = true
 
   // _type is the document type field. We facet on this to be able
   // to answer the question "how many Category results came back for
   // this query?"
   doc.AddFieldMappingsAt("_type", faceting)
 
   return doc
}

Returning to our Category DocumentMapping example, we also disable a number of fields from being searchable (in effect, leaving them as store-only fields in our Extractable). For the “name” field, we apply multiple different mappings, applying different analyzers, which ensures that the field is indexed in ways that are intuitively searchable to our customers. Finally, we set up some additional field mappings from some common Date and Keyword types. Every document type we currently support is sortable by date, which allows us to promote more recent documents in the results.

With our bundle populated and our mappings in place, we are ready to index!

First, we’ll create a new index using Bleve’s offline IndexBuilder. IndexBuilder optimizes the performance of creating an index, as well as the resulting files—a good match for our immutable offline indexes.

// NewIndex returns an empty index ready for indexing of this bundle
func (b Bundle) NewIndex(storePath string, batchSize int) (bleve.Builder, error) {
   mapping, err := b.mapping()
   if err != nil {
       return nil, err
   }
 
   path := b.VersionPath(storePath)
 
   // Create an index build directory within the storePath, which avoids
   // issues within Docker containers when Bleve compacts the final index
   // segments.
   if err := os.MkdirAll(b.TempPath(storePath), 0775); err != nil {
       return nil, err
   }
 
   config := map[string]interface{}{
       "buildPathPrefix": b.TempPath(storePath),
   }
 
   // Pass through provided batch size if set,
   // otherwise allow Bleve to fall back to the
   // internal default batch size.
   if batchSize > 0 {
       config["batchSize"] = batchSize
   }
 
   // Builders only support offline indexing.
   return bleve.NewBuilder(path, mapping, config)
}

Bleve’s IndexBuilder also accepts a map of string-based configuration options, which we can use to tune the batch size of indexing to make sure our indexes build quickly.

Finally, actually indexing all of our documents is a range over a channel of indexable documents.

func (idx *Indexer) builder(index bleve.Builder, documents document.Stream, legalEntityID string) error {
   start := time.Now()
   count := 0
   for doc := range documents {
       // Skip documents with empty IDs
       if doc.ID() == "" {
           continue
       }
 
       // Add document to index
       if err := index.Index(doc.ID(), doc); err != nil {
           return err
       }
 
       count++
   }
 
   // After indexing is complete, close the index to ensure that the
   // file handle is released and bleve stops attempting to modify
   // segments prior to us compressing/deleting the file.
   if err := index.Close(); err != nil {
       return err
   }
 
   logger.Infof("indexed %d in %s for %s", count, time.Since(start), legalEntityID)
   return nil
}

Once we’ve generated our indexes, we ship them off to the serving boxes by encrypting them and placing them in GCS. We ensure that each serving box receives a copy of the newly-produced index by placing an event into PubSub with subscriptions configured in a “broadcast” pattern. 

We’ve also added some safety mechanisms to handle edge cases, such as if a request arrives for an index that’s not present on the serving box, as well as by utilizing Kubernetes StatefulSets to keep the current index set around between deployments.

Querying

With our indexes built and ready to receive requests, let’s take a look at the query side of Search.

Bleve offers a built in query language called Query String Query, which is interpreted by the Bleve libraries and exposes a moderately-powerful interface into Bleve. However, it’s possible for your application to outgrow Query String Query. Fortunately, Bleve offers a powerful and composable API to express more complex queries.

This enables us to expose a search API customized to searching Digits, and to express constraints as distinct, typed API fields, rather than requiring them to be built into a single query string.

message EntityRequest {
   // legal_entity_id and view_version are required to route search
   // queries to the correct index.
   string legal_entity_id = 1;
   string view_version = 2;
 
   // The object kind for the query
   object.v1beta1.ObjectIdentifier.Kind kind = 3;
 
   // The user-entered query text, unmodified.
   string text = 10;
 
   // Date range support for constraining query results.
   digits.v1.Timestamp occurred_after = 20;
   digits.v1.Timestamp occurred_before = 21;
 
   // page supports query pagination
   digits.v1.Pagination page = 30;
}

We are also able to perform some pre-query analysis of the user-entered text to further customize the results. For example, if we determine that the user is searching for a currency amount, we can constrain that to only search against currency fields in the index.

Although some of our SemanticQuery implementation overlaps with Bleve’s different query types, performing pre-query analysis lets us highly-customize how we handle different inputs.

// SemanticQuery is an analyzed query string with semantic tokens extracted.
type SemanticQuery struct {
   Phrase   string
   Terms    []string
   Keywords []string
   Quoted   []string
   Amounts  []float64
}

Notably, we also wire up our own Stop Words that remove accounting-specific terms from queries to help focus on what the user is actually looking for:

// Custom Digits-specific stopwords
StopWords.Add(
   // Spending: consider focusing searches with these terms to expenses?
   "spend", "spent", "money", "pay",
 
   // Time: consider interpreting and limiting queries
   "year", "month", "day", "week", "quarter",
 
   // Aggregations:
   "total",
)

We then assemble a Bleve Boolean query from our extracted query parts:

// BuildBooleanQuery assembles a BooleanQuery from our common query arguments.
// Additional query requirements can be added to the returned object as needed.
func BuildBooleanQuery(args BooleanQueryArguments) *query.BooleanQuery {
   // Extract SemanticQuery from query string.
   sem := ExtractSemanticQuery(args.GetText())
 
   // Instantiate our core BooleanQuery object, to which
   // we will add parameters/requirements.
   boolean := bleve.NewBooleanQuery()
 
   // Add per-term queries for our semantic terms
   terms := bleve.NewBooleanQuery()
   for _, t := range sem.Terms {
       // Our terms must at minimum be a subset of something in the index
       prefix := bleve.NewPrefixQuery(t)
       terms.AddMust(prefix)
 
       // Exact matches are preferred
       term := bleve.NewTermQuery(t)
       boolean.AddShould(term)
 
       // As are name matches
       namePrefix := bleve.NewPrefixQuery(t)
       namePrefix.SetField("name")
       boolean.AddShould(namePrefix)
   }
 
   keywords := bleve.NewBooleanQuery()
   for _, t := range sem.Keywords {
       // Our terms must at minimum be a subset of something in the index
       prefix := bleve.NewPrefixQuery(t)
       keywords.AddMust(prefix)
   }
 
   // Add our terms and keywords multiple ways:
   switch {
   case sem.HasTerms() && sem.HasKeywords():
       // If we have both terms _and_ keywords, run them as a disjunct query
       disjunct := bleve.NewDisjunctionQuery(terms, keywords)
       boolean.AddMust(disjunct)
   case sem.HasTerms():
       // If we only have terms, run just the terms
       boolean.AddMust(terms)
   case sem.HasKeywords():
       // If we only have keywords, run just the keywords.
       boolean.AddMust(keywords)
   }
 
   // Add specific handling for quoted terms
   for _, q := range sem.Quoted {
       quoted := bleve.NewMatchPhraseQuery(q)
       boolean.AddMust(quoted)
   }
 
   // Add specific handling for formatted amounts if detected.
   for _, amt := range sem.Amounts {
       // Ranges should be inclusive so the same value for min and max
       // will find the correct results.
       inclusive := true
 
       // Capture the loop variable because we must take it's address.
       amount := amt
 
       // Search for this amount numerically. Range must be inclusive to support
       // searching precise amounts.
       numAmount := bleve.NewNumericRangeInclusiveQuery(&amount, &amount, &inclusive, &inclusive)
       numAmount.SetField("num_amount")
 
       // Search for this amount numerically against absolute values. Range must
       // be inclusive to support searching precise amounts.
       absAmount := bleve.NewNumericRangeInclusiveQuery(&amount, &amount, &inclusive, &inclusive)
       absAmount.SetField("abs_amount")
 
       amounts := bleve.NewDisjunctionQuery(numAmount, absAmount)
       boolean.AddMust(amounts)
   }
 
   return boolean
}

We have additional query-building logic to further constrain queries by date, page, and type.

We then route our built query to our serving shards, await the results, and convert them to our protobuf response type:

// Result represents a customer-facing set of search results.
message Result {
   // The number of results
   int64 total = 1;
 
   // The hits for this page of results
   repeated Hit results = 2;
 
   // The count by ObjectEntity.Kind of the results for this query.
   repeated ObjectKindFacet kind_counts = 3;
 
   // The amount of time to retrieve the results from the index.
   double took_secs = 4;
 
   // The hydrated objects referenced in results above
   object.v1beta1.ObjectEntities entities = 10;
}

And with that, we’re able to then render these results in Digits!

Results

We’re extremely happy with Digits Search. 

What began as a companion feature quickly proved it’s value and became our first public launch. Although search functionality exists today in nearly every app and website, the power of Digits brings deep, full-history search to finances in a way that hasn’t existed before.

Digits Search in action

Furthermore, we’re especially pleased with Search (and Bleve’s) performance. Our typical response time, even for very large customers, is under 50ms.

We use Google Cloud Trace to inspect requests and to ensure we can respond to performance issues. Visualizing the requests made to build the page in the screenshot above highlights how quickly we’re able to assemble the parts of the page into a response:

Google Cloud Trace of a Digits Search request

This makes search feel truly instant, and has led to some great customer feedback.
We’re also very pleased with Bleve’s underlying index implementation. In addition to excellent response latencies, its focus on memory usage means that our search service typically operates within low-tens-of-megabytes of memory under current usage, excluding mmap page caches.

Digits Search memory usage

Compared with other search platforms’ out-of-the-box defaults, this is at least an order of magnitude more memory efficient for our use case of many small, isolated indexes. This efficiency means we don’t need to grow our Kubernetes cluster to make space for processes with a large baseline memory footprint.

We’re also pleased that we were able to use our existing authentication and authorization libraries, and tightly integrate them with our search index routing, so that every customer’s data is isolated and protected at a deep level.

Another decision that worked well in practice was to store the actual objects we need as a stored field in the index, which allows us to render beautiful search results without incurring object hydration latency penalties that we’d pay if we needed to make a cross-system call.

Finally, by building on Bleve’s composable query API, we were able to assemble queries in ways that solve for observed pain points. By architecting Search this way, we are able to easily build end-to-end tests for queries and their expected results, adding cases for bug fixes as we observe them.

Looking Forward

Although we’re happy with Search today, we’re not done. We’ve identified a number of future paths to make Search even better, such as investigating Bluge, a next-generation library built on Bleve’s core, which would give us more control over result ranking, query building, and aggregations.

We’re also exploring ways to further enhance query handling by incorporating machine learning to more deeply understand connections between what users are asking and patterns in their financial data. For example, we think we can make Search more conversational by extracting deeper semantic meaning from queries.

Join our team to push the limits of Search

If you’d like to learn how Digits users are interacting with Search, see something we missed, or have ideas on how to make it even better, we’d love to hear from you! We’re hiring engineers who are excited about taking real-world customer needs and iterating towards delightful, reliable product experiences.

We’re currently hiring software engineers and ML engineers, and we can’t wait to meet you.

Less technical? We have many more open positions to help accelerate our mission to revolutionize business finance.

Building Digits: Efficient Serving of Static Views with Google Cloud Spanner


At Digits, we strive to push the bounds of technology in order to deliver radically more useful, delightful software experiences for our customers. We’re excited to begin sharing a closer look at the technical foundations that underpin our products in a new series of blog posts called Building Digits. Without further ado…

Let’s talk about viewing complex data. One of our primary goals at Digits is to provide business owners with insightful and holistic views of their company’s finances, in substantially real-time. 

Achieving this involves three major, independent steps: 

  1. We collect all of their relevant data from various sources, such as their QuickBooks, the financial institutions they bank with, their corporate credit card providers, etc.
  2. We apply our algorithms and proprietary datasets to extend, interpret, and tease out meaning from all of their data.
  3. We consolidate and aggregate the results into a holistic view that we then visualize for them on their dashboard.

Facts

We refer to the pieces of data that we receive from third party systems as Facts.  This is not a judgement of the credibility or immutability of these systems, but rather a delineation of what is (and what is not) under our control.  

For example, if we receive a transaction from an external source that looks like 05/12/20 - Taxi $15.05, we might classify it as Transportation.  Later, we may receive another piece of information that leads us to believe that this transaction was actually a client expense, and is better classified as Meals & Entertainment.  In this example, the transaction itself, the fact, did not change—but our interpretation of it did.

Computed Data

We refer to insights and analysis that are performed by Digits, based on all of the Facts that we have received, as Computed Data.  In the example above, this involved a category classification of a transaction.  In other cases, this might involve determining that two external pieces of information actually represent the same physical transaction, or detecting that a particular transaction tends to recur on a regular interval and that it should be treated as a subscription.  

Benefits of Recomputation

One of the challenges with this model is that we receive new data from external systems constantly, and each new Fact we receive may shed more light on, or change our understanding of, earlier Facts we already recorded.  The arrival of new Facts can impact virtually every aspect of our Computed Data.  As a result, determining the subset of Computed Data that needs to be updated as a result of any new Fact arriving is non-trivially complex. 

We’ve chosen to avoid this complexity by “recomputing the world.” We reconsider our entire set of Computed Data every time the set of Facts that it is based on changes in any way.  This guarantees that Digits uses every piece of knowledge it has access to in order to model your business’s financial health at every moment, so it’s as accurate as it can possibly be.

Consistent Views of the World

The last core tenet of our architecture is our notion of a View.  To us, a view of a customer’s data is the combination of all of the Facts for that customer at time T, as well as all of the Computed Data derived from that fact set.  If the set of customer facts changes at time T+1, we’ll create a new view that represents that updated set of Facts and Computed Data.

When a customer loads our dashboard, we retrieve the latest available version of their view, and their experience is based on that view version for the remainder of the customer session. 

What is the motivation for keeping their experience tied to a single, static view version?  There are several:

  1. Consistency. Assume that the arrival of a new fact causes us to re-label a subset of transactions as Meals & Entertainment instead of Transportation.  It would be confusing to our customers if one or two of the transactions in this subset changed labels while others did not as they browsed the site, and even more confusing if additional transactions became recategorized incrementally as they loaded new pages.
  2. Atomicity. Assume that we receive an external update that replaces one Fact with another.  For example, a correction of a pending transaction with the actual, confirmed transaction and the date it posted.  If a customer was looking at a page of transactions that included the pending transaction, and then clicked Next and loaded a new page that now, all of a sudden, included the confirmed version of the transaction, they would be confused why the transaction seemed to appear twice.

View Serving Architecture

For these reasons, we decided that our architecture would entail a system that is capable of loading a holistic view V1, serving a customer dashboard based on it, and then, at a later point in time, loading a new holistic view V2 and atomically switching to serving the next customer experience based on it.  At yet another later point, after a configurable number of hours has elapsed, we want to unload view V1 as it is no longer needed to serve any customer experiences.

Frequency of Recomputation

For a given customer, we recompute their view whenever we detect that any of their Facts or Computed Data have changed.  In practice, this varies from customer to customer but can roughly be assumed to be between 1 and 24 times per day.

Why Google Cloud Spanner?

After a detailed analysis of the major cloud platforms in 2018, we made the decision to begin building Digits on Google Cloud, and we have been quite pleased with that decision.

With the rest of our infrastructure within the Google ecosystem, it was natural to evaluate Spanner as a potential database technology, and there are quite a few aspects of Spanner that are beneficial to our use case:

  1. Spanner is fully-managed and requires effectively zero overhead for database operations.
  2. Its ability to automatically shard data via table interleaving was an appealing feature for us, as it allows us to prepare for high scale and still get the benefits of relational database features: efficient joins, foreign keys, etc.
  3. Its ability to perform ACID transactions, as needed, was appealing to us from a financial data perspective.

Read-time SQL

There is a huge trade-off between pre-computing everything to reduce read latency (i.e. dashboard load time) while limiting development speed, versus biasing towards read-time computation, which permits rapid feature iteration on the frontend. 

At Digits’ current stage, we want to be in the middle of this spectrum — have low read latencies for a great user experience, but still be able to quickly iterate and perfect new features based on customer feedback.  In order to support this model, whichever solution we chose for serving our views would have to support read-time SQL.

Sharding and Interleaving

Once it became clear that read-time SQL support was a requirement, we also wanted to be sure that the database solution we selected would easily scale with us as we grew.  Traditional RDBMS systems have trouble scaling join performance once the data set can no longer fit on a single node, and many NoSQL key-value stores address the scalability concern by sacrificing join support entirely.

Spanner’s interleaving/sharding design is a nice balance between these two ends of the spectrum.  While all data for a given set of parent-child tables does not need to fit on a single node, rows that share the same root key are guaranteed to be co-located on a node.  This allows for fast joins within a parent-child hierarchy and aside from defining the interleaving model in the schema, it happens without the developer’s involvement.  

Alternative Solutions

Combined, these constraints of easy scalability, low latency reads, and support for relational SQL eliminate quite a few otherwise appealing solutions.  For example:

  1. Cassandra and Redis are both great for serving precomputed views, but do not support read time aggregations via SQL.  (Cassandra does support a SQL API but not read-time aggregation via SQL)
  2. MySQL and Postgres are great for relational querying and read-time aggregation, but are challenging to scale, as sharding data across clusters is left to the engineer/operator.
  3. Google BigQuery is great for all kinds of SQL analytics querying but is not designed to serve low latency, customer-facing dashboard reads.

Based on all of these factors, we elected to implement our view architecture on Spanner.

Implementing Views in Spanner

As we developed our view implementation, one of the challenges that we had to overcome was Spanner’s 20,000 cell mutation limit.  The limit caps the number of cells (rows * columns-per-row) that can be inserted/updated in a single transaction.  

This limit presented challenges on both the view loading side and the view unloading (deleting) side.

View Loading

On the view loading side it meant that as we computed views for a given customer, we could not guarantee that we could load the entire view into the database atomically.  Additionally, it is non-trivial for the implementer to know whether a particular transaction would hit this limit or not as they would need to keep track of all of the cells impacted by the generated mutations or the DML statement.  

To address this, we created a view version table that is independent of the tables that actually store the view data.  This table is a simple mapping from a customer id to a version identifier.  This version column is also set on all rows of the actual view data tables.

For example, a small subset of our view schema may look like this:

The queries that power our dashboard can either serve a view for a particular, known, version or consult the active versions table to see which version is the latest (our version_ids increase monotonically).

Our view loading process, for a given customer, then looks like this:

  1. Load all parts of the view in independent transactions of roughly 100 rows each (conservative to stay well-clear of the mutation limit).
  2. Once all parts of the view have been successfully loaded, update the view version table to denote the newest active version.

This process ensures that a new view will be served atomically in its entirety, because no query will be aware of its existence until the view version table has been updated.  At the same time, existing customer sessions can continue to experience our dashboard against the view version which was active at the start of their session.

View Unloading

The same 20,000 cell mutation limit applies to data deletion.  Spanner does support a Partitioned DML alternative that was appealing for this use case, however we found two limitations with it:

  1. Every time we load a new view, it effectively invalidates a similarly sized older view, and this amounts to tens of thousands of rows in need of deletion.  This has a significant impact on CPU load.  Spanner tombstones rows that are marked as deleted, which makes them invisible to all queries, and then reclaims the disk space in an asynchronous process.  However, in our experience both the tombstoning and the reclamation process place a non-trivial load on the CPU and can thus impact read latency of customer facing queries.  
  2. The partitioned DML alternative that is documented to not be constrained by the 20,000 cell mutation limit still fails intermittently with the 20,000 cell mutation limit error.

Efficient Incremental Views

To address the deletion constraint, we analyzed the insertions and deletions that we were performing as part of view loading/unloading and confirmed what we expected: the vast majority of rows stay the same from one view version to the next. All we had to do was invest in being able to easily identify the rows that actually changed, and only inserting and removing the deltas.

(It is important to note that while determining which pieces of our Computed Data for a given customer need to be updated because we received new Facts is non-trivially complex, comparing two fully computed views to each other and determining which rows have been updated, removed, or created is quite straightforward.)

The output of this comparison can then be used as follows:

Each row in the current active version is determined to:

  1. Still be relevant
  2. Be removed for all view versions going forward

Each row in the newly computed version is determined to:

  1. Already be present in the active view version
  2. Be added for all view versions going forward

For most normal operations, 99% of rows in both the existing version and the new version are determined to be identical, and thus no-ops. 

To support this, we modified our view tables to have two version-related columns, version_valid_since and version_invalid_since, and updated all queries with two WHERE conditions.  For example, continuing with the example schema above, our modified schema would look like this:

Finally, we implemented version diffing in a generic way, such that it can be applied to all of our view tables without additional work. 

With this in place, we now have to insert and delete 99% less data from Spanner than we did when we were fully loading/unloading every view.

Schema Design for Scale

One unintuitive aspect of Spanner’s secondary indexes (indexes you explicitly add to tables as opposed to the index you implicitly get for the primary key), is that if a query which hits the index selects a column that is neither a part of the index nor explicitly stored on the index, Spanner must perform an implicit join from the secondary index back to the base table.  This makes sense once you accept the fact that the index is stored as an independent structure from the table that it is indexing. 

Unfortunately, this join may be non-trivial in cost, particularly when a lot of data is being selected.

To avoid secondary-index-to-base-table join costs, we have explored two options:

  1. Carefully designing our tables’ primary keys in such a way that most common queries only require the implicit primary key index.
  2. Creating secondary indexes that are less generic and more tailored to specific query patterns by including all of the columns selected by that query in their STORING clauses.  

The second option has a higher maintenance cost as it potentially requires updating indexes when new columns are added to tables (if these new columns are selected by queries which indexes are tailored for) or when query patterns are added for product reasons.  As a result, we prefer the first approach whenever possible. 

For example, if the majority of queries against a table will involve restricting the result set by time, then we consider adding the column that represents time to be part of the primary key, even if it logically is not required to be in the primary key.  

Sharding

Spanner’s interleaving support allows for schema design that makes your database straightforward to scale while still supporting efficient joins on data that is commonly joined together.  These two properties are often very difficult to achieve in tandem with relational databases.  

Interleaving lets you to signal to Spanner that all hierarchical data spanning multiple tables, rooted at a particular root row, should be colocated together.  Building on our schema examples above, it might be appealing to interleave all of our view data under the customers table.  This would mean that all view data for a given customer would be colocated—a property that makes sense since we often want to show a customer various parts of their data, while we never want to join data from multiple customers together.

The schema for the customers table as well as the view tables above may then look like this:

CREATE TABLE customers (
    customer_id STRING(MAX) NOT NULL,
    name STRING(MAX)
) PRIMARY KEY (customer_id);

CREATE TABLE active_versions (
    customer_id STRING(MAX) NOT NULL,
    version INT64 NOT NULL
) PRIMARY KEY (customer_id), INTERLEAVE IN PARENT customers ON DELETE CASCADE;

CREATE TABLE payments (
    customer_id STRING(MAX) NOT NULL,
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    payment_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, version_valid_since, payment_id), INTERLEAVE IN PARENT customers ON DELETE CASCADE;

CREATE TABLE sales (
    customer_id STRING(MAX) NOT NULL,
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    sale_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, version_valid_since, sale_id), INTERLEAVE IN PARENT customers ON DELETE CASCADE;

This schema might work well, but there is another factor to keep in mind: the size of all data that is interleaved under a single root has a hard limit in Spanner of 4 GB.  To avoid approaching this limit, we might further restrict the data that is co-located. 

For example, if we know that there are pieces of data that will never be joined with each other, then there is no reason for them to be co-located on the same node. Building on our scenario above, imagine that payments and sales are shown in totally separate parts of our dashboard and would never need to be joined together. If that were the case, then interleaving and thus colocating the view data for both of these tables, for a given customer, under the same customer_id row would make that row unnecessarily large.

To address this, we could add a table in between customers and payments/sales that would facilitate better sharding. The new table might look like:

CREATE TABLE customer_view_types (
    customer_id STRING(MAX) NOT NULL,
    view_type STRING(MAX)
) PRIMARY KEY (customer_id, view_type) INTERLEAVE IN PARENT customers ON DELETE CASCADE;

And the payments and sales tables would be updated to look like:

CREATE TABLE payments (
    customer_id STRING(MAX) NOT NULL,
    view_type STRING(MAX),
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    payment_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, view_type, version_valid_since, payment_id), INTERLEAVE IN PARENT customer_view_types ON DELETE CASCADE;

CREATE TABLE sales (
    customer_id STRING(MAX) NOT NULL,
    view_type STRING(MAX),
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    sale_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, view_type, version_valid_since, sale_id), INTERLEAVE IN PARENT customer_view_types ON DELETE CASCADE;

In Production

While there were a few limitations to work around, and special care must be taken in both schema design and query design to maintain high performance, Spanner has performed well in production as our customer base has scaled to billions of dollars in transaction value.

Incremental static views provide an optimal balance between dashboard consistency, read-time latency, continual re-computation based on new data, and developer productivity, and Spanner’s ease of scalability via auto-sharding and interleaving has made this architecture very low-overhead to operate.

Join the Team

Static view serving at scale is just one of countless technical challenges we’ve faced while building Digits, and we’re pushing the boundaries at every layer of the stack.

If you’re interested in crafting the next generation of financial software, we’re hiring, and we’d love to meet you! See our open positions here.