Batch upload from BigQuery to Aito with 20 lines of Python code

#integration

#BigQuery

#howto

Photo by Danielle MacInnes
author

One of the key elements of any machine learning solution is to figure out how to run it in production. It might be easy to run the initial PoC, but in the operational use there will be new data coming in, and the model needs to take this new training data into account in predictions.

A lot of our work revolves around robotic process automation (RPA). In this context every time a bot executes a workflow, there might be a new entry used to improve the predictions.

Using traditional machine learning libraries that produce a static model, the workflow requires engineers to periodically retrain the model, test and deploy it. For a lot of the RPA projects this adds additional pain as the infrastructure and pipelines are often not ready for this. It's just too complex.

There is a better way

Aito works in a different way.

Every time you input a new datapoint into Aito's database, it is automatically used as training data for next predictions. So essentially every time an RPA bot runs and feeds a new datapoint into Aito, it will use the latest training data. Aito's predictions evolve on the fly, without need for constant retraining.

Refuel your ML model with new data on the fly
Refuel your ML model with new data on the fly, photo from Defense News.

Segment.com and BigQuery

While there are tons of ways to input data into Aito, I wanted to provide an example of how to do that with Python. I'm using BigQuery as a source database simply because it was easily available at the time of writing. You can do the same with any database with Python client.

My dataset is our web analytics data that we orchestrate through Segment.com, and all ends up in BigQuery data warehouse. We use this data to, for example, understand what type of website actions correlate most with actions like signup, or becoming an active user.

Again, the contents of the data is not relevant to this example. I will not focus on the predictions, but on the data wrangling only.

Prerequisites

I am only focusing on the daily new data upload. Here is a checklist of things you need before we can start:

  • You need to have your Aito instance. Free Sandboxes are available, sign up.
  • Have source data that you want to push into Aito and use for predictions, preferably in BigQuery or in some other database. If you just want to play around, the world is full of free datasets. If you have your own Google Analytics account, check for instructions to get data into BigQuery. If not, maybe check the Google Analytics sample data, but remember that it will not update continuously, so you need to fake it to test the daily batch uploads.
  • Have necessary components installed for Python, connecting to your database and of course Aito: pip install aitoai. Code examples have been tested with version 0.4.0.
  • Have you already created the schema in Aito for your data? You'll need to do this as well. Essentially, you can just drop an example file in Aito Console and watch the schema autocreate, or DIY with help.

Can we achieve our goal with 20 lines of code?

Let's try! The logic of my daily batch transfer is:

  1. Fetch new entries (based on previous max timestamp in Aito) from BigQuery to a dataframe
  2. Upload dataframe to Aito
  3. Keep only a recent snapshot of data in Aito. We clean data points that are older than X days
  4. EXTRA: Run evaluation to track prediction accuracy, and write result in a file for monitoring purposes

Clearly this is just one example. Especially with deleting old data you need to use your own judgement. In some scenarios it's better to keep a long history, in some cases predicting from more recent data is faster and serves the purpose.

My schema is below. I am only showing one table here to keep things short. In real life scenarios there are multiple tables that sync from BigQuery to Aito. As an example of this, the fields anonymous_id links to another table called aito_segment_user.

If you have worked with Google Analytics, this dataset is most likely very familiar to you.

{
  "type": "table",
  "columns": {
    "anonymous_id": {
      "link": "aito_segment_user.id",
      "nullable": False,
      "type": "String"
    },
    "context_campaign_medium": {
      "nullable": True,
      "type": "String"
    },
    "context_campaign_name": {
      "nullable": True,
      "type": "String"
    },
    "context_campaign_source": {
      "nullable": True,
      "type": "String"
    },
    "context_ip": {
      "nullable": False,
      "type": "String"
    },
    "context_locale": {
      "nullable": False,
      "type": "String"
    },
    "context_page_path": {
      "nullable": False,
      "type": "String"
    },
    "context_page_referrer": {
      "nullable": True,
      "type": "String"
    },
    "context_page_title": {
      "nullable": True,
      "type": "String"
    },
    "context_user_agent": {
      "nullable": False,
      "type": "String"
    },
    "id": {
      "nullable": False,
      "type": "String"
    },
    "unix_timestamp": {
      "nullable": False,
      "type": "Int"
    },
    "referrer": {
      "nullable": True,
      "type": "String"
    },
    "search": {
      "nullable": True,
      "type": "Text",
      "analyzer": "en"
    },
    "url": {
      "nullable": False,
      "type": "String"
    }
  }
}

Finally there is some code!

A housekeeping note. I'm using pandas_gbq to make queries to BigQuery, but you can do the same with Google's own BigQuery Python Client.

I'm using GCP Service Accounts to authenticate, but again this is up to you.

import pandas as pd
import pandas_gbq
from google.oauth2 import service_account
from aito.client import AitoClient
import aito.api as aito_api

pandas_gbq.context.credentials = service_account.Credentials.from_service_account_file('YOUR-OWN-FILE.json')
pandas_gbq.context.project = 'YOUR-OWN-GCP-PROJECT'

aito_client = AitoClient("http://$AITO_INSTANCE_URL", "YOUR-OWN-AITO-API-KEY")

Look how many lines are spent already!

Get data from BigQuery

First I need to know what is the latest entry currently in my Aito database. I am using Aito's search endpoint, and ordered by the descending timestamp (unix secs) and limiting to 1 result. It's like a DIY max function!

body = {
    "from": "aito_segment_web",
    "orderBy": { "$desc": "unix_timestamp" },
    "limit": 1,
    "select": ["unix_timestamp"]
}

batch_start_unixtime = aito_api.generic_query(
    client=aito_client,
    query = body
)['hits'][0]['unix_timestamp']

Pay attention that I grab only the timestamp to my variable with ['hits'][0]['unix_timestamp'] instead of the entire JSON response. The value is an integer. Relevant for later on!

Next, let's fetch the data from BigQuery into a dataframe. It has to match the schema in Aito, therefore be careful how you construct it. Essentially you want an exact match, otherwise you'll be getting steady stream of error messages.

sql = """
SELECT
  anonymous_id,
  context_campaign_medium,
  context_campaign_name,
  context_campaign_source,
  context_ip,
  context_locale,
  context_page_path,
  context_page_referrer,
  context_page_title,
  context_user_agent,
  id,
  UNIX_SECONDS(timestamp) as unix_timestamp,
  referrer,
  search,
  url
FROM aito_ai_website.pages
WHERE
  UNIX_SECONDS(timestamp) > {starttime};
""".format(starttime=str(batch_start_unixtime))

df = pandas_gbq.read_gbq(sql)

Upload to Aito

Now this step is simple! Using Aito Python client, I'm uploading data in batches. By default data is split to batches of 1000 rows, but you can control it if you wish a different batch size.

In the below command I'm declaring the destination table to be aito_segment_web and convert my dataframe to a Dictionary, oriented by records.

aito_api.upload_entries(
    client=aito_client,
    table_name="aito_segment_web",
    entries=df.to_dict(orient="records"))

New data uploaded. That was it! Not too bad right?

Janitorial duties

Aito is not a data lake. Nor a data warehouse. It's a predictive database. So the intention is not to keep your entire history of data there, but only data that is needed for high quality predictions. It is very challenging to give a general rule on what is the optimal sample. You'll learn it by quickly testing different datasets and using Aito's evaluate end point to get accuracy.

For demo purposes, I am purging data that is older than 90 days, three months.

cutoff_unix_sec = int(time.time()) - (90 * 24 * 60 * 60) # days * hours * minutes * seconds

del_body = {
    "from": "aito_segment_web",
    "where": { "unix_timestamp" : { "$lt" : cutoff_unix_sec } }
}

aito_api.delete_entries(
    client=aito_client,
    query=del_body
)

Evaluate accuracy with the new dataset

I am going to cheat a little bit and skip this part. Evaluating prediction accuracy is discussed in a previous post. It would require going a couple of steps deeper into our prediction targets to make sense, and that is beyond the scope of this post. However, here are some pointers for the curious ones:

  • Define an evaluate query body.
  • Send it to Aito as a job, as evaluates take usually more time than our normal query timeout of 30 secs.
  • After getting the results, you would append the vital statistics like size of sthe ample, error rate, accuracy etc to a file for later examination.

So to be fair with our code lines target, this all would add 3. Fair?

Summary

First and foremost, let's count the command lines.

I'm counting 18. That includes 15 lines of examples shown here, plus those imaginary 3 lines for accuracy evaluation. Success!

Every developer knows this 20 lines is just marketing blablabla. In reality you would add error handling, and rather use a bit more lines to make your code maintainable. You probably have several linked tables (Aito supports inference in relational datasets!), which I chose to ignore to keep things simple. In addition you would schedule the execution, using cron or maybe schedule a recurring Python script in GCP.

The key takeaway, however, is this. With 15 lines of Python code you are keeping the machine learning model up to date. Nothing else is needed. This will save some serious pain and suffering compared to managing all the model training and deployment yourself, day in day out.

Back to blog list

Fast track to machine learning starts with a free Aito Sandbox.

Locations

Aito Intelligence Oy

Kaivokatu 10 A, 8th floor

00100 Helsinki

Finland

VAT ID FI28756352

See map

470 Ramona St.

Palo Alto

CA 94301, USA

See map

Contact

COVID-19 situation has driven us all to work from homes, please connect with us online. Stay safe & play with data!

About usContact usJoin our Slack workspace

Follow us