Big Data and Spark

Big Data and Spark

What is big data? Pick your favourite definition:

  1. The Four V's: volumn, velocity, variety, veracity.
  2. When you have enough data that Pandas (or similar) can't handle it (quickly enough or at all).
  3. When you have so much data it's annoying.

When people say big data, they usually mean #3.

My best functional definition: #2.

Big Data and Spark

The big limitations of Pandas:

  • All the data is kept in memory.
  • Programs use a single thread/​core (unless you did something heroic).

Big Data and Spark

The biggest CSV file I could find in my life: 27000 rows, making a 1.7 MB DataFrame.

Your laptop probably has ≥4 GB (4096 MB) of memory: data needs to be quite big before Pandas won't handle it.

Compute Clusters

If one computer won't do the job, you need several: each one can store some of the data and do some of the processing, and they can work together to generate final results.

Compute cluster: several computers working together to do some work.

Compute Clusters

Apache Hadoop is a collection of tools for managing compute clusters.

  • YARN: managing compute jobs in the cluster.
  • HDFS: Hadoop Distributed File System, for storing data on the cluster's nodes.
  • Spark: a framework to do computation on YARN (or elsewhere).

Compute Clusters

The goal here is to express the computation we want to do, in such a way that the work can be sent out to the cluster and done in parallel.

Spark will let us do that.

YARN can take our job and make sure all of the pieces get done somewhere, somehow (with Spark's help).

HDFS can store all of the pieces of our data files, so they're there when YARN/​Spark wants to work on them.

Compute Clusters

Today, we'll (mostly?) run Spark on our computers, with the assurance that we could scale up the computation if we needed to.

… and knowing that we're now using all of the cores in our processor.

First Spark Program

A complete Spark program:

import sys
from pyspark.sql import SparkSession, functions, types
 
spark = SparkSession.builder.appName('example 1').getOrCreate()
 
assert sys.version_info >= (3, 4) # make sure we have Python 3.4+
assert spark.version >= '2.1' # make sure we have Spark 2.1+

data = spark.read.csv('cities.csv', header=True, inferSchema=True)
data.show()

Last two lines are real work. Rest is boilerplate that will be in all of our Spark programs.

First Spark Program

Run the job:

spark-submit spark-1.py

First Spark Program

The example reads a file cities.csv. Let's say:

city,population,area
Vancouver,2463431,2878.52
Calgary,1392609,5110.21
Toronto,5928040,5905.71
Montreal,4098927,4604.26
Halifax,403390,5496.31

It will then output:

+---------+----------+-------+
|     city|population|   area|
+---------+----------+-------+
|Vancouver|   2463431|2878.52|
|  Calgary|   1392609|5110.21|
|  Toronto|   5928040|5905.71|
| Montreal|   4098927|4604.26|
|  Halifax|    403390|5496.31|
+---------+----------+-------+

Spark DataFrames

This line…

data = spark.read.csv('cities.csv', header=True, inferSchema=True)

…creates an object that is the primary way we'll store and manipulate data in Spark: a DataFrame.

Spark DataFrames

A Pandas DataFrame and a Spark DataFrame are not the same thing. Spark's DataFrames were inspired by Pandas (and DataFrames in R).

Spark's DataFrames work differently, often because of their basic job: letting you do things in parallel across a cluster. Sometimes just because of design differences.

Spark DataFrames

Some things will be familiar. Like Pandas, Spark DataFrames:

  • have rows and columns.
  • have a schema: each column has a name and a type.
  • are operated on by implicitly doing operations on every element, not by explicitly iterating.
  • can be created from files; written to files.

Spark DataFrames

A lot of the operations are spelled differently, but you can see the similarities:

cities = spark.read.csv('cities.csv', header=True, inferSchema=True)
cities.printSchema()
root
 |-- city: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- area: double (nullable = true)

Spark DataFrames

We can do operations that feel familiar:

c = cities.filter(cities['area'] < 5000)
c = c.select(c['city'], c['population'])
c.show()
+---------+----------+
|     city|population|
+---------+----------+
|Vancouver|   2463431|
| Montreal|   4098927|
+---------+----------+

Spark DataFrames

And we can write the results:

c.write.json('spark-output', mode='overwrite')

But it doesn't create a file. It creates a directory with several files:

$ ls spark-output/
part-00000-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json
part-00001-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json
part-00002-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json
_SUCCESS

The real output is the concatenation of those files.

Operating on DataFrames

In the previous code, we saw two methods on DataFrames:

c = cities.filter(cities['area'] < 5000)
c = c.select(c['city'], c['population'])

… and they feel very SQL-like. (Actually .where() is a synonym for .filter()very SQL-like.)

Operating on DataFrames

The .select() method creates a new DataFrame of the columns you specify: either existing or a calculation.

some_values = cities.select(
    cities['city'],
    cities['area'] * 1000000
)
some_values.show()
+---------+----------------+
|     city|(area * 1000000)|
+---------+----------------+
|Vancouver|       2.87852E9|
|  Calgary|       5.11021E9|
|  Toronto|       5.90571E9|
| Montreal|       4.60426E9|
|  Halifax|       5.49631E9|
+---------+----------------+

Operating on DataFrames

That could have been prettier…

some_values = cities.select(
    cities['city'],
    (cities['area'] * 1000000).alias('area_m2')
)
some_values.show()
+---------+---------+
|     city|  area_m2|
+---------+---------+
|Vancouver|2.87852E9|
|  Calgary|5.11021E9|
|  Toronto|5.90571E9|
| Montreal|4.60426E9|
|  Halifax|5.49631E9|
+---------+---------+

Operating on DataFrames

The .filter() method keeps rows where the condition is true.

some_values = cities.filter(cities['population'] % 2 == 1)
some_values.show()
+---------+----------+-------+
|     city|population|   area|
+---------+----------+-------+
|Vancouver|   2463431|2878.52|
|  Calgary|   1392609|5110.21|
| Montreal|   4098927|4604.26|
+---------+----------+-------+

Operating on DataFrames

These methods all create a new DataFrame object. The previous example was exactly equivalent to:

cities = spark.read.csv('cities.csv', header=True, inferSchema=True)
c_small = cities.filter(cities['area'] < 5000)
c_droparea = c_small.select(c_small['city'], c_small['population'])
c_droparea.show()

This is typical: operations tend to build new DataFrames, and it's common to have many as you construct the final result.

Operating on DataFrames

There are many methods on DataFrames that you'll find useful.

# Return a new DataFrame...
c = cities.withColumn('area_m2', cities['area'] * 1000000)
c = cities.drop('area') # DF without 'area' column
c = cities.drop_duplicates() # remove duplicate rows
c = cities.na.drop() # remove any rows with NaN values
c = cities.sort([cities['city'], cities['population']])
c = cities.sample(withReplacement=False, fraction=0.5)
 
# Returns a number...
r = cities.stat.corr(cities['population'], cities['area'])

Column Expressions

When doing calculations with Spark DataFrames, we write expressions that operate on columns:

df['col1']
df['col1'] + 1
df['col1'] * df['col2']
(df['col1'] == 1) & (df['col2'].isNull())
functions.length(df['col1'])

Each of these is a Spark column expression, represented as a Column object.

Column Expressions

Just about everywhere we do some kind of calculation, it is a column expression.

df.select(colexpr, colexpr, colexpr)
df.filter(colexpr)
df.groupBy(colexpr).agg(colexpr)
df1.join(df2, on=colexpr)

Column Expressions

There are some useful methods on Column objects, and Python operators are overloaded to work on them:

df['col1'] ** 2
df['col1'].isNotNull()
(df['col1'] / 10).alias('onetenth')
df['str'].astype(types.IntegerType())

Column Expressions

Column expressions can be produced by extracting a column from a DataFrame, doing some calculation on a column, or by calling a column function.

df['col1']
df['col1'] - df['col2']
functions.sin(df['col1'])

Column Functions

Many column functions are provided in the pyspark.sql.functions module we have been importing.

Column Functions

There is a huge variety of functions in that module.

functions.abs(df['number'])
functions.datediff(df['date1'], df['date2'])
functions.format_string('%d-%s', df['number'], df['label'])
functions.length(df['str'])
functions.concat(df['str1'], df['str2'])

Column Functions

Some of the functions are aggregation functions that are likely to be used near a .groupBy().

groups = df.groupBy(df['col1'])
groups.agg(functions.approx_count_distinct(df['col2']))
groups.agg(functions.countDistinct(df['col2']))
groups.agg(functions.avg(df['col2']))
groups.agg(functions.collect_list(df['col2']))

DataFrames are Partitioned

Let's start thinking about big data…

The underlying assumption is that a Spark Data­Frame will not fit in any single computer's memory or disk. All we can hope for is to store pieces of it on many different computers.

All Spark DataFrames are partitioned this way.

DataFrames are Partitioned

Subsets of rows are handled separately by different processes/​cores. Each piece can (hopefully) be operated on in parallel.

If operations can truly be done in parallel without much coordination, n cores can do the work almost n times faster. (Unfortunately, that won't always be true.)

DataFrames are Partitioned

Important point: a partition is the smallest unit that can be operated on in parallel.

If you have 2 partitions, you'll be using at most 2 cores to work on that DataFrame.

Controlling Partitions

We need more control over partitions in our DataFrames.

Honestly, often easiest: split the input files into something that makes sense, then start working.

Controlling Partitions

Mostly, we are working with whatever is implied by the input files. If that's not sensible, we have to fix it.

There are a couple of methods that will rearrange the partitions of a DataFrame…

Controlling Partitions

If the problem is too many partitions, the .coalesce(n) can concatenate some of them together.

It might not do quite what you expect, but it will lower the many-partitions overhead if you have it.

Controlling Partitions

.coalesce() will never grow the number of partitions.

It can clean up your output if you know you have a small number of rows. Produces one output file:

lumpy_df.coalesce(1).write.json(…)

Controlling Partitions

If you really need a DataFrame's partitions rearranged, the .repartition(n) method does it, but it's more expensive.

print(partition_sizes(lumpy_df.coalesce(4)))
print(partition_sizes(lumpy_df.repartition(4)))
[220, 0, 300, 150]
[168, 167, 167, 168]

Is it worth it? It depends what else is going to be done with that data.

Controlling Partitions

My rule: if you explicitly decrease the partitions of a DataFrame (or RDD), you must include a comment justifying why it is a safe thing to do. If not: mark penalty.

You should say something about an upper-bound on the data size.

Controlling Partitions

How many partitions should you have? It depends.

How many total rows in the DataFrame? How many executors? What calculation is going to happen? How much processor and memory will it need?

My best guess: 100–10000. Except when that's wrong.

Lazy Evaluation

How long does this code take to complete?

numbers = spark.range(100000000000000000, numPartitions=1000000)
numbers = numbers.select(
    numbers['id'],
    functions.rand(),
    (numbers['id'] % 100).alias('m')
)
numbers.show()

<10 seconds on my desktop. How?

Lazy Evaluation

All Spark calculations are lazy (or are lazily evaluated).

That means that when we create a DataFrame, Spark doesn't actually do the calculation.

Lazy Evaluation

The 1017 rows implied by this code don't get created, just a description of what would need to happen to create this DataFrame later.

numbers = spark.range(100000000000000000, numPartitions=1000000)
numbers = numbers.select(
    numbers['id'],
    functions.rand(),
    (numbers['id'] % 100).alias('m')
)

Lazy Evaluation

In other words, the actual result of that code is producing the execution plan.

numbers.explain()
== Physical Plan ==
*Project [id#0L, rand(3073591720651987782) AS rand(3073591720651987782)#4, (id#0L % 100) AS m#3L]
+- *Range (0, 100000000000000000, step=1, splits=Some(1000000))

… not evaluating it.

Lazy Evaluation

Even calling .show() doesn't cause much work: it computes just enough to show the first 20 rows.

The DataFrame isn't actually calculated (materialized is the Spark term) until we do something with it, like:

numbers.write.csv(…)

Lazy Evaluation

That's why creating many DataFrames isn't bad:

df1 = spark.read.…
df2 = df1.select(…)
df3 = df2.groupBy(…).agg(…)
df4 = df3.filter(…)
df4.write.…

The DataFrames df1, df2, df3 are never computed. They are just used to build the execution plan for df4, which runs when the .write happens.

Caching

Lazy evaluation is great, until it isn't. Consider:

int_range = spark.range(…)
values = int_range.select(…)
result1 = values.groupBy(…).agg(…)
result1.write.…
result2 = values.filter(…)
result2.write.…

What happens:

  1. Calculate result1 by doing the .range() and .select() work.
  2. Calculate result2 by doing the .range() and .select() work.

Caching

We should use .cache() to say when you calculate this, store in memory so it's there for later, because I need it.

int_range = spark.range(…)
values = int_range.select(…).cache()
result1 = values.groupBy(…).agg(…)
result1.write.…
result2 = values.filter(…)
result2.write.…

Now:

  1. Calculate result1: calculate values as a prerequisite and store in memory.
  2. Calculate result2 using the previously-computed values.

Caching

values = int_range.select(…).cache()

Semantics: when the values DataFrame gets evaluated, try to store the results in memory, because we need them later.

It doesn't force evaluation now, but any parts of values that are calculated will end up stored by the executors.

You should .cache() when you use a single DataFrame more than once.