Debris 20190509

Docker deployment

The Docker daemon uses the HTTP_PROXY, HTTPS_PROXY, and NO_PROXY environmental variables in its start-up environment to configure HTTP or HTTPS proxy behavior. If your compant use a proxy, you need to change the setting of proxy in docker.

  1. edit the http-proxy.conf

    1
    2
    3
    nano /etc/systemd/system/docker.service.d/http-proxy.conf
    [Service]
    Environment="HTTP_PROXY=http://ip:port"
  2. edit daemon.json

1
2
3
4
5
sudo nano /etc/docker/daemon.json

{
"registry-mirrors":["https://docker.mirrors.ustc.edu.cn","http://hub-mirror.c.163.com"]
}
  1. sudo systemctl daemon-reload

  2. sudo systemctl restart docker

if not work

  1. check the version of docker
  2. downgrade the version to

TFserving

Once we have our Tensorflow or Keras based model trained, we needs to think on how to use it in, deploy it in production. we may want to Dockerize it as a micro-service, implementing a custom GRPC (or REST- or not) interface. Then deploy this to server or Kubernetes cluster and have other client micro-services calling it. Google Tensorflow Serving library helps here, to save our model to disk, and then load and serve a GRPC or RESTful interface to interact with it.

TFserving.png

Deploy docker environment of TFserving

1
docker pull tensorflow/serving

download example code

1
2
3
mkdir -p /tmp/tfserving
cd /tmp/tfserving
git clone https://github.com/tensorflow/serving

RESTful API

  1. run TFserving
1
2
3
4
5
docker run -p 8501:8501 \
--mount type=bind,\
source=/tmp/tfserving/serving/tensorflow_serving/servables/tensorflow/testdata/saved_model_half_plus_two_cpu,\
target=/models/half_plus_two \
-e MODEL_NAME=half_plus_two -t tensorflow/serving &
  1. run Client
1
2
curl -d '{"instances": [1.0, 2.0, 5.0]}' \
-X POST http://localhost:8501/v1/models/half_plus_two:predict
  1. result
1
{ "predictions": [2.5, 3.0, 4.5] }

gPRC API

  1. run a mnist model and save the model
1
python3 mnist_saved_model.py models/mnist

source code

  • rain a model
  • Save the model we have trained
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
    from __future__ import print_function

import os
import sys

# This is a placeholder for a Google-internal import.

import tensorflow as tf

import mnist_input_data

tf.app.flags.DEFINE_integer('training_iteration', 1000,
'number of training iterations.')
tf.app.flags.DEFINE_integer('model_version', 1, 'version number of the model.')
tf.app.flags.DEFINE_string('work_dir', '/tmp', 'Working directory.')
FLAGS = tf.app.flags.FLAGS


def main(_):
if len(sys.argv) < 2 or sys.argv[-1].startswith('-'):
print('Usage: mnist_saved_model.py [--training_iteration=x] '
'[--model_version=y] export_dir')
sys.exit(-1)
if FLAGS.training_iteration <= 0:
print('Please specify a positive value for training iteration.')
sys.exit(-1)
if FLAGS.model_version <= 0:
print('Please specify a positive value for version number.')
sys.exit(-1)

# Train model
print('Training model...')
mnist = mnist_input_data.read_data_sets(FLAGS.work_dir, one_hot=True)
sess = tf.InteractiveSession()
serialized_tf_example = tf.placeholder(tf.string, name='tf_example')
feature_configs = {'x': tf.FixedLenFeature(shape=[784], dtype=tf.float32),}
tf_example = tf.parse_example(serialized_tf_example, feature_configs)
x = tf.identity(tf_example['x'], name='x') # use tf.identity() to assign name
y_ = tf.placeholder('float', shape=[None, 10])
w = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
sess.run(tf.global_variables_initializer())
y = tf.nn.softmax(tf.matmul(x, w) + b, name='y')
cross_entropy = -tf.reduce_sum(y_ * tf.log(y))
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
values, indices = tf.nn.top_k(y, 10)
table = tf.contrib.lookup.index_to_string_table_from_tensor(
tf.constant([str(i) for i in range(10)]))
prediction_classes = table.lookup(tf.to_int64(indices))
for _ in range(FLAGS.training_iteration):
batch = mnist.train.next_batch(50)
train_step.run(feed_dict={x: batch[0], y_: batch[1]})
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
print('training accuracy %g' % sess.run(
accuracy, feed_dict={
x: mnist.test.images,
y_: mnist.test.labels
}))
print('Done training!')

# Export model
# WARNING(break-tutorial-inline-code): The following code snippet is
# in-lined in tutorials, please update tutorial documents accordingly
# whenever code changes.
export_path_base = sys.argv[-1]
export_path = os.path.join(
tf.compat.as_bytes(export_path_base),
tf.compat.as_bytes(str(FLAGS.model_version)))
print('Exporting trained model to', export_path)
builder = tf.saved_model.builder.SavedModelBuilder(export_path)

# Build the signature_def_map.
classification_inputs = tf.saved_model.utils.build_tensor_info(
serialized_tf_example)
classification_outputs_classes = tf.saved_model.utils.build_tensor_info(
prediction_classes)
classification_outputs_scores = tf.saved_model.utils.build_tensor_info(values)

classification_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={
tf.saved_model.signature_constants.CLASSIFY_INPUTS:
classification_inputs
},
outputs={
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES:
classification_outputs_classes,
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_SCORES:
classification_outputs_scores
},
method_name=tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))

tensor_info_x = tf.saved_model.utils.build_tensor_info(x)
tensor_info_y = tf.saved_model.utils.build_tensor_info(y)

prediction_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={'images': tensor_info_x},
outputs={'scores': tensor_info_y},
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME))

builder.add_meta_graph_and_variables(
sess, [tf.saved_model.tag_constants.SERVING],
signature_def_map={
'predict_images':
prediction_signature,
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
classification_signature,
},
main_op=tf.tables_initializer(),
strip_default_attrs=True)

builder.save()

print('Done exporting!')


if __name__ == '__main__':
tf.app.run()
```


2. Deploy the model we have exported using TFserving

```BASH
docker run -p 8500:8500 \
--mount type=bind,source=$(pwd)/models/mnist,target=/models/mnist \
-e MODEL_NAME=mnist -t tensorflow/serving
  1. run client
1
pip3 install tensorflow-serving-api
1
python3 mnist_client.py --num_tests=1000 --server=127.0.0.1:8500
  1. result
1
Inference error rate: 11.13%

Client source code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from __future__ import print_function

import sys
import threading

# This is a placeholder for a Google-internal import.

import grpc
import numpy
import tensorflow as tf

from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import mnist_input_data


tf.app.flags.DEFINE_integer('concurrency', 1,
'maximum number of concurrent inference requests')
tf.app.flags.DEFINE_integer('num_tests', 100, 'Number of test images')
tf.app.flags.DEFINE_string('server', '', 'PredictionService host:port')
tf.app.flags.DEFINE_string('work_dir', '/tmp', 'Working directory. ')
FLAGS = tf.app.flags.FLAGS


class _ResultCounter(object):
"""Counter for the prediction results."""

def __init__(self, num_tests, concurrency):
self._num_tests = num_tests
self._concurrency = concurrency
self._error = 0
self._done = 0
self._active = 0
self._condition = threading.Condition()

def inc_error(self):
with self._condition:
self._error += 1

def inc_done(self):
with self._condition:
self._done += 1
self._condition.notify()

def dec_active(self):
with self._condition:
self._active -= 1
self._condition.notify()

def get_error_rate(self):
with self._condition:
while self._done != self._num_tests:
self._condition.wait()
return self._error / float(self._num_tests)

def throttle(self):
with self._condition:
while self._active == self._concurrency:
self._condition.wait()
self._active += 1


def _create_rpc_callback(label, result_counter):
"""Creates RPC callback function.

Args:
label: The correct label for the predicted example.
result_counter: Counter for the prediction result.
Returns:
The callback function.
"""
def _callback(result_future):
"""Callback function.

Calculates the statistics for the prediction result.

Args:
result_future: Result future of the RPC.
"""
exception = result_future.exception()
if exception:
result_counter.inc_error()
print(exception)
else:
sys.stdout.write('.')
sys.stdout.flush()
response = numpy.array(
result_future.result().outputs['scores'].float_val)
prediction = numpy.argmax(response)
if label != prediction:
result_counter.inc_error()
result_counter.inc_done()
result_counter.dec_active()
return _callback


def do_inference(hostport, work_dir, concurrency, num_tests):
"""Tests PredictionService with concurrent requests.

Args:
hostport: Host:port address of the PredictionService.
work_dir: The full path of working directory for test data set.
concurrency: Maximum number of concurrent requests.
num_tests: Number of test images to use.

Returns:
The classification error rate.

Raises:
IOError: An error occurred processing test data set.
"""
test_data_set = mnist_input_data.read_data_sets(work_dir).test
channel = grpc.insecure_channel(hostport)
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
result_counter = _ResultCounter(num_tests, concurrency)
for _ in range(num_tests):
request = predict_pb2.PredictRequest()
request.model_spec.name = 'mnist'
request.model_spec.signature_name = 'predict_images'
image, label = test_data_set.next_batch(1)
request.inputs['images'].CopyFrom(
tf.contrib.util.make_tensor_proto(image[0], shape=[1, image[0].size]))
result_counter.throttle()
result_future = stub.Predict.future(request, 5.0) # 5 seconds
result_future.add_done_callback(
_create_rpc_callback(label[0], result_counter))
return result_counter.get_error_rate()


def main(_):
if FLAGS.num_tests > 10000:
print('num_tests should not be greater than 10k')
return
if not FLAGS.server:
print('please specify server host:port')
return
error_rate = do_inference(FLAGS.server, FLAGS.work_dir,
FLAGS.concurrency, FLAGS.num_tests)
print('\nInference error rate: %s%%' % (error_rate * 100))


if __name__ == '__main__':
tf.app.run()
Donate article here
Share the post