Tommi Holmgren
Chief Product Officer
May 18, 2020 • 5 min read
One of the key elements of any machine learning solution is to figure out how to run it in production. It might be easy to run the initial PoC, but in the operational use there will be new data coming in, and the model needs to take this new training data into account in predictions.
A lot of our work revolves around robotic process automation (RPA). In this context every time a bot executes a workflow, there might be a new entry used to improve the predictions.
Using traditional machine learning libraries that produce a static model, the workflow requires engineers to periodically retrain the model, test and deploy it. For a lot of the RPA projects this adds additional pain as the infrastructure and pipelines are often not ready for this. It's just too complex.
Aito works in a different way.
Every time you input a new datapoint into Aito's database, it is automatically used as training data for next predictions. So essentially every time an RPA bot runs and feeds a new datapoint into Aito, it will use the latest training data. Aito's predictions evolve on the fly, without need for constant retraining.
While there are tons of ways to input data into Aito, I wanted to provide an example of how to do that with Python. I'm using BigQuery as a source database simply because it was easily available at the time of writing. You can do the same with any database with Python client.
My dataset is our web analytics data that we orchestrate through Segment.com, and all ends up in BigQuery data warehouse. We use this data to, for example, understand what type of website actions correlate most with actions like signup, or becoming an active user.
Again, the contents of the data is not relevant to this example. I will not focus on the predictions, but on the data wrangling only.
I am only focusing on the daily new data upload. Here is a checklist of things you need before we can start:
pip install aitoai
. Code examples have been tested with version 0.4.0.Let's try! The logic of my daily batch transfer is:
Clearly this is just one example. Especially with deleting old data you need to use your own judgement. In some scenarios it's better to keep a long history, in some cases predicting from more recent data is faster and serves the purpose.
My schema is below. I am only showing one table here to keep things short. In real life scenarios there are multiple tables that sync from BigQuery to Aito. As an example of this, the fields anonymous_id
links to another table called aito_segment_user
.
If you have worked with Google Analytics, this dataset is most likely very familiar to you.
{
"type": "table",
"columns": {
"anonymous_id": {
"link": "aito_segment_user.id",
"nullable": False,
"type": "String"
},
"context_campaign_medium": {
"nullable": True,
"type": "String"
},
"context_campaign_name": {
"nullable": True,
"type": "String"
},
"context_campaign_source": {
"nullable": True,
"type": "String"
},
"context_ip": {
"nullable": False,
"type": "String"
},
"context_locale": {
"nullable": False,
"type": "String"
},
"context_page_path": {
"nullable": False,
"type": "String"
},
"context_page_referrer": {
"nullable": True,
"type": "String"
},
"context_page_title": {
"nullable": True,
"type": "String"
},
"context_user_agent": {
"nullable": False,
"type": "String"
},
"id": {
"nullable": False,
"type": "String"
},
"unix_timestamp": {
"nullable": False,
"type": "Int"
},
"referrer": {
"nullable": True,
"type": "String"
},
"search": {
"nullable": True,
"type": "Text",
"analyzer": "en"
},
"url": {
"nullable": False,
"type": "String"
}
}
}
A housekeeping note. I'm using pandas_gbq to make queries to BigQuery, but you can do the same with Google's own BigQuery Python Client.
I'm using GCP Service Accounts to authenticate, but again this is up to you.
import pandas as pd
import pandas_gbq
from google.oauth2 import service_account
from aito.client import AitoClient
import aito.api as aito_api
pandas_gbq.context.credentials = service_account.Credentials.from_service_account_file('YOUR-OWN-FILE.json')
pandas_gbq.context.project = 'YOUR-OWN-GCP-PROJECT'
aito_client = AitoClient("http://$AITO_INSTANCE_URL", "YOUR-OWN-AITO-API-KEY")
Look how many lines are spent already!
First I need to know what is the latest entry currently in my Aito database. I am using Aito's search endpoint, and ordered by the descending timestamp (unix secs) and limiting to 1 result. It's like a DIY max function!
body = {
"from": "aito_segment_web",
"orderBy": { "$desc": "unix_timestamp" },
"limit": 1,
"select": ["unix_timestamp"]
}
batch_start_unixtime = aito_api.generic_query(
client=aito_client,
query = body
)['hits'][0]['unix_timestamp']
Pay attention that I grab only the timestamp to my variable with ['hits'][0]['unix_timestamp']
instead of the entire JSON response. The value is an integer. Relevant for later on!
Next, let's fetch the data from BigQuery into a dataframe. It has to match the schema in Aito, therefore be careful how you construct it. Essentially you want an exact match, otherwise you'll be getting steady stream of error messages.
sql = """
SELECT
anonymous_id,
context_campaign_medium,
context_campaign_name,
context_campaign_source,
context_ip,
context_locale,
context_page_path,
context_page_referrer,
context_page_title,
context_user_agent,
id,
UNIX_SECONDS(timestamp) as unix_timestamp,
referrer,
search,
url
FROM aito_ai_website.pages
WHERE
UNIX_SECONDS(timestamp) > {starttime};
""".format(starttime=str(batch_start_unixtime))
df = pandas_gbq.read_gbq(sql)
Now this step is simple! Using Aito Python client, I'm uploading data in batches. By default data is split to batches of 1000 rows, but you can control it if you wish a different batch size.
In the below command I'm declaring the destination table to be aito_segment_web
and convert my dataframe to a Dictionary, oriented by records.
aito_api.upload_entries(
client=aito_client,
table_name="aito_segment_web",
entries=df.to_dict(orient="records"))
New data uploaded. That was it! Not too bad right?
Aito is not a data lake. Nor a data warehouse. It's a predictive database. So the intention is not to keep your entire history of data there, but only data that is needed for high quality predictions. It is very challenging to give a general rule on what is the optimal sample. You'll learn it by quickly testing different datasets and using Aito's evaluate end point to get accuracy.
For demo purposes, I am purging data that is older than 90 days, three months.
cutoff_unix_sec = int(time.time()) - (90 * 24 * 60 * 60) # days * hours * minutes * seconds
del_body = {
"from": "aito_segment_web",
"where": { "unix_timestamp" : { "$lt" : cutoff_unix_sec } }
}
aito_api.delete_entries(
client=aito_client,
query=del_body
)
I am going to cheat a little bit and skip this part. Evaluating prediction accuracy is discussed in a previous post. It would require going a couple of steps deeper into our prediction targets to make sense, and that is beyond the scope of this post. However, here are some pointers for the curious ones:
So to be fair with our code lines target, this all would add 3. Fair?
First and foremost, let's count the command lines.
I'm counting 18. That includes 15 lines of examples shown here, plus those imaginary 3 lines for accuracy evaluation. Success!
Every developer knows this 20 lines is just marketing blablabla. In reality you would add error handling, and rather use a bit more lines to make your code maintainable. You probably have several linked tables (Aito supports inference in relational datasets!), which I chose to ignore to keep things simple. In addition you would schedule the execution, using cron or maybe schedule a recurring Python script in GCP.
The key takeaway, however, is this. With 15 lines of Python code you are keeping the machine learning model up to date. Nothing else is needed. This will save some serious pain and suffering compared to managing all the model training and deployment yourself, day in day out.
Back to blog listEpisto Oy
Putouskuja 6 a 2
01600 Vantaa
Finland
VAT ID FI34337429