Posts

In our last tutorials, we had a look at Transformations in Spark. Now, we look at Actions in Spark. One of them – collect – we already used frequently in our transformation samples. The reason for this is simple – transformations use late binding – so nothing happens – until you call an action. We used the most simple one – collect. Actions always do something with data and thus you should be prepared to use this. With the collect method, all the data is loaded into the memory. For our samples this wasn’t an issue, since we only had very small data. However, If datasets are larger, you need to re-consider this. Other functions might deliver better results for you. Let’s now have a look at the different options available

Reduce

Reduce calls a function on all items in a dataset that accumulates them. Basically, all binary operators can be used. For Python, please refer to this documentation: https://docs.python.org/3/library/operator.html. If we want to sum up all items by multiplying them in Spark, this would look like the following:

from operator import *
spark_data.reduce(mul)

The output would be this:

First and Count

These Actions are easy to use – they either return the first item in an RDD or return the count of elements in the RDD. If you use count() on a very large RDD, it might take very long and your task could run into a timeout. There is another function called countApprox() that returns the approximate count in an RDD to prevent that.

ds_one.first()

Count is also used the same way like first – without applying a function or similar.

ds_one.count()

Saving data

One important thing to do is to store data eventually somewhere. In our case, we store it into the same directory like we have Jupyter notebooks. We can therefore check if it worked as expected. Normally, you would store data on S3 or any other storage you use. Spark RDDs provide several means to save to files, in our case we will use “saveAsTextFile”. This stores the content as part files that are in text format.

ds_one.saveAsTextFile("data/dsone.csv")

You can now navigate to the output at this folder: data/dsone.csv.

Spark SaveAsTextFile Result

In our last tutorial section, we looked at more data transformations with Spark. In this tutorial, we will continue with data transformations before we move on to Actions. First, we look at Union

Intersect

A intersection between two different datasets only returns that values that – well – intersect. This means that only those values are returned if their values correspond. Only one element is returned, even though there are multiple elements in both datasets. Intersection is used like this:

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Mark", 1984), ("Anastasia", 2017)])
sorted(ds_one.intersection(ds_two).collect())

The output should look like this now:

The Intersection keyword

Please note that “Mark” is only considered by the intersection keyword if all keys match. In our case, this means the year and name.

Map

Map returns a new RDD, that was transformed by applying a function to it. This is very useful if you want to check a dataset for different things or modify data. You can also add entries by this. The following example is using a little more complex transformation where we create our own function and calculate the age and gender of the person:

import datetime
def doTheTrans(values):
    
    age = datetime.datetime.now().year - values[1]
    gender = ""
    
    if values[0].endswith("a"):
        gender = "f"
    else:
        gender = "m"
        
    return ([values[0], age, gender])
ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
sorted(ds_one.map(doTheTrans).collect())

What happens in the code?

First, we import datetime. We need this to get the current year (we could also hardcode “2019”, but then I would have to re-write the entire tutorial in january!). Next, we create our function which we call “doTheTrans”. This gets tuples from the map function (which we will use later). Note that tuples are immutable, therefore we need to create new variables and return a new tuple.

First, we calculate the age. This is easily done by “datetime.now().year – values[1]”. We have the year the person is born in the second index (number 1) in the tuple. Then, we check if the name ends with “a” and define if the person is female or male. Please note here that this isn’t sufficient normally, we just use it for our sample.

Once we are done with this, we can call the “map” function with the function we have used. Now, the map function applies the new function which we created to each item in the rdd.

Now, the output of this should look like the following:

[['Lisa', 34, 'f'], ['Mark', 35, 'm']]

It is also possible to write the function inline as lambda, if your transformations aren’t that complex. Suppose we only want to return the age of a person; here, the code would look like the following:

sorted(ds_one.map(lambda x: (x[0], datetime.datetime.now().year - x[1] +1)).collect())

And the output should be this:

[('Lisa', 35), ('Mark', 36)]

In our next tutorial, we will have a look at Actions in Spark.

In our last tutorial section, we looked at filtering, joining and sorting data. Now, we will have a look at several other operators for that. First, we will use the “Distinct” transformation

Distinct

Distinct enables us to return exactly one of each item. For instance, if we have more than one entry in the same sequence, we can reduce this. A sample would be the following array:

[1, 2, 3, 4, 1]

However, we only want to return each number exactly once. This is done via the distinct keyword. The following example illustrates this:

ds_distinct = sc.parallelize([(1), (2), (3), (4), (1)]).distinct().collect()
ds_distinct

GROUPBY

A very important task when working with data is grouping. In Spark, we have the GroupBy transformation for this. In our case, this is “GroupByKey”. Basically, this groups the dataset into a specific form and the execution is added when calling the “mapValues” function. With this function, you can provide how you want to deal with the values. Some options are pre-defined, such as “len” for the number of occurrences or “list” for the actual values. The following sample illustrates this with our dataset introduced in the previous tutorial:

ds_set = sc.parallelize([("Mark", 1984), ("Lisa", 1985), ("Mark", 2015)])
ds_grp = ds_set.groupByKey().mapValues(list).collect()
ds_grp

If you want to have a count instead, simply use “len” for it:

ds_set.groupByKey().mapValues(len).collect()

The output should look like this now:

The GroupByKey keyword in Apache Spark
The GroupByKey keyword in Apache Spark

Union

A union joins together two datasets into one. In contrast to the “Join” transformation that we already looked at in our last tutorial, it doesn’t take any keys and simply appends the datasets. It is very similar to the previous one, but the result is different. The syntax for it is straight forward: it is written “dsone.union(dstwo)”. Let’s have a look at it:

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Luke", 2015), ("Anastasia", 2017)])
sorted(ds_one.union(ds_two).collect())

Now, the output of this should look like the following:

[('Anastasia', 2017), ('Lisa', 1985), ('Luke', 2015), ('Mark', 1984)]

In our next tutorial, we will have a look at more data transformations before we move on to Actions.

In the last post, we learned the basics of Spark RDDs. Now, we will have a look at data transformations in Apache Spark. One data transformation was already used in the last tutorial: filter. We will have a look at this now as the first transformation. But before we start, we need to have a look at one major thing: lambda expressions. Each of the functions we use for data transformations in Spark take functions in the form of a lambda expression. In our last tutorial, we filtered data based on a lambda expression. Let’s recall the code to have a deeper look into it:

sp_pos = spark_data.filter(lambda x: x>0.0).collect()

What is crucial for us is the statement that is in the braces “()” after the filter command. In there, we see the following: “lamdba x: x>0.0”. Bascially, with “lambda x” we state that the following evaluation should be applied to all items in the dataset. For each iteration, we use “x” as the variable. So it reads like: “apply the evaluation, if the variable is greater than 0, to all items in our dataset”. If x would be a complex dataset, we could also use it’s fields and methods. But in our case, it is a simple number.

One more thing that is important for transformations: transformations in Spark are always “lazy” bound to it’s execution. So calling the filter function does nothing, until you call an execution on it. The one we used above is “.collect()”. Collect() in our case calls the filter.

Filtering

Filtering in data is one of the most frequent used transformations. The filter criteria is parsed as a lambda expression. You can also chain different filter criteria easily, since we have late binding on it.

We extend the above sample by only showing numbers that are smaller than 3.0. The code for that looks the following:

sp_pos = spark_data.filter(lambda x: x>0.0).filter(lambda y: y<3.0).collect()
sp_pos

Sorting

Another important transformation is sorting data. At some point, you want to arrange the data in either ascending or descending order. This is done with the “sortBy” function. The “sortBy” function takes a lambda expression that takes the field to filter. In our above example, this isn’t relevant since we only have one item per RDD. Let’s have a look at how to use it with our dataset:

sp_sorted = spark_data.sortBy(lambda x: x).collect()
sp_sorted

Now, if we want to filter it in the opposite order, we can set the optional “ascending” keyword to false. Let’s have a look at this:

sp_sorted = spark_data.sortBy(lambda x: x, False).collect()
sp_sorted
The sorted dataset

Joining data

Often, it is necessary to join two different datasets together. This is done by the “Join” function. To use the join, we need to create new datasets first (and we will need them further on as well):

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Mark", 2015), ("Anastasia", 2017)])
sorted(ds_one.join(ds_two).collect())

You can also use the inner or outer join on RDDs. In the next tutorial, we will have a look at more transformations in Spark

In the last blog post, we looked at how to access the Jupyter Notebook and work with Spark. Now, we will have some hands-on with Spark and do some first samples.

Therefore, we start with RDDs. RDDs are the basic API in Spark. Even though it is much more convenient to use Dataframes, we start with the RDDs in order to get a basic understanding of Spark.

RDDs stand for “Resilient Distributed Dataset”. Basically, this means that the Dataset is used for parallel computation. In RDDs, we can store any kind of data and apply some functions to it (such as the sum or the average). Let’s create a new Python notebook and import the required libraries.

Create a new Notebook in Python 3
The new Notebook with pyspark

Once the new notebook is created, let’s start to work with Spark.

First, we need to import the necessary libraries. Therefore, we use “pyspark” and “random”. Also, we need to create the Spark context that is used throughout our application. Note that you can only execute this line once, because if you create the SparkContext twice – it is case sensitive!

import pyspark
import random

sc = pyspark.SparkContext(appName="Cloudvane_S01")

When done with this, hit the “Run” Button in the Notebook. Next to the current cell, you will now see the [ ] turning into [*]. This means that the process is currently running and something is happening. Once it has finished, it will turn into [1] or any other incremental number. Also, if errors occur, you will see them below your code. If this part of the code succeeded, you will see no other output than the [1].

Next, let’s produce some data that we can work with. Therefore, we create an array and fill it with some data. In our case, we add 100 items. Each item is of a random value calculated with expovariate.

someValues = []

for i in range(0,100):
    someValues.append(random.expovariate(random.random()-0.5))

someValues

When the data is created, we distribute it over the network by calling “sc.parallelize”. This creates an RDD now and enables us to work with Spark.

spark_data = sc.parallelize(someValues)

We can apply various functions to the RDD. One sample would be to use the “Sum” function.

sp_sum = spark_data.sum()
sp_sum

Another sample is the “Count” function.

sp_ct = spark_data.count()
sp_ct

We can also do more complex calculations by defining methods that do some calculations. In Python, this is done by “def functionname(params)”. The following sample creates the average of the array that is passed onto the function.

def average(vals):
    return vals.sum() / vals.count()

The function is simply invoked with our data.

average(spark_data)

Last but not least, we can also filter data. In the following sample, we only include positive values.

sp_pos = spark_data.filter(lambda x: x>0.0).collect()
sp_pos

In our last tutorial, we had some brief introduction to Apache Spark. Now, in this tutorial we will have a look into how to setup an environment to work with Apache Spark. To get started, we first need to install Docker. If you don’t have it yet, find out how to install it from this link: https://docs.docker.com/install/

Once your docker is installed successfully, download the container for Spark via Kitematic. Select “all-spark-notebook” for our samples. Note that the download will take a while.

Download Apache Spark for Docker

Once your download has finished, it is about time to start your Docker container. When you download the container via Kitematic, it will be started by default. Within the container logs, you can see the URL and port to which Jupyter is mapped. Open the URL and enter the Token. When everything works as expected, you can now create new Notebooks in Jupyter.

Enter the URL and the Token
Jupyter is running

Apache Spark is the number one Big Data Tool nowadays. It is even considered ad the “killer” to Hadoop, even though Hadoop isn’t that old yet. However, Apache Spark has several advantages over “traditional” Hadoop. One of the key benefits is that Spark is ways better suited for Big Data Analytics in the Cloud than Hadoop is. Hadoop itself was never built for the Cloud, since it was built years before the Cloud took over major workloads. Apache Spark in contrast was built during the Cloud became common sense and thus has several benefits over it – e.g. by using object stores to access data such as Amazon S3.

However, Spark also integrates well into an existing Hadoop environment. Apache Spark runs native on Hadoop as a Yarn application and it re-uses different Hadoop components such as HDFS, HBase, Hive. Spark replaces Map/Reduce for batch processing with it‘s own technology, which is much faster. Hive can also run with Spark on it and thus is ways faster. Additionally, Spark comes with new technologies for interactive queries, streaming and machine learning.

Apache Spark is great in terms of performance. To sort 100 TB of data, Spark does that 3 times faster as Map/Reduce by only using 1/10th of nodes. Spark is well suited for sorting PB of Data and it won several sorting benchmarks such as the GraySort and CloudSort benchmark.

Spark is written in Scala, but is often used from Python. However, if you want to use the newest features of Spark, it is often necessary to work with Scala. Spark uses Micro-batches for „real-time“ processing, meaning that it isn‘t true real-time. The fastest interval for Micro-batches is 0.5 seconds. Spark should run in the same LAN as the data is stored. In terms of the Cloud, this means the same datacenter or availability zone. Spark shouldn‘t run on the same nodes as the data is stored (e.g. with Hbase). With Hadoop, this is the other way around; Hadoop processes the data where it is stored. There are several options to run Spark: Standalone, on Apache Mesos, on Hadoop or via Kubernetes/Docker. For our future tutorials, we will use Docker for it.

Spark has 4 main components. Over the next tutorials, we will have a look at each of them. These 4 components are:

  • Spark SQL: Provides a SQL Language, Dataframes and Datasets. This is the most convenient way to use Spark
  • Spark Streaming: Provides Micro-batch execution for near real-time applications
  • Spark ML: Built-In Machine Learning Library for Spark
  • GraphX: Built-In Library for Graph Processing

To develop Apache Spark applications, it is possible to use either Scala, Java, Python or R. Each Spark application starts with a „driver program“. The driver program executes the „main“ function.

Data Elements in Spark are called „RDD – Resilient Distributed Datasets“. This can be files on HDFS, an Object Store such as S3 or any other kind of dataset. RDDs are distributed on different nodes and Spark doesn’t take care of them. RDDs can also be kept in memory for faster execution. Spark works with „Shared Variables“. These are variables shared over different nodes, e.g. for computation. There are two types:

  • Broadcast variables: used to cache values in memory on all nodes (e.g. commonly used values)
  • Accumulators: used to add values (e.g. counters, sums or similar)

So, this was quite some text now, in the next tutorial about Apache Spark we will have a look at how to setup the environment to work with Apache Spark.