[notebook] MLlib: machine learning on PySpark

Today I gave a tutorial on MLlib in PySpark. I post the notebook here for whoever could be interested =)

MLlib is a package of Spark (available also in PySpark).

MLlib is just a package of Spark, therefore, no need for extra intallation (once you have your Spark up and running). There are different (sub-)packages available in MLlib that can be useful for machine learning on big data.

In this lab we will see something from Statistics, Regression, Classification, and Clustering. But the documentation often comes with example, so I enourage you to take a look: MLlib on PySpark

Dataset

In this lab, we will use data about the 2016 US Presidential elections. The data is available on Kaggle: here

HDFS setting up

We make sure that there is a folder for this lab (if not mkdir). And we upload to HDFS the files from the Kaggle dataset on the 2016 US presidential elections.

In [2]:
%%bash
hdfs dfs -ls /user/cloudera/laura
Found 3 items
drwxr-xr-x   - cloudera cloudera          0 2016-02-08 05:34 /user/cloudera/laura/data
drwxr-xr-x   - cloudera cloudera          0 2016-04-19 07:46 /user/cloudera/laura/input
drwxr-xr-x   - cloudera cloudera          0 2016-06-09 00:57 /user/cloudera/laura/lab4
In [3]:
%%bash
hdfs dfs -mkdir /user/cloudera/laura/lab4
In [4]:
%%bash
hdfs dfs -mkdir /user/cloudera/laura/lab4/presidentials

We are interested in two files:

  • county_facts.csv: that contains demographic statistics on US counties
  • primary_results.csv: that contains records on votes for each county primary consultation
In [5]:
%%bash
hdfs dfs -put /home/cloudera/Desktop/Lab4/2016_presidential_election/county_facts.csv /user/cloudera/laura/lab4/presidentials/
hdfs dfs -put /home/cloudera/Desktop/Lab4/2016_presidential_election/primary_results.csv /user/cloudera/laura/lab4/presidentials/
In [ ]:
%%bash
hdfs dfs -ls /user/cloudera/laura/lab4/presidentials

Looking at the demographics

In [6]:
counties_raw = sc.textFile('hdfs://localhost//user/cloudera/laura/lab4/presidentials/county_facts.csv')
In [7]:
counties_raw.count()
Out[7]:
3196
In [8]:
counties_raw.take(3)
Out[8]:
[u'fips,area_name,state_abbreviation,PST045214,PST040210,PST120214,POP010210,AGE135214,AGE295214,AGE775214,SEX255214,RHI125214,RHI225214,RHI325214,RHI425214,RHI525214,RHI625214,RHI725214,RHI825214,POP715213,POP645213,POP815213,EDU635213,EDU685213,VET605213,LFE305213,HSG010214,HSG445213,HSG096213,HSG495213,HSD410213,HSD310213,INC910213,INC110213,PVY020213,BZA010213,BZA110213,BZA115213,NES010213,SBO001207,SBO315207,SBO115207,SBO215207,SBO515207,SBO415207,SBO015207,MAN450207,WTN220207,RTN130207,RTN131207,AFN120207,BPS030214,LND110210,POP060210',
 u'0,United States,,318857056,308758105,3.3,308745538,6.2,23.1,14.5,50.8,77.4,13.2,1.2,5.4,0.2,2.5,17.4,62.1,84.9,12.9,20.7,86.0,28.8,21263779,25.5,133957180,64.9,26.0,176700,115610216,2.63,28155,53046,15.4,7488353,118266253,2.0,23005620,27092908,7.1,0.9,5.7,0.1,8.3,28.8,5319456312,4174286516,3917663456,12990,613795732,1046363,3531905.43,87.4',
 u'1000,Alabama,,4849377,4780127,1.4,4779736,6.1,22.8,15.3,51.5,69.7,26.7,0.7,1.3,0.1,1.5,4.1,66.2,85.0,3.5,5.2,83.1,22.6,388865,24.2,2207912,69.7,15.9,122500,1838683,2.55,23680,43253,18.6,97578,1603100,1.1,311578,382350,14.8,0.8,1.8,0.1,1.2,28.1,112858843,52252752,57344851,12364,6426342,13369,50645.33,94.4']

What do all this columns mean? Together with the dataset, we can find a dictionary that explains the meaning of all of the demographic columns (county_facts_dictionary.csv)

In [9]:
%%bash
cat /home/cloudera/Desktop/Lab4/2016_presidential_election/county_facts_dictionary.csv
column_name,description
PST045214,"Population, 2014 estimate"
PST040210,"Population, 2010 (April 1) estimates base"
PST120214,"Population, percent change - April 1, 2010 to July 1, 2014"
POP010210,"Population, 2010"
AGE135214,"Persons under 5 years, percent, 2014"
AGE295214,"Persons under 18 years, percent, 2014"
AGE775214,"Persons 65 years and over, percent, 2014"
SEX255214,"Female persons, percent, 2014"
RHI125214,"White alone, percent, 2014"
RHI225214,"Black or African American alone, percent, 2014"
RHI325214,"American Indian and Alaska Native alone, percent, 2014"
RHI425214,"Asian alone, percent, 2014"
RHI525214,"Native Hawaiian and Other Pacific Islander alone, percent, 2014"
RHI625214,"Two or More Races, percent, 2014"
RHI725214,"Hispanic or Latino, percent, 2014"
RHI825214,"White alone, not Hispanic or Latino, percent, 2014"
POP715213,"Living in same house 1 year & over, percent, 2009-2013"
POP645213,"Foreign born persons, percent, 2009-2013"
POP815213,"Language other than English spoken at home, pct age 5+, 2009-2013"
EDU635213,"High school graduate or higher, percent of persons age 25+, 2009-2013"
EDU685213,"Bachelor's degree or higher, percent of persons age 25+, 2009-2013"
VET605213,"Veterans, 2009-2013"
LFE305213,"Mean travel time to work (minutes), workers age 16+, 2009-2013"
HSG010214,"Housing units, 2014"
HSG445213,"Homeownership rate, 2009-2013"
HSG096213,"Housing units in multi-unit structures, percent, 2009-2013"
HSG495213,"Median value of owner-occupied housing units, 2009-2013"
HSD410213,"Households, 2009-2013"
HSD310213,"Persons per household, 2009-2013"
INC910213,"Per capita money income in past 12 months (2013 dollars), 2009-2013"
INC110213,"Median household income, 2009-2013"
PVY020213,"Persons below poverty level, percent, 2009-2013"
BZA010213,"Private nonfarm establishments, 2013"
BZA110213,"Private nonfarm employment,  2013"
BZA115213,"Private nonfarm employment, percent change, 2012-2013"
NES010213,"Nonemployer establishments, 2013"
SBO001207,"Total number of firms, 2007"
SBO315207,"Black-owned firms, percent, 2007"
SBO115207,"American Indian- and Alaska Native-owned firms, percent, 2007"
SBO215207,"Asian-owned firms, percent, 2007"
SBO515207,"Native Hawaiian- and Other Pacific Islander-owned firms, percent, 2007"
SBO415207,"Hispanic-owned firms, percent, 2007"
SBO015207,"Women-owned firms, percent, 2007"
MAN450207,"Manufacturers shipments, 2007 ($1,000)"
WTN220207,"Merchant wholesaler sales, 2007 ($1,000)"
RTN130207,"Retail sales, 2007 ($1,000)"
RTN131207,"Retail sales per capita, 2007"
AFN120207,"Accommodation and food services sales, 2007 ($1,000)"
BPS030214,"Building permits, 2014"
LND110210,"Land area in square miles, 2010"
POP060210,"Population per square mile, 2010"
In [10]:
# remove the header from our RDD
counties = counties_raw.filter(lambda x : x.split(',')[0]!='fips')
counties.take(2)
Out[10]:
[u'0,United States,,318857056,308758105,3.3,308745538,6.2,23.1,14.5,50.8,77.4,13.2,1.2,5.4,0.2,2.5,17.4,62.1,84.9,12.9,20.7,86.0,28.8,21263779,25.5,133957180,64.9,26.0,176700,115610216,2.63,28155,53046,15.4,7488353,118266253,2.0,23005620,27092908,7.1,0.9,5.7,0.1,8.3,28.8,5319456312,4174286516,3917663456,12990,613795732,1046363,3531905.43,87.4',
 u'1000,Alabama,,4849377,4780127,1.4,4779736,6.1,22.8,15.3,51.5,69.7,26.7,0.7,1.3,0.1,1.5,4.1,66.2,85.0,3.5,5.2,83.1,22.6,388865,24.2,2207912,69.7,15.9,122500,1838683,2.55,23680,43253,18.6,97578,1603100,1.1,311578,382350,14.8,0.8,1.8,0.1,1.2,28.1,112858843,52252752,57344851,12364,6426342,13369,50645.33,94.4']

MLlib Statistics package

MLlib contains a package that can be very useful for initial exploration of data: pyspark.mllib.stat

This package contains different classes based on the data you need to analyze or the analysis you want to do:

  • Statistics
  • MultivariateStatisticalSummary
  • KernelDensity
In [11]:
from pyspark.mllib.stat import Statistics

We can look at statistics of the columns in our RDD using colStats().

In [12]:
# we need an RDD of Vectors to use Statistics.colStats()
counties_vec = counties.map(lambda x: x.split(','))\
                                .map(lambda x : [float(i) for i in x[3:]])
counties_vec.take(1)
Out[12]:
[[318857056.0,
  308758105.0,
  3.3,
  308745538.0,
  6.2,
  ...]]
In [13]:
# Compute column summary statistics.
summary = Statistics.colStats(counties_vec)
print summary.mean()
print summary.variance()
print summary.numNonzeros()
[  2.99396297e+05   2.89913714e+05   5.08544601e-01   2.89901914e+05
   5.90078247e+00   2.25452895e+01   1.75246322e+01   4.99132707e+01
   8.49532707e+01   9.26647887e+00   2.24381847e+00   1.44723005e+00
   1.26322379e-01   1.92482003e+00   9.04406886e+00   7.69373083e+01
   8.63628169e+01   4.59580595e+00   9.34331768e+00   8.46027230e+01
   1.99062285e+01   1.99659897e+04   2.29886385e+01   1.25781390e+05
   7.20585290e+01   1.25920814e+01   1.31963693e+05   1.08554193e+05
   2.52687011e+00   2.36970138e+04   4.60605512e+04   1.66477308e+01
   7.02574085e+03   1.10074848e+05   6.89139280e-01   2.16015124e+04
   2.56490767e+04   2.44723005e+00   5.98654147e-01   8.54741784e-01
   1.97809077e-02   1.73505477e+00   1.81081690e+01   4.73232906e+06
   3.80413212e+06   3.67750406e+06   1.02873252e+04   5.73988567e+05
   9.82300469e+02   3.31634362e+03   2.61265227e+02]
[  3.32752206e+13   3.11830767e+13   1.74729483e+01   3.11805495e+13
   1.46876269e+00   1.16888530e+01   1.96578057e+01   5.88094468e+00
   2.65389528e+02   2.06864771e+02   5.69361758e+01   7.94205479e+00
   9.25578049e-01   2.38452779e+00   1.80427093e+02   3.96281369e+02
   1.91984103e+01   3.18840338e+01   1.32703696e+02   4.73005655e+01
   7.84456876e+01   1.46785383e+11   2.98007444e+01   5.84924866e+12
   6.46736866e+01   8.86546931e+01   6.11655397e+09   4.35924928e+12
   6.23367881e-02   3.14777636e+07   1.41938697e+08   4.17536040e+01
   1.83380150e+10   4.57484851e+12   3.44653391e+01   1.74252146e+11
   2.41034070e+11   4.74217104e+01   1.43529631e+01   7.24941004e+00
   1.47917923e-01   3.87231603e+01   1.64085741e+02   9.26120686e+15
   5.79679129e+15   5.01672182e+15   2.92620714e+07   1.23839068e+14
   3.61933527e+08   4.09971684e+09   2.95428262e+06]
[ 3194.  3194.  3157.  3195.  3193.  3193.  3194.  3194.  3194.  3184.
  3191.  3180.  1684.  3192.  3194.  3194.  3195.  3182.  3190.  3195.
  3195.  3195.  3195.  3194.  3194.  3189.  3194.  3195.  3195.  3195.
  3195.  3195.  3194.  3155.  3107.  3193.  3019.   816.   540.   787.
    89.   831.  2225.  1601.  2105.  3146.  3146.  2910.  2826.  3195.
  3194.]
In [14]:
cnt = summary.count()
print [i/cnt for i in summary.numNonzeros()]
[0.99968701095461654, 0.99968701095461654, 0.98810641627543039, 1.0, 0.99937402190923319, 0.99937402190923319, 0.99968701095461654, 0.99968701095461654, 0.99968701095461654, 0.99655712050078249, 0.99874804381846638, 0.99530516431924887, 0.52707355242566511, 0.99906103286384973, 0.99968701095461654, 0.99968701095461654, 1.0, 0.99593114241001568, 0.99843505477308292, 1.0, 1.0, 1.0, 1.0, 0.99968701095461654, 0.99968701095461654, 0.99812206572769957, 0.99968701095461654, 1.0, 1.0, 1.0, 1.0, 1.0, 0.99968701095461654, 0.98748043818466358, 0.97245696400625981, 0.99937402190923319, 0.94491392801251961, 0.25539906103286386, 0.16901408450704225, 0.24632237871674492, 0.027856025039123631, 0.26009389671361505, 0.69640062597809071, 0.50109546165884189, 0.65884194053208134, 0.98466353677621288, 0.98466353677621288, 0.91079812206572774, 0.88450704225352117, 1.0, 0.99968701095461654]

We can also compute the correlation between lists or between all the pairs of columns in an RDD using corr(). This method computes the correlation using Pearson’s method by default. Enter “spearman” for Spearman’s method.

In [15]:
seriesX = sc.parallelize([5, 8 , 10]) # a series
seriesY = sc.parallelize([10, 16 , 20]) # another series with the same number 
                                        # of partitions and cardinality as seriesX

print Statistics.corr(seriesX, seriesY, method="pearson")
1.0
In [16]:
data = counties_vec # an RDD of Vectors

print Statistics.corr(data, method="pearson")
[[ 1.          0.99998926  0.03520207 ...,  0.99396947  0.97479657
   0.01685583]
 [ 0.99998926  1.          0.03447091 ...,  0.99357885  0.97483096
   0.01674188]
 [ 0.03520207  0.03447091  1.         ...,  0.04873137  0.02659992
   0.10901011]
 ..., 
 [ 0.99396947  0.99357885  0.04873137 ...,  1.          0.97361771
   0.01488022]
 [ 0.97479657  0.97483096  0.02659992 ...,  0.97361771  1.         -0.00524069]
 [ 0.01685583  0.01674188  0.10901011 ...,  0.01488022 -0.00524069  1.        ]]

Let’s look at some of the correlations

In [ ]:
counties.take(2)
In [17]:
pop = counties.map(lambda x : x.split(','))\
                        .map(lambda x : [float(x[3]), float(x[4])])
print 'Num rows:', pop.count()
print '[pop2014, pop2010]:', pop.take(2)
Num rows: 3195
[pop2014, pop2010]: [[318857056.0, 308758105.0], [4849377.0, 4780127.0]]
In [18]:
Statistics.corr(pop)
Out[18]:
array([[ 1.        ,  0.99998926],
       [ 0.99998926,  1.        ]])
In [19]:
%matplotlib inline
import matplotlib.pyplot as plt
In [20]:
pop2014 = pop.map(lambda x : x[0]).collect()
pop2010 = pop.map(lambda x : x[1]).collect()
plt.scatter(pop2014,pop2010)
plt.show()

Predictions with MLlib

MLlib has many packages for creating predicting models:

  • Linear regression
  • Logistic regression
  • SVM
In [21]:
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
In [22]:
data = [
[0.0, 0.0], #target variable, and observation(s)
[2.0, 1.0],
[4.0, 2.0],
[6.0, 3.0]
]

data = sc.parallelize(data)

labeledData = data.map(lambda x : LabeledPoint(x[0], x[1:]))

print 'LabeledPoint(target variable, [observations])'
print labeledData.take(2)
LabeledPoint(target variable, [observations])
[LabeledPoint(0.0, [0.0]), LabeledPoint(2.0, [1.0])]
In [23]:
model = LinearRegressionWithSGD.train(labeledData, iterations=10)

print 'model.predict([1.0]) = predicted value for the target variable' 
print 'model.predict([1.0]) =', model.predict([1.0])

def plot_linreg(data, model):
    data_x = data.map(lambda x : x.features[0])
    data_y = data.map(lambda x : x.label)
    min_pred = model.predict([data_x.min()])
    max_pred = model.predict([data_x.max()])

    plt.scatter(data_x.collect(),
                data_y.collect())
    plt.plot([data_x.min(),data_x.max()], [min_pred, max_pred], color='red')
    plt.show()
    
plot_linreg(labeledData, model)
model.predict([1.0]) = predicted value for the target variable
model.predict([1.0]) = 1.99813254286

Before trying it on our data, we will see another interesting package of MLlib: the feature package. We will use a class to help us standardize features: StandardScaler (Experimental)

“Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set.”

In [24]:
from pyspark.mllib.feature import StandardScaler

example_data = sc.parallelize([[100, 50], [50, 0]])

standardizer = StandardScaler(True, True)
scaler = standardizer.fit(example_data)
data_std = scaler.transform(example_data)

print 'original:', example_data.take(2)
print 'scaled:', data_std.take(2)
original: [[100, 50], [50, 0]]
scaled: [DenseVector([0.7071, 0.7071]), DenseVector([-0.7071, -0.7071])]
In [25]:
standardizer = StandardScaler(True, True)
model = standardizer.fit(data)
data_std = model.transform(data)

labeledData = data_std.map(lambda x : LabeledPoint(x[0], x[1:]))

model = LinearRegressionWithSGD.train(labeledData, iterations=10)

plot_linreg(labeledData, model)

Exercise:

Build a linear regression of the pop data (the RDD that we built before):

  1. scale the input data using StandardScaler
  2. build an RDD of LabeledPoint
  3. train the model on the data
  4. plot the result
In [26]:
print '[population 2014, population 2010]'
print pop.take(2)
[population 2014, population 2010]
[[318857056.0, 308758105.0], [4849377.0, 4780127.0]]
In [27]:
# use StandardScaler to scale the input data
standardizer = StandardScaler(True, True)
model = standardizer.fit(pop)
data_std = model.transform(pop)
In [28]:
#prepare the data to be given as input to training a linear regression model
#i.e. build an RDD of LabeledPoint objects
labeledData = data_std.map(lambda x : LabeledPoint(x[0], x[1:]))

model = LinearRegressionWithSGD.train(labeledData, iterations=10)

#plot results
plot_linreg(labeledData, model)
In [29]:
from pyspark.mllib.evaluation import RegressionMetrics
predictionAndObservations = labeledData.map(lambda p: (float(model.predict(p.features)), p.label))

print '(prediction, observed target variable)'
print predictionAndObservations.take(2)

metrics = RegressionMetrics(predictionAndObservations)
print 'Explained variance:', metrics.explainedVariance
print 'Mean Absolute Error:', metrics.meanAbsoluteError
print 'Mean Squared Error:', metrics.meanSquaredError
print 'Root Mean Squared Error:', metrics.rootMeanSquaredError
print 'R^2:', metrics.r2
(prediction, observed target variable)
[(55.23398764649117, 55.223964374631365), (0.8040128356704407, 0.7887676362815622)]
Explained variance: 0.999482132865
Mean Absolute Error: 0.000791867950178
Mean Squared Error: 2.14730061357e-05
Root Mean Squared Error: 0.00463389751027
R^2: 0.999978520271

—————————-

Now let’s look at the votes

Now we load the data about the votes, and try to look for correlations among candidates…

In [30]:
votes_raw = sc.textFile('hdfs://localhost//user/cloudera/laura/lab4/presidentials/primary_results.csv')
In [32]:
votes_raw.take(2)
Out[32]:
[u'state,state_abbreviation,county,fips,party,candidate,votes,fraction_votes',
 u'Alabama,AL,Autauga,1001,Republican,Donald Trump,5387,0.445']
In [33]:
votes_header = votes_raw.filter(lambda x : x.split(',')[0]=='state').collect()
In [34]:
votes = votes_raw.filter(lambda x : x.split(',')[0]!='state')\
                    .map(lambda x : [str(i) for i in x.split(',')])\
                    .map(lambda x : [x[0], x[1], x[2], x[3], x[4], x[5], int(x[6]), 
                                     float(x[7])])
votes.count()
Out[34]:
13212
In [35]:
print 'Number of rows per party:'
print votes.map(lambda x : (x[4], 1)).reduceByKey(lambda v,u: v+u).collect()
Number of rows per party:
[('Republican', 9152), ('Democrat', 4060)]
In [36]:
votes.map(lambda x : (x[4], x[5])).distinct()\
            .sortBy(lambda x : x[0]).collect()
Out[36]:
[('Democrat', 'Hillary Clinton'),
 ('Democrat', 'Bernie Sanders'),
 ('Democrat', ' Uncommitted'),
 ('Democrat', ' No Preference'),
 ('Democrat', "Martin O'Malley"),
 ('Republican', 'Rick Santorum'),
 ('Republican', 'Mike Huckabee'),
 ('Republican', 'Jeb Bush'),
 ('Republican', 'John Kasich'),
 ('Republican', 'Ted Cruz'),
 ('Republican', 'Ben Carson'),
 ('Republican', 'Rand Paul'),
 ('Republican', 'Donald Trump'),
 ('Republican', 'Marco Rubio'),
 ('Republican', 'Chris Christie'),
 ('Republican', 'Carly Fiorina')]

11 candidates for the republicans and 3 (plus undecided) for the democrats…

In [37]:
print 'Number of votes per party:'
print votes.map(lambda x : (x[4], x[6])).reduceByKey(lambda v,u: v+u).collect()
Number of votes per party:
[('Republican', 18324120), ('Democrat', 11919638)]

Does the county matter?

Now we want to see whether using some characteristics of a county we can predict which party will get more votes.

In [38]:
print votes.take(2)
print counties.take(1)
[['Alabama', 'AL', 'Autauga', '1001', 'Republican', 'Donald Trump', 5387, 0.445], ['Alabama', 'AL', 'Autauga', '1001', 'Republican', 'Ted Cruz', 2482, 0.205]]
[u'0,United States,,318857056,308758105,3.3,308745538,6.2,23.1,14.5,50.8,77.4,13.2,1.2,5.4,0.2,2.5,17.4,62.1,84.9,12.9,20.7,86.0,28.8,21263779,25.5,133957180,64.9,26.0,176700,115610216,2.63,28155,53046,15.4,7488353,118266253,2.0,23005620,27092908,7.1,0.9,5.7,0.1,8.3,28.8,5319456312,4174286516,3917663456,12990,613795732,1046363,3531905.43,87.4']

Our features:

  1. Population in 2010
  2. Population percent change
  3. Percent of persons 65 years and over
  4. Percent of female persons
  5. Percent of persons with a language other than English spoken at home (age 5+)’
  6. Percent of persons high school graduate or higher
  7. Veterans
In [39]:
features = counties.map(lambda x: x.split(','))\
                        .map(lambda x : (str(x[0]), [float(x[6]),float(x[5]),float(x[9]),float(x[10]),float(x[21]),float(x[22]),float(x[24])])) 
features.take(3)
Out[39]:
[('0', [308745538.0, 3.3, 14.5, 50.8, 20.7, 86.0, 21263779.0]),
 ('1000', [4779736.0, 1.4, 15.3, 51.5, 5.2, 83.1, 388865.0]),
 ('1001', [54571.0, 1.5, 13.8, 51.4, 3.5, 85.6, 5922.0])]
In [40]:
def majorityCount(v,u):
    if v[1]>u[1]:
        return v
    else:
        return u

labels = votes.map(lambda x : ((x[3], x[4]), x[6])).reduceByKey(lambda v,u: v+u)\
                .map(lambda x : (x[0][0], (x[0][1], x[1]))).reduceByKey(majorityCount)\
                .map(lambda x : (x[0], x[1][0]))
print '(county ID, majority voted party)'
print labels.take(4)
(county ID, majority voted party)
[('48441', 'Republican'), ('48117', 'Republican'), ('48443', 'Democrat'), ('48113', 'Republican')]
In [41]:
# check that there are no duplicates
print labels.count()
print labels.map(lambda x : x[0]).distinct().count()
2048
2048
In [42]:
from pyspark.mllib.regression import LabeledPoint
In [43]:
#join the two RDDs
joint = labels.join(features)
print '(county id, (majority voted party, [features]))'
print joint.take(2)
(county id, (majority voted party, [features]))
[('48441', ('Republican', [131506.0, 2.8, 13.9, 51.1, 15.0, 85.4, 12003.0])), ('48117', ('Republican', [19372.0, -0.9, 11.9, 50.4, 51.6, 69.5, 690.0]))]
In [44]:
# Parse the data
def parsePoint(x):
    feat = x[1][1]
    if x[1][0]=='Democrat':
        lab = 0
    else:
        lab = 1
    return LabeledPoint(lab, feat)


labeledData = joint.map(parsePoint)
labeledData.take(3)
Out[44]:
[LabeledPoint(1.0, [131506.0,2.8,13.9,51.1,15.0,85.4,12003.0]),
 LabeledPoint(1.0, [19372.0,-0.9,11.9,50.4,51.6,69.5,690.0]),
 LabeledPoint(1.0, [2368139.0,6.4,9.7,50.7,40.6,77.4,106414.0])]
In [45]:
labeledData.map(lambda p : (p.label, 1)).reduceByKey(lambda u,v: u+v).collect()
Out[45]:
[(0.0, 468), (1.0, 1580)]
In [46]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

# Build the model
model = LogisticRegressionWithLBFGS.train(labeledData, iterations=100)

# Evaluate the model on training data
obsAndPreds = labeledData.map(lambda p: (p.label, model.predict(p.features)))
print  obsAndPreds.take(4)
trainErr = obsAndPreds.filter(lambda (v,p) : v!=p).count()/float(labeledData.count())
print("Training error = " + str(trainErr))
[(1.0, 1), (1.0, 1), (1.0, 0), (1.0, 1)]
Training error = 0.23095703125

Now we evaluate the prediction error

Before we computed how well the model fits the training data. Now we want to estimate how good the model would do on a new set of data (that was not used for training).

In [47]:
splits = labeledData.randomSplit([0.7,0.3],seed=1234)
trainData = splits[0]
testData = splits[1]
print 'Num of rows in training data:', trainData.count()
print 'Num of rows in test data:', testData.count()
Num of rows in training data: 1437
Num of rows in test data: 611
In [48]:
# Build the model
model = LogisticRegressionWithLBFGS.train(trainData, iterations=100)

# Evaluate the model on testing data
valuesAndPreds = testData.map(lambda p: (float(p.label), float(model.predict(p.features))))
print 'Examples:', valuesAndPreds.take(4)

trainErr = valuesAndPreds.filter(lambda (v,p) : v!=p).count()/float(labeledData.count())
print("Test error = " + str(trainErr))
Examples: [(1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0)]
Test error = 0.06005859375

MLlib provides also other ways for evaluating the results of models:

In [49]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

metrics = BinaryClassificationMetrics(valuesAndPreds)
print metrics.areaUnderROC
0.714180672269

MLlib can do also clustering

One of the packages of MLlib is clustering:

  • K-means
  • Latent Dirichlet Allocation (LDA)
  • (and others in experimental status)
In [50]:
from pyspark.mllib.clustering import KMeans

from numpy import array

#4 data points (0.0, 0.0), (1.0, 1.0), (9.0, 8.0) (8.0, 9.0)
data = [[0.0,0.0], [1.0,1.0], [9.0,8.0], [8.0,9.0]]

data = sc.parallelize(data)

#Generate K means
model = KMeans.train(data, 2, maxIterations=10, 
                     runs=30, initializationMode="random")
In [51]:
labels = []
for d in data.collect():
    if model.predict(d)==1:
        labels.append('red')
    else:
        labels.append('blue')

data_x = data.map(lambda x : x[0]).collect()
data_y = data.map(lambda x : x[1]).collect()
        
plt.scatter(data_x, data_y, color=labels)
plt.show()

Exercise:

We want to cluster the presidential candidates based on their performance in the primaries.

  1. For each candidate, we count the total number of votes, and the total number of primary contests he/she took part in.
  2. We use these number as 2-D points representing each candidate
  3. We use K-mean on these points (with k=2 and k=3)
In [52]:
votes_header
Out[52]:
[u'state,state_abbreviation,county,fips,party,candidate,votes,fraction_votes']
In [63]:
#for each candidate we want the total number of votes
 #the num of counties he participated in
cand = votes.map(lambda x : (x[5], (x[6], 1)))\
            .reduceByKey(lambda v,u: (v[0]+u[0], v[1]+u[1]))
cand.take(2)
# output should be like:
# [('Chris Christie', (24347, 108)), ('Donald Trump', (6944654, 1881))]
Out[63]:
[('Rand Paul', (8460, 98)), (' No Preference', (313, 8))]
In [64]:
cand.count() #16
Out[64]:
16
In [65]:
#split the data into names and points
# names should be an array (collected)
# points should be an RDD, where each point should be an array: .. [8460, 98], [313, 8] ..
names = cand.map(lambda x : x[0]).collect()#your code
points = cand.map(lambda x : [x[1][0],x[1][1]])#your code
In [66]:
print points.take(2)
[[8460, 98], [313, 8]]
In [67]:
#Generate K means with k=2
model = KMeans.train(points, 2, maxIterations=10, 
                     runs=30, initializationMode="random")
In [68]:
model.predict(points.take(1)[0])
Out[68]:
0
In [69]:
data = array(points.collect())
labels = []
for d in data:
    if model.predict(d)==1:
        labels.append('red')
    else:
        labels.append('blue')

plt.scatter(data[:,0], data[:,1], color=labels)
plt.show()
In [70]:
#Generate K means with k = 3
model = KMeans.train(points, 3, maxIterations=10, 
                     runs=30, initializationMode="random")
In [71]:
data = array(points.collect())
colors = ['blue', 'green', 'red']
labels = []
for i in range(len(data)):
    d = data[i]
    labels.append(colors[model.predict(d)])
    if len(names)>0:
        plt.text(d[0], d[1], names[i])
#         plt.figure(figsize=(3,4))

plt.scatter(data[:,0], data[:,1], color=labels)
plt.show()
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s