Monday, June 8, 2015

Introduction to Apache Spark

Introduction to Apache Spark

These are some rough notes for a talk to be given at the June 2015 Chadoop meeting.

What is Apache Spark?

  1. Enterprise parallel processing framework based on resilient distributed datasets (RDD)
  2. for java, scala, python and sql
  3. with memory caching (memory reusable, used as progressive enhancement)

How can I get Spark up and running quickly?

–> 1. Docker on collection of various firewalled hardware (described here)
2. Amazon EC2 script (included in Spark distribution)

How does it perform / scale?

  1. Resilient against (some) worker failures
  2. Can add / remove workers [Amazon spot market compatible]
    –> 3. Can perform well or poorly depending on classical parallel processing concerns (cpu vs memory vs disk vs net bottlenecks)

Quirks

  • Transformations, such as reading a file, mapping data over a function, etc. do not instantiate.
  • Actions do – this is good and allows you to be deliberate about exactly how large intermediate results are to be handled (cached in memory, on disk, recomputed each time).
  • Spark is not currently hierarchical. Networking is not layered as App/Driver <-> Master <-> Workers. Important make sure all workstations/masters/workers can connect to each other.
  • Random port numbers.
  • Networking is not-NAT friendly and the documentation says little about this issue. This can be a blocker for simple combining your cluster of 3-4 home PCs with extra Amazon EC2 machines, as the home machines may be on the 192.168 LAN and the Amazon EC2 machines on a difference Amazon LAN. If the spark master has a local IP like 192.168.1.100 and a router exposes it to the internet as 1.2.3.4, packets to the spark master at 1.2.3.4 will be read and discarded with logged errors.
  • Ideally, jobs that don’t finish timeout and are rescheduled. But that wasn’t always my experience. Spark still has a ways to go on robustness/reliability. There are unsolved issues. Workers can hang. A malformed csv file import was one way. I suspect though, reliability will improve over time.

Using Pipework and Docker to Run Apache Spark

My cluster at home experience: For Apache Spark I learned the hard way not to use docker’s hostname flag -h, but instead use default hostnames and use pipework to establish an alternate network interface in each container in order to place them all with their own IPs on a shared firewalled LAN. By default they can not be reached from outside the home, and would require planned holes in the firewall for outside access probably via ssh to a shell account inside the home with access to the cluster.
Pipework is a free utility to route an additional ethernet bridge into a docker container
Docker is a Virtualization Technology for Linux based on Containers.
To install in Ubuntu use apt-get install docker.io where the name is docker.io due to name conflict with an older project also called docker (system tray for KDE3/Gnome2 applications).

Docker vs. Traditional Virtual Machine

  • Docker uses Linux Kernel namespace and cgroup capabilities to isolate and restrict containers
  • All the host and the docker containers share the hosts’ Linux kernel, memory, swap and hardware
  • the container sees its own file tree and process list, and can only access devices as permitted by docker

In a Traditional VM, resources are wasted:

  • booting and running multiple kernels
  • duplicate drivers, system utlities and cron jobs, etc.
  • by requiring hard allocations of memory for each running VM that might turn out to be incorrect later, like giving a workers too little or too much memory versus the master or driver.

The traditional VM is probably safer/more secure. Docker is lighter weight in a trusted environment.

Dockerhub is a public repository of layered images for container filesystems
A Dockerfile is a script for building a container filesystem from the repository, docker directives and shell scripting.
FROM drpaulbrewer/spark-roasted-elephant:latest
MAINTAINER drpaulbrewer@eaftc.com
ADD my-spark-worker.sh /spark/
CMD /spark/my-spark-worker.sh
The docker command, in conjunction with a docker daemon, builds, pushes, pulls, runs, stops, kills, etc… containers.
shell script ./run-docker-spark for master and 1st worker [AMD FX 8150]
#!/bin/bash
sudo -v
MASTER=$(docker run --name="master" --expose=1-65535 --env SPARK_MASTER_IP=192.168.1.10 --env SPARK_LOCAL_IP=192.168.1.10 \
 -v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-master:latest)
sudo pipework eth0 $MASTER 192.168.1.10/24@192.168.1.1
SPARK1=$(docker run --name="spark1" --expose=1-65535 --env mem=10G \
--env master=spark://192.168.1.10:7077 \
--env SPARK_LOCAL_IP=192.168.1.11 \
--env SPARKDIR=/spark/spark-1.3.1 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK1 192.168.1.11/24@192.168.1.1
shell script for additional worker on the same wired LAN
#!/bin/bash
sudo -v
SPARK=$(docker run --name="spark" --expose=1-65535 \
--env mem=10G \
--env master=spark://192.168.1.10:7077 \
--env SPARK_LOCAL_IP=192.168.1.13 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK 192.168.1.13/24@192.168.1.1
shell script for additional worker on wireless LAN.
Issue: wireless LAN apparently not compatible with pipework’s bridging designed for eth0
Solution: –net=”host” runs on hosts’ network stack; no pipework run
#!/bin/bash
sudo -v
SPARK=$(docker run --net="host" --expose=1-65535 \
--env SPARKDIR=/spark/spark-1.3.1 \
--env mem=10G \
--env master=spark://192.168.1.10:7077 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
To reset or on system restart:
need to docker stop (or docker kill) and docker rm each container before running again

Parallel Processing Toy example:

Calculate the Leibniz formula for PI

(π/4) = 1 - (1/3) + (1/5) - (1/7) +(1/9) - (1/11) + …

Let’s use denominators out to 4 Billion and 1.

A. Without Apache Spark – Python

total=1.0
for n0 in xrange(1000000000):
    n = 1+n0
    total += (1.0/(4.0*n+1.0)) - (1.0/(4.0*n-1.0))
print 4.0*total

A. Run Time: 409 sec on 1 core of AMD-FX8150


B. Without Apache Spark – Python + Numpy

import numpy
billionInt = 1+numpy.arange(1000000000, dtype=numpy.int32) 
total = 1.0 + ((1.0/(4.0*billionInt+1.0)) - (1.0/(4.0*billionInt-1.0))).sum() 
print 4.0*total

Issue: wants 26G ram

Solution: Rent Amazon EC2 Xeon E5-2680 2.8Ghz 32GB (c3-4xlarge)

B. Run Time 25 sec on one core


C. Without Apache Spark – C

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char ** argv)
{
 long int n;
 if (argc!=3){
    fprintf(stderr, "usage: LeibnizPi1G <fromidx> <toidx_uninclusive>\n");
    exit(EXIT_FAILURE);
 }
 long int loop_from = strtol(argv[1],NULL,10);
 long int loop_to = strtol(argv[2], NULL, 10);
 long double pisum = 0.0L;
 const long double one =  1.0L;
 const long double four = 4.0L;
 for(n=loop_from;n<loop_to;++n)
   pisum+=(one/(four*n+one))-(one/(four*n-one));
 long double LeibnizPi = 4.0*(1.0+pisum);
 printf("%.20Lf\n", LeibnizPi);
}

C. Run Time (AMD FX8150, 1 Core): 9 sec


D. Without Apache Spark – hand written map/reduce

===
#!/bin/bash
X=./LeibnizPi1G
$X 1 100000001 >/tmp/x1 &
$X 100000001 200000001 >/tmp/x2 &
$X 200000001 300000001 >/tmp/x3 &
$X 300000001 400000001 >/tmp/x4 &
$X 400000001 500000001 >/tmp/x5 &
$X 500000001 600000001 >/tmp/x6 &
$X 600000001 700000001 >/tmp/x7 &
$X 700000001 800000001 >/tmp/x8 &
$X 800000001 900000001 >/tmp/x9 &
$X 900000001 1000000001 >/tmp/x10 &
wait
cat /tmp/x* | gawk 'BEGIN {sum=0.0} {sum+=($1-4.0)/4.0} END { printf("%0.17g\n",4.0+4.0*sum) }'

D. Run Time: 2-3 sec on 8 core AMD FX 8150


JS. Browser-side Javascript

Leibniz π on jsfiddle

Run Time: 8.88 sec in Chromium Browser on i5-4570


Home Apache Spark Cluster

Box 1 (master, worker1, and workstation) AMD FX-8150 8 cores 16GB

Box 2 (worker2) i5-3470S HP Ultra Slim 4 cores 16 GB

Box 3 (worker3) i5-4570R Gigabyte Brix Cube 4 cores 16GB WLAN

After the master is running and the workers connected, we can go to the master WebUI to monitor
the cluster and check stdout/stderr on any of the workers
Spark Cluster Management Console

Spark – Python

from operator import add
from pyspark import SparkContext

def term(n):
    return (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))

def LeibnizPi(Nterms):
    sc = SparkContext(appName="LeibnizPI")
    piOver4Minus1 = sc.parallelize(xrange(1,Nterms+1),30).map(term).reduce(add)
    return 4*(1+piOver4Minus1)

print LeibnizPi(1000*1000*1000)

To run: spark-submit –master $MASTER /path/to/code.py or pyspark

Run Time 36 sec


Idea: Make the map task more substantial

Spark – Python #2

from operator import add
from pyspark import SparkContext

def Msum(n):
    megaN = 1000000*n
    s = 0.0
    for k in xrange(1000000):
        d0 = 4*(k+1+megaN)+1
        d1 = d0-2
        s += ((1.0/d0)-(1.0/d1))
    return s

def LeibnizPi(Nterms):
    sc = SparkContext(appName="LeibnizPI")
    piOver4Minus1 = sc.parallelize(xrange(0,Nterms+1), 20).map(Msum).reduce(add)
    return 4*(1+piOver4Minus1)

print LeibnizPi(999)

Run Time: 24 sec


Spark – Scala

def term(n: Int) = (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))

def LeibnizSum(Nterms: Int) = sc.parallelize(1 to Nterms).map(term).reduce(_ + _)

def LeibnizPi(Nterms: Int) = 4.0*(1.0+LeibnizSum(Nterms))   

println(LeibnizPi(1000*1000*1000))

To run: spark-shell –master $MASTER, then :load code.sc

Run Time: 19 sec


Spark – Scala #2

def LeibnizMSum(k: Int) : Double = {
  var sum:Double = 0.0;
  var n:Long = 0;
  sum = 0.0
  for( n <- (1000000*k.toLong+1) to (1000000*(k.toLong+1)) ){
    sum = sum + ((1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))) ;
  }
  return sum;
}

def LeibnizSum(Nterms: Int) = sc.parallelize(0 to Nterms).
   map(LeibnizMSum).reduce(_ + _)

def LeibnizPi(Nterms: Int) = 4.0*(1.0+LeibnizSum(Nterms))   

println(LeibnizPi(999))

Run Time: 2.15 - 2.25 sec

Run time for 10 Billion terms, run as 64 map tasks: 17 sec


Spark – Java #1

Failed

parallelize() required a List<Integer> to split up among workers, but it first had to be initialized to 1, 2, 3, …, 1 Billion, which was problematic.


Spark – Java #2

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;

class LeibnizMSum implements Function<Integer,Double> {
    public Double call(Integer k){
        double s=0.0;
        int n = 0;
        int limit = 1000000*(k+1);
        for(n=1000000*k+1; n<limit; ++n){
            s = s + ((1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0)));
        }
        return s;
    }
}

public class LeibnizPIApp {

    public static void main (String[] args){
        int billion = 1000*1000*1000;
        String masterURL="spark://192.168.1.10:7077";
        String appName = "LeibnizPIJava";
        String home = "/spark/spark-1.3.1";
        String jarfile = "/Z/pi/LeibnizJava/target/LeibnizPiJava-1.0.jar";
        System.out.println("Hello this is LeibnizPIApp.  Initializing...");
        ArrayList<Integer> gigalist = new ArrayList<Integer>(1000);
        int i;
        for(i = 0; i<1000; ++i) gigalist.add(i);
        System.out.println("Initialized gigalist");
        JavaSparkContext sc = new JavaSparkContext(masterURL, appName, home, jarfile);
        System.out.println("have spark context");
        JavaRDD<Integer> gigaRDD = sc.parallelize(gigalist, 32);
        System.out.println("have paralellized gigalist, starting map/reduce");
        Double pisum = gigaRDD.map(new LeibnizMSum()).reduce( (a,b) -> a+b );
        Double pi = 4.0 + 4.0*pisum;
        System.out.println(pi);
    }
}

Run Time 3.14 sec


Spark SQL

  • Prepare a file with entries 1,2,3,4,5…1 Billion. One int per line.
  • Load it as a temporary SQL table with a scala script
  • Transformations, such as reading a file, mapping data over a function, etc. do not instantiate. Actions do
  • A cache request stores the object the first time an action instantiates it
  • Must use –executor-memory option to set available memory to workers

SQL Table Load script

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Gig(n: Int)
val gigcsv = sc.textFile("/data/gig.csv")
               .map(x => Gig(x.trim.toInt)).toDF().cache()
gigcsv.registerTempTable("gig")

SQL count statement

sqlContext.sql("select count(*) from gig").collect().foreach(println)

First Run: 59 secs to read 1 billion records from SSD

Second Run: Cached in memory – 3.5 secs

SQL Pi Calculation

sqlContext.sql("select 4.0*(1.0+sum( (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0)) )) as LeibnizPI FROM gig").collect().foreach(println)

Run Time: 20 secs (from previous cached result)

Wednesday, March 25, 2015

LoanModificationApprovalRates

Inside the US Treasury HAMP 2015.01 Data Dump: Fun with Loan Modification Approval Rates

It’s no secret that many people who applied for a loan modification were unable to obtain one. Some people received the run around as banks lost their paperwork over and over. Others received a trial modification but not a permanent one. Some people simply weren’t eligible.

The fields ln_st_nme and ln_mdfc_mode_nme in the data dump tell this story:

outcomes of loan modification requests

Only rows 2 and 6 are good outcomes. This article concentrates on the rate of row 2 “Active Payment - Official Modification” outcomes, about 1 million out of the 6+ million total requests.

A recent public data dump, together with the capabilities of Google BigQuery, allows a deeper look at the approval rates for loan modification requests.

Loan Modification Approvals Percentages by MSA State

top 10 states by loan modification approval rate

Active is the number of official, permanent modifications with active payment

Total is the number of all requests, including rejected requests, cancellations, trials and permanent.

Approval percentage is a percentage calculated as 100 x Active / Total

Puerto Rico is the surprising entry on this list . Even though the number of applications and official modifications is small, there’s no obvious explanation why it should be easier for someone to get a loan modification approval in Puerto Rico, as opposed, to say, Hawaii, which had about than twice as many applications but fewer finished official, permanent modifications.

DC is not exactly a surprise. Putting cynicism aside for a moment, the place is full of professionals who can navigate, or help others navigate, the government red tape involved in the loan modification program.

Bottom States

Bottom States by Loan Modification Approval Rate (Metro Areas)

Among the bottom approval rates we find Texas, with a ~ 12% approval percentage for loan modifications. Texas has about as many active, permanent, official modifications as Arizona, about 33k. However, 272k Texans applied as opposed to 200k Arizonans.

What’s an MSA?

Practically, an MSA is an area around a city or town, identified by a code number such as 31080 for Los Angeles, or 12060 for Atlanta.

Technically, an MSA is a Metropolitan/Micropolitan Statistical Area, a group of counties identified by the federal government (probably the census bureau) as representing a named area.

In the HAMP data dump, a row is tagged with either an MSA code number (~5.8 million records) or a state (~419,000 records) or nothing (8 records) . Knowing both the state and MSA would be useful in state border regions such as New York City, Memphis, or Chicago as the MSA can cross state borders.

Best Approval Rates by MSA

enter image description here

Once again, Puerto Rico takes the lead, followed by the LA metro area and various California towns.

A Bit of Fun with Politics

Going back to our state-level data, we can also fetch whether Obama (D) or Romney(R) won the state in the 2012 Election and put that alongside the loan modification approval percentage.

2012 Presidential Election State by State

2012 election map from Wikipedia
source: Wikipedia

top loan modification approval percentage with 2012 election info

Although the US political scene is not so simple or rigid as to put an entire state in the Democratic or Republican bucket based on the 2012 election, for the sake of argument we see 8 (D) states getting the top 10 loan modification approval rates and only 2 (R) states in the top 10.

And if we look at the bottom 10 states by loan modification approval percentage…

bottom loan modification approval percentages wth 2012 election info

we see 9 (R) states at the bottom of the rankings and only 1 (D) state.

From this, what can a hypothetical talk-radio host conclude? Probably not much. Certainly none of these things:

  • that the (D) states are being paid off for voting (D), at the expense of the (R) states [**]?
  • that (R) folks have a better chance attracting a tax audit than receiving a loan modification[**]?
  • that the (R) folks are sabotaging the loan modification process in some states [**]?
  • that people who needed a loan modification voted, in 2012, for the party that would give them one[**]?
    [**] except Utah and Arizona of course.

After all, correlation does not prove causation.

There can be unknown factors creating these effects that are also correlated with politics.

Speaking of correlation, if we use 2012 popular vote, % Romney - % Obama, to put each state on a left-right axis, we get this pretty plot:

MSA HAMP Loan Modification Approval Rate vs 2012 Popular Vote

The R^2 for the linear fit is around 0.40, meaning that about 40% of the variance is explained by the linear model (leaving 60% unexplained…)

Once again, remember, correlation does not prove causation.

R provided this summary of the linear model shown in the chart.

Call:
lm(formula = hamp_approval_rate ~ 1 + popular_vote2012, data = df)

Residuals:
    Min      1Q  Median      3Q     Max 
-3.6433 -0.9806 -0.3023  1.1444  6.2260 

Coefficients:
                 Estimate Std. Error t value Pr(>|t|)    
(Intercept)      14.76049    0.26599  55.492  < 2e-16 ***
popular_vote2012 -0.07342    0.01307  -5.618 9.54e-07 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 1.875 on 48 degrees of freedom
Multiple R-squared:  0.3967,    Adjusted R-squared:  0.3842 
F-statistic: 31.57 on 1 and 48 DF,  p-value: 9.544e-07

The queries, raw data, and R scripts for this are in this github repository for those who would like to use these files as a basis for other explorations, commentary, or improvement.

Totalling Savings to Borrowers

The data dump contains fields for borrower monthly housing expense before and after the modification. These fields do not always contain any numbers, or numbers that make sense, so we restrict the data to permanent modifications where these numbers take on somewhat reasonable values (I used up to $10,000/mo, which might seem a bit high for a house payment plus miscellaneous – but there are hundreds of entries that are higher) and eliminate any reports showing zero or negative savings.

Monthly Savings from Loan Modification

The California savings of $209 million/month dwarfs the #2 and #3 states of Florida and New York that are tied at around $72 million/month saved on housing expenses. Other states in the top 12 come in from $10-$30 million/month saved.

The top average savings per borrower/mod studied goes to HI with $1008/mo (off chart), then NY ($910/mo), CA ($857/mo), DC ($769/mo)

Bottom states, Monthly Savings from Loan Modification

The bottom end of the savings list dips under $100K/month total saved, with average savings in the $300/mo-$500/mo range.

Data Issues

I've heard that there are data issues with some fields in the HAMP data dump. Some of these are mentioned in the data dictionary and users guide. I've also seen reports of higher HAMP approval rates than the measly 10-20% we see here.

Here's a document that says the number is more like 30% but it includes, in the fine print, trials that do not become permanent modifications:
HAMP Application Activity by Servicer Jan 2015

So, take what you read here with a grain of salt. If anyone has further insight about the data issues, please, explain in the comments section.