Monday, June 8, 2015

Introduction to Apache Spark

Introduction to Apache Spark

These are some rough notes for a talk to be given at the June 2015 Chadoop meeting.

What is Apache Spark?

  1. Enterprise parallel processing framework based on resilient distributed datasets (RDD)
  2. for java, scala, python and sql
  3. with memory caching (memory reusable, used as progressive enhancement)

How can I get Spark up and running quickly?

–> 1. Docker on collection of various firewalled hardware (described here)
2. Amazon EC2 script (included in Spark distribution)

How does it perform / scale?

  1. Resilient against (some) worker failures
  2. Can add / remove workers [Amazon spot market compatible]
    –> 3. Can perform well or poorly depending on classical parallel processing concerns (cpu vs memory vs disk vs net bottlenecks)

Quirks

  • Transformations, such as reading a file, mapping data over a function, etc. do not instantiate.
  • Actions do – this is good and allows you to be deliberate about exactly how large intermediate results are to be handled (cached in memory, on disk, recomputed each time).
  • Spark is not currently hierarchical. Networking is not layered as App/Driver <-> Master <-> Workers. Important make sure all workstations/masters/workers can connect to each other.
  • Random port numbers.
  • Networking is not-NAT friendly and the documentation says little about this issue. This can be a blocker for simple combining your cluster of 3-4 home PCs with extra Amazon EC2 machines, as the home machines may be on the 192.168 LAN and the Amazon EC2 machines on a difference Amazon LAN. If the spark master has a local IP like 192.168.1.100 and a router exposes it to the internet as 1.2.3.4, packets to the spark master at 1.2.3.4 will be read and discarded with logged errors.
  • Ideally, jobs that don’t finish timeout and are rescheduled. But that wasn’t always my experience. Spark still has a ways to go on robustness/reliability. There are unsolved issues. Workers can hang. A malformed csv file import was one way. I suspect though, reliability will improve over time.

Using Pipework and Docker to Run Apache Spark

My cluster at home experience: For Apache Spark I learned the hard way not to use docker’s hostname flag -h, but instead use default hostnames and use pipework to establish an alternate network interface in each container in order to place them all with their own IPs on a shared firewalled LAN. By default they can not be reached from outside the home, and would require planned holes in the firewall for outside access probably via ssh to a shell account inside the home with access to the cluster.
Pipework is a free utility to route an additional ethernet bridge into a docker container
Docker is a Virtualization Technology for Linux based on Containers.
To install in Ubuntu use apt-get install docker.io where the name is docker.io due to name conflict with an older project also called docker (system tray for KDE3/Gnome2 applications).

Docker vs. Traditional Virtual Machine

  • Docker uses Linux Kernel namespace and cgroup capabilities to isolate and restrict containers
  • All the host and the docker containers share the hosts’ Linux kernel, memory, swap and hardware
  • the container sees its own file tree and process list, and can only access devices as permitted by docker

In a Traditional VM, resources are wasted:

  • booting and running multiple kernels
  • duplicate drivers, system utlities and cron jobs, etc.
  • by requiring hard allocations of memory for each running VM that might turn out to be incorrect later, like giving a workers too little or too much memory versus the master or driver.

The traditional VM is probably safer/more secure. Docker is lighter weight in a trusted environment.

Dockerhub is a public repository of layered images for container filesystems
A Dockerfile is a script for building a container filesystem from the repository, docker directives and shell scripting.
FROM drpaulbrewer/spark-roasted-elephant:latest
MAINTAINER drpaulbrewer@eaftc.com
ADD my-spark-worker.sh /spark/
CMD /spark/my-spark-worker.sh
The docker command, in conjunction with a docker daemon, builds, pushes, pulls, runs, stops, kills, etc… containers.
shell script ./run-docker-spark for master and 1st worker [AMD FX 8150]
#!/bin/bash
sudo -v
MASTER=$(docker run --name="master" --expose=1-65535 --env SPARK_MASTER_IP=192.168.1.10 --env SPARK_LOCAL_IP=192.168.1.10 \
 -v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-master:latest)
sudo pipework eth0 $MASTER 192.168.1.10/24@192.168.1.1
SPARK1=$(docker run --name="spark1" --expose=1-65535 --env mem=10G \
--env master=spark://192.168.1.10:7077 \
--env SPARK_LOCAL_IP=192.168.1.11 \
--env SPARKDIR=/spark/spark-1.3.1 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK1 192.168.1.11/24@192.168.1.1
shell script for additional worker on the same wired LAN
#!/bin/bash
sudo -v
SPARK=$(docker run --name="spark" --expose=1-65535 \
--env mem=10G \
--env master=spark://192.168.1.10:7077 \
--env SPARK_LOCAL_IP=192.168.1.13 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK 192.168.1.13/24@192.168.1.1
shell script for additional worker on wireless LAN.
Issue: wireless LAN apparently not compatible with pipework’s bridging designed for eth0
Solution: –net=”host” runs on hosts’ network stack; no pipework run
#!/bin/bash
sudo -v
SPARK=$(docker run --net="host" --expose=1-65535 \
--env SPARKDIR=/spark/spark-1.3.1 \
--env mem=10G \
--env master=spark://192.168.1.10:7077 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
To reset or on system restart:
need to docker stop (or docker kill) and docker rm each container before running again

Parallel Processing Toy example:

Calculate the Leibniz formula for PI

(π/4) = 1 - (1/3) + (1/5) - (1/7) +(1/9) - (1/11) + …

Let’s use denominators out to 4 Billion and 1.

A. Without Apache Spark – Python

total=1.0
for n0 in xrange(1000000000):
    n = 1+n0
    total += (1.0/(4.0*n+1.0)) - (1.0/(4.0*n-1.0))
print 4.0*total

A. Run Time: 409 sec on 1 core of AMD-FX8150


B. Without Apache Spark – Python + Numpy

import numpy
billionInt = 1+numpy.arange(1000000000, dtype=numpy.int32) 
total = 1.0 + ((1.0/(4.0*billionInt+1.0)) - (1.0/(4.0*billionInt-1.0))).sum() 
print 4.0*total

Issue: wants 26G ram

Solution: Rent Amazon EC2 Xeon E5-2680 2.8Ghz 32GB (c3-4xlarge)

B. Run Time 25 sec on one core


C. Without Apache Spark – C

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char ** argv)
{
 long int n;
 if (argc!=3){
    fprintf(stderr, "usage: LeibnizPi1G <fromidx> <toidx_uninclusive>\n");
    exit(EXIT_FAILURE);
 }
 long int loop_from = strtol(argv[1],NULL,10);
 long int loop_to = strtol(argv[2], NULL, 10);
 long double pisum = 0.0L;
 const long double one =  1.0L;
 const long double four = 4.0L;
 for(n=loop_from;n<loop_to;++n)
   pisum+=(one/(four*n+one))-(one/(four*n-one));
 long double LeibnizPi = 4.0*(1.0+pisum);
 printf("%.20Lf\n", LeibnizPi);
}

C. Run Time (AMD FX8150, 1 Core): 9 sec


D. Without Apache Spark – hand written map/reduce

===
#!/bin/bash
X=./LeibnizPi1G
$X 1 100000001 >/tmp/x1 &
$X 100000001 200000001 >/tmp/x2 &
$X 200000001 300000001 >/tmp/x3 &
$X 300000001 400000001 >/tmp/x4 &
$X 400000001 500000001 >/tmp/x5 &
$X 500000001 600000001 >/tmp/x6 &
$X 600000001 700000001 >/tmp/x7 &
$X 700000001 800000001 >/tmp/x8 &
$X 800000001 900000001 >/tmp/x9 &
$X 900000001 1000000001 >/tmp/x10 &
wait
cat /tmp/x* | gawk 'BEGIN {sum=0.0} {sum+=($1-4.0)/4.0} END { printf("%0.17g\n",4.0+4.0*sum) }'

D. Run Time: 2-3 sec on 8 core AMD FX 8150


JS. Browser-side Javascript

Leibniz π on jsfiddle

Run Time: 8.88 sec in Chromium Browser on i5-4570


Home Apache Spark Cluster

Box 1 (master, worker1, and workstation) AMD FX-8150 8 cores 16GB

Box 2 (worker2) i5-3470S HP Ultra Slim 4 cores 16 GB

Box 3 (worker3) i5-4570R Gigabyte Brix Cube 4 cores 16GB WLAN

After the master is running and the workers connected, we can go to the master WebUI to monitor
the cluster and check stdout/stderr on any of the workers
Spark Cluster Management Console

Spark – Python

from operator import add
from pyspark import SparkContext

def term(n):
    return (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))

def LeibnizPi(Nterms):
    sc = SparkContext(appName="LeibnizPI")
    piOver4Minus1 = sc.parallelize(xrange(1,Nterms+1),30).map(term).reduce(add)
    return 4*(1+piOver4Minus1)

print LeibnizPi(1000*1000*1000)

To run: spark-submit –master $MASTER /path/to/code.py or pyspark

Run Time 36 sec


Idea: Make the map task more substantial

Spark – Python #2

from operator import add
from pyspark import SparkContext

def Msum(n):
    megaN = 1000000*n
    s = 0.0
    for k in xrange(1000000):
        d0 = 4*(k+1+megaN)+1
        d1 = d0-2
        s += ((1.0/d0)-(1.0/d1))
    return s

def LeibnizPi(Nterms):
    sc = SparkContext(appName="LeibnizPI")
    piOver4Minus1 = sc.parallelize(xrange(0,Nterms+1), 20).map(Msum).reduce(add)
    return 4*(1+piOver4Minus1)

print LeibnizPi(999)

Run Time: 24 sec


Spark – Scala

def term(n: Int) = (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))

def LeibnizSum(Nterms: Int) = sc.parallelize(1 to Nterms).map(term).reduce(_ + _)

def LeibnizPi(Nterms: Int) = 4.0*(1.0+LeibnizSum(Nterms))   

println(LeibnizPi(1000*1000*1000))

To run: spark-shell –master $MASTER, then :load code.sc

Run Time: 19 sec


Spark – Scala #2

def LeibnizMSum(k: Int) : Double = {
  var sum:Double = 0.0;
  var n:Long = 0;
  sum = 0.0
  for( n <- (1000000*k.toLong+1) to (1000000*(k.toLong+1)) ){
    sum = sum + ((1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))) ;
  }
  return sum;
}

def LeibnizSum(Nterms: Int) = sc.parallelize(0 to Nterms).
   map(LeibnizMSum).reduce(_ + _)

def LeibnizPi(Nterms: Int) = 4.0*(1.0+LeibnizSum(Nterms))   

println(LeibnizPi(999))

Run Time: 2.15 - 2.25 sec

Run time for 10 Billion terms, run as 64 map tasks: 17 sec


Spark – Java #1

Failed

parallelize() required a List<Integer> to split up among workers, but it first had to be initialized to 1, 2, 3, …, 1 Billion, which was problematic.


Spark – Java #2

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;

class LeibnizMSum implements Function<Integer,Double> {
    public Double call(Integer k){
        double s=0.0;
        int n = 0;
        int limit = 1000000*(k+1);
        for(n=1000000*k+1; n<limit; ++n){
            s = s + ((1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0)));
        }
        return s;
    }
}

public class LeibnizPIApp {

    public static void main (String[] args){
        int billion = 1000*1000*1000;
        String masterURL="spark://192.168.1.10:7077";
        String appName = "LeibnizPIJava";
        String home = "/spark/spark-1.3.1";
        String jarfile = "/Z/pi/LeibnizJava/target/LeibnizPiJava-1.0.jar";
        System.out.println("Hello this is LeibnizPIApp.  Initializing...");
        ArrayList<Integer> gigalist = new ArrayList<Integer>(1000);
        int i;
        for(i = 0; i<1000; ++i) gigalist.add(i);
        System.out.println("Initialized gigalist");
        JavaSparkContext sc = new JavaSparkContext(masterURL, appName, home, jarfile);
        System.out.println("have spark context");
        JavaRDD<Integer> gigaRDD = sc.parallelize(gigalist, 32);
        System.out.println("have paralellized gigalist, starting map/reduce");
        Double pisum = gigaRDD.map(new LeibnizMSum()).reduce( (a,b) -> a+b );
        Double pi = 4.0 + 4.0*pisum;
        System.out.println(pi);
    }
}

Run Time 3.14 sec


Spark SQL

  • Prepare a file with entries 1,2,3,4,5…1 Billion. One int per line.
  • Load it as a temporary SQL table with a scala script
  • Transformations, such as reading a file, mapping data over a function, etc. do not instantiate. Actions do
  • A cache request stores the object the first time an action instantiates it
  • Must use –executor-memory option to set available memory to workers

SQL Table Load script

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Gig(n: Int)
val gigcsv = sc.textFile("/data/gig.csv")
               .map(x => Gig(x.trim.toInt)).toDF().cache()
gigcsv.registerTempTable("gig")

SQL count statement

sqlContext.sql("select count(*) from gig").collect().foreach(println)

First Run: 59 secs to read 1 billion records from SSD

Second Run: Cached in memory – 3.5 secs

SQL Pi Calculation

sqlContext.sql("select 4.0*(1.0+sum( (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0)) )) as LeibnizPI FROM gig").collect().foreach(println)

Run Time: 20 secs (from previous cached result)