This article is contributed. See the original author and article here.

Introduction

 

Machine Learning is widely used these days for various data driven tasks including detection of security threats, monitoring IoT devices for predictive maintenance, recommendation systems, financial analysis and many other domains. Most ML models are built and deployed in two steps:

  • Offline training
  • Real time scoring

ML Training is done by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle to improve the model’s quality and accuracy. This process is usually done using data science tools such as Jupyter, PyCharm, VS Code, Matlab etc. Once the model meets the required quality it is serialized and saved for scoring.

ML Scoring is the process of applying the model on new data to get insights and predictions. This is actually the business goal for building the model. Scoring usually needs to be done at scale with minimal latency, processing large sets of new records. For ADX users, the best solution for scoring is directly in ADX. ADX scoring is done on its compute nodes, in distributed manner near the data, thus achieving the best performance with minimal latency.

 

There are many types of models such as Bayesian models, decision trees and forests, regressions, deep neural networks and many more. These models can be built by various frameworks and/or packages like Scikit-learn, Tensorflow, CNTK, Keras, Caffe2, PyTorch etc. (here is a nice overview of ML algorithms, tools and frameworks). On one hand this variety is very good – you can find the most convenient algorithm and framework for your scenario, but on the other hand it creates an interoperability issue, as usually the ML scoring is done on infrastructure which is different from the one used for the training.

 

To resolve it, Microsoft and Facebook introduced in 2017 ONNX, Open Neural Network Exchange, that was adopted by many companies including AWS, IBM, Intel, Baidu, Mathworks, NVIDIA and many more. ONNX is a system for representation and serialization of ML models to a common file format. This format enables smooth switching among ML frameworks as well as allowing hardware vendors and others to improve the performance of deep neural networks for multiple frameworks at once by targeting the ONNX representation.

 

In this blog we explain how ADX can consume ONNX models, that were built and trained externally, for near real time scoring of new samples that are ingested into ADX.

 

How to use ADX for scoring ONNX models

 

ADX supports running Python code embedded in Kusto Query Language (KQL) using the python() plugin.  The Python code is run in multiple sandboxes on ADX existing compute nodes. The Python image is based on Anaconda distribution and contains the most common ML frameworks including Scikit-learn, TensorFlow, Keras and PyTorch. To score ONNX models in ADX follow these steps:

  1. Develop your ML model using your favorite framework and tools
  2. Convert the final trained model to ONNX format
  3. Export the ONNX model to a table on ADX or to an Azure blob
  4. Score new data in ADX using the inline  python() plugin

Example

 

We build a model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The complete process can be found in this Jupyter notebook. Here we embed few snips just to present the main concepts

 

Prerequisite

 

  • Enable Python plugin on your ADX cluster (see the Onboarding section of the python() plugin doc)
  • Whitelist a blob container to be accessible by ADX Python sandbox (see the Appendix section of that doc)
  • Create a Python environment (conda or virtual env) that reflects the Python sandbox image
  • Install in that environment ONNX packages: onnxruntime and skl2onnx packages
  • Install in that environment Azure Blob Storage package: azure-storage-blob
  • Install KqlMagic to easily connect and query ADX cluster from Jupyter notebooks

 

Retrieve and explore the data using KqlMagic

 

reload_ext Kqlmagic
%config Kqlmagic.auto_dataframe = True
%kql kusto://code;cluster='demo11.westus';database='ML' -try_azcli_login
%kql df << OccupancyDetection
df[-4:]

 

 

Timestamp

Temperature

Humidity

Light

CO2

HumidityRatio

Occupancy

Test

20556

2015-02-18 09:16:00.0000000

20.865

27.7450

423.50

1514.5

0.004230

True

True

20557

2015-02-18 09:16:00.0000000

20.890

27.7450

423.50

1521.5

0.004237

True

True

20558

2015-02-18 09:17:00.0000000

20.890

28.0225

418.75

1632.0

0.004279

True

True

20559

2015-02-18 09:19:00.0000000

21.000

28.1000

409.00

1864.0

0.004321

True

True

 

Train your model

 

Split the data to features (x), labels (y) and for training/testing:

 

train_x = df[df['Test'] == False][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
train_y = df[df['Test'] == False]['Occupancy']
test_x = df[df['Test'] == True][['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']]
test_y = df[df['Test'] == True]['Occupancy']

print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)

 

(8143, 5) (8143,) (12417, 5) (12417,)

 

Train few classic models from Scikit-learn:

 

from sklearn import tree
from sklearn import neighbors
from sklearn import naive_bayes
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

#four classifier types
clf1 = tree.DecisionTreeClassifier()
clf2 = LogisticRegression(solver='liblinear')
clf3 = neighbors.KNeighborsClassifier()
clf4 = naive_bayes.GaussianNB()

clf1 = clf1.fit(train_x, train_y)
clf2 = clf2.fit(train_x, train_y)
clf3 = clf3.fit(train_x, train_y)
clf4 = clf4.fit(train_x, train_y)

# Accuracy on Testing set

for clf, label in zip([clf1, clf2, clf3, clf4], ['Decision Tree', 'Logistic Regression', 'K Nearest Neighbour', 'Naive Bayes']):
            scores = cross_val_score(clf, test_x, test_y, cv=5, scoring='accuracy')
            print("Accuracy: %0.4f (+/- %0.4f) [%s]" % (scores.mean(), scores.std(), label))

 

Accuracy: 0.8605 (+/- 0.1130) [Decision Tree]
Accuracy: 0.9887 (+/- 0.0071) [Logistic Regression]
Accuracy: 0.9656 (+/- 0.0224) [K Nearest Neighbour]
Accuracy: 0.8893 (+/- 0.1265) [Naive Bayes]

 

The logistic regression model is the best one 

 

Convert the model to ONNX

 

from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType

# We define the input type (5 sensors readings), convert the scikit-learn model to ONNX and serialize it

initial_type = [('float_input', FloatTensorType([None, 5]))]
onnx_model = convert_sklearn(clf2, initial_types=initial_type)
bmodel = onnx_model.SerializeToString()

 

Test ONNX Model

Predict using ONNX runtime

 

import numpy as np
import onnxruntime as rt

sess = rt.InferenceSession(bmodel)
input_name = sess.get_inputs()[0].name
label_name = sess.get_outputs()[0].name
pred_onnx = sess.run([label_name], {input_name: test_x.values.astype(np.float32)})[0]

# Verify ONNX and Scikit-learn predictions are same

pred_clf2 = clf2.predict(test_x)
diff_num = (pred_onnx != pred_clf2).sum()

if diff_num:
    print(f'Predictions difference between sklearn and onnxruntime, total {diff_num} elements differ')
else:
    print('Same prediction using sklearn and onnxruntime')

 

Same prediction using sklearn and onnxruntime

 

Scoring in ADX

 

Prerequisite

The Python image of ADX sandbox does NOT include ONNX runtime package. Therefore, we need to zip and upload it to a blob container and dynamically install from that container. Note that the blob container should be whitelisted to be accessible by ADX Python sandbox (see the appendix section of the python() plugin doc)

 

Here are the steps to create and upload the ONNX runtime package:

  1. Open Anaconda prompt on your local Python environment
  2. Download the onnxruntime package, run:

 

pip wheel onnxruntime

 

  1. Zip all the wheel files into onnxruntime-1.4.0-py36.zip (or your preferred name)
  2. Upload the zip file to a blob in the whitelisted blob container (you can use Azure Storage Explorer)
  3. Generate a SAS key with read permission to the blob

 

There are 2 options for retrieving the model for scoring:

  • serialize the model to a string to be stored in a standard table in ADX
  • copy the model to a blob container (that was previously whitelisted for access by ADX Python sandbox)

 

Scoring from serialized model which is stored in ADX table

 

Serializing the model and store it in ADX models table using KqlMagic

 

import pandas as pd
import datetime

models_tbl = 'ML_Models'
model_name = 'ONNX-Occupancy'

smodel = bmodel.hex()
now = datetime.datetime.now()
dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]})
dfm

 

 

name

timestamp

model

0

ONNX-Occupancy

2020-07-28 17:07:20.280040

08031208736b6c326f6e6e781a05312e362e3122076169…

 

set_query = '''
.set-or-append {0} <|
let tbl = dfm;
tbl
'''.format(models_tbl)
print(set_query)

 

.set-or-append ML_Models <|
let tbl = dfm;
tbl

 

%kql -query set_query

 

 

ExtentId

OriginalSize

ExtentSize

CompressedSize

IndexSize

RowCount

0

bfc9acc2-3d79-4e64-9a79-d2681547e43d

1430.0

1490.0

1040.0

450.0

1

Scoring from serialized model which is stored in ADX table

 

# NOTE: we run ADX scoring query here using KqlMagic by embedding the query from Kusto Explorer
# with r'''Kusto Explorer query''':

# NOTE: replace the string "**** YOUR SAS KEY ****" in the external_artifacts parameter with the real SAS

scoring_from_table_query = r'''
let classify_sf=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
    let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
    let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
    let code =
    'n'
    'import picklen'
    'import binasciin'
    'n'
    'smodel = kargs["smodel"]n'
    'features_cols = kargs["features_cols"]n'
    'pred_col = kargs["pred_col"]n'
    'bmodel = binascii.unhexlify(smodel)n'
    'n'
    'from sandbox_utils import Zipackagen'
    'Zipackage.install("onnxruntime-v17-py36.zip")n'
    'features_cols = kargs["features_cols"]n'
    'pred_col = kargs["pred_col"]n'
    'n'
    'import onnxruntime as rtn'
    'sess = rt.InferenceSession(bmodel)n'
    'input_name = sess.get_inputs()[0].namen'
    'label_name = sess.get_outputs()[0].namen'
    'df1 = df[features_cols]n'
    'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]n'
    'n'
    'result = dfn'
    'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
    'n'
    ;
    samples | evaluate python(typeof(*), code, kwargs, external_artifacts=pack('onnxruntime-v17-py36.zip', 'https://artifcatswestus.blob.core.windows.net/kusto/ONNX/onnxruntime-v17-py36.zip? **** YOUR SAS KEY ****')
)
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke classify_sf(ML_Models, 'ONNX-Occupancy', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
'''

%kql pred_df << -query scoring_from_table_query

pred_df[-4:]

 

 

Timestamp

Temperature

Humidity

Light

CO2

HumidityRatio

Occupancy

Test

pred_Occupancy

12413

2015-02-18 09:16:00+00:00

20.865

27.7450

423.50

1514.5

0.004230

True

True

True

12414

2015-02-18 09:16:00+00:00

20.890

27.7450

423.50

1521.5

0.004237

True

True

True

12415

2015-02-18 09:17:00+00:00

20.890

28.0225

418.75

1632.0

0.004279

True

True

True

12416

2015-02-18 09:19:00+00:00

21.000

28.1000

409.00

1864.0

0.004321

True

True

True

 

print('Confusion Matrix')
pred_df.groupby(['Occupancy', 'pred_Occupancy']).size()

 

Confusion Matrix

 

Occupancy         pred_Occupancy

False                    False                                  9284

                            True                                   112

True                     False                                  15

                            True                                   3006

 

Scoring from model which is stored in blob storage

 

Copy the model to blob

Note again that the blob container should be whitelisted to be accessible by ADX Python sandbox

 

from azure.storage.blob import BlobClient

conn_str = "BlobEndpoint=https://artifcatswestus.blob.core.windows.net/kusto;SharedAccessSignature=?**** YOUR SAS KEY ****"
blob_client = BlobClient.from_connection_string(conn_str, container_name="ONNX", blob_name="room_occupancy.onnx")

res = blob_client.upload_blob(bmodel, overwrite=True)
# NOTE: we run ADX scoring query here using KqlMagic by embedding the query from Kusto Explorer
# with r'''Kusto Explorer query''':

# NOTE: replace the strings "**** YOUR SAS KEY ****" below with the respective real SAS

scoring_from_blob_query = r'''
let classify_sf=(samples:(*), model_sas:string, features_cols:dynamic, pred_col:string)
{
    let kwargs = pack('features_cols', features_cols, 'pred_col', pred_col);
    let code =
    'n'
    'from sandbox_utils import Zipackagen'
    'Zipackage.install("onnxruntime-v17-py36.zip")n'
    'features_cols = kargs["features_cols"]n'
    'pred_col = kargs["pred_col"]n'
    'n'
    'import onnxruntime as rtn'
    'sess = rt.InferenceSession(r"C:Tempmodel.onnx")n'
    'input_name = sess.get_inputs()[0].namen'
    'label_name = sess.get_outputs()[0].namen'
    'df1 = df[features_cols]n'
    'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]n'
    'n'
    'result = dfn'
    'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
    'n'
    ;
    samples | evaluate python(typeof(*), code, kwargs,
        external_artifacts=pack('model.onnx', model_sas,
                                'onnxruntime-v17-py36.zip', 'https://artifcatswestus.blob.core.windows.net/kusto/ONNX/onnxruntime-v17-py36.zip? **** YOUR SAS KEY ****')
)
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke classify_sf('https://artifcatswestus.blob.core.windows.net/kusto/ONNX/room_occupancy.onnx? **** YOUR SAS KEY ****',
                     pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
'''

%kql pred_df << -query scoring_from_blob_query
pred_df[-4:]

 

 

Timestamp

Temperature

Humidity

Light

CO2

HumidityRatio

Occupancy

Test

pred_Occupancy

12413

2015-02-18 09:16:00+00:00

20.865

27.7450

423.50

1514.5

0.004230

True

True

True

12414

2015-02-18 09:16:00+00:00

20.890

27.7450

423.50

1521.5

0.004237

True

True

True

12415

2015-02-18 09:17:00+00:00

20.890

28.0225

418.75

1632.0

0.004279

True

True

True

12416

2015-02-18 09:19:00+00:00

21.000

28.1000

409.00

1864.0

0.004321

True

True

True

 

print('Confusion Matrix')
pred_df.groupby(['Occupancy', 'pred_Occupancy']).size()

 

Confusion Matrix

 

Occupancy         pred_Occupancy

False                    False                                  9284

                            True                                   112

True                     False                                  15

                            True                                   3006

 

Summary

 

In this tutorial we showed how to train a model in Scikit-learn, convert it to ONNX format and export it to ADX for scoring. This workflow is convenient as

  • Training can be done on any hardware platform, using any framework supporting ONNX
  • Scoring is done in ADX near the data, on the existing compute nodes, enabling near real time processing of big amounts of new data. There is no the need to export the data to external scoring service and import back the results. Consequently, scoring architecture is simpler and performance is much faster and scalable

Brought to you by Dr. Ware, Microsoft Office 365 Silver Partner, Charleston SC.