What is big data? Pick your favourite definition:
When people say big data
, they usually mean #3.
My best functional definition: #2.
The big limitations of Pandas:
The biggest CSV file I could find in my life: 27000 rows, making a 1.7 MB DataFrame.
Your laptop probably has ≥4 GB (4096 MB) of memory: data needs to be quite big before Pandas won't handle it.
If one computer won't do the job, you need several: each one can store some of the data and do some of the processing, and they can work together to generate final results.
Compute cluster: several computers working together to do some work.
Apache Hadoop is a collection of tools for managing compute clusters.
The goal here is to express the computation we want to do, in such a way that the work can be sent out to the cluster and done in parallel.
Spark will let us do that.
YARN can take our job and make sure all of the pieces get done somewhere, somehow (with Spark's help).
HDFS can store all of the pieces of our data files, so they're there when YARN/​Spark wants to work on them.
Today, we'll (mostly?) run Spark on our computers, with the assurance that we could scale up the computation if we needed to.
… and knowing that we're now using all of the cores in our processor.
A complete Spark program:
import sys from pyspark.sql import SparkSession, functions, types spark = SparkSession.builder.appName('example 1').getOrCreate() assert sys.version_info >= (3, 4) # make sure we have Python 3.4+ assert spark.version >= '2.1' # make sure we have Spark 2.1+ data = spark.read.csv('cities.csv', header=True, inferSchema=True) data.show()
Last two lines are real work. Rest is boilerplate that will be in all of our Spark programs.
Run the job:
spark-submit spark-1.py
The example reads a file cities.csv
. Let's say:
city,population,area Vancouver,2463431,2878.52 Calgary,1392609,5110.21 Toronto,5928040,5905.71 Montreal,4098927,4604.26 Halifax,403390,5496.31
It will then output:
+---------+----------+-------+ | city|population| area| +---------+----------+-------+ |Vancouver| 2463431|2878.52| | Calgary| 1392609|5110.21| | Toronto| 5928040|5905.71| | Montreal| 4098927|4604.26| | Halifax| 403390|5496.31| +---------+----------+-------+
This line…
data = spark.read.csv('cities.csv', header=True, inferSchema=True)
…creates an object that is the primary way we'll store and manipulate data in Spark: a DataFrame.
A Pandas DataFrame and a Spark DataFrame are not the same thing. Spark's DataFrames were inspired by Pandas (and DataFrames in R).
Spark's DataFrames work differently, often because of their basic job: letting you do things in parallel across a cluster. Sometimes just because of design differences.
Some things will be familiar. Like Pandas, Spark DataFrames:
A lot of the operations are spelled differently, but you can see the similarities:
cities = spark.read.csv('cities.csv', header=True, inferSchema=True) cities.printSchema()
root |-- city: string (nullable = true) |-- population: integer (nullable = true) |-- area: double (nullable = true)
We can do operations that feel familiar:
c = cities.filter(cities['area'] < 5000) c = c.select(c['city'], c['population']) c.show()
+---------+----------+ | city|population| +---------+----------+ |Vancouver| 2463431| | Montreal| 4098927| +---------+----------+
And we can write the results:
c.write.json('spark-output', mode='overwrite')
But it doesn't create a file. It creates a directory with several files:
$ ls spark-output/
part-00000-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json part-00001-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json part-00002-cf9efe32-ee0e-41a5-a97f-442cbe3a7de8.json _SUCCESS
The real
output is the concatenation of those files.
In the previous code, we saw two methods on DataFrames:
c = cities.filter(cities['area'] < 5000) c = c.select(c['city'], c['population'])
… and they feel very SQL-like. (Actually .where()
is a synonym for .filter()
… very SQL-like.)
The .select()
method creates a new DataFrame of the columns you specify: either existing or a calculation.
some_values = cities.select( cities['city'], cities['area'] * 1000000 ) some_values.show()
+---------+----------------+ | city|(area * 1000000)| +---------+----------------+ |Vancouver| 2.87852E9| | Calgary| 5.11021E9| | Toronto| 5.90571E9| | Montreal| 4.60426E9| | Halifax| 5.49631E9| +---------+----------------+
That could have been prettier…
some_values = cities.select( cities['city'], (cities['area'] * 1000000).alias('area_m2') ) some_values.show()
+---------+---------+ | city| area_m2| +---------+---------+ |Vancouver|2.87852E9| | Calgary|5.11021E9| | Toronto|5.90571E9| | Montreal|4.60426E9| | Halifax|5.49631E9| +---------+---------+
The .filter()
method keeps rows where the condition is true.
some_values = cities.filter(cities['population'] % 2 == 1) some_values.show()
+---------+----------+-------+ | city|population| area| +---------+----------+-------+ |Vancouver| 2463431|2878.52| | Calgary| 1392609|5110.21| | Montreal| 4098927|4604.26| +---------+----------+-------+
These methods all create a new DataFrame object. The previous example was exactly equivalent to:
cities = spark.read.csv('cities.csv', header=True, inferSchema=True) c_small = cities.filter(cities['area'] < 5000) c_droparea = c_small.select(c_small['city'], c_small['population']) c_droparea.show()
This is typical: operations tend to build new DataFrames, and it's common to have many as you construct the final result.
There are many methods on DataFrames that you'll find useful.
# Return a new DataFrame... c = cities.withColumn('area_m2', cities['area'] * 1000000) c = cities.drop('area') # DF without 'area' column c = cities.drop_duplicates() # remove duplicate rows c = cities.na.drop() # remove any rows with NaN values c = cities.sort([cities['city'], cities['population']]) c = cities.sample(withReplacement=False, fraction=0.5) # Returns a number... r = cities.stat.corr(cities['population'], cities['area'])
When doing calculations with Spark DataFrames, we write expressions that operate on columns:
df['col1'] df['col1'] + 1 df['col1'] * df['col2'] (df['col1'] == 1) & (df['col2'].isNull()) functions.length(df['col1'])
Each of these is a Spark column expression, represented as a Column
object.
Just about everywhere we do some kind of calculation, it is a column expression.
df.select(colexpr, colexpr, colexpr) df.filter(colexpr) df.groupBy(colexpr).agg(colexpr) df1.join(df2, on=colexpr)
There are some useful methods on Column
objects, and Python operators are overloaded to work on them:
df['col1'] ** 2 df['col1'].isNotNull() (df['col1'] / 10).alias('onetenth') df['str'].astype(types.IntegerType())
Column expressions can be produced by extracting a column from a DataFrame, doing some calculation on a column, or by calling a column function.
df['col1'] df['col1'] - df['col2'] functions.sin(df['col1'])
Many column functions are provided in the pyspark.sql.functions
module we have been importing.
There is a huge variety of functions in that module.
functions.abs(df['number']) functions.datediff(df['date1'], df['date2']) functions.format_string('%d-%s', df['number'], df['label']) functions.length(df['str']) functions.concat(df['str1'], df['str2'])
Some of the functions are aggregation functions that are likely to be used near a .groupBy()
.
groups = df.groupBy(df['col1']) groups.agg(functions.approx_count_distinct(df['col2'])) groups.agg(functions.countDistinct(df['col2'])) groups.agg(functions.avg(df['col2'])) groups.agg(functions.collect_list(df['col2']))
Let's start thinking about big data…
The underlying assumption is that a Spark DataFrame will not fit in any single computer's memory or disk. All we can hope for is to store pieces of it on many different computers.
All Spark DataFrames are partitioned this way.
Subsets of rows are handled separately by different processes/​cores. Each piece can (hopefully) be operated on in parallel.
If operations can truly be done in parallel without much coordination, n cores can do the work almost n times faster. (Unfortunately, that won't always be true.)
Important point: a partition is the smallest unit that can be operated on in parallel.
If you have 2 partitions, you'll be using at most 2 cores to work on that DataFrame.
We need more control over partitions in our DataFrames.
Honestly, often easiest: split the input files into something that makes sense, then start working.
Mostly, we are working with whatever is implied by the input files. If that's not sensible, we have to fix it.
There are a couple of methods that will rearrange the partitions of a DataFrame…
If the problem is too many partitions, the .coalesce(n)
can concatenate some of them together.
It might not do quite what you expect, but it will lower the many-partitions overhead if you have it.
.coalesce()
will never grow the number of partitions.
It can clean up your output if you know you have a small number of rows. Produces one output file:
lumpy_df.coalesce(1).write.json(…)
If you really need a DataFrame's partitions rearranged, the .repartition(n)
method does it, but it's more expensive.
print(partition_sizes(lumpy_df.coalesce(4))) print(partition_sizes(lumpy_df.repartition(4)))
[220, 0, 300, 150] [168, 167, 167, 168]
Is it worth it? It depends what else is going to be done with that data.
My rule: if you explicitly decrease the partitions of a DataFrame (or RDD), you must include a comment justifying why it is a safe thing to do. If not: mark penalty.
You should say something about an upper-bound on the data size.
How many partitions should you have? It depends.
How many total rows in the DataFrame? How many executors? What calculation is going to happen? How much processor and memory will it need?
My best guess: 100–10000. Except when that's wrong.
How long does this code take to complete?
numbers = spark.range(100000000000000000, numPartitions=1000000) numbers = numbers.select( numbers['id'], functions.rand(), (numbers['id'] % 100).alias('m') ) numbers.show()
<10 seconds on my desktop. How?
All Spark calculations are lazy (or are lazily evaluated).
That means that when we create a DataFrame, Spark doesn't actually do the calculation.
The 1017 rows implied by this code don't get created, just a description of what would need to happen to create this DataFrame later.
numbers = spark.range(100000000000000000, numPartitions=1000000) numbers = numbers.select( numbers['id'], functions.rand(), (numbers['id'] % 100).alias('m') )
In other words, the actual result of that code is producing the execution plan.
numbers.explain()
== Physical Plan == *Project [id#0L, rand(3073591720651987782) AS rand(3073591720651987782)#4, (id#0L % 100) AS m#3L] +- *Range (0, 100000000000000000, step=1, splits=Some(1000000))
… not evaluating it.
Even calling .show()
doesn't cause much work: it computes just enough to show the first 20 rows.
The DataFrame isn't actually calculated (materialized is the Spark term) until we do something with it, like:
numbers.write.csv(…)
That's why creating many DataFrames isn't bad:
df1 = spark.read.… df2 = df1.select(…) df3 = df2.groupBy(…).agg(…) df4 = df3.filter(…) df4.write.…
The DataFrames df1
, df2
, df3
are never computed. They are just used to build the execution plan for df4
, which runs when the .write
happens.
Lazy evaluation is great, until it isn't. Consider:
int_range = spark.range(…) values = int_range.select(…) result1 = values.groupBy(…).agg(…) result1.write.… result2 = values.filter(…) result2.write.…
What happens:
result1
by doing the .range()
and .select()
work.result2
by doing the .range()
and .select()
work.We should use .cache()
to say when you calculate this, store in memory so it's there for later, because I need it.
int_range = spark.range(…) values = int_range.select(…).cache() result1 = values.groupBy(…).agg(…) result1.write.… result2 = values.filter(…) result2.write.…
Now:
result1
: calculate values
as a prerequisite and store in memory.result2
using the previously-computed values
.values = int_range.select(…).cache()
Semantics: when the values
DataFrame gets evaluated, try to store the results in memory, because we need them later.
It doesn't force evaluation now, but any parts of values
that are calculated will end up stored by the executors.
You should .cache()
when you use a single DataFrame more than once.