Introduction to Apache Spark
These are some rough notes for a talk to be given at the June 2015 Chadoop meeting.What is Apache Spark?
- Enterprise parallel processing framework based on resilient distributed datasets (RDD)
- for java, scala, python and sql
- with memory caching (memory reusable, used as progressive enhancement)
How can I get Spark up and running quickly?
–> 1. Docker on collection of various firewalled hardware (described here)2. Amazon EC2 script (included in Spark distribution)
How does it perform / scale?
- Resilient against (some) worker failures
- Can add / remove workers [Amazon spot market compatible]
–> 3. Can perform well or poorly depending on classical parallel processing concerns (cpu vs memory vs disk vs net bottlenecks)
Quirks
- Transformations, such as reading a file, mapping data over a function, etc. do not instantiate.
- Actions do – this is good and allows you to be deliberate about exactly how large intermediate results are to be handled (cached in memory, on disk, recomputed each time).
- Spark is not currently hierarchical. Networking is not layered as App/Driver <-> Master <-> Workers. Important make sure all workstations/masters/workers can connect to each other.
- Random port numbers.
- Networking is not-NAT friendly and the documentation says little about this issue. This can be a blocker for simple combining your cluster of 3-4 home PCs with extra Amazon EC2 machines, as the home machines may be on the 192.168 LAN and the Amazon EC2 machines on a difference Amazon LAN. If the spark master has a local IP like 192.168.1.100 and a router exposes it to the internet as 1.2.3.4, packets to the spark master at 1.2.3.4 will be read and discarded with logged errors.
- Ideally, jobs that don’t finish timeout and are rescheduled. But that wasn’t always my experience. Spark still has a ways to go on robustness/reliability. There are unsolved issues. Workers can hang. A malformed csv file import was one way. I suspect though, reliability will improve over time.
Using Pipework and Docker to Run Apache Spark
My cluster at home experience: For Apache Spark I learned the hard way not to use docker’s hostname flag-h
, but instead use default hostnames and use pipework
to establish an alternate network interface in each container in order to place them all with their own IPs on a shared firewalled LAN. By default they can not be reached from outside the home, and would require planned holes in the firewall for outside access probably via ssh to a shell account inside the home with access to the cluster.Pipework is a free utility to route an additional ethernet bridge into a docker container
Docker is a Virtualization Technology for Linux based on Containers.
To install in Ubuntu use
apt-get install docker.io
where the name is docker.io
due to name conflict with an older project also called docker (system tray for KDE3/Gnome2 applications).Docker vs. Traditional Virtual Machine
- Docker uses Linux Kernel namespace and cgroup capabilities to isolate and restrict containers
- All the host and the docker containers share the hosts’ Linux kernel, memory, swap and hardware
- the container sees its own file tree and process list, and can only access devices as permitted by docker
In a Traditional VM, resources are wasted:
- booting and running multiple kernels
- duplicate drivers, system utlities and cron jobs, etc.
- by requiring hard allocations of memory for each running VM that might turn out to be incorrect later, like giving a workers too little or too much memory versus the master or driver.
The traditional VM is probably safer/more secure. Docker is lighter weight in a trusted environment.
Dockerhub is a public repository of layered images for container filesystemsA
Dockerfile
is a script for building a container filesystem from the repository, docker directives and shell scripting.FROM drpaulbrewer/spark-roasted-elephant:latest
MAINTAINER drpaulbrewer@eaftc.com
ADD my-spark-worker.sh /spark/
CMD /spark/my-spark-worker.sh
The docker
command, in conjunction with a docker daemon, builds, pushes, pulls, runs, stops, kills, etc… containers. shell script ./run-docker-spark for master and 1st worker [AMD FX 8150]
#!/bin/bash
sudo -v
MASTER=$(docker run --name="master" --expose=1-65535 --env SPARK_MASTER_IP=192.168.1.10 --env SPARK_LOCAL_IP=192.168.1.10 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-master:latest)
sudo pipework eth0 $MASTER 192.168.1.10/24@192.168.1.1
SPARK1=$(docker run --name="spark1" --expose=1-65535 --env mem=10G \
--env master=spark://192.168.1.10:7077 \
--env SPARK_LOCAL_IP=192.168.1.11 \
--env SPARKDIR=/spark/spark-1.3.1 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK1 192.168.1.11/24@192.168.1.1
shell script for additional worker on the same wired LAN#!/bin/bash
sudo -v
SPARK=$(docker run --name="spark" --expose=1-65535 \
--env mem=10G \
--env master=spark://192.168.1.10:7077 \
--env SPARK_LOCAL_IP=192.168.1.13 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
sudo pipework eth0 $SPARK 192.168.1.13/24@192.168.1.1
shell script for additional worker on wireless LAN. Issue: wireless LAN apparently not compatible with pipework’s bridging designed for eth0
Solution: –net=”host” runs on hosts’ network stack; no pipework run
#!/bin/bash
sudo -v
SPARK=$(docker run --net="host" --expose=1-65535 \
--env SPARKDIR=/spark/spark-1.3.1 \
--env mem=10G \
--env master=spark://192.168.1.10:7077 \
-v /data:/data -v /tmp:/tmp -d drpaulbrewer/spark-worker:latest)
To reset or on system restart: need to docker stop (or docker kill) and docker rm each container before running again
Parallel Processing Toy example:
Calculate the Leibniz formula for PI
(π/4) = 1 - (1/3) + (1/5) - (1/7) +(1/9) - (1/11) + …
Let’s use denominators out to 4 Billion and 1.
A. Without Apache Spark – Python
total=1.0
for n0 in xrange(1000000000):
n = 1+n0
total += (1.0/(4.0*n+1.0)) - (1.0/(4.0*n-1.0))
print 4.0*total
A. Run Time: 409 sec on 1 core of AMD-FX8150
B. Without Apache Spark – Python + Numpy
import numpy
billionInt = 1+numpy.arange(1000000000, dtype=numpy.int32)
total = 1.0 + ((1.0/(4.0*billionInt+1.0)) - (1.0/(4.0*billionInt-1.0))).sum()
print 4.0*total
Issue: wants 26G ram
Solution: Rent Amazon EC2 Xeon E5-2680 2.8Ghz 32GB (c3-4xlarge)
B. Run Time 25 sec on one core
C. Without Apache Spark – C
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char ** argv)
{
long int n;
if (argc!=3){
fprintf(stderr, "usage: LeibnizPi1G <fromidx> <toidx_uninclusive>\n");
exit(EXIT_FAILURE);
}
long int loop_from = strtol(argv[1],NULL,10);
long int loop_to = strtol(argv[2], NULL, 10);
long double pisum = 0.0L;
const long double one = 1.0L;
const long double four = 4.0L;
for(n=loop_from;n<loop_to;++n)
pisum+=(one/(four*n+one))-(one/(four*n-one));
long double LeibnizPi = 4.0*(1.0+pisum);
printf("%.20Lf\n", LeibnizPi);
}
C. Run Time (AMD FX8150, 1 Core): 9 sec
D. Without Apache Spark – hand written map/reduce
===#!/bin/bash
X=./LeibnizPi1G
$X 1 100000001 >/tmp/x1 &
$X 100000001 200000001 >/tmp/x2 &
$X 200000001 300000001 >/tmp/x3 &
$X 300000001 400000001 >/tmp/x4 &
$X 400000001 500000001 >/tmp/x5 &
$X 500000001 600000001 >/tmp/x6 &
$X 600000001 700000001 >/tmp/x7 &
$X 700000001 800000001 >/tmp/x8 &
$X 800000001 900000001 >/tmp/x9 &
$X 900000001 1000000001 >/tmp/x10 &
wait
cat /tmp/x* | gawk 'BEGIN {sum=0.0} {sum+=($1-4.0)/4.0} END { printf("%0.17g\n",4.0+4.0*sum) }'
D. Run Time: 2-3 sec on 8 core AMD FX 8150
JS. Browser-side Javascript
Leibniz π on jsfiddleRun Time: 8.88 sec in Chromium Browser on i5-4570
Home Apache Spark Cluster
Box 1 (master, worker1, and workstation) AMD FX-8150 8 cores 16GB
Box 2 (worker2) i5-3470S HP Ultra Slim 4 cores 16 GB
Box 3 (worker3) i5-4570R Gigabyte Brix Cube 4 cores 16GB WLAN
After the master is running and the workers connected, we can go to the master WebUI to monitorthe cluster and check stdout/stderr on any of the workers
Spark – Python
from operator import add
from pyspark import SparkContext
def term(n):
return (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))
def LeibnizPi(Nterms):
sc = SparkContext(appName="LeibnizPI")
piOver4Minus1 = sc.parallelize(xrange(1,Nterms+1),30).map(term).reduce(add)
return 4*(1+piOver4Minus1)
print LeibnizPi(1000*1000*1000)
To run: spark-submit –master $MASTER /path/to/code.py or pyspark
Run Time 36 sec
Idea: Make the map task more substantial
Spark – Python #2
from operator import add
from pyspark import SparkContext
def Msum(n):
megaN = 1000000*n
s = 0.0
for k in xrange(1000000):
d0 = 4*(k+1+megaN)+1
d1 = d0-2
s += ((1.0/d0)-(1.0/d1))
return s
def LeibnizPi(Nterms):
sc = SparkContext(appName="LeibnizPI")
piOver4Minus1 = sc.parallelize(xrange(0,Nterms+1), 20).map(Msum).reduce(add)
return 4*(1+piOver4Minus1)
print LeibnizPi(999)
Run Time: 24 sec
Spark – Scala
def term(n: Int) = (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))
def LeibnizSum(Nterms: Int) = sc.parallelize(1 to Nterms).map(term).reduce(_ + _)
def LeibnizPi(Nterms: Int) = 4.0*(1.0+LeibnizSum(Nterms))
println(LeibnizPi(1000*1000*1000))
To run: spark-shell –master $MASTER, then :load code.sc
Run Time: 19 sec
Spark – Scala #2
def LeibnizMSum(k: Int) : Double = {
var sum:Double = 0.0;
var n:Long = 0;
sum = 0.0
for( n <- (1000000*k.toLong+1) to (1000000*(k.toLong+1)) ){
sum = sum + ((1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0))) ;
}
return sum;
}
def LeibnizSum(Nterms: Int) = sc.parallelize(0 to Nterms).
map(LeibnizMSum).reduce(_ + _)
def LeibnizPi(Nterms: Int) = 4.0*(1.0+LeibnizSum(Nterms))
println(LeibnizPi(999))
Run Time: 2.15 - 2.25 sec
Run time for 10 Billion terms, run as 64 map tasks: 17 sec
Spark – Java #1
Failed
parallelize()
required a List<Integer>
to split up among workers, but it first had to be initialized to 1, 2, 3, …, 1 Billion, which was problematic.
Spark – Java #2
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
class LeibnizMSum implements Function<Integer,Double> {
public Double call(Integer k){
double s=0.0;
int n = 0;
int limit = 1000000*(k+1);
for(n=1000000*k+1; n<limit; ++n){
s = s + ((1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0)));
}
return s;
}
}
public class LeibnizPIApp {
public static void main (String[] args){
int billion = 1000*1000*1000;
String masterURL="spark://192.168.1.10:7077";
String appName = "LeibnizPIJava";
String home = "/spark/spark-1.3.1";
String jarfile = "/Z/pi/LeibnizJava/target/LeibnizPiJava-1.0.jar";
System.out.println("Hello this is LeibnizPIApp. Initializing...");
ArrayList<Integer> gigalist = new ArrayList<Integer>(1000);
int i;
for(i = 0; i<1000; ++i) gigalist.add(i);
System.out.println("Initialized gigalist");
JavaSparkContext sc = new JavaSparkContext(masterURL, appName, home, jarfile);
System.out.println("have spark context");
JavaRDD<Integer> gigaRDD = sc.parallelize(gigalist, 32);
System.out.println("have paralellized gigalist, starting map/reduce");
Double pisum = gigaRDD.map(new LeibnizMSum()).reduce( (a,b) -> a+b );
Double pi = 4.0 + 4.0*pisum;
System.out.println(pi);
}
}
Run Time 3.14 sec
Spark SQL
- Prepare a file with entries 1,2,3,4,5…1 Billion. One int per line.
- Load it as a temporary SQL table with a scala script
- Transformations, such as reading a file, mapping data over a function, etc. do not instantiate. Actions do
- A cache request stores the object the first time an action instantiates it
- Must use –executor-memory option to set available memory to workers
SQL Table Load script
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Gig(n: Int)
val gigcsv = sc.textFile("/data/gig.csv")
.map(x => Gig(x.trim.toInt)).toDF().cache()
gigcsv.registerTempTable("gig")
SQL count statement
sqlContext.sql("select count(*) from gig").collect().foreach(println)
First Run: 59 secs to read 1 billion records from SSD
Second Run: Cached in memory – 3.5 secs
SQL Pi Calculation
sqlContext.sql("select 4.0*(1.0+sum( (1.0/(4.0*n+1.0))-(1.0/(4.0*n-1.0)) )) as LeibnizPI FROM gig").collect().foreach(println)