0%

SparkML Examples

Supervised Learning

1
wget https://github.com/IBM/coursera/raw/master/hmp.parquet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')
# two class
df.createOrReplaceTempView('df')
df_two_class = spark.sql("select * from df where class in ('Use_telephone','Standup_chair')")


# train test split
splits = df_two_class.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]




from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler


# transformer
indexer = StringIndexer(inputCol="class", outputCol="label")
encoder = OneHotEncoder(inputCol="label", outputCol="labelVec")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
outputCol="features")
normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")



# modeling
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1)



## pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer,lsvc])


# fit and predict
model = pipeline.fit(df_train)
prediction = model.transform(df_train)



from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(prediction)



# testing
prediction = model.transform(df_test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(prediction)

Unsupervised Learning

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')


from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
outputCol="features")


from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(13).setSeed(1)


from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, kmeans])


df.createOrReplaceTempView('df')
df = spark.sql("select * from df where class in ('Brush_teeth','Climb_stairs')")


model = pipeline.fit(df)

wssse = model.stages[1].computeCost(vectorAssembler.transform(df))
print("Within Set Sum of Squared Errors = " + str(wssse))