TensorFlow Data Service

Extract - Transform - Load (ETL)

ETL Pipeline

First, to perform the Extract process we use tfts.load. This handles everything from downloading the raw data to parsing and splitting it, giving us a dataset. Next, we perform the Transform process. In this simple example, our transform process will just consist of shuffling the dataset. Finally, we Load one record by using the take(1) method. In this case, each record consists of an image and its corresponding label. After loading the record we proceed to plot the image and print its corresponding label.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# EXTRACT
dataset = tfds.load(name="mnist", split="train")

# TRANSFORM
dataset.shuffle(100)

# LOAD
for data in dataset.take(1):
image = data["image"].numpy().squeeze()
label = data["label"].numpy()

print("Label: {}".format(label))
plt.imshow(image, cmap=plt.cm.binary)
plt.show()

US3 API for TensorFlow datasets.

S3 API
1
2
3
4
import tensorflow as tf
import tensorflow_datasets as tfds

print("\u2022 Using TensorFlow Version:", tf.__version__)

Before using the new S3 API, we must first find out whether the MNIST dataset implements the new S3 API. In the cell below we indicate that we want to use version 3.*.* of the MNIST dataset.

1
2
3
4

mnist_builder = tfds.builder("mnist:3.*.*")

print(mnist_builder.version.implements(tfds.core.Experiment.S3))

We can see that the code above printed True, which means that version 3.*.* of the MNIST dataset supports the new S3 API.

Now, let’s see how we can use the S3 API to download the MNIST dataset and specify the splits we want use. In the code below we download the train and test splits of the MNIST dataset and then we print their size. We will see that there are 60,000 records in the training set and 10,000 in the test set.

1
2
3
4
train_ds, test_ds = tfds.load('mnist:3.*.*', split=['train', 'test'])

print(len(list(train_ds)))
print(len(list(test_ds)))

In the S3 API we can use strings to specify the slicing instructions. For example, in the cell below we will merge the training and test sets by passing the string ’train+test' to the split argument.

1
2
3
combined = tfds.load('mnist:3.*.*', split='train+test')

print(len(list(combined)))

We can also use Python style list slicers to specify the data we want. For example, we can specify that we want to take the first 10,000 records of the train split with the string 'train[:10000]', as shown below:

1
2
3
first10k = tfds.load('mnist:3.*.*', split='train[:10000]')

print(len(list(first10k)))

The S3 API, also allows us to specify the percentage of the data we want to use. For example, we can select the first 20\% of the training set with the string 'train[:20%]', as shown below:

1
2
3
first20p = tfds.load('mnist:3.*.*', split='train[:20%]')

print(len(list(first20p)))

We can see that first20p contains 12,000 records, which is indeed 20\% the total number of records in the training set. Recall that the training set contains 60,000 records.

Because the slices are string-based we can use loops, like the ones shown below, to slice up the dataset and make some pretty complex splits. For example, the loops below create 10 complimentary validation and training sets (each loop returns a list with 5 data sets).

1
2
3
val_ds = tfds.load('mnist:3.*.*', split=['train[{}%:{}%]'.format(k, k+20) for k in range(0, 100, 20)])

train_ds = tfds.load('mnist:3.*.*', split=['train[:{}%]+train[{}%:]'.format(k, k+20) for k in range(0, 100, 20)])

The S3 API also allows us to compose new datasets by using pieces from different splits. For example, we can create a new dataset from the first 10\% of the test set and the last 80\% of the training set, as shown below.

1
2
3
composed_ds = tfds.load('mnist:3.*.*', split='test[:10%]+train[-80%:]')

print(len(list(composed_ds)))

Pipeline for Classifing Structured Data

Import TensorFlow and Other Libraries

1
2
3
4
5
6
7
8
import pandas as pd
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow import feature_column

from os import getcwd
from sklearn.model_selection import train_test_split

Use Pandas to Create a Dataframe

Pandas is a Python library with many helpful utilities for loading and working with structured data. We will use Pandas to download the dataset and load it into a dataframe.

1
2
3
filePath = f"{getcwd()}/../tmp2/heart.csv"
dataframe = pd.read_csv(filePath)
dataframe.head()
age sex cp trestbps chol fbs restecg thalach exang oldpeak slope ca thal target
0 63 1 1 145 233 1 2 150 0 2.3 3 0 fixed 0
1 67 1 4 160 286 0 2 108 1 1.5 2 3 normal 1
2 67 1 4 120 229 0 2 129 1 2.6 2 2 reversible 0
3 37 1 3 130 250 0 0 187 0 3.5 3 0 normal 0
4 41 0 2 130 204 0 2 172 0 1.4 1 0 normal 0

Split the Dataframe Into Train, Validation, and Test Sets

The dataset we downloaded was a single CSV file. We will split this into train, validation, and test sets.

1
2
3
4
5
train, test = train_test_split(dataframe, test_size=0.2)
train, val = train_test_split(train, test_size=0.2)
print(len(train), 'train examples')
print(len(val), 'validation examples')
print(len(test), 'test examples')
193 train examples
49 validation examples
61 test examples

Create an Input Pipeline Using tf.data

Next, we will wrap the dataframes with tf.data. This will enable us to use feature columns as a bridge to map from the columns in the Pandas dataframe to features used to train the model. If we were working with a very large CSV file (so large that it does not fit into memory), we would use tf.data to read it from disk directly.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# EXERCISE: A utility method to create a tf.data dataset from a Pandas Dataframe.

def df_to_dataset(dataframe, shuffle=True, batch_size=32):
dataframe = dataframe.copy()

# Use Pandas dataframe's pop method to get the list of targets.
labels = dataframe["target"].values
dataframe.drop("target", axis = 1, inplace = True)

# Create a tf.data.Dataset from the dataframe and labels.
ds = tf.data.Dataset.from_tensor_slices((dict(dataframe),labels))

if shuffle:
# Shuffle dataset.
ds = ds.shuffle(3)

# Batch dataset with specified batch_size parameter.
ds = ds.batch(batch_size)

return ds
1
2
3
4
batch_size = 5 # A small batch sized is used for demonstration purposes
train_ds = df_to_dataset(train, batch_size=batch_size)
val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_ds = df_to_dataset(test, shuffle=False, batch_size=batch_size)

Understand the Input Pipeline

Now that we have created the input pipeline, let’s call it to see the format of the data it returns. We have used a small batch size to keep the output readable.

1
2
3
4
for feature_batch, label_batch in train_ds.take(1):
print('Every feature:', list(feature_batch.keys()))
print('A batch of ages:', feature_batch['age'])
print('A batch of targets:', label_batch )
Every feature: ['age', 'sex', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'oldpeak', 'slope', 'ca', 'thal']
A batch of ages: tf.Tensor([51 63 64 58 57], shape=(5,), dtype=int32)
A batch of targets: tf.Tensor([0 1 0 0 0], shape=(5,), dtype=int64)

We can see that the dataset returns a dictionary of column names (from the dataframe) that map to column values from rows in the dataframe.

Create Several Types of Feature Columns

TensorFlow provides many types of feature columns. In this section, we will create several types of feature columns, and demonstrate how they transform a column from the dataframe.

1
2
# Try to demonstrate several types of feature columns by getting an example.
example_batch = next(iter(train_ds))[0]
1
2
3
4
# A utility method to create a feature column and to transform a batch of data.
def demo(feature_column):
feature_layer = layers.DenseFeatures(feature_column, dtype='float64')
print(feature_layer(example_batch).numpy())

Numeric Columns

The output of a feature column becomes the input to the model (using the demo function defined above, we will be able to see exactly how each column from the dataframe is transformed). A numeric column is the simplest type of column. It is used to represent real valued features.

1
2
3
4
# EXERCISE: Create a numeric feature column out of 'age' and demo it.
age = tf.feature_column.numeric_column("age")

demo(age)
[[51.]
 [58.]
 [63.]
 [64.]
 [60.]]

In the heart disease dataset, most columns from the dataframe are numeric.

Bucketized Columns

Often, you don’t want to feed a number directly into the model, but instead split its value into different categories based on numerical ranges. Consider raw data that represents a person’s age. Instead of representing age as a numeric column, we could split the age into several buckets using a bucketized column.

1
2
3
4
5
6
7
# EXERCISE: Create a bucketized feature column out of 'age' with
# the following boundaries and demo it.
boundaries = [18, 25, 30, 35, 40, 45, 50, 55, 60, 65]

age_buckets = tf.feature_column.bucketized_column(age, boundaries = boundaries)

demo(age_buckets)
[[0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]]

Notice the one-hot values above describe which age range each row matches.

Categorical Columns

In this dataset, thal is represented as a string (e.g. ‘fixed’, ‘normal’, or ‘reversible’). We cannot feed strings directly to a model. Instead, we must first map them to numeric values. The categorical vocabulary columns provide a way to represent strings as a one-hot vector (much like you have seen above with age buckets).

Note: You will probably see some warning messages when running some of the code cell below. These warnings have to do with software updates and should not cause any errors or prevent your code from running.

1
2
3
4
5
6
7
8
9
# EXERCISE: Create a categorical vocabulary column out of the
# above mentioned categories with the key specified as 'thal'.
thal = tf.feature_column.categorical_column_with_vocabulary_list(
'thal', ['fixed', 'normal', 'reversible'])

# EXERCISE: Create an indicator column out of the created categorical column.
thal_one_hot = tf.feature_column.indicator_column(thal)

demo(thal_one_hot)
[[0. 1. 0.]
 [0. 1. 0.]
 [0. 0. 1.]
 [0. 0. 1.]
 [0. 1. 0.]]

The vocabulary can be passed as a list using categorical_column_with_vocabulary_list, or loaded from a file using categorical_column_with_vocabulary_file.

Embedding Columns

Suppose instead of having just a few possible strings, we have thousands (or more) values per category. For a number of reasons, as the number of categories grow large, it becomes infeasible to train a neural network using one-hot encodings. We can use an embedding column to overcome this limitation. Instead of representing the data as a one-hot vector of many dimensions, an embedding column represents that data as a lower-dimensional, dense vector in which each cell can contain any number, not just 0 or 1. You can tune the size of the embedding with the dimension parameter.

1
2
3
4
5
6
7
8
# EXERCISE: Create an embedding column out of the categorical
# vocabulary you just created (thal). Set the size of the
# embedding to 8, by using the dimension parameter.

thal_embedding = tf.feature_column.embedding_column(thal, dimension=8)


demo(thal_embedding)
[[-1.4254066e-01 -1.0374661e-01  3.4352791e-01 -3.3996427e-01
  -3.2193713e-02 -1.8381193e-01 -1.8051244e-01  3.2638407e-01]
 [-1.4254066e-01 -1.0374661e-01  3.4352791e-01 -3.3996427e-01
  -3.2193713e-02 -1.8381193e-01 -1.8051244e-01  3.2638407e-01]
 [-6.5549983e-05  2.7680036e-01  4.1849682e-01  5.3418136e-01
  -1.6281548e-01  2.5406811e-01  8.8969752e-02  1.8004593e-01]
 [-6.5549983e-05  2.7680036e-01  4.1849682e-01  5.3418136e-01
  -1.6281548e-01  2.5406811e-01  8.8969752e-02  1.8004593e-01]
 [-1.4254066e-01 -1.0374661e-01  3.4352791e-01 -3.3996427e-01
  -3.2193713e-02 -1.8381193e-01 -1.8051244e-01  3.2638407e-01]]

Hashed Feature Columns

Another way to represent a categorical column with a large number of values is to use a categorical_column_with_hash_bucket. This feature column calculates a hash value of the input, then selects one of the hash_bucket_size buckets to encode a string. When using this column, you do not need to provide the vocabulary, and you can choose to make the number of hash buckets significantly smaller than the number of actual categories to save space.

1
2
3
4
5
6
# EXERCISE: Create a hashed feature column with 'thal' as the key and 
# 1000 hash buckets.
thal_hashed = tf.feature_column.categorical_column_with_hash_bucket(
'thal', hash_bucket_size=1000)

demo(feature_column.indicator_column(thal_hashed))
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]

Crossed Feature Columns

Combining features into a single feature, better known as feature crosses, enables a model to learn separate weights for each combination of features. Here, we will create a new feature that is the cross of age and thal. Note that crossed_column does not build the full table of all possible combinations (which could be very large). Instead, it is backed by a hashed_column, so you can choose how large the table is.

1
2
3
4
5
# EXERCISE: Create a crossed column using the bucketized column (age_buckets),
# the categorical vocabulary column (thal) previously created, and 1000 hash buckets.
crossed_feature = tf.feature_column.crossed_column([age_buckets, thal], hash_bucket_size=1000)

demo(feature_column.indicator_column(crossed_feature))
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]

Choose Which Columns to Use

We have seen how to use several types of feature columns. Now we will use them to train a model. The goal of this exercise is to show you the complete code needed to work with feature columns. We have selected a few columns to train our model below arbitrarily.

If your aim is to build an accurate model, try a larger dataset of your own, and think carefully about which features are the most meaningful to include, and how they should be represented.

1
dataframe.dtypes
age           int64
sex           int64
cp            int64
trestbps      int64
chol          int64
fbs           int64
restecg       int64
thalach       int64
exang         int64
oldpeak     float64
slope         int64
ca            int64
thal         object
target        int64
dtype: object

You can use the above list of column datatypes to map the appropriate feature column to every column in the dataframe.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# EXERCISE: Fill in the missing code below
feature_columns = []

# Numeric Cols.
# Create a list of numeric columns. Use the following list of columns
# that have a numeric datatype: ['age', 'trestbps', 'chol', 'thalach', 'oldpeak', 'slope', 'ca'].
numeric_columns = ['age', 'trestbps', 'chol', 'thalach', 'oldpeak', 'slope', 'ca']

for header in numeric_columns:
# Create a numeric feature column out of the header.
numeric_feature_column = tf.feature_column.numeric_column(header)

feature_columns.append(numeric_feature_column)

# Bucketized Cols.
# Create a bucketized feature column out of the age column (numeric column)
# that you've already created. Use the following boundaries:
# [18, 25, 30, 35, 40, 45, 50, 55, 60, 65]
age_buckets = tf.feature_column.bucketized_column(age, boundaries = [18, 25, 30, 35, 40, 45, 50, 55, 60, 65] )

feature_columns.append(age_buckets)

# Indicator Cols.
# Create a categorical vocabulary column out of the categories
# ['fixed', 'normal', 'reversible'] with the key specified as 'thal'.
thal = feature_column.categorical_column_with_vocabulary_list(
'thal', ['fixed', 'normal', 'reversible'])

# Create an indicator column out of the created thal categorical column
thal_one_hot = feature_column.indicator_column(thal)

feature_columns.append(thal_one_hot)

# Embedding Cols.
# Create an embedding column out of the categorical vocabulary you
# just created (thal). Set the size of the embedding to 8, by using
# the dimension parameter.
thal_embedding = tf.feature_column.embedding_column(thal, dimension=8)

feature_columns.append(thal_embedding)

# Crossed Cols.
# Create a crossed column using the bucketized column (age_buckets),
# the categorical vocabulary column (thal) previously created, and 1000 hash buckets.
crossed_feature = feature_column.crossed_column([age_buckets, thal], hash_bucket_size=1000)

# Create an indicator column out of the crossed column created above to one-hot encode it.
crossed_feature = feature_column.indicator_column(crossed_feature)

feature_columns.append(crossed_feature)

Create a Feature Layer

Now that we have defined our feature columns, we will use a DenseFeatures layer to input them to our Keras model.

1
2
# EXERCISE: Create a Keras DenseFeatures layer and pass the feature_columns you just created.
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

Earlier, we used a small batch size to demonstrate how feature columns worked. We create a new input pipeline with a larger batch size.

1
2
3
4
batch_size = 32
train_ds = df_to_dataset(train, batch_size=batch_size)
val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_ds = df_to_dataset(test, shuffle=False, batch_size=batch_size)

Create, Compile, and Train the Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
model = tf.keras.Sequential([
feature_layer,
layers.Dense(128, activation='relu'),
layers.Dense(128, activation='relu'),
layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy'])

model.fit(train_ds,
validation_data=val_ds,
epochs=100)
......
7/7 [==============================] - 0s 45ms/step - loss: 0.2225 - accuracy: 0.8912 - val_loss: 0.6998 - val_accuracy: 0.6939
Epoch 98/100
7/7 [==============================] - 0s 55ms/step - loss: 0.2089 - accuracy: 0.9067 - val_loss: 0.6846 - val_accuracy: 0.7143
Epoch 99/100
7/7 [==============================] - 0s 44ms/step - loss: 0.2043 - accuracy: 0.8964 - val_loss: 0.7292 - val_accuracy: 0.7347
Epoch 100/100
7/7 [==============================] - 0s 55ms/step - loss: 0.2008 - accuracy: 0.9016 - val_loss: 0.7064 - val_accuracy: 0.7143





<tensorflow.python.keras.callbacks.History at 0x7f33184937b8>
1
2
loss, accuracy = model.evaluate(test_ds)
print("Accuracy", accuracy)
2/2 [==============================] - 1s 329ms/step - loss: 0.5511 - accuracy: 0.8197
Accuracy 0.8196721
Donate article here