Posts

During the past tutorials, we have aquired a lot of knowledge about Spark. Now, we are with the last tutorial on Spark, where we will have a look at Cube and Rollup. Basically both are useful for multi-dimensional data for further processing.

Data for Spark Rollup and Cube functions

First, let’s create a dataset that we later want to work with. Our dataset is the monthly salary of people working in Finance or Sales:

employees = spark.createDataFrame([("Mario", 4400, "Sales")\
                                  , ("Max", 3420, "Finance")\
                                  , ("Sue", 5500, "Sales")\
                                  , ("Tom", 6700, "Finance")]\
                                 , ("name", "salary", "department"))

We then use the first function – rollup. We want to have the rollup to be on the department and the name of the person.

employees.rollup(employees.department, employees.name)\
            .sum()\
            .withColumnRenamed("sum(salary)", "salary")\
            .orderBy("department", "salary")\
            .show()

Here you can see the output (I will discuss it after you reviewed it):

+----------+-----+------+
|department| name|salary|
+----------+-----+------+
|      null| null| 20020|
|   Finance|  Max|  3420|
|   Finance|  Tom|  6700|
|   Finance| null| 10120|
|     Sales|Mario|  4400|
|     Sales|  Sue|  5500|
|     Sales| null|  9900|
+----------+-----+------+

We have several lines in this now. Let’s look at it line-by-line:

  • The first line is consisting of two null values and the sum of all salaries. So, this would represent the entire company. Basically, it fills department and name with null, since it is neither a department nor a specific person – it is all departments and all persons in it.
  • The second and third line are Max and Tom, who work in the finance department
  • The fourth line is the sum of the finance department; here you see “null” in the name, since it isn’t a name, but the entire department
  • The same story continues for the following lines with the sales department

So, basically, we get different things: (A) the sum of all revenues, (B) the individual values and (C) the revenues per department. Now, let’s build the cube:

employees.cube(employees.department, employees.name)\
            .sum()\
            .withColumnRenamed("sum(salary)", "salary")\
            .orderBy("department", "salary")\
            .show()

Here, the results are in even more dimensions. First, we have the values of each person, but not from the department. Then, we have all results and then again the departments and individuals in it. The cube isn’t relevant for us for this calculation much. The background is that a cube creates all possible combinations, whereas the rollup only creates hierarchies. The cube also treats null’s as a possible combination, that’s why we have the individuals here several times. Here is the output:

+----------+-----+------+
|department| name|salary|
+----------+-----+------+
|      null|  Max|  3420|
|      null|Mario|  4400|
|      null|  Sue|  5500|
|      null|  Tom|  6700|
|      null| null| 20020|
|   Finance|  Max|  3420|
|   Finance|  Tom|  6700|
|   Finance| null| 10120|
|     Sales|Mario|  4400|
|     Sales|  Sue|  5500|
|     Sales| null|  9900|
+----------+-----+------+

I hope you liked the tutorials on Spark. There is much more to learn – e.g. about machine learning or different libraries for that. Make sure to check out the tutorial section in order to figure that out.

If you enjoyed this tutorial on spark rollup and cube, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In the previous tutorial, we learned about data cleaning in Spark. Today, we will look at different options to work with columns and rows in Spark. First, we will start with renaming columns. We did this already several times so far, and it is a frequent task in data engineering. In the following sample, we will rename a column:

thirties = clean.select(clean.name, clean.age.between(30, 39)).withColumnRenamed("((age >= 30) AND (age <= 39))", "goodage")
thirties.show()

As you could see, we took the old name – which was very complicated – and renamed it to “goodage”. The output should be the following:

+-----+-------+
| name|goodage|
+-----+-------+
|  Max|  false|
|  Tom|   true|
|  Sue|  false|
|Mario|   true|
+-----+-------+

In the next sample, we want to filter columns on a string-expression. This can be done with the “endswith” method being applied to the column name that should be filtered. In the following sample, we want to filter all contacts that are from Austria:

austrian = clean.filter(clean.lang.endswith("at"))
austrian.show()

As you can see, only one result is returned (as expected):

+---+-----+---+-----+
|nid| name|age| lang|
+---+-----+---+-----+
|  1|Mario| 35|DE-at|
+---+-----+---+-----+

Removing Null-Values in Spark

In our next sample, we want to filter all rows that contain null values in a specific column. This is useful to get a glimpse of null values in datasets. This can easily be done by applying the “isNull” function on a column:

nullvalues = dirtyset.filter(dirtyset.age.isNull())
nullvalues.show()

Here, we get the two results containing these null values:

+---+----+----+-----+
|nid|name| age| lang|
+---+----+----+-----+
|  4| Tom|null|AT-ch|
|  5| Tom|null|AT-ch|
+---+----+----+-----+

Another useful function in Spark is the “Like” function. If you are familiar with SQL, it should be easy to apply this. If not – basically, it scans text in a column, which contains one or more specific literals. You can use different expressions to filter for patterns. The following one filters all people that have “DE” in it, independent of what follows afterwards (“%”):

langde = clean.filter(clean.lang.like("DE%"))
langde.show()

Here, we get all items:

+---+-----+---+-----+
|nid| name|age| lang|
+---+-----+---+-----+
|  2|  Max| 46|DE-de|
|  4|  Tom| 34|DE-ch|
|  1|Mario| 35|DE-at|
+---+-----+---+-----+

Shorten Strings in a Column in Spark

Several times, we want to shorten string values. The following sample takes the first 2 letters with the “substr” function on the column. We afterwards apply the “alias” function, which renames the function (similar to the “withColumnRenamed” function above).

shortnames = clean.select(clean.name.substr(0,2).alias("sn")).collect()
shortnames

Also here, we get the expected output; please note that it isn’t unique anymore (names!):

[Row(sn='Ma'), Row(sn='To'), Row(sn='Su'), Row(sn='Ma')]

Spark offers much more functionality to manipulate Columns, so just play with the API :). In the next tutorial, we will have a look at how to build Cubes and Rollups in Spark

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.


Over the past tutorials, we have acquired quite some knowledge about Python and Spark. We know most of the relevant statements in Apache Spark and are now ready for some data cleaning, which is one of the key tasks of a data engineer. Before we get started, we need to have something that a data engineer (unfortunately) often works with: dirty data. We will focus on how to deal with missing, corrupt and wrong data in Spark

Dealing with wrong data in Spark

In the following sample, we create some data. Note, that there are some errors in it:

  1. The first field (id: 1) is having the wrong language – there is no “AT-at”, but it needs to be “DE-at”
  2. The fourth field (id: 4) is having a null-value “None” and also the wrong language – AT-ch, whereas it should be “DE-ch”
  3. The fifth field (id: 5) is a duplicate of the previous one
dirtyset = spark.createDataFrame([(1, "Mario", 35, "AT-at")\
                                  , (2, "Max", 46, "DE-de")\
                                  , (3, "Sue", 22, "EN-uk")\
                                  , (4, "Tom", None, "AT-ch")\
                                  , (5, "Tom", None, "AT-ch")]\
                                 , ("nid", "name", "age", "lang"))
dirtyset.show()

The dataset should look like this:

+---+-----+----+-----+
|nid| name| age| lang|
+---+-----+----+-----+
|  1|Mario|  35|AT-at|
|  2|  Max|  46|DE-de|
|  3|  Sue|  22|EN-uk|
|  4|  Tom|null|AT-ch|
|  5|  Tom|null|AT-ch|
+---+-----+----+-----+

Deleting duplicates in Spark

The first thing we want to do is removing duplicates. There is an easy function for that in Apache Spark – called “dropDuplicates”. When you look at the previous dataset, it might be very easy to figure out that the last one is a duplicate – but wait! For Apache Spark, it isn’t that easy, because the id is different – it is 4 vs 5. Spark doesn’t figure out which columns are relevant to take duplicates from. If we would apply the “dropDuplicates” to the dataframe, it wouldn’t remove anything. So, we need to apply the columns it should take into account when removing duplicates. We tell spark that we want to work with “name” and “age” for this purpose and pass a list of these to the function:

nodub = dirtyset.dropDuplicates(["name", "age"])
nodub.show()

When you now execute the code, it should result in the cleaned dataset:

+---+-----+----+-----+
|nid| name| age| lang|
+---+-----+----+-----+
|  2|  Max|  46|DE-de|
|  4|  Tom|null|AT-ch|
|  3|  Sue|  22|EN-uk|
|  1|Mario|  35|AT-at|
+---+-----+----+-----+

This was very easy so far. Now, let’s take care of the wrong language values.

Replacing wrong values in columns in Apache Spark

The Spark “na” functions provide this for us. There is a function called “na.replace” that takes the old value and the new value to replace on. Since we have two different values, we need to call the function twice:

reallangs = nodub.na.replace("AT-at", "DE-at").na.replace("AT-ch", "DE-ch")
reallangs.show()

As you can see, the values are replaced accordingly and now also this should work:

+---+-----+----+-----+
|nid| name| age| lang|
+---+-----+----+-----+
|  2|  Max|  46|DE-de|
|  4|  Tom|null|DE-ch|
|  3|  Sue|  22|EN-uk|
|  1|Mario|  35|DE-at|
+---+-----+----+-----+

Only one last thing is now necessary: replacing null values in the dataset.

Replacing null values in Apache Spark

Dealing and working with null-values is always a complicated thing to achieve. Null-values basically means that we don’t know something and that it might have a negative impact on our future predictions and analysis. However, there are some solutions:

  • Ignoring null-values by removing the rows containing them
  • adding a standard-value (e.g. in our case either 0 or a very high value)
  • Using statistics to calculate the most appropriate value

The first one would take away a lot of data. The more columns you have, the higher the possibility is to have null-values! This would reduce the relevant samples and thus the accuracy of predictions. The second one would add some other challenges in our case: it would either increase the average to a very high number or to a very low number. So, only the last opportunity stays for us: calculate a value for the dataset. If you have several features, such as name, it is somewhat easier to do so. In our sample, we use a very easy method: just calculating the average from the correct values. The following sample does exactly that, I will explain each step after the sample:

from pyspark.sql.functions import *
avage = reallangs.groupby().agg(avg(reallangs.age)).collect()
rage = int(avage[0][0])
clean = reallangs.na.fill(rage)
clean.show()

So, what has happend here?

  • We start by calculating the average from all ages we have. This is done with the agg() function
  • Next, we convert the average (which is a float) to an int. Since it is of type row, we have to use two indices to get to our value
  • We then call the “na.fill” with the previously calculated average

The output of that should look like the following:

+---+-----+---+-----+
|nid| name|age| lang|
+---+-----+---+-----+
|  2|  Max| 46|DE-de|
|  4|  Tom| 34|DE-ch|
|  3|  Sue| 22|EN-uk|
|  1|Mario| 35|DE-at|
+---+-----+---+-----+

That’s all! Easy, isn’t it? Of course, we could add much more intelligence to it, but let’s keep it as is right now. More will be added in the tutorials on Data Science, which are about to come soon :). Dealing with wrong data in Spark is one of the key things you will do before going for Data Science.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In our previous tutorial, we looked at how to join data in Apache Spark. Another frequently used thing when working with data is to reduce the number of results by limit data in spark to a specific number. This is done with the limit statement.

Limit Data in Spark with the limit() method

Basically, the limit statement is very easy. It is easy to use since it only takes the number of results to return as a parameter. The limit statement is usually applied with an order-statement. In the following sample, we use the limit statement on the df_ordered dataset which we introduced in the tutorial on filtering and ordering data in Spark. After the sample, I will explain what the steps are.

sumed = df_ordered.groupby(df_ordered.personid) \
                  .agg(sum(df_ordered.price)) \
                  .toDF("pid", "ordervalue")
newPers = df_ordered.join(sumed, sumed.pid == df_ordered.personid, "inner") \
                    .drop("productname", "price", "pid").distinct() \
                    .orderBy("ordervalue", ascending=False) \
                    .limit(10)
newPers.show()

Basically, the above sample shows the top 10 customers from our dataset. The following steps are applied:

  1. Grouping the dataset by the person id
  2. Creating the sum of products bought by the customer
  3. And creating a new dataframe from it

We then join the dataset of ordered values back into the person data. Spark doesn’t allow appending this data and keeping all the original values (like personname, age, …) in it. In the next statement, we do the following:

  1. We join the newly created dataset into the original dataset
  2. Remove the unnecessary items such as productname, price and pid
  3. Order everything by ordervalue descending
  4. and limit the results to only have the top 10 customers.

Now, the result should look like the following:

+--------+----------+---+-----+------------------+
|personid|personname|age|state|        ordervalue|
+--------+----------+---+-----+------------------+
|     162|     Heidi| 37|   GA|24269.340000000226|
|      38|     Daisy| 45|   CA|23799.450000000204|
|     140|     Elsie| 64|   FL|  23759.5400000002|
|      18|      Ruby| 47|   GA|23414.710000000185|
|     180|   Caitlin| 65|   NY| 23124.71000000019|
|     159|    Taylor| 41|   NY|23054.670000000162|
|     131|     Aaron| 67|   TX| 23049.63000000016|
|      49|     Dylan| 47|   TX| 23029.68000000018|
|     136|    Isabel| 52|   CA| 22839.85000000014|
|      43|     Mason| 30|   CA|22834.710000000185|
+--------+----------+---+-----+------------------+

The limit statement itself is very easy, however, it is a bit more complex on how to get towards using the statement ;). In the next tutorial, we will look at how to deal with corrupt data – get ready for some data cleaning!

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In our last tutorial, we looked at different aggregation functions in Apache Spark. This time, we will look at what different options are available to join data in Apache Spark. Joining Data is an essential thing when it comes to working with data.

Join Data in Spark

In most cases, you will sooner or later come across data joining in whatever form. In our tutorial, we will look at three different ways of joining data: via a join, a union or an intersect.

But first, we need to create some datasets we can join. Our financial datasets from the previous samples isn’t comprehensive enough, so we create some other dummy data:

names = spark.createDataFrame([(1, "Mario"), (2, "Max"), (3, "Sue")], ("nid", "name"))
revenues = spark.createDataFrame([(1, 9.99), (2, 189.99), (3, 1099.99)], ("rid", "revenue"))

We simply create two dataframes: names and revenues. These dataframes are initialised as a list of key/value pairs. Now, it is about time to learn the first item: the “join” function.

Creating a join in Spark: the Join() in PySpark

A join() operation is performed on the dataframe itself. Basically, the join operation takes several parameters. We will look at the most important ones:

  • Other Dataset: the other dataset to join with
  • Join by: the IDs to join on (must be existing in both datasets)
  • Type: inner, outer, left/right outer join

Basically, the inner join returns all values that are matched (e.g. available in both datasets). The outer join also returns values that haven’t got a matching “partner” in the other dataset; non-matching datasets are filled with Null-values. The difference between left and right outer join is that dataset either from the left (first dataset) or right (second dataset) is used. The left or right syntax comes from SQL, where you write it from left to right. In our following sample, we use an inner join:

joined = names.join(revenues, names.nid == revenues.rid, "inner").drop("rid")
joined.show()

Basically, all items should have a “matching partner” from both datasets. The result should be this:

+---+-----+-------+
|nid| name|revenue|
+---+-----+-------+
|  1|Mario|   9.99|
|  3|  Sue|1099.99|
|  2|  Max| 189.99|
+---+-----+-------+

Creating the union in Spark: the union() method in PySpark

Creating a union is the second way to bring data together. Basically, a Union is appending data to the original dataset without taking the structure or ids into account. Basically, the union just copies the other dataset to the first dataset to the end of it. The following sample shows the union:

dnames.union(revenues).show()

The output should be this:

+---+-------+
|nid|   name|
+---+-------+
|  1|  Mario|
|  2|    Max|
|  3|    Sue|
|  1|   9.99|
|  2| 189.99|
|  3|1099.99|
+---+-------+

Creating the intersection in Spark: the intersect() in PySpark

The third way in our tutorial today is the intersect() method. Basically, it only returns those datasets that have a matching dataset between the two compared datasets. As for our sample, we first need to create a new dataset, where we add new items of id/name pairs and then call the intersect statement:

names2 = spark.createDataFrame([(1, "Mario"), (4, "Tom"), (5, "Lati")], ("nid", "name"))
names2.intersect(names).show()

And the output should be this:

+---+-----+
|nid| name|
+---+-----+
|  1|Mario|
+---+-----+

Now you should be familiar with all different ways on how to join data. In the next sample, we will look at how to limit data results.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In our last tutorial section, we worked with Filters and Groups. Now, we will look at different aggregation functions in Spark and Python. Today, we will look at the Aggregation function in Spark, that allows us to apply different aggregations on columns in Spark

The agg() function in PySpark

Basically, there is one function that takes care of all the agglomerations in Spark. This is called “agg()” and takes some other function in it. There are various possibilities, the most common ones are building sums, calculating the average or max/min values. The “agg()” function is called on a grouped dataset and is executed on one column. In the following samples, some possibilities are shown:

from pyspark.sql.functions import *
df_ordered.groupby().agg(max(df_ordered.price)).collect()

In this sample, we imported all available functions from pyspark.sql. We called the “agg” function on the df_ordered dataset that we have created in the previous tutorial. We than use the “max()” function that retrieves the highest value of the price. The output should be the following:

[Row(max(price)=99.99)]

Now, we want to calculate the average value of the price. Similar to the above example, we use the “agg()” function and instead of “max()” we call the “avg” function.

df_ordered.groupby().agg(avg(df_ordered.price)).collect()

The output should be this:

[Row(avg(price)=42.11355000000023)]

Now, let’s get the sum of all orders. This can be done with the “sum()” function. It is also very similar to the previous samples:

df_ordered.groupby().agg(sum(df_ordered.price)).collect()

And the output should be this:

[Row(sum(price)=4211355.000000023)]

To calculate the mean value, we can use “mean” with it:

df_ordered.groupby().agg(mean(df_ordered.price)).collect()

This should be the output:

[Row(avg(price)=42.11355000000023)]

Another useful function is “count”, where we can simply count all datasets for a column:

df_ordered.groupby().agg(count(df_ordered.price)).collect()

And the output should be this:

[Row(count(price)=100000)]

Today’s tutorial was very easy. It was dealing with aggregation function in Spark and how you can create them with the use of the agg() method. In the next sample, we will look at different ways to join data together.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In our last tutorial section, we had a brief introduction to Spark Dataframes. Now, let’s have a look into filtering, grouping and sorting Data with Apache Spark Dataframes. First, we will start with the orderby statement in Spark, which allows us to order data in Spark. Next, we will continue with the sort statement in Spark that allows us to sort data in Spark and then we will focus on the groupby statement in Spark, which will eventually allow us to group data in Spark.

Order Data in Spark

First, let’s look at how to order data. Basically, there is an easy function on all dataframes, called “orderBy”. This function takes several parameters, but the most important parameters for us are:

  • Columns: a list of columns to order the dataset by. This is either one or more items
  • Order: ascending (=True) or descending (ascending=False)

If we again load our dataset from the previous sample, we can now easily apply the orderBy() function. In our sample, we order everything by personid:

df_new = spark.read.parquet(fileName)
df_ordered = df_new.orderBy(df_new.personid, ascending=True)
df_ordered.show()

The output should be:

+--------+----------+---+---------------+-----+-----+
|personid|personname|age|    productname|price|state|
+--------+----------+---+---------------+-----+-----+
|       0|    Amelia| 33|Bicycle gearing|19.99|   NY|
|       0|    Amelia| 33|      Kickstand|14.99|   NY|
|       0|    Amelia| 33|   Bicycle bell| 9.99|   NY|
|       0|    Amelia| 33|         Fender|29.99|   NY|
|       0|    Amelia| 33|         Fender|29.99|   NY|
|       0|    Amelia| 33|      Kickstand|14.99|   NY|
|       0|    Amelia| 33| Bicycle saddle|59.99|   NY|
|       0|    Amelia| 33| Bicycle saddle|59.99|   NY|
|       0|    Amelia| 33|  Bicycle brake|49.99|   NY|
|       0|    Amelia| 33|   Bicycle bell| 9.99|   NY|
|       0|    Amelia| 33|Bicycle gearing|19.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
|       0|    Amelia| 33|  Bicycle Frame|99.99|   NY|
|       0|    Amelia| 33|Luggage carrier|34.99|   NY|
|       0|    Amelia| 33|Luggage carrier|34.99|   NY|
|       0|    Amelia| 33|      Kickstand|14.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
|       0|    Amelia| 33|  Bicycle Frame|99.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
+--------+----------+---+---------------+-----+-----+
only showing top 20 rows

Filter Data in Spark

Next, we want to filter our data. We take the ordered data again and only take customers that are between 33 and 35 years. Just like the previous “orderby” function, we can also apply an easy function here: filter. This function basically takes one filter argument. In order to have a “between”, we need to chain two filter statements together. Also like the previous sample, the column to filter with needs to be applied:

df_filtered = df_ordered.filter(df_ordered.age < 35).filter(df_ordered.age > 33)
df_filtered.show()

The output should look like this:

+--------+----------+---+---------------+-----+-----+
|personid|personname|age|    productname|price|state|
+--------+----------+---+---------------+-----+-----+
|      47|      Jake| 34|  Bicycle chain|39.99|   TX|
|      47|      Jake| 34|Bicycle gearing|19.99|   TX|
|      47|      Jake| 34|  Bicycle Frame|99.99|   TX|
|      47|      Jake| 34|Luggage carrier|34.99|   TX|
|      47|      Jake| 34|Bicycle gearing|19.99|   TX|
|      47|      Jake| 34|Luggage carrier|34.99|   TX|
|      47|      Jake| 34|   Bicycle bell| 9.99|   TX|
|      47|      Jake| 34|Bicycle gearing|19.99|   TX|
|      47|      Jake| 34|   Bicycle fork|79.99|   TX|
|      47|      Jake| 34|  Bicycle Frame|99.99|   TX|
|      47|      Jake| 34|      Saddlebag|24.99|   TX|
|      47|      Jake| 34|Luggage carrier|34.99|   TX|
|      47|      Jake| 34| Bicycle saddle|59.99|   TX|
|      47|      Jake| 34|  Bicycle Frame|99.99|   TX|
|      47|      Jake| 34|         Fender|29.99|   TX|
|      47|      Jake| 34|      Kickstand|14.99|   TX|
|      47|      Jake| 34|   Bicycle bell| 9.99|   TX|
|      47|      Jake| 34|   Bicycle bell| 9.99|   TX|
|      47|      Jake| 34|  Bicycle brake|49.99|   TX|
|      47|      Jake| 34|      Saddlebag|24.99|   TX|
+--------+----------+---+---------------+-----+-----+
only showing top 20 rows

Group Data in Spark

In order to make more complex queries, we can also group data by different columns. This is done with the “groupBy” statement. Basically, this statement also takes just one argument – the column(s) to sort by. The following sample is a bit more complex, but I will explain it after the sample:

from pyspark.sql.functions import bround
df_grouped = df_ordered \
    .groupBy(df_ordered.personid) \
    .sum("price") \
    .orderBy("sum(price)") \
    .select("personid", bround("sum(price)", 2)) \
    .withColumnRenamed("bround(sum(price), 2)", "value")
df_grouped.show()

So, what has happened here? We had several steps:

  • Take the previously ordered dataset and group it by personid
  • Create the sum of each person’s items
  • Order everything descending by the column for the sum. NOTE: the column is named “sum(price)” since it is a new column
  • We round the column “sum(price)” by two decimal points so that it looks nicer. Note again, that the name of the column is changed again to “ground(sum(price), 2)”
  • Since the column is now at a really hard to interpret name, we call the “withColumnRenamed” function to give the column a much nicer name. We call our column “value”

The output should look like this:

+--------+--------+
|personid|   value|
+--------+--------+
|      37|18555.38|
|      69|18825.24|
|       6|18850.19|
|     196|19050.34|
|      96|19060.34|
|     144|19165.37|
|     108|19235.21|
|      61|19275.52|
|      63|19330.23|
|     107|19390.22|
|      46|19445.41|
|      79|19475.16|
|     181|19480.35|
|      17|19575.33|
|     123|19585.29|
|      70|19655.19|
|     134|19675.31|
|     137|19715.16|
|      25|19720.07|
|      45|19720.14|
+--------+--------+
only showing top 20 rows

You can also do each of the statements step-by-step in case you want to see each transformation and its impact.

In our next tutorial, we will have a look at aggregation functions in Apache Spark.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In our last tutorials, we had a look at Transformations in Spark. Now, we look at Actions in Spark. One of them – collect – we already used frequently in our transformation samples. The reason for this is simple – transformations use late binding – so nothing happens – until you call an action. We used the most simple one – collect. Actions always do something with data and thus you should be prepared to use this. With the collect method, all the data is loaded into the memory. For our samples this wasn’t an issue, since we only had very small data. However, If datasets are larger, you need to re-consider this. Other functions might deliver better results for you. Let’s now have a look at the different options available

Reduce Action in Spark

Reduce calls a function on all items in a dataset that accumulates them. Basically, all binary operators can be used. For Python, please refer to this documentation: https://docs.python.org/3/library/operator.html. If we want to sum up all items by multiplying them in Spark, this would look like the following:

from operator import *
spark_data.reduce(mul)

The output would be this:

First and Count Actions in Spark

These Actions are easy to use – they either return the first item in an RDD or return the count of elements in the RDD. If you use count() on a very large RDD, it might take very long and your task could run into a timeout. There is another function called countApprox() that returns the approximate count in an RDD to prevent that.

ds_one.first()

Count is also used the same way like first – without applying a function or similar.

ds_one.count()

Saving data

One important thing to do is to store data eventually somewhere. In our case, we store it into the same directory like we have Jupyter notebooks. We can therefore check if it worked as expected. Normally, you would store data on S3 or any other storage you use. Spark RDDs provide several means to save to files, in our case we will use “saveAsTextFile”. This stores the content as part files that are in text format.

ds_one.saveAsTextFile("data/dsone.csv")

You can now navigate to the output at this folder: data/dsone.csv.

Spark SaveAsTextFile Result

So, today we have learned about Actions in Spark and how to apply them to RDDs.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue.

In our last tutorial section, we looked at more data transformations with Spark. In this tutorial, we will continue with data transformations on Spark RDD before we move on to Actions. Today, we will focus on map and intersect keywords and apply them to Spark RDDs.

Intersect

A intersection between two different datasets only returns that values that – well – intersect. This means that only those values are returned if their values correspond. Only one element is returned, even though there are multiple elements in both datasets. Intersection is used like this:

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Mark", 1984), ("Anastasia", 2017)])
sorted(ds_one.intersection(ds_two).collect())

The output should look like this now:

The Intersection keyword

Please note that “Mark” is only considered by the intersection keyword if all keys match. In our case, this means the year and name.

Map

Map returns a new RDD, that was transformed by applying a function to it. This is very useful if you want to check a dataset for different things or modify data. You can also add entries by this. The following example is using a little more complex transformation where we create our own function and calculate the age and gender of the person:

import datetime
def doTheTrans(values):
    
    age = datetime.datetime.now().year - values[1]
    gender = ""
    
    if values[0].endswith("a"):
        gender = "f"
    else:
        gender = "m"
        
    return ([values[0], age, gender])
ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
sorted(ds_one.map(doTheTrans).collect())

What happens in the code?

First, we import datetime. We need this to get the current year (we could also hardcode “2019”, but then I would have to re-write the entire tutorial in january!). Next, we create our function which we call “doTheTrans”. This gets tuples from the map function (which we will use later). Note that tuples are immutable, therefore we need to create new variables and return a new tuple.

First, we calculate the age. This is easily done by “datetime.now().year – values[1]”. We have the year the person is born in the second index (number 1) in the tuple. Then, we check if the name ends with “a” and define if the person is female or male. Please note here that this isn’t sufficient normally, we just use it for our sample.

Once we are done with this, we can call the “map” function with the function we have used. Now, the map function applies the new function which we created to each item in the rdd.

Now, the output of this should look like the following:

[['Lisa', 34, 'f'], ['Mark', 35, 'm']]

Data Transformations on Spark with a Lambda Function

It is also possible to write the function inline as lambda, if your transformations aren’t that complex. Suppose we only want to return the age of a person; here, the code would look like the following:

sorted(ds_one.map(lambda x: (x[0], datetime.datetime.now().year - x[1] +1)).collect())

And the output should be this:

[('Lisa', 35), ('Mark', 36)]

Now, after 3 intense tutorials on Data transformations on Spark RDD, we will focus on Actions in Spark in the next tutorial.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. The official Apache Spark page can intensify your experience. Your learning journey can still continue.

In our last tutorial section, we looked at filtering, joining and sorting data. Today, we will look at more Spark Data Transformations on RDD. Our focus topics for today are: distinct, groupby and union in Spark. Now, we will have a look at several other operators for that. First, we will use the “Distinct” transformation

Distinct

Distinct enables us to return exactly one of each item. For instance, if we have more than one entry in the same sequence, we can reduce this. A sample would be the following array:

[1, 2, 3, 4, 1]

However, we only want to return each number exactly once. This is done via the distinct keyword. The following example illustrates this:

ds_distinct = sc.parallelize([(1), (2), (3), (4), (1)]).distinct().collect()
ds_distinct

GROUPBY

A very important task when working with data is grouping. In Spark, we have the GroupBy transformation for this. In our case, this is “GroupByKey”. Basically, this groups the dataset into a specific form and the execution is added when calling the “mapValues” function. With this function, you can provide how you want to deal with the values. Some options are pre-defined, such as “len” for the number of occurrences or “list” for the actual values. The following sample illustrates this with our dataset introduced in the previous tutorial:

ds_set = sc.parallelize([("Mark", 1984), ("Lisa", 1985), ("Mark", 2015)])
ds_grp = ds_set.groupByKey().mapValues(list).collect()
ds_grp

If you want to have a count instead, simply use “len” for it:

ds_set.groupByKey().mapValues(len).collect()

The output should look like this now:

The GroupByKey keyword in Apache Spark
The GroupByKey keyword in Apache Spark

Union

A union joins together two datasets into one. In contrast to the “Join” transformation that we already looked at in our last tutorial, it doesn’t take any keys and simply appends the datasets. It is very similar to the previous one, but the result is different. The syntax for it is straight forward: it is written “dsone.union(dstwo)”. Let’s have a look at it:

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Luke", 2015), ("Anastasia", 2017)])
sorted(ds_one.union(ds_two).collect())

Now, the output of this should look like the following:

[('Anastasia', 2017), ('Lisa', 1985), ('Luke', 2015), ('Mark', 1984)]

Today, we learned a lot about Spark data transformations. In our next tutorial – part 3 – we will have a look at even more of them.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. The official Apache Spark page can intensify your experience. Your learning journey can still continue.