The Future of Finance is Written Here.

Architecting Digits Search: Real-time Transaction Indexing With Bleve


We recently launched Digits Search, which brings fast, beautiful, full-depth search to business financial data. We’ve received a ton of great customer feedback, but one question just keeps coming up…

How did you build this!?

Well, let’s break down how Digits Search puts your finances at your fingertips.

Digits Architecture

First, some quick background on Digits:

Digits links with your business’ accounting software and your financial institutions to build, and continually maintain, a living model of your business with the most up-to-date data. Once linked, we ingest all of that financial data in raw form, a collection that we call “facts”. 

We then use machine learning and data processing to normalize all of that overlapping, unstructured data. We perform a significant number of calculations to fill holes in the picture, such as predicting how your latest transactions will be categorized, detecting and predicting recurring activity, etc.

The end result of all this work is a “view”, which is then efficiently loaded into Google Cloud Spanner as well as encrypted and archived in Google Cloud Storage for secondary processing.

Each view we produce is a complete, standalone picture of your company’s entire financial history.

Views are served by our serving layer, which is composed of a number of services communicating over TLS-encrypted GRPC APIs, written in Go and hosted in GKE. Our serving layer aims to optimize for efficiency, security, and reliability.

Architecting Search

There are a great many open-source search tools available. Examples range from library implementations like Lucene, server implementations like Solr, clustered solutions like ElasticSearch, and SaaS solutions like Algolia.

To pick a direction, we thought about how each approach would integrate with our existing Digits architecture.

The most important architectural factor for our design is our complete, standalone view production every time we get new data. This architectural bedrock presents us with some significant design opportunities:

  • Each View is complete and self-contained, which allows for indexing to be done entirely offline and for each view index to be immutable.
  • Each View is a natural sharding element, which is independent from other views, even for the same business.
  • The immutable nature of views and their indexes allows us to think of indexes as deliverable objects, rather than as online mutable state.

With this framing in mind, we began to look for a solution that would allow us to produce offline indexes and reliably serve them from within our current serving layer.

Ideally, this solution would integrate seamlessly with our existing services: Go, GRPC with mTLS, stateless authorization for bulletproof multi-tenancy, minimal GKE footprint, and minimal operator overhead. In particular, mutual TLS and token authentication are part of our system design at every layer of the stack, and act as defense-in-depth measures against pivots within our infrastructure in case of a breach. Any solution we pick for search must support this level of data protection.

It should also be as fast as the dickens.

We took stock of the various search strategies (clusters, servers, libraries, SaaS), and researched options within each space. We found that many solutions are organized around managing updates to online, mutable indexes. However, our view-based architecture allows us to skip over many of the state-management concerns (and the operational overhead) that mutable indexes must handle.

We decided that the library strategy appeared most promising, as it would allow us to take strong advantage of the near-statelessness of offline index shipping. The library approach would also allow us to use our existing common service code and configurations to keep operational overhead low.

Apache Lucene regularly tops the list of open source search libraries, but an ideal solution would be able to leverage all of our experience-hardened service code in our Go monorepo. We’ve strongly prioritized security in our system designs, and being able to leverage our existing, penetration-tested code for authorization, encryption, and network services would help us continue to keep our systems secure. With that in mind, we surveyed the current search landscape in Go, and immediately found a promising option!

Enter: Bleve, a native-Go search library that underpins major projects such as Couchbase and Caddy.

We tried some quick experiments to see if Bleve was suited to our needs, and we were very pleased to learn that it exceeded our expectations! We then built a more complete prototype and quickly demonstrated that it was a great fit.

With our starting strategy in mind, we then began to flesh out the system’s overall design. 

The views themselves would be produced by our streaming and batch pipelines. To ensure the work of indexing our views wouldn’t impact query latency, we wanted to index the data offline, separately from the serving processes. This meant we’d need a way to reliably deliver each completed index to the serving boxes. Finally, we’d also need a service for query processing and to coordinate future sharding. To coordinate the encrypted data moving between systems, Google Cloud PubSub and GCS were our first choice, as we had experience with them elsewhere.

Here’s a quick look at the design we’ve sketched out so far:

Digits Search architecture diagram

Although this might seem like a lot at first blush, each of these services serves a specific purpose, and our Go monorepo automation makes managing them easy and lightweight.

Each service is deployed to GKE via our continuous deployment pipeline and uses common, shared service and GRPC libraries we’ve built to ensure all of our services operate in a consistent, secure manner. Standing on the shoulders of giants, the completed search system amounts to less than 5,000 lines of code, including custom query pre-processing, document handling, and integration tests.

Building Search

Search must address a number of sub-problems: document collection, mapping, and indexing; index storage and cleanup; query pre-processing and routing; and result delivery. To see how this all comes together, we’ll walk through the Indexing and Serving parts of Search.

One thing we learned about Bleve is that while the basic documentation is excellent, the project has added a number of interesting features and optimizations which are not mentioned in the published documentation, but are instead discussed on the mailing list or in Github issues. As we go along, we’ll include code snippets to highlight those additional features as we’ve used them.

Secure Indexing

To build our indexes, we first need to assemble the raw data from the views we produce, a process we call “bundling”. Bundling brings together multiple distinct document types, for example, vendors, categories, and transactions. These document collections are encoded as Protobuf, archived, and encrypted, waiting for our search indexer to retrieve them from Google Cloud Storage.

type Bundle struct {
   LegalEntityID string
   ViewVersion   float64
 
   Vendors      []*search.ArchiveVendor
   Transactions []*search.ArchiveTransaction
   Categories   []*search.ArchiveCategory
}

To populate the Bundle, we download our archives from GCS and decrypt them. Once we’ve got everything in one place, we’re almost ready to index.

But, before we can proceed, we need to tell Bleve how to handle each of our document types. Bleve calls this process mapping, and we’ll need to configure our overall index, document types, and their fields. At the top, we create our IndexMapping(), and add each of our Document types to it:

func (b Bundle) mapping() (mapping.IndexMapping, error) {
   // Base Index Mapping for this Bundle
   m := bleve.NewIndexMapping()
 
   // Each new type this bundle supports must
   // be added here, which will in turn expose
   // it's mapping to the indexer.
   types := []document.Mappable{
       &document.Transaction{},
       &document.Vendor{},
       &document.Category{},
   }
 
   // Iterate over the provided mappable types
   // and map them.
   for _, t := range types {
       m.AddDocumentMapping(t.Type(), t.DocumentMapping())
   }
 
   return m, nil
}

Each of our supported document types implements our Mappable interface, which returns their DocumentMappings, and in turn, their relevant FieldMappings:

func (t *Category) DocumentMapping() *mapping.DocumentMapping {
   doc := t.Extractable.DocumentMapping()
 
   // Disable fields that should not be searched/stored in the index
   doc.AddSubDocumentMapping("category_id", disabledField)
   doc.AddSubDocumentMapping("display_key", disabledField)
   doc.AddSubDocumentMapping("type", disabledField)
   doc.AddSubDocumentMapping("parent_category_id", disabledField)
 
   doc.AddFieldMappingsAt("name", simpleMapping, minimalMapping, whitespaceMapping)
 
   // Add date as a field mapping to this document
   doc.AddFieldMappingsAt("date", t.Date.FieldMapping())
 
   // Add keywords as a field mapping to this document
   doc.AddFieldMappingsAt("keywords", t.Keywords.FieldMappings()...)
 
   return doc
}

There’s a few interesting things happening in this example.

First, we create an Extractable as our base DocumentMapping. Each of our documents embeds Extractable, which allows us to store a protojson encoded copy of our document in a stored field in Bleve. We take this step because it allows us to build our response entirely from search results without calling other systems, which is important for keeping response latencies low. Our Extractable documents also set up type faceting, which lets us count results by type.

// Extractable represents non-indexed fields for being able to extract
// documents from a Bleve index
type Extractable struct {
   Source   string `json:"_source"` // must be string not bytes, https://groups.google.com/forum/#!topic/bleve/BAzJolSqhwU
   DocType  string `json:"_type"`
   Encoding string `json:"_encoding"`
}
 
// DocumentMapping returns a DocumentMapping for Extractable types.
func (Extractable) DocumentMapping() *mapping.DocumentMapping {
   doc := bleve.NewDocumentMapping()
 
   // storeOnly is a text field mapping that results in a copy
   // of the field being stored along with the rest of the document
   // in the index, but not actually included in a searchable way.
   storeOnly := bleve.NewTextFieldMapping()
   storeOnly.Index = false
   storeOnly.IncludeInAll = false
   storeOnly.IncludeTermVectors = false
   storeOnly.DocValues = false
 
   // _source and _encoding are both used to reconstitute the original
   // object from proto-as-json encoded data in search results.
   doc.AddFieldMappingsAt("_source", storeOnly)
   doc.AddFieldMappingsAt("_encoding", storeOnly)
 
   // faceting is a text field mapping that is as close to storeOnly
   // as possible, but with features required for faceting enabled.
   faceting := bleve.NewTextFieldMapping()
   faceting.Index = true
   faceting.IncludeInAll = false
   faceting.IncludeTermVectors = false
   faceting.DocValues = true
 
   // _type is the document type field. We facet on this to be able
   // to answer the question "how many Category results came back for
   // this query?"
   doc.AddFieldMappingsAt("_type", faceting)
 
   return doc
}

Returning to our Category DocumentMapping example, we also disable a number of fields from being searchable (in effect, leaving them as store-only fields in our Extractable). For the “name” field, we apply multiple different mappings, applying different analyzers, which ensures that the field is indexed in ways that are intuitively searchable to our customers. Finally, we set up some additional field mappings from some common Date and Keyword types. Every document type we currently support is sortable by date, which allows us to promote more recent documents in the results.

With our bundle populated and our mappings in place, we are ready to index!

First, we’ll create a new index using Bleve’s offline IndexBuilder. IndexBuilder optimizes the performance of creating an index, as well as the resulting files—a good match for our immutable offline indexes.

// NewIndex returns an empty index ready for indexing of this bundle
func (b Bundle) NewIndex(storePath string, batchSize int) (bleve.Builder, error) {
   mapping, err := b.mapping()
   if err != nil {
       return nil, err
   }
 
   path := b.VersionPath(storePath)
 
   // Create an index build directory within the storePath, which avoids
   // issues within Docker containers when Bleve compacts the final index
   // segments.
   if err := os.MkdirAll(b.TempPath(storePath), 0775); err != nil {
       return nil, err
   }
 
   config := map[string]interface{}{
       "buildPathPrefix": b.TempPath(storePath),
   }
 
   // Pass through provided batch size if set,
   // otherwise allow Bleve to fall back to the
   // internal default batch size.
   if batchSize > 0 {
       config["batchSize"] = batchSize
   }
 
   // Builders only support offline indexing.
   return bleve.NewBuilder(path, mapping, config)
}

Bleve’s IndexBuilder also accepts a map of string-based configuration options, which we can use to tune the batch size of indexing to make sure our indexes build quickly.

Finally, actually indexing all of our documents is a range over a channel of indexable documents.

func (idx *Indexer) builder(index bleve.Builder, documents document.Stream, legalEntityID string) error {
   start := time.Now()
   count := 0
   for doc := range documents {
       // Skip documents with empty IDs
       if doc.ID() == "" {
           continue
       }
 
       // Add document to index
       if err := index.Index(doc.ID(), doc); err != nil {
           return err
       }
 
       count++
   }
 
   // After indexing is complete, close the index to ensure that the
   // file handle is released and bleve stops attempting to modify
   // segments prior to us compressing/deleting the file.
   if err := index.Close(); err != nil {
       return err
   }
 
   logger.Infof("indexed %d in %s for %s", count, time.Since(start), legalEntityID)
   return nil
}

Once we’ve generated our indexes, we ship them off to the serving boxes by encrypting them and placing them in GCS. We ensure that each serving box receives a copy of the newly-produced index by placing an event into PubSub with subscriptions configured in a “broadcast” pattern. 

We’ve also added some safety mechanisms to handle edge cases, such as if a request arrives for an index that’s not present on the serving box, as well as by utilizing Kubernetes StatefulSets to keep the current index set around between deployments.

Querying

With our indexes built and ready to receive requests, let’s take a look at the query side of Search.

Bleve offers a built in query language called Query String Query, which is interpreted by the Bleve libraries and exposes a moderately-powerful interface into Bleve. However, it’s possible for your application to outgrow Query String Query. Fortunately, Bleve offers a powerful and composable API to express more complex queries.

This enables us to expose a search API customized to searching Digits, and to express constraints as distinct, typed API fields, rather than requiring them to be built into a single query string.

message EntityRequest {
   // legal_entity_id and view_version are required to route search
   // queries to the correct index.
   string legal_entity_id = 1;
   string view_version = 2;
 
   // The object kind for the query
   object.v1beta1.ObjectIdentifier.Kind kind = 3;
 
   // The user-entered query text, unmodified.
   string text = 10;
 
   // Date range support for constraining query results.
   digits.v1.Timestamp occurred_after = 20;
   digits.v1.Timestamp occurred_before = 21;
 
   // page supports query pagination
   digits.v1.Pagination page = 30;
}

We are also able to perform some pre-query analysis of the user-entered text to further customize the results. For example, if we determine that the user is searching for a currency amount, we can constrain that to only search against currency fields in the index.

Although some of our SemanticQuery implementation overlaps with Bleve’s different query types, performing pre-query analysis lets us highly-customize how we handle different inputs.

// SemanticQuery is an analyzed query string with semantic tokens extracted.
type SemanticQuery struct {
   Phrase   string
   Terms    []string
   Keywords []string
   Quoted   []string
   Amounts  []float64
}

Notably, we also wire up our own Stop Words that remove accounting-specific terms from queries to help focus on what the user is actually looking for:

// Custom Digits-specific stopwords
StopWords.Add(
   // Spending: consider focusing searches with these terms to expenses?
   "spend", "spent", "money", "pay",
 
   // Time: consider interpreting and limiting queries
   "year", "month", "day", "week", "quarter",
 
   // Aggregations:
   "total",
)

We then assemble a Bleve Boolean query from our extracted query parts:

// BuildBooleanQuery assembles a BooleanQuery from our common query arguments.
// Additional query requirements can be added to the returned object as needed.
func BuildBooleanQuery(args BooleanQueryArguments) *query.BooleanQuery {
   // Extract SemanticQuery from query string.
   sem := ExtractSemanticQuery(args.GetText())
 
   // Instantiate our core BooleanQuery object, to which
   // we will add parameters/requirements.
   boolean := bleve.NewBooleanQuery()
 
   // Add per-term queries for our semantic terms
   terms := bleve.NewBooleanQuery()
   for _, t := range sem.Terms {
       // Our terms must at minimum be a subset of something in the index
       prefix := bleve.NewPrefixQuery(t)
       terms.AddMust(prefix)
 
       // Exact matches are preferred
       term := bleve.NewTermQuery(t)
       boolean.AddShould(term)
 
       // As are name matches
       namePrefix := bleve.NewPrefixQuery(t)
       namePrefix.SetField("name")
       boolean.AddShould(namePrefix)
   }
 
   keywords := bleve.NewBooleanQuery()
   for _, t := range sem.Keywords {
       // Our terms must at minimum be a subset of something in the index
       prefix := bleve.NewPrefixQuery(t)
       keywords.AddMust(prefix)
   }
 
   // Add our terms and keywords multiple ways:
   switch {
   case sem.HasTerms() && sem.HasKeywords():
       // If we have both terms _and_ keywords, run them as a disjunct query
       disjunct := bleve.NewDisjunctionQuery(terms, keywords)
       boolean.AddMust(disjunct)
   case sem.HasTerms():
       // If we only have terms, run just the terms
       boolean.AddMust(terms)
   case sem.HasKeywords():
       // If we only have keywords, run just the keywords.
       boolean.AddMust(keywords)
   }
 
   // Add specific handling for quoted terms
   for _, q := range sem.Quoted {
       quoted := bleve.NewMatchPhraseQuery(q)
       boolean.AddMust(quoted)
   }
 
   // Add specific handling for formatted amounts if detected.
   for _, amt := range sem.Amounts {
       // Ranges should be inclusive so the same value for min and max
       // will find the correct results.
       inclusive := true
 
       // Capture the loop variable because we must take it's address.
       amount := amt
 
       // Search for this amount numerically. Range must be inclusive to support
       // searching precise amounts.
       numAmount := bleve.NewNumericRangeInclusiveQuery(&amount, &amount, &inclusive, &inclusive)
       numAmount.SetField("num_amount")
 
       // Search for this amount numerically against absolute values. Range must
       // be inclusive to support searching precise amounts.
       absAmount := bleve.NewNumericRangeInclusiveQuery(&amount, &amount, &inclusive, &inclusive)
       absAmount.SetField("abs_amount")
 
       amounts := bleve.NewDisjunctionQuery(numAmount, absAmount)
       boolean.AddMust(amounts)
   }
 
   return boolean
}

We have additional query-building logic to further constrain queries by date, page, and type.

We then route our built query to our serving shards, await the results, and convert them to our protobuf response type:

// Result represents a customer-facing set of search results.
message Result {
   // The number of results
   int64 total = 1;
 
   // The hits for this page of results
   repeated Hit results = 2;
 
   // The count by ObjectEntity.Kind of the results for this query.
   repeated ObjectKindFacet kind_counts = 3;
 
   // The amount of time to retrieve the results from the index.
   double took_secs = 4;
 
   // The hydrated objects referenced in results above
   object.v1beta1.ObjectEntities entities = 10;
}

And with that, we’re able to then render these results in Digits!

Results

We’re extremely happy with Digits Search. 

What began as a companion feature quickly proved it’s value and became our first public launch. Although search functionality exists today in nearly every app and website, the power of Digits brings deep, full-history search to finances in a way that hasn’t existed before.

Digits Search in action

Furthermore, we’re especially pleased with Search (and Bleve’s) performance. Our typical response time, even for very large customers, is under 50ms.

We use Google Cloud Trace to inspect requests and to ensure we can respond to performance issues. Visualizing the requests made to build the page in the screenshot above highlights how quickly we’re able to assemble the parts of the page into a response:

Google Cloud Trace of a Digits Search request

This makes search feel truly instant, and has led to some great customer feedback.
We’re also very pleased with Bleve’s underlying index implementation. In addition to excellent response latencies, its focus on memory usage means that our search service typically operates within low-tens-of-megabytes of memory under current usage, excluding mmap page caches.

Digits Search memory usage

Compared with other search platforms’ out-of-the-box defaults, this is at least an order of magnitude more memory efficient for our use case of many small, isolated indexes. This efficiency means we don’t need to grow our Kubernetes cluster to make space for processes with a large baseline memory footprint.

We’re also pleased that we were able to use our existing authentication and authorization libraries, and tightly integrate them with our search index routing, so that every customer’s data is isolated and protected at a deep level.

Another decision that worked well in practice was to store the actual objects we need as a stored field in the index, which allows us to render beautiful search results without incurring object hydration latency penalties that we’d pay if we needed to make a cross-system call.

Finally, by building on Bleve’s composable query API, we were able to assemble queries in ways that solve for observed pain points. By architecting Search this way, we are able to easily build end-to-end tests for queries and their expected results, adding cases for bug fixes as we observe them.

Looking Forward

Although we’re happy with Search today, we’re not done. We’ve identified a number of future paths to make Search even better, such as investigating Bluge, a next-generation library built on Bleve’s core, which would give us more control over result ranking, query building, and aggregations.

We’re also exploring ways to further enhance query handling by incorporating machine learning to more deeply understand connections between what users are asking and patterns in their financial data. For example, we think we can make Search more conversational by extracting deeper semantic meaning from queries.

Join our team to push the limits of Search

If you’d like to learn how Digits users are interacting with Search, see something we missed, or have ideas on how to make it even better, we’d love to hear from you! We’re hiring engineers who are excited about taking real-world customer needs and iterating towards delightful, reliable product experiences.

We’re currently hiring software engineers and ML engineers, and we can’t wait to meet you.

Less technical? We have many more open positions to help accelerate our mission to revolutionize business finance.

Training and Deploying State of the Art Transformer Models at Digits


Understanding banking transactions as they happen, in real-time, is core to our mission with Digits Search. You can’t answer important finance questions with bad data.

Transaction descriptions contain valuable information which helps us understand and communicate our customers’ business activity. The information we extract is then indexed and made available via Digits Search, and presented in a far more human-readable and intuitive manner than they would get from reviewing their raw bank or credit card statements.

Here we wanted to share a peek behind the curtains on how we extract transaction information with Natural Language Processing (NLP) at Digits. You’ll learn how we apply state-of-the-art Transformer models to this problem and how we go from an ML model idea all the way to a production integration with our Digits Search product.

Our Plan

Information can be extracted from unstructured text through a process called Named Entity Recognition (NER). This NLP concept has been around for many years, and its goal is to classify tokens into predefined categories, such as dates, persons, locations, and entities. 

For example, the transaction below could be transformed into the following structured format:

Named Entity Recognition in action

We had seen outstanding results from NER implementations applied to other industries and we were eager to implement our own banking-related NER model. Rather than adopting a pre-trained NER model, we envisioned a model built with a minimal number of dependencies. That avenue would allow us to continuously update the model while remaining in control of “all moving parts.” With this in mind, we discarded available tools like the SpaCy NER implementation or HuggingFace models for NER. We ended up building our internal NER model based only on TensorFlow 2.x and the ecosystem library TensorFlow Text.

The Data

Every Machine Learning project starts with the data, and so did this one. We decided which relevant information we wanted to extract (e.g., location, website URLs, party names, etc.) and, in the absence of an existing public dataset, we decided to annotate the data ourselves

There are a number of commercial and open-source tools available for data annotation, including:

The optimal tool varies with each project, and is a question of cost, speed, and useful UI. For this project, our key driver for our tool selection was the quality of the UI and the speed of the sample processing, and we chose doccano.

Example of a manual data annotation

At least one human reviewer then evaluated each selected transaction, and that person would mark the relevant sub-strings as shown above. The end-product of this processing step was a dataset of annotated transactions together with the start- and end-character of each entity within the string.

Selecting an Architecture

While NER models can also be based on statistical methods, we established our NER models on an ML architecture called Transformers. This decision was based on two major factors:

  • Transformers provide a major improvement in NLP when it comes to language understanding. Instead of evaluating a sentence token-by-token, the way recurrent networks would perform this task, transformers use an attention mechanism to evaluate the connections between the tokens.
  • Transformers allow the evaluation of up to 512 tokens simultaneously (with some evaluating even more tokens). 

The initial attention-based model architecture was the Bidirectional Encoder Representation from Transformers (BERT, for short), published in 2019. In the original paper by Google AI, the author already highlighted potential applications to NER, which gave us confidence that our transformer approach might work.

Original BERT paper highlighting NER applications (link)

Furthermore, we had previously implemented various other deep-learning applications based on BERT architectures and we were able to reuse our existing shared libraries. This allowed us to develop a prototype in a short amount of time.

BERT models can be used as pre-trained models, which are initially trained on multi-lingual corpi on two general tasks: predicting mask tokens and predicting if the next sentence has a connection to the previous one. Such general training creates a general language understanding within the model. The pre-trained models are provided by various companies, for example, by Google via TensorFlow Hub. The pre-trained model can then be fine-tuned during a task-specific training phase. This requires less computational resources than training a model from scratch.

The BERT architecture can compute up to 512 tokens simultaneously. BERT requires WordPiece tokenization which splits words and sentences into frequent word chunks. The following example sentence would be tokenized as follows:

Digits builds a real-time engine
[b'dig', b'##its', b'builds', b'a', b'real', b'-', b'time', b'engine']

There are a variety of pre-trained BERT models available online, but each has a different focus. Some models are language-specific (e.g., CamemBERT for French or Beto for Spanish), and other models have been reduced in their size through model distillation or pruning (e.g., ALBERT or DistilBERT).

Time to Prototype

Our prototype model was designed to classify the sequence of tokens which represent the transaction in question. We converted the annotated data into a sequence of labels that matched the number of tokens generated from the transactions for the training.  Then, we trained the model to classify each token label:

Prototype predicting NER categories for each token

In the figure above, you notice the “O” tokens. Such tokens represent irrelevant tokens, and we trained the classifier to detect those as well. 

The prototype model helped us demonstrate a business fit of the ML solution before engaging in the full model integration. At Digits, we develop our prototypes in GPU-backed Jupyter notebooks. Such a process helps us to iterate quickly. Then, once we confirm a business use-case for the model, we focus on the model integration and the automation of the model version updates via our MLOps pipelines.

Moving to Production

In general, we use TensorFlow Extended (TFX) to update our model versions. In this step, we convert the notebook code into TensorFlow Ops, and here we converted our prototype data preprocessing steps into TensorFlow Transform Ops. This extra step allows us later to train our model versions effectively, avoid training-serving skew, and furthermore allows us to “bake” our internal business logic into our ML models. This last benefit helps us to reduce the dependencies between our ML models and our data pipeline or backend integrations.

Digits machine learning engineering workflow

We are running our TFX pipelines on Google Cloud’s Vertex AI pipelines. This managed service frees us from maintaining a Kubernetes cluster for Kubeflow Pipelines (which we have done prior to using Vertex AI).

In a future blog post, we will go into more detail about our ML pipeline setup. Stay tuned.

Deploying ML at Digits

At Digits, we deploy our ML models via TensorFlow Serving, and we host our own TFServing instance within our Kubernetes cluster. TFServing allows TLS termination directly at the service, instead of at the load-balancer, which is critical for us, and overall provides a high-throughput solution. We have automated the entire model deployment process: new version deployments are version controlled, and deployments are automatically triggered by Git commits. 

Our production models are stored in Google Cloud Storage buckets, and TFServing allows us to load model versions directly from cloud storage. Because of the dynamic loading of the model versions, we don’t need to build custom containers for our model serving setup; we can use the pre-built images from the TensorFlow team.

Here is a minimal setup for Kubernetes deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  ...
  name: tensorflow-serving-deployment
spec:
  ...
  template:
    ...
    spec:
      containers:
        - name: tensorflow-serving-container
          image: tensorflow/serving:2.5.1
          command:
            - /usr/local/bin/tensorflow_model_server
          args:
            - --port=8500
            - --model_config_file=/serving/models/config/models.conf
            - --file_system_poll_wait_seconds=120
        ...

Note the additional argument --file_system_poll_wait_seconds in the list above. By default, TFServing will check the file system for new model versions every 2s. This can generate large Cloud Storage costs since every check triggers a list operation, and storage costs are billed based on the used network volume. For most applications, it is fine to reduce the file system check to every 2 minutes (set the value to 120 seconds) or disable it entirely (set the value to 0).

For maintainability, we keep all model-specific configurations in a specific ConfigMap. The generated file is then consumed by TFServing on boot-up.

apiVersion: v1
kind: ConfigMap
metadata:
  namespace: ml-deployments
  name: <MODEL_NAME>-config
data:
  models.conf: |+
    model_config_list: {
      config: {
        name: "<MODEL_NAME>",
        base_path: "gs://<BUCKET_NAME>/<MODEL_NAME>",
        model_platform: "tensorflow",
        model_version_policy: {
          specific: {
            versions: 1607628093,
            versions: 1610301633
          }
        }
        version_labels {
          key: 'canary',
          value: 1610301633
        }
        version_labels {
          key: 'release',
          value: 1607628093
        }
      }
    }

After the initial deployment, we started iterating to optimize the model architecture for high throughput and low latency results. This meant optimizing our deployment setup for BERT-like architectures and optimizing the trained BERT models. For example, we optimized the integration between our data processing Dataflow jobs and our ML deployments, and shared our approach in our recent talk at the Apache Beam Summit 2021.

Results

The deployed NER model allows us to extract a multitude of information from unstructured text and make it available through Digits Search

Here are some examples of our NER model extractions:

Extraction results from example transactions

The Final Product

At Digits, an ML model is never itself the final product. We strive to delight our customers with well-designed experiences that are tightly integrated with ML models, and only then do we witness the final product. Many additional factors come into play:

Latency vs. Accuracy

A more recent pre-trained model (e.g., BART or T5) could have provided higher model accuracy, but it would have also increased the model latency substantially. Since we are processing millions of transactions daily, it became clear that model latency is critical for us. Therefore, we spent a significant amount of time on the optimization of our trained models.

Design for false-positive scenarios  

There will always be false positives, regardless of how stunning the model accuracy was pre-model deployment. Product design efforts that focus on communicating ML-predicted results to end-users are critical. At Digits, this is especially important because we cannot risk customers’ confidence in how Digits is handling their financial data.

Automation of model deployments

The investment in our automated model deployment setup helped us provide model rollback support. All changes to deployed models are version controlled, and deployments are automatically executed from our CI/CD system. This provides a consistent and transparent deployment workflow for our engineering team. 

Devise a versioning strategy for release and rollback 

To assist smooth model rollout and a holistic quantitative analysis prior to rollout, we deploy two versions of the same ML model and use TFServing’s version labels (e.g., “release” and “pre-release” tags) to differentiate between them. Additionally, we use an active version table that allows for version rollbacks, made as simple as updating a database record.

Assist customers, don’t alienate them

Last but not least, the goal for our ML models should always be to assist our customers in their tasks instead of alienating them. That means our goal is not to replace humans or their functions, but to help our customers with cumbersome tasks. Instead of asking people to extract information manually from every transaction, we’ll assist our customers by pre-filling extracted vendors, but they will always stay in control. If we make a mistake, Digits makes it easy to overwrite our suggestions. In fact, we will learn from our mistakes and update our ML models accordingly.

Further Reading

Check out these great resources for even more on NER and Transformer models:


Join our team to push the limits of Machine Learning 

Machine Learning at Digits sits at the unique intersection of large datasets and agile production. Our ML team applies state-of-the-art techniques and launches models into production without delay, optimizing for customer impact.

We’re currently hiring ML engineers and software engineers, and we can’t wait to meet you.

Less technical? We have many more open positions to help accelerate our mission to revolutionize business finance.


Building Digits: Efficient Serving of Static Views with Google Cloud Spanner


At Digits, we strive to push the bounds of technology in order to deliver radically more useful, delightful software experiences for our customers. We’re excited to begin sharing a closer look at the technical foundations that underpin our products in a new series of blog posts called Building Digits. Without further ado…

Let’s talk about viewing complex data. One of our primary goals at Digits is to provide business owners with insightful and holistic views of their company’s finances, in substantially real-time. 

Achieving this involves three major, independent steps: 

  1. We collect all of their relevant data from various sources, such as their QuickBooks, the financial institutions they bank with, their corporate credit card providers, etc.
  2. We apply our algorithms and proprietary datasets to extend, interpret, and tease out meaning from all of their data.
  3. We consolidate and aggregate the results into a holistic view that we then visualize for them on their dashboard.

Facts

We refer to the pieces of data that we receive from third party systems as Facts.  This is not a judgement of the credibility or immutability of these systems, but rather a delineation of what is (and what is not) under our control.  

For example, if we receive a transaction from an external source that looks like 05/12/20 - Taxi $15.05, we might classify it as Transportation.  Later, we may receive another piece of information that leads us to believe that this transaction was actually a client expense, and is better classified as Meals & Entertainment.  In this example, the transaction itself, the fact, did not change—but our interpretation of it did.

Computed Data

We refer to insights and analysis that are performed by Digits, based on all of the Facts that we have received, as Computed Data.  In the example above, this involved a category classification of a transaction.  In other cases, this might involve determining that two external pieces of information actually represent the same physical transaction, or detecting that a particular transaction tends to recur on a regular interval and that it should be treated as a subscription.  

Benefits of Recomputation

One of the challenges with this model is that we receive new data from external systems constantly, and each new Fact we receive may shed more light on, or change our understanding of, earlier Facts we already recorded.  The arrival of new Facts can impact virtually every aspect of our Computed Data.  As a result, determining the subset of Computed Data that needs to be updated as a result of any new Fact arriving is non-trivially complex. 

We’ve chosen to avoid this complexity by “recomputing the world.” We reconsider our entire set of Computed Data every time the set of Facts that it is based on changes in any way.  This guarantees that Digits uses every piece of knowledge it has access to in order to model your business’s financial health at every moment, so it’s as accurate as it can possibly be.

Consistent Views of the World

The last core tenet of our architecture is our notion of a View.  To us, a view of a customer’s data is the combination of all of the Facts for that customer at time T, as well as all of the Computed Data derived from that fact set.  If the set of customer facts changes at time T+1, we’ll create a new view that represents that updated set of Facts and Computed Data.

When a customer loads our dashboard, we retrieve the latest available version of their view, and their experience is based on that view version for the remainder of the customer session. 

What is the motivation for keeping their experience tied to a single, static view version?  There are several:

  1. Consistency. Assume that the arrival of a new fact causes us to re-label a subset of transactions as Meals & Entertainment instead of Transportation.  It would be confusing to our customers if one or two of the transactions in this subset changed labels while others did not as they browsed the site, and even more confusing if additional transactions became recategorized incrementally as they loaded new pages.
  2. Atomicity. Assume that we receive an external update that replaces one Fact with another.  For example, a correction of a pending transaction with the actual, confirmed transaction and the date it posted.  If a customer was looking at a page of transactions that included the pending transaction, and then clicked Next and loaded a new page that now, all of a sudden, included the confirmed version of the transaction, they would be confused why the transaction seemed to appear twice.

View Serving Architecture

For these reasons, we decided that our architecture would entail a system that is capable of loading a holistic view V1, serving a customer dashboard based on it, and then, at a later point in time, loading a new holistic view V2 and atomically switching to serving the next customer experience based on it.  At yet another later point, after a configurable number of hours has elapsed, we want to unload view V1 as it is no longer needed to serve any customer experiences.

Frequency of Recomputation

For a given customer, we recompute their view whenever we detect that any of their Facts or Computed Data have changed.  In practice, this varies from customer to customer but can roughly be assumed to be between 1 and 24 times per day.

Why Google Cloud Spanner?

After a detailed analysis of the major cloud platforms in 2018, we made the decision to begin building Digits on Google Cloud, and we have been quite pleased with that decision.

With the rest of our infrastructure within the Google ecosystem, it was natural to evaluate Spanner as a potential database technology, and there are quite a few aspects of Spanner that are beneficial to our use case:

  1. Spanner is fully-managed and requires effectively zero overhead for database operations.
  2. Its ability to automatically shard data via table interleaving was an appealing feature for us, as it allows us to prepare for high scale and still get the benefits of relational database features: efficient joins, foreign keys, etc.
  3. Its ability to perform ACID transactions, as needed, was appealing to us from a financial data perspective.

Read-time SQL

There is a huge trade-off between pre-computing everything to reduce read latency (i.e. dashboard load time) while limiting development speed, versus biasing towards read-time computation, which permits rapid feature iteration on the frontend. 

At Digits’ current stage, we want to be in the middle of this spectrum — have low read latencies for a great user experience, but still be able to quickly iterate and perfect new features based on customer feedback.  In order to support this model, whichever solution we chose for serving our views would have to support read-time SQL.

Sharding and Interleaving

Once it became clear that read-time SQL support was a requirement, we also wanted to be sure that the database solution we selected would easily scale with us as we grew.  Traditional RDBMS systems have trouble scaling join performance once the data set can no longer fit on a single node, and many NoSQL key-value stores address the scalability concern by sacrificing join support entirely.

Spanner’s interleaving/sharding design is a nice balance between these two ends of the spectrum.  While all data for a given set of parent-child tables does not need to fit on a single node, rows that share the same root key are guaranteed to be co-located on a node.  This allows for fast joins within a parent-child hierarchy and aside from defining the interleaving model in the schema, it happens without the developer’s involvement.  

Alternative Solutions

Combined, these constraints of easy scalability, low latency reads, and support for relational SQL eliminate quite a few otherwise appealing solutions.  For example:

  1. Cassandra and Redis are both great for serving precomputed views, but do not support read time aggregations via SQL.  (Cassandra does support a SQL API but not read-time aggregation via SQL)
  2. MySQL and Postgres are great for relational querying and read-time aggregation, but are challenging to scale, as sharding data across clusters is left to the engineer/operator.
  3. Google BigQuery is great for all kinds of SQL analytics querying but is not designed to serve low latency, customer-facing dashboard reads.

Based on all of these factors, we elected to implement our view architecture on Spanner.

Implementing Views in Spanner

As we developed our view implementation, one of the challenges that we had to overcome was Spanner’s 20,000 cell mutation limit.  The limit caps the number of cells (rows * columns-per-row) that can be inserted/updated in a single transaction.  

This limit presented challenges on both the view loading side and the view unloading (deleting) side.

View Loading

On the view loading side it meant that as we computed views for a given customer, we could not guarantee that we could load the entire view into the database atomically.  Additionally, it is non-trivial for the implementer to know whether a particular transaction would hit this limit or not as they would need to keep track of all of the cells impacted by the generated mutations or the DML statement.  

To address this, we created a view version table that is independent of the tables that actually store the view data.  This table is a simple mapping from a customer id to a version identifier.  This version column is also set on all rows of the actual view data tables.

For example, a small subset of our view schema may look like this:

The queries that power our dashboard can either serve a view for a particular, known, version or consult the active versions table to see which version is the latest (our version_ids increase monotonically).

Our view loading process, for a given customer, then looks like this:

  1. Load all parts of the view in independent transactions of roughly 100 rows each (conservative to stay well-clear of the mutation limit).
  2. Once all parts of the view have been successfully loaded, update the view version table to denote the newest active version.

This process ensures that a new view will be served atomically in its entirety, because no query will be aware of its existence until the view version table has been updated.  At the same time, existing customer sessions can continue to experience our dashboard against the view version which was active at the start of their session.

View Unloading

The same 20,000 cell mutation limit applies to data deletion.  Spanner does support a Partitioned DML alternative that was appealing for this use case, however we found two limitations with it:

  1. Every time we load a new view, it effectively invalidates a similarly sized older view, and this amounts to tens of thousands of rows in need of deletion.  This has a significant impact on CPU load.  Spanner tombstones rows that are marked as deleted, which makes them invisible to all queries, and then reclaims the disk space in an asynchronous process.  However, in our experience both the tombstoning and the reclamation process place a non-trivial load on the CPU and can thus impact read latency of customer facing queries.  
  2. The partitioned DML alternative that is documented to not be constrained by the 20,000 cell mutation limit still fails intermittently with the 20,000 cell mutation limit error.

Efficient Incremental Views

To address the deletion constraint, we analyzed the insertions and deletions that we were performing as part of view loading/unloading and confirmed what we expected: the vast majority of rows stay the same from one view version to the next. All we had to do was invest in being able to easily identify the rows that actually changed, and only inserting and removing the deltas.

(It is important to note that while determining which pieces of our Computed Data for a given customer need to be updated because we received new Facts is non-trivially complex, comparing two fully computed views to each other and determining which rows have been updated, removed, or created is quite straightforward.)

The output of this comparison can then be used as follows:

Each row in the current active version is determined to:

  1. Still be relevant
  2. Be removed for all view versions going forward

Each row in the newly computed version is determined to:

  1. Already be present in the active view version
  2. Be added for all view versions going forward

For most normal operations, 99% of rows in both the existing version and the new version are determined to be identical, and thus no-ops. 

To support this, we modified our view tables to have two version-related columns, version_valid_since and version_invalid_since, and updated all queries with two WHERE conditions.  For example, continuing with the example schema above, our modified schema would look like this:

Finally, we implemented version diffing in a generic way, such that it can be applied to all of our view tables without additional work. 

With this in place, we now have to insert and delete 99% less data from Spanner than we did when we were fully loading/unloading every view.

Schema Design for Scale

One unintuitive aspect of Spanner’s secondary indexes (indexes you explicitly add to tables as opposed to the index you implicitly get for the primary key), is that if a query which hits the index selects a column that is neither a part of the index nor explicitly stored on the index, Spanner must perform an implicit join from the secondary index back to the base table.  This makes sense once you accept the fact that the index is stored as an independent structure from the table that it is indexing. 

Unfortunately, this join may be non-trivial in cost, particularly when a lot of data is being selected.

To avoid secondary-index-to-base-table join costs, we have explored two options:

  1. Carefully designing our tables’ primary keys in such a way that most common queries only require the implicit primary key index.
  2. Creating secondary indexes that are less generic and more tailored to specific query patterns by including all of the columns selected by that query in their STORING clauses.  

The second option has a higher maintenance cost as it potentially requires updating indexes when new columns are added to tables (if these new columns are selected by queries which indexes are tailored for) or when query patterns are added for product reasons.  As a result, we prefer the first approach whenever possible. 

For example, if the majority of queries against a table will involve restricting the result set by time, then we consider adding the column that represents time to be part of the primary key, even if it logically is not required to be in the primary key.  

Sharding

Spanner’s interleaving support allows for schema design that makes your database straightforward to scale while still supporting efficient joins on data that is commonly joined together.  These two properties are often very difficult to achieve in tandem with relational databases.  

Interleaving lets you to signal to Spanner that all hierarchical data spanning multiple tables, rooted at a particular root row, should be colocated together.  Building on our schema examples above, it might be appealing to interleave all of our view data under the customers table.  This would mean that all view data for a given customer would be colocated—a property that makes sense since we often want to show a customer various parts of their data, while we never want to join data from multiple customers together.

The schema for the customers table as well as the view tables above may then look like this:

CREATE TABLE customers (
    customer_id STRING(MAX) NOT NULL,
    name STRING(MAX)
) PRIMARY KEY (customer_id);

CREATE TABLE active_versions (
    customer_id STRING(MAX) NOT NULL,
    version INT64 NOT NULL
) PRIMARY KEY (customer_id), INTERLEAVE IN PARENT customers ON DELETE CASCADE;

CREATE TABLE payments (
    customer_id STRING(MAX) NOT NULL,
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    payment_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, version_valid_since, payment_id), INTERLEAVE IN PARENT customers ON DELETE CASCADE;

CREATE TABLE sales (
    customer_id STRING(MAX) NOT NULL,
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    sale_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, version_valid_since, sale_id), INTERLEAVE IN PARENT customers ON DELETE CASCADE;

This schema might work well, but there is another factor to keep in mind: the size of all data that is interleaved under a single root has a hard limit in Spanner of 4 GB.  To avoid approaching this limit, we might further restrict the data that is co-located. 

For example, if we know that there are pieces of data that will never be joined with each other, then there is no reason for them to be co-located on the same node. Building on our scenario above, imagine that payments and sales are shown in totally separate parts of our dashboard and would never need to be joined together. If that were the case, then interleaving and thus colocating the view data for both of these tables, for a given customer, under the same customer_id row would make that row unnecessarily large.

To address this, we could add a table in between customers and payments/sales that would facilitate better sharding. The new table might look like:

CREATE TABLE customer_view_types (
    customer_id STRING(MAX) NOT NULL,
    view_type STRING(MAX)
) PRIMARY KEY (customer_id, view_type) INTERLEAVE IN PARENT customers ON DELETE CASCADE;

And the payments and sales tables would be updated to look like:

CREATE TABLE payments (
    customer_id STRING(MAX) NOT NULL,
    view_type STRING(MAX),
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    payment_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, view_type, version_valid_since, payment_id), INTERLEAVE IN PARENT customer_view_types ON DELETE CASCADE;

CREATE TABLE sales (
    customer_id STRING(MAX) NOT NULL,
    view_type STRING(MAX),
    version_valid_since INT64 NOT NULL,
    version_invalid_since INT64,
    sale_id INT64 NOT NULL,
    amount INT64 NOT NULL,
) PRIMARY KEY (customer_id, view_type, version_valid_since, sale_id), INTERLEAVE IN PARENT customer_view_types ON DELETE CASCADE;

In Production

While there were a few limitations to work around, and special care must be taken in both schema design and query design to maintain high performance, Spanner has performed well in production as our customer base has scaled to billions of dollars in transaction value.

Incremental static views provide an optimal balance between dashboard consistency, read-time latency, continual re-computation based on new data, and developer productivity, and Spanner’s ease of scalability via auto-sharding and interleaving has made this architecture very low-overhead to operate.

Join the Team

Static view serving at scale is just one of countless technical challenges we’ve faced while building Digits, and we’re pushing the boundaries at every layer of the stack.

If you’re interested in crafting the next generation of financial software, we’re hiring, and we’d love to meet you! See our open positions here.