Skip to main content

Interactive Analysis with the Spark Shell

Basics

Spark’s primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Due to Python’s dynamic nature, we don’t need the Dataset to be strongly-typed in Python. As a result, all Datasets in Python are Dataset[Row], and we call it DataFrame to be consistent with the data frame concept in Pandas and R. Let’s make a new DataFrame from the text of the README file in the Spark source directory:

pyspark
>>> textFile = spark.read.text("README.md")

Output

no output


textFile.count()  # Number of rows in this DataFrame

Output

125

textFile.first()  # First row in this DataFrame

Output

Row(value='# Apache Spark')

linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

Output

no output


textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?

Output

20

More on Dataset Operations

Dataset actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:

from pyspark.sql import functions as sf

textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()

Output

[Row(max(numWords)=16)]

Dataset actions and transformations can be used for more complex computations. Let’s say we want to find the line with the most words:

from pyspark.sql import functions as sf

textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()

Output

[Row(max(numWords)=16)]

wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
wordCounts.collect()

Output


[Row(word='[![PySpark', count=1), Row(word='online', count=1), Row(word='graphs', ...

Caching

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank. As a simple example, let’s mark our linesWithSpark dataset to be cached:

linesWithSpark.cache()

Output

DataFrame[value: string]

linesWithSpark.count()

Output

20