This is the second part of a two parts tutorial. In this part we take the trained model that we created in the first part, upload it to Machine Learning Foundation and use it. We will create three Python files, one that carries a bunch of helper functions to interact with SAP Cloud Platform, a second one to upload our model and finally one inferring our model.
import os
import base64
import requests as req
from json import JSONDecoder as jd
import tensorflow as tf
import skimage.io
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
from grpc.beta import implementations
def load_json_from_file(path):
with open(path, mode='rt') as json:
data = json.read()
return jd(strict=False).decode(data)
def generate_bearer(auth_url, client_id, client_secret):
url = "/".join([auth_url, 'oauth', 'token'])
querystring = {"grant_type": "client_credentials"}
credetials = '{}:{}'.format(client_id, client_secret)
base64string = base64.b64encode(str.encode(credetials)).decode("ascii")
headers = { 'Content-type': 'application/x-www-form-urlencoded',
'Authorization' : "Basic %s" % base64string}
response = req.get(url, params=querystring, headers=headers)
token = jd(strict=False).decode(response.text)
return 'Bearer ' + token['access_token']
def upload_model(model_name, token, service_urls, model_path):
url = service_urls['MODEL_REPO_URL'] + '/api/v2/models/{}/versions'.format(model_name)
headers = {'Authorization' : token,
'Accept' : 'application/json'}
body = {'file' : (os.path.split(model_path)[1], _load_model_from_file(model_path))}
response = req.post(url, files=body, headers=headers)
return (response.status_code, jd(strict=False).decode(response.content.decode()))
def _load_model_from_file(path):
with open(path, mode='rb') as model:
return model.read()
def create_model_server(model_name, model_version, token, service_urls):
url = service_urls['DEPLOYMENT_API_URL'] + '/api/v2/modelServers'
headers = { 'Content-Type' : 'application/json',
'Accept' : 'application/json',
'Authorization' : token}
data = {'specs' : {'enableHttpEndpoint' : True,
'modelRuntimeId': 'tf-1.8',
'models' : [{'modelName' : model_name, 'modelVersion' : model_version}]
},
'replicas': 1,
'resourcePlanId': "starter"
}
response = req.post(url, json=data, headers=headers)
return (response.status_code, jd(strict=False).decode(response.content.decode()))
def get_model_server_by_id(model_server_id, token, service_urls):
url = service_urls['DEPLOYMENT_API_URL'] + '/api/v2/modelServers/{}'.format(model_server_id)
return _execute_get(token, url)
def get_model_server_by_name(model_name, token, service_urls):
url = service_urls['DEPLOYMENT_API_URL'] + '/api/v2/modelServers?modelName={}'.format(model_name)
return _execute_get(token, url)
def remove_model_server(model_server_id, token, service_urls):
url = service_urls['DEPLOYMENT_API_URL'] + '/api/v2/modelServers/{}'.format(model_server_id)
headers = {'Authorization' : token,
'Accept' : 'application/json'}
response = req.delete(url, headers=headers)
return (response.status_code, response.content.decode())
def _execute_get(token, url):
headers = {
'Accept': 'application/json',
'Authorization': token,
'Cache-Control': "no-cache"
}
response = req.get(url, headers=headers)
return response.status_code, jd(strict=False).decode(response.text)
DATA_TYPES = {"float" : 1, "int64" : 9, "int32" : 3, "string" : 7}
VALUE_TYPES = {1 :"float_val", 3 : "int32_val", 7 : "string_val", 9 : "int64_val"}
def _to_float(data):
return data.astype(float)/255
def infer_model_http(endpoint, token, model_name, model_version, image_path, signature_name = 'predict_images'):
# Read an image as a two-dimensional array of integer
dtype = DATA_TYPES['float']
data = _to_float(skimage.io.imread(image_path))
url = 'http://' + endpoint['host'] + '/api/v2/predict'
headers = { 'Content-Type' : 'application/json',
'Accept': 'application/json',
'Authorization' : token}
body = {'model_spec' : {
'name': model_name,
'version': model_version,
'signature_name': signature_name
},
'inputs': {
'images': {
'dtype': dtype,
'tensor_shape': {
'dim': [
{'size': '1'},
{'size': '32'},
{'size': '32'},
{'size': '3'}]},
'{}'.format(VALUE_TYPES[dtype]): data.flatten().tolist()
}
}
}
response = req.post(url, json = body, headers = headers)
if response.status_code >= 300:
return (response.status_code, response.content.decode())
else:
return (response.status_code, jd(strict = False).decode(response.content.decode())['outputs']['scores']['float_val'])
self.images = images.reshape(n,3,32,32).transpose(0,2,3,1).astype(float)/255
'{}'.format(VALUE_TYPES[dtype]): data.flatten().tolist()
def infer_model_grcp(endpoint, token, model_name, image_path, signature_name = 'predict_images'):
#Read an image as a two dimensional array of integer
data = skimage.io.imread(image_path)
#Create a channel for an RPC with tensorflow
credentials = implementations.ssl_channel_credentials(root_certificates=endpoint["caCrt"].encode())
channel = implementations.secure_channel(str(endpoint['host']),
int(endpoint['port']), credentials)
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel._channel)
req = predict_pb2.PredictRequest()
req.model_spec.name = model_name
req.model_spec.signature_name = signature_name
req.inputs["images"].CopyFrom(tf.contrib.util.make_tensor_proto(_to_float(data), shape=[1,32,32,3], dtype="float32"))
res = stub.Predict(req, 150, metadata=_metadata_transformer(token))
intermediates = (str(res).split('}')[3].split('\n'))[1:][:-1]
values = [float(value.split(':')[1]) for value in intermediates]
return values
def _metadata_transformer(bearer_token):
additions = []
token = bearer_token
additions.append(('authorization', token))
return tuple(additions)
import scp_access as scp
import time
import requests as req
from pprint import pprint as pp
MODEL_PATH = 'cifar10.zip'
SERVICE_KEY_PATH = 'ServiceKey.txt'
MODEL_NAME = 'cifar10-tutorial'
# Load the Service Key
service_key = scp.load_json_from_file(SERVICE_KEY_PATH)
print("Service Key read")
bearerToken = scp.generate_bearer(service_key['url'], service_key['clientid'], service_key['clientsecret'])
print('Token created')
#Load the trained model to the repository
try:
print('Start model upload')
status, resp = scp.upload_model(MODEL_NAME, bearerToken, service_key['serviceurls'], MODEL_PATH)
except req.RequestException as e:
print(e.read().decode())
else:
print('Model uploaded ({}). Search for old versions'.format(status))
status, model_severs = scp.get_model_server_by_name(MODEL_NAME, bearerToken, service_key['serviceurls'])
try:
# Find existing model server for older versions of the model
status, resp = scp.create_model_server(resp['modelName'], resp['version'], bearerToken, service_key['serviceurls'])
except req.RequestException as e:
print(e.read().decode())
else:
print('Started server creation. Create no. instances {} with plan "{}"'.format(resp['specs']['replicas'], resp['specs']['resourcePlanId']))
deployment_state = resp['status']['state']
model_server_id = resp['id']
# The deployment is a asynchronous process. We need to wait still it ends
while deployment_state == 'PENDING':
print("Current deployment status: {}".format(deployment_state))
time.sleep(10) # Ping for the status every 10 seconds
status, resp = scp.get_model_server_by_id(model_server_id, bearerToken, service_key['serviceurls'])
deployment_state = resp['status']['state']
if deployment_state == 'SUCCEEDED':
print('Model successfully deployed. Model Server Id: {}'.format(model_server_id))
if len(model_severs['modelServers']) > 0:
#After the model has successfully deployed, delete old model server
print('Remove old versions')
for model_servers in model_severs['modelServers']:
try:
status, _ = scp.remove_model_server(model_server['id'], bearerToken, service_key['serviceurls'])
if status < 300:
print('Successfully removed version {}'.format(model_server['specs']['models'][0]['modelVersion']))
except req.RequestException as e:
print(e.read().decode())
else:
print ('Final status: {}'.format(status))
print(resp)
import scp_access as scp
import numpy as np
CIFAR_CATEGORY = ['Airplane', 'Automobile', 'Bird', 'Cat', 'Deer', 'Dog', 'Frog', 'Horse', 'Ship', 'Truck']
SERVICE_KEY_PATH = 'ServiceKey.txt'
AIRPLANE_PATH = 'cifar-airplane5.png'
SHIP_PATH = 'cifar-ship9.png'
MODEL_NAME = 'cifar10-tutorial'
service_key = scp.load_json_from_file(SERVICE_KEY_PATH)
print("Service Key read")
bearer_token = scp.generate_bearer(service_key['url'], service_key['clientid'], service_key['clientsecret'])
print('Token created')
status, resp = scp.get_model_server_by_name(MODEL_NAME, bearer_token, service_key['serviceurls'])
print('Model server instance found. Run test')
resp = resp['modelServers'][0]
result = scp.infer_model_grcp(resp['endpoints'][0], bearer_token, resp['specs']['models'][0]['modelName'], AIRPLANE_PATH)
print("Result of test: {}".format(result))
category = np.argmax(result)
print("Selected Category: {}({})".format(category, CIFAR_CATEGORY[category]))
status, result = scp.infer_model_http(resp['endpoints'][1], bearer_token, MODEL_NAME, resp['specs']['models'][0]['modelVersion'], SHIP_PATH)
print("Result of test: {}".format(result))
category = np.argmax(result)
print("Selected Category: {}({})".format(category, CIFAR_CATEGORY[category]))
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.