Interactive Analysis with the Spark Shell
Basics
Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Due to Python’s dynamic nature, we don’t need the Dataset to be strongly-typed in Python. As a result, all Datasets in Python are Dataset[Row], and we call it DataFrame to be consistent with the data frame concept in Pandas and R. Let’s make a new DataFrame from the text of the README file in the Spark source directory:
pyspark
>>> textFile = spark.read.text("README.md")
Output
no output
textFile.count() # Number of rows in this DataFrame
Output
125
textFile.first() # First row in this DataFrame
Output
Row(value='# Apache Spark')
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
Output
no output
textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
Output
20
More on Dataset Operations
Dataset actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:
from pyspark.sql import functions as sf
textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
Output
[Row(max(numWords)=16)]
Dataset actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:
from pyspark.sql import functions as sf
textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
Output
[Row(max(numWords)=16)]
wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
wordCounts.collect()
Output
[Row(word='[![PySpark', count=1), Row(word='online', count=1), Row(word='graphs', ...
Caching
Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:
linesWithSpark.cache()
Output
DataFrame[value: string]
linesWithSpark.count()
Output
20