Spark Fundamental

Basic concepts

Work with the SparkContext object

The Spark driver application uses the SparkContext object to allow a programming interface to interact with the driver application. The SparkContext object tells Spark how and where to access a cluster.

1
2
3
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

Work with Resilient Distributed Datasets

Spark uses an abstraction for working with data called a Resilient Distributed Dataset (RDD). An RDD is a collection of elements that can be operated on in parallel. RDDs are immutable, so you can’t update the data in them. To update data in an RDD, you must create a new RDD. In Spark, all work is done by creating new RDDs, transforming existing RDDs, or using RDDs to compute results. When working with RDDs, the Spark driver application automatically distributes the work across the cluster.
You can construct RDDs by parallelizing existing Python collections (lists), by manipulating RDDs, or by manipulating files in HDFS or any other storage system.
You can run these types of methods on RDDs:

  • Actions: query the data and return values
  • Transformations: manipulate data values and return pointers to new RDDs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Create a Python collection of the numbers 1 - 10
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Put the collection into an RDD named x_nbr_rdd using the parallelize method
x_nbr_rdd = sc.parallelize(x)

# View the first element in the RDD
# Each number in the collection is in a different element in the RDD. Because the first() method returned a value, it is an action.
x_nbr_rdd.first()

# Now view the first five elements in the RDD
x_nbr_rdd.take(5)

# Create another RDD
y = ["Hello Human", "My Name is Spark"]

# y_str_rdd = sc.parallelize(y)
y_str_rdd.take(1)

Manipulate data in RDDs

Remember that to manipulate data, you use transformation functions.
Here are some common Python transformation functions that you’ll be using in this notebook:

  • map(func): returns a new RDD with the results of running the specified function on each element
  • filter(func): returns a new RDD with the elements for which the specified function returns true
  • distinct([numTasks])): returns a new RDD that contains the distinct elements of the source RDD
  • flatMap(func): returns a new RDD by first running the specified function on all elements, returning 0 or more results for each original element, and then flattening the results into individual elements
    You can also create functions that run a single expression and don’t have a name with the Python lambda keyword. For example, this function returns the sum of its arguments: lambda a , b : a + b.

Update numeric values

Run the map() function with the lambda keyword to replace each element, X, in your first RDD (the one that has numeric values) with X+1. Because RDDs are immutable, you need to specify a new RDD name.

Be careful with the collect method! It returns all elements of the RDD to the driver. Returning a large data set might be not be very useful. No-one wants to scroll through a million rows!

1
2
3
4
x_nbr_rdd_2 = x_nbr_rdd.map(lambda x: x+1)

# Now look at the elements of the new RDD
x_nbr_rdd_2.collect()

Add numbers in an array

An array of values is a common data format where multiple values are contained in one element. You can manipulate the individual values if you split them up into separate elements.
Create an array of numbers by including quotation marks around the whole set of numbers. If you omit the quotation marks, you get a collection of numbers instead of an array.

1
2
3
4
5
6
7
8
9
X = ["1,2,3,4,5,6,7,8,9,10"]

y_rd = sc.parallelize(X)

# Split the values at commas and add values in the positions 2 and 9 in the array. Keep in mind that an array starts with position 0. Use a backslash character, \, to break the line of code for clarity.
Sum_rd = y_rd.map(lambda y: y.split(",")).\
map(lambda y: (int(y[2])+int(y[9])))

Sum_rd.first()

Split and count text strings

1
2
3
4
5
6
7
8
Words = ["Hello Human. I'm Spark and I love running analysis on data."]
words_rd = sc.parallelize(Words)
words_rd.first()

Words_rd2 = words_rd.map(lambda line: line.split(" "))
Words_rd2.first()

Words_rd2.count()

Count words with a pair RDD

A common way to count the number of instances of words in an RDD is to create a pair RDD. A pair RDD converts each word into a key-value pair: the word is the key and the number 1 is the value. Because the values are all 1, when you add the values for a particular word, you get the number of instances of that word.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
z = ["First,Line", "Second,Line", "and,Third,Line"]
z_str_rdd = sc.parallelize(z)
z_str_rdd.first()

z_str_rdd_split_flatmap = z_str_rdd.flatMap(lambda line: line.split(","))
z_str_rdd_split_flatmap.collect()

countWords = z_str_rdd_split_flatmap.map(lambda word:(word,1))
countWords.collect()

# [('First', 1),
# ('Line', 1),
# ('Second', 1),
# ('Line', 1),
# ('and', 1),
# ('Third', 1),
# ('Line', 1)]


from operator import add
countWords2 = countWords.reduceByKey(add)
countWords2.collect()

# [('Second', 1), ('Line', 3), ('First', 1), ('and', 1), ('Third', 1)]

Filter data

The filter command creates a new RDD from another RDD based on a filter criteria. The filter syntax is:
.filter(lambda line: "Filter Criteria Value" in line)

1
2
3
4
words_rd3 = z_str_rdd_split_flatmap.filter(lambda line: "Second" in line) 

print ("The count of words " + str(words_rd3.first()))
print ("Is: " + str(words_rd3.count()))

Querying data

Enable SQL processing

The preferred method to enable SQL processing with Spark 2.0 is to use the new SparkSession object, but you can also create a SQLContext object.
Use the predefined Spark Context, sc, which contains the connection information for Spark, to create an SQLContext:

1
2
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Create a DataFrame

Instead of creating an RDD to read the file, you’ll create a Spark DataFrame. Unlike an RDD, a DataFrame creates a schema around the data, which supplies the necessary structure for SQL queries. A self-describing format like JSON is ideal for DataFrames, but many other file types are supported, including text (CSV) and Parquet.
Create a DataFrame:

1
example1_df = sqlContext.read.json("world_bank.json.gz")

Create a table

SQL statements must be run against a table. Create a table that’s a pointer to the DataFrame:

1
example1_df.registerTempTable("world_bank")

Run SQL queries

You must define a new DataFrame for the results of the SQL query and put the SQL statement inside the sqlContext.sql() method.
Run the following cell to select all columns from the table and print information about the resulting DataFrame and schema of the data:

1
2
3
4
5
temp_df =  sqlContext.sql("select * from world_bank")

print (type(temp_df))
print ("*" * 20)
print (temp_df)

Display query results with a pandas DataFrame

1
2
import pandas as pd
sqlContext.sql("select id, borrower from world_bank limit 2").toPandas()

Run a group by query

1
2
3
4
5
6
7
8
9
10
query = """
select
regionname ,
count(*) as project_count
from world_bank
group by regionname
order by count(*) desc
"""

sqlContext.sql(query).toPandas()

Run a subselect query

1
2
3
4
5
6
7
8
9
10
11
12
13
query = """

select * from
(select
regionname ,
count(*) as project_count
from world_bank
group by regionname
order by count(*) desc) table_alias
limit 2
"""

sqlContext.sql(query).toPandas()

Convert RDDs to DataFrames

If you want to run SQL queries on an existing RDD, you must convert the RDD to a DataFrame. The main difference between RDDs and DataFrames is whether the columns are named.
You’ll create an RDD and then convert it to a DataFrame in two different ways:

  • Apply a schema
  • Create rows with named columns

Create a simple RDD

1
2
3
4
5
6
7
8
9
10
11
import random

data_e2 = []
for x in range(1,6):
random_int = int(random.random() * 10)
data_e2.append([x, random_int, random_int^2])

rdd_example2 = sc.parallelize(data_e2)
print (rdd_example2.collect())

# [[1, 1, 3], [2, 3, 1], [3, 1, 3], [4, 8, 10], [5, 0, 2]]

Apply a schema

You’ll use the StructField method to create a schema object that’s based on a string, apply the schema to the RDD to create a DataFrame, and then create a table to run SQL queries on.
Define your schema columns as a string:

1
2
3
from pyspark.sql.types import *

schemaString = "ID VAL1 VAL2"

Assign header information with the StructField method and create the schema with the StructType method:

1
2
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

Apply the schema to the RDD with the createDataFrame method

1
schemaExample = sqlContext.createDataFrame(rdd_example2, schema)

Register the DataFrame as a table

1
2
# Register the DataFrame as a table
schemaExample.registerTempTable("example2")

View the data

1
2
3
4
5
6
7
8
9
print (schemaExample.collect())

# [Row(ID='1', VAL1='1', VAL2='3'), Row(ID='2', VAL1='3', VAL2='1'), Row(ID='3', VAL1='1', VAL2='3'), Row(ID='4', VAL1='8', VAL2='10'), Row(ID='5', VAL1='0', VAL2='2')]


# You can reference the columns names in DataFrames

for row in schemaExample.take(2):
print (row.ID, row.VAL1, row.VAL2)

Run a simple SQL query

1
sqlContext.sql("select * from example2").toPandas()

Create rows with named columns

You’ll create an RDD with named columns and then convert it to a DataFrame and a table.
Create a new RDD and specify the names of the columns with the map method:

1
2
3
4
5
6
7
from pyspark.sql import Row

rdd_example3 = rdd_example2.map(lambda x: Row(id=x[0], val1=x[1], val2=x[2]))

print (rdd_example3.collect())

# [Row(id=1, val1=1, val2=3), Row(id=2, val1=3, val2=1), Row(id=3, val1=1, val2=3), Row(id=4, val1=8, val2=10), Row(id=5, val1=0, val2=2)]

Convert rdd_example3 to a DataFrame and register an associated table

1
2
df_example3 = rdd_example3.toDF()
df_example3.registerTempTable("example3")

Run a simple SQL query

1
sqlContext.sql("select * from example3").toPandas()

Join tables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# Join tables example2 and example3 on the ID column:

query = """
select
*
from
example2 e2
inner join example3 e3 on
e2.ID = e3.id
"""

print (sqlContext.sql(query).toPandas())


# Alternatively, you can join DataFrames with a Python command instead of an SQL query:

df_example4 = df_example3.join(schemaExample, schemaExample["ID"] == df_example3["id"] )

for row in df_example4.take(5):
print (row)

Create SQL functions

You can create functions that run in SQL queries.
First, create a Python function and test it:

1
2
3
4
5
def simple_function(v):
return int(v * 10)

#test the function
print (simple_function(3))

Next, register the function as an SQL function with the registerFunction method:

1
sqlContext.registerFunction("simple_function", simple_function)

Now run the function in an SQL Statement:

1
2
3
4
5
6
7
8
9
10
11
query = """
select
ID,
VAL1,
VAL2,
simple_function(VAL1) as s_VAL1,
simple_function(VAL2) as s_VAL2
from
example2
"""
sqlContext.sql(query).toPandas()

The values in the VAL1 and VAL2 columns look like strings (10 characters instead of a number multiplied by 10). That’s because string is the default data type for columns in Spark DataFrames.

Convert a pandas DataFrame to a Spark DataFrame

Although pandas DataFrames display data in a friendlier format, Spark DataFrames can be faster and more scalable.
You’ll get a new data set, create a pandas DataFrame for it, and then convert the pandas DataFrame to a Spark DataFrame.

1
2
3
4
5
6
7
8
9
10
11

pandas_df = pd.read_csv("./GoSales_Tx.csv")
pandas_df.head()

# Convert the pandas DataFrame to a Spark DataFrame with the createDataFrame method. Remember using the createDataFrame method to convert an RDD to a Spark DataFrame
spark_df = sqlContext.createDataFrame(pandas_df)

# Register the Spark DataFrame as a table
spark_df.registerTempTable("gosales_tx")

sqlContext.sql("select * from gosales_tx limit 10").collect()

Spark machine learning

The Spark machine learning library makes practical machine learning scalable and easy. The library consists of common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering (this notebook!), dimensionality reduction, lower-level optimization primitives, and higher-level pipeline APIs.
The library has two packages:

  • spark.mllib contains the original API that handles data in RDDs. It’s in maintenance mode, but fully supported.
  • spark.ml contains a newer API for constructing ML pipelines. It handles data in DataFrames. It’s being actively enhanced.

Alternating least squares algorithm

The alternating least squares (ALS) algorithm provides collaborative filtering between customers and products to find products that the customers might like, based on their previous purchases or ratings.
The ALS algorithm creates a matrix of all customers versus all products. Most cells in the matrix are empty, which means the customer hasn’t bought that product. The ALS algorithm then fills in the probability of customers buying products that they haven’t bought yet, based on similarities between customer purchases and similarities between products. The algorithm uses the least squares computation to minimize the estimation errors, and alternates between fixing the customer factors and solving for product factors and fixing the product factors and solving for customer factors.
You don’t, however, need to understand how the ALS algorithm works to use it! Spark machine learning algorithms have default values that work well in most cases.

Get the data

The data set contains the transactions of an online retailer of gift items for the period from 01/12/2010 to 09/12/2011. Many of the customers are wholesalers.
You’ll be using a slightly modified version of UCI’s Online Retail Data Set.
Here’s a glimpse of the data:

1
2
rm 'OnlineRetail.csv.gz' -f
wget https://raw.githubusercontent.com/rosswlewis/RecommendationPoT/master/OnlineRetail.csv.gz
1
2
3
4
5
6
7
8
9
10
11

loadRetailData = sc.textFile("OnlineRetail.csv.gz")

for row in loadRetailData.take(5):
print (row)

# InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
# 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/10 8:26,2.55,17850,United Kingdom
# 536365,71053,WHITE METAL LANTERN,6,12/1/10 8:26,3.39,17850,United Kingdom
# 536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/10 8:26,2.75,17850,United Kingdom
# 536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/10 8:26,3.39,17850,United Kingdom

Prepare and shape the data

It’s been said that preparing and shaping data is 80% of a data scientist’s job. Having the right data in the right format is critical for getting accurate results.
To get the data ready, complete these tasks:

  • Format the data
  • Clean the data
  • Create a DataFrame
  • Remove unneeded columns

Format the data

Remove the header from the RDD and split the string in each row with a comma:

1
2
3
4
5
6
7
8
9
10
11
12
header = loadRetailData.first()
loadRetailData = loadRetailData.filter(lambda line: line != header).\
map(lambda l: l.split(","))

for row in loadRetailData.take(5):
print (row)

# ['536365', '85123A', 'WHITE HANGING HEART T-LIGHT HOLDER', '6', '12/1/10 8:26', '2.55', '17850', 'United Kingdom']
# ['536365', '71053', 'WHITE METAL LANTERN', '6', '12/1/10 8:26', '3.39', '17850', 'United Kingdom']
# ['536365', '84406B', 'CREAM CUPID HEARTS COAT HANGER', '8', '12/1/10 8:26', '2.75', '17850', 'United Kingdom']
# ['536365', '84029G', 'KNITTED UNION FLAG HOT WATER BOTTLE', '6', '12/1/10 8:26', '3.39', '17850', 'United Kingdom']
# ['536365', '84029E', 'RED WOOLLY HOTTIE WHITE HEART.', '6', '12/1/10 8:26', '3.39', '17850', 'United Kingdom']

Clean the data

Remove the rows that have incomplete data. Keep only the rows that meet the following criteria:

  • The purchase quantity is greater than 0
  • The customer ID not equal to 0
  • The stock code is not blank after you remove non-numeric characters
1
2
3
4
5
6
7
8
9
import re

loadRetailData = loadRetailData.filter(lambda l: int(l[3]) > 0\
and len(re.sub("\D", "", l[1])) != 0 \
and len(l[6]) != 0)

print (loadRetailData.take(2))

# [['536365', '85123A', 'WHITE HANGING HEART T-LIGHT HOLDER', '6', '12/1/10 8:26', '2.55', '17850', 'United Kingdom'], ['536365', '71053', 'WHITE METAL LANTERN', '6', '12/1/10 8:26', '3.39', '17850', 'United Kingdom']]

Create a DataFrame

First, create an SQLContext and map each line to a row:

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

#Convert each line to a Row.
loadRetailData = loadRetailData.map(lambda l: Row(inv=int(l[0]),\
stockCode=int(re.sub("\D", "", l[1])),\
description=l[2],\
quant=int(l[3]),\
invDate=l[4],\
price=float(l[5]),\
custId=int(l[6]),\
country=l[7]))

Create a DataFrame and show the inferred schema:

1
2
3
4
5
6
7
8
9
10
11
12
retailDf = sqlContext.createDataFrame(loadRetailData)
print (retailDf.printSchema())

# root
# |-- country: string (nullable = true)
# |-- custId: long (nullable = true)
# |-- description: string (nullable = true)
# |-- inv: long (nullable = true)
# |-- invDate: string (nullable = true)
# |-- price: double (nullable = true)
# |-- quant: long (nullable = true)
# |-- stockCode: long (nullable = true)

Register the DataFrame as a table so that you can run SQL queries on it and show the first two rows:

1
2
retailDf.registerTempTable("retailPurchases")
sqlContext.sql("SELECT * FROM retailPurchases limit 2").toPandas()

Remove unneeded columns

The only columns you need are custId, stockCode, and a new column, purch, which has a value of 1 to indicate that the customer purchased the product:

1
2
3
4
5
6
7
8
9
10
11
query = """
SELECT
custId, stockCode, 1 as purch
FROM
retailPurchases
group
by custId, stockCode"""
retailDf = sqlContext.sql(query)
retailDf.registerTempTable("retailDf")

sqlContext.sql("select * from retailDf limit 10").toPandas()

Split the data into three sets

You’ll split the data into three sets:

  • a testing data set (10% of the data)
  • a cross-validation data set (10% of the data)
  • a training data set (80% of the data)

Split the data randomly and create a DataFrame for each data set:

1
2
3
4
5
6
7
8
9
10
11
12
13
testDf, cvDf, trainDf = retailDf.randomSplit([.1,.1,.8],1)

print ("trainDf count: ", trainDf.count(), " example: ")
for row in trainDf.take(2): print (row)
print ()

print ("cvDf count: ", cvDf.count(), " example: ")
for row in cvDf.take(2): print (row)
print ()

print ("testDf count: ", testDf.count(), " example: ")
for row in testDf.take(2): print (row)
print ()

Build recommendation models

Machine learning algorithms have standard parameters and hyperparameters. Standard parameters specify data and options. Hyperparameters control the performance of the algorithm.
The ALS algorithm has these hyperparameters:

  • The rank hyperparameter represents the number of features. The default value of rank is 10.
  • The maxIter hyperparameter represents the number of iterations to run the least squares computation. The default value of maxIter is 10.
    Use the training DataFrame to train three models with the ALS algorithm with different values for the rank and maxIter hyperparameters. Assign the userCol, itemCol, and ratingCol parameters to the appropriate data columns. Set the implicitPrefs parameter to true so that the algorithm can predict latent factors.
1
2
3
4
5
6
7
8
9
10
11
12
from pyspark.ml.recommendation import ALS

als1 = ALS(rank=3, maxIter=15,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model1 = als1.fit(trainDf)

als2 = ALS(rank=15, maxIter=3,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model2 = als2.fit(trainDf)

als3 = ALS(rank=15, maxIter=15,userCol="custId",itemCol="stockCode",ratingCol="purch",implicitPrefs=True)
model3 = als3.fit(trainDf)

print ("The models are trained")

Test the models

First, test the three models on the cross-validation data set, and then on the testing data set.
You’ll know the model is accurate when the prediction values for products that the customers have already bought are close to 1.

Clean the cross validation data set

Remove any of the customers or products in the cross-validation data set that are not in the training data set:

1
2
3
4
5
6
7
8
9
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import BooleanType
customers = set(trainDf.rdd.map(lambda line: line.custId).collect())
stock = set(trainDf.rdd.map(lambda line: line.stockCode).collect())

print (cvDf.count())
cvDf = cvDf.rdd.filter(lambda line: line.stockCode in stock and\
line.custId in customers).toDF()
print (cvDf.count())

Run the models on the cross-validation data set

Run the model with the cross-validation DataFrame by using the transform function and print the first two rows of each set of predictions:

1
2
3
4
5
6
7
8
9
10
11
predictions1 = model1.transform(cvDf)
predictions2 = model2.transform(cvDf)
predictions3 = model3.transform(cvDf)

print (predictions1.take(2))
print (predictions2.take(2))
print (predictions3.take(2))

# [Row(custId=14606, stockCode=20735, purch=1, prediction=0.02294829487800598), Row(custId=16464, stockCode=20735, purch=1, prediction=0.00998256541788578)]
# [Row(custId=14606, stockCode=20735, purch=1, prediction=0.0441482812166214), Row(custId=16464, stockCode=20735, purch=1, prediction=0.004716672468930483)]
# [Row(custId=14606, stockCode=20735, purch=1, prediction=0.10467907041311264), Row(custId=16464, stockCode=20735, purch=1, prediction=0.0019032559357583523)]

Calculate the accuracy for each model

You’ll use the mean squared error calculation to determine accuracy by comparing the prediction values for products to the actual purchase values. Remember that if a customer purchased a product, the value in the purch column is 1. The mean squared error calculation measures the average of the squares of the errors between what is estimated and the existing data. The lower the mean squared error value, the more accurate the model.
For all predictions, subtract the prediction from the actual purchase value (1), square the result, and calculate the mean of all of the squared differences:

1
2
3
4
5
6
7
8
9
10
11
meanSquaredError1 = predictions1.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError2 = predictions2.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()
meanSquaredError3 = predictions3.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()

print ('Mean squared error = %.4f for our first model' % meanSquaredError1)
print ('Mean squared error = %.4f for our second model' % meanSquaredError2)
print ('Mean squared error = %.4f for our third model' % meanSquaredError3)

# Mean squared error = 0.7393 for our first model
# Mean squared error = 0.7011 for our second model
# Mean squared error = 0.6683 for our third model

The third model (model3) has the lowest mean squared error value, so it’s the most accurate.
Notice that of the three models, model3 has the highest values for the hyperparameters. At this point you might be tempted to run the model with even higher values for rank and maxIter. However, you might not get better results. Increasing the values of the hyperparameters increases the time for the model to run. Also, you don’t want to overfit the model so that it exactly fits the original data. In that case, you wouldn’t get any recommendations! For best results, keep the values of the hyperparameters close to the defaults.

Confirm the best model

Now run model3 on the testing data set to confirm that it’s the best model. You want to make sure that the model is not over-matched to the cross-validation data. It’s possible for a model to match one subset of the data well but not another. If the values of the mean squared error for the testing data set and the cross-validation data set are close, then you’ve confirmed that the model works for all the data.
Clean the testing data set, run model3 on the testing data set, and calculate the mean squared error:

1
2
3
4
5
6
7
8
9
filteredTestDf = testDf.rdd.filter(lambda line: line.stockCode in stock and\
line.custId in customers).toDF()
predictions4 = model3.transform(filteredTestDf)
meanSquaredError4 = predictions4.rdd.map(lambda line: (line.purch - line.prediction)**2).mean()

print ('Mean squared error = %.4f for our best model' % meanSquaredError4)

# Mean squared error = 0.6693 for our best model
# That's pretty close. The model works for all the data.

Implement the model

Use the best model to predict which products a specific customer might be interested in purchasing.

Create a DataFrame for the customer and all products

Create a DataFrame in which each row has the customer ID (15544) and a product ID

1
2
3
4
5
6
7
8
9
10
from pyspark.sql.functions import lit

stock15544 = set(trainDf.filter(trainDf['custId'] == 15544).rdd.map(lambda line: line.stockCode).collect())

userItems = trainDf.select("stockCode").distinct().\
withColumn('custId', lit(15544)).\
rdd.filter(lambda line: line.stockCode not in stock15544).toDF()

for row in userItems.take(5):
print (row.stockCode, row.custId)

Rate each product

Run the transform function to create a prediction value for each product:

1
2
3
4
userItems = model3.transform(userItems)

for row in userItems.take(5):
print (row.stockCode, row.custId, row.prediction)

Find the top recommendations

Print the top five product recommendations

1
2
3
4
userItems.registerTempTable("predictions")
query = "select * from predictions order by prediction desc limit 5"

sqlContext.sql(query).toPandas()
Donate article here