During the past tutorials, we have aquired a lot of knowledge about Spark. Now, we are with the last tutorial on Spark, where we will have a look at Cube and Rollup. Basically both are useful for multi-dimensional data for further processing.

Data for Spark Rollup and Cube functions

First, let’s create a dataset that we later want to work with. Our dataset is the monthly salary of people working in Finance or Sales:

employees = spark.createDataFrame([("Mario", 4400, "Sales")\
                                  , ("Max", 3420, "Finance")\
                                  , ("Sue", 5500, "Sales")\
                                  , ("Tom", 6700, "Finance")]\
                                 , ("name", "salary", "department"))

We then use the first function – rollup. We want to have the rollup to be on the department and the name of the person.

employees.rollup(employees.department, employees.name)\
            .sum()\
            .withColumnRenamed("sum(salary)", "salary")\
            .orderBy("department", "salary")\
            .show()

Here you can see the output (I will discuss it after you reviewed it):

+----------+-----+------+
|department| name|salary|
+----------+-----+------+
|      null| null| 20020|
|   Finance|  Max|  3420|
|   Finance|  Tom|  6700|
|   Finance| null| 10120|
|     Sales|Mario|  4400|
|     Sales|  Sue|  5500|
|     Sales| null|  9900|
+----------+-----+------+

We have several lines in this now. Let’s look at it line-by-line:

  • The first line is consisting of two null values and the sum of all salaries. So, this would represent the entire company. Basically, it fills department and name with null, since it is neither a department nor a specific person – it is all departments and all persons in it.
  • The second and third line are Max and Tom, who work in the finance department
  • The fourth line is the sum of the finance department; here you see “null” in the name, since it isn’t a name, but the entire department
  • The same story continues for the following lines with the sales department

So, basically, we get different things: (A) the sum of all revenues, (B) the individual values and (C) the revenues per department. Now, let’s build the cube:

employees.cube(employees.department, employees.name)\
            .sum()\
            .withColumnRenamed("sum(salary)", "salary")\
            .orderBy("department", "salary")\
            .show()

Here, the results are in even more dimensions. First, we have the values of each person, but not from the department. Then, we have all results and then again the departments and individuals in it. The cube isn’t relevant for us for this calculation much. The background is that a cube creates all possible combinations, whereas the rollup only creates hierarchies. The cube also treats null’s as a possible combination, that’s why we have the individuals here several times. Here is the output:

+----------+-----+------+
|department| name|salary|
+----------+-----+------+
|      null|  Max|  3420|
|      null|Mario|  4400|
|      null|  Sue|  5500|
|      null|  Tom|  6700|
|      null| null| 20020|
|   Finance|  Max|  3420|
|   Finance|  Tom|  6700|
|   Finance| null| 10120|
|     Sales|Mario|  4400|
|     Sales|  Sue|  5500|
|     Sales| null|  9900|
+----------+-----+------+

I hope you liked the tutorials on Spark. There is much more to learn – e.g. about machine learning or different libraries for that. Make sure to check out the tutorial section in order to figure that out.

If you enjoyed this tutorial on spark rollup and cube, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In the previous tutorial, we learned about data cleaning in Spark. Today, we will look at different options to work with columns and rows in Spark. First, we will start with renaming columns. We did this already several times so far, and it is a frequent task in data engineering. In the following sample, we will rename a column:

thirties = clean.select(clean.name, clean.age.between(30, 39)).withColumnRenamed("((age >= 30) AND (age <= 39))", "goodage")
thirties.show()

As you could see, we took the old name – which was very complicated – and renamed it to “goodage”. The output should be the following:

+-----+-------+
| name|goodage|
+-----+-------+
|  Max|  false|
|  Tom|   true|
|  Sue|  false|
|Mario|   true|
+-----+-------+

In the next sample, we want to filter columns on a string-expression. This can be done with the “endswith” method being applied to the column name that should be filtered. In the following sample, we want to filter all contacts that are from Austria:

austrian = clean.filter(clean.lang.endswith("at"))
austrian.show()

As you can see, only one result is returned (as expected):

+---+-----+---+-----+
|nid| name|age| lang|
+---+-----+---+-----+
|  1|Mario| 35|DE-at|
+---+-----+---+-----+

Removing Null-Values in Spark

In our next sample, we want to filter all rows that contain null values in a specific column. This is useful to get a glimpse of null values in datasets. This can easily be done by applying the “isNull” function on a column:

nullvalues = dirtyset.filter(dirtyset.age.isNull())
nullvalues.show()

Here, we get the two results containing these null values:

+---+----+----+-----+
|nid|name| age| lang|
+---+----+----+-----+
|  4| Tom|null|AT-ch|
|  5| Tom|null|AT-ch|
+---+----+----+-----+

Another useful function in Spark is the “Like” function. If you are familiar with SQL, it should be easy to apply this. If not – basically, it scans text in a column, which contains one or more specific literals. You can use different expressions to filter for patterns. The following one filters all people that have “DE” in it, independent of what follows afterwards (“%”):

langde = clean.filter(clean.lang.like("DE%"))
langde.show()

Here, we get all items:

+---+-----+---+-----+
|nid| name|age| lang|
+---+-----+---+-----+
|  2|  Max| 46|DE-de|
|  4|  Tom| 34|DE-ch|
|  1|Mario| 35|DE-at|
+---+-----+---+-----+

Shorten Strings in a Column in Spark

Several times, we want to shorten string values. The following sample takes the first 2 letters with the “substr” function on the column. We afterwards apply the “alias” function, which renames the function (similar to the “withColumnRenamed” function above).

shortnames = clean.select(clean.name.substr(0,2).alias("sn")).collect()
shortnames

Also here, we get the expected output; please note that it isn’t unique anymore (names!):

[Row(sn='Ma'), Row(sn='To'), Row(sn='Su'), Row(sn='Ma')]

Spark offers much more functionality to manipulate Columns, so just play with the API :). In the next tutorial, we will have a look at how to build Cubes and Rollups in Spark

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.


Over the past tutorials, we have acquired quite some knowledge about Python and Spark. We know most of the relevant statements in Apache Spark and are now ready for some data cleaning, which is one of the key tasks of a data engineer. Before we get started, we need to have something that a data engineer (unfortunately) often works with: dirty data. We will focus on how to deal with missing, corrupt and wrong data in Spark

Dealing with wrong data in Spark

In the following sample, we create some data. Note, that there are some errors in it:

  1. The first field (id: 1) is having the wrong language – there is no “AT-at”, but it needs to be “DE-at”
  2. The fourth field (id: 4) is having a null-value “None” and also the wrong language – AT-ch, whereas it should be “DE-ch”
  3. The fifth field (id: 5) is a duplicate of the previous one
dirtyset = spark.createDataFrame([(1, "Mario", 35, "AT-at")\
                                  , (2, "Max", 46, "DE-de")\
                                  , (3, "Sue", 22, "EN-uk")\
                                  , (4, "Tom", None, "AT-ch")\
                                  , (5, "Tom", None, "AT-ch")]\
                                 , ("nid", "name", "age", "lang"))
dirtyset.show()

The dataset should look like this:

+---+-----+----+-----+
|nid| name| age| lang|
+---+-----+----+-----+
|  1|Mario|  35|AT-at|
|  2|  Max|  46|DE-de|
|  3|  Sue|  22|EN-uk|
|  4|  Tom|null|AT-ch|
|  5|  Tom|null|AT-ch|
+---+-----+----+-----+

Deleting duplicates in Spark

The first thing we want to do is removing duplicates. There is an easy function for that in Apache Spark – called “dropDuplicates”. When you look at the previous dataset, it might be very easy to figure out that the last one is a duplicate – but wait! For Apache Spark, it isn’t that easy, because the id is different – it is 4 vs 5. Spark doesn’t figure out which columns are relevant to take duplicates from. If we would apply the “dropDuplicates” to the dataframe, it wouldn’t remove anything. So, we need to apply the columns it should take into account when removing duplicates. We tell spark that we want to work with “name” and “age” for this purpose and pass a list of these to the function:

nodub = dirtyset.dropDuplicates(["name", "age"])
nodub.show()

When you now execute the code, it should result in the cleaned dataset:

+---+-----+----+-----+
|nid| name| age| lang|
+---+-----+----+-----+
|  2|  Max|  46|DE-de|
|  4|  Tom|null|AT-ch|
|  3|  Sue|  22|EN-uk|
|  1|Mario|  35|AT-at|
+---+-----+----+-----+

This was very easy so far. Now, let’s take care of the wrong language values.

Replacing wrong values in columns in Apache Spark

The Spark “na” functions provide this for us. There is a function called “na.replace” that takes the old value and the new value to replace on. Since we have two different values, we need to call the function twice:

reallangs = nodub.na.replace("AT-at", "DE-at").na.replace("AT-ch", "DE-ch")
reallangs.show()

As you can see, the values are replaced accordingly and now also this should work:

+---+-----+----+-----+
|nid| name| age| lang|
+---+-----+----+-----+
|  2|  Max|  46|DE-de|
|  4|  Tom|null|DE-ch|
|  3|  Sue|  22|EN-uk|
|  1|Mario|  35|DE-at|
+---+-----+----+-----+

Only one last thing is now necessary: replacing null values in the dataset.

Replacing null values in Apache Spark

Dealing and working with null-values is always a complicated thing to achieve. Null-values basically means that we don’t know something and that it might have a negative impact on our future predictions and analysis. However, there are some solutions:

  • Ignoring null-values by removing the rows containing them
  • adding a standard-value (e.g. in our case either 0 or a very high value)
  • Using statistics to calculate the most appropriate value

The first one would take away a lot of data. The more columns you have, the higher the possibility is to have null-values! This would reduce the relevant samples and thus the accuracy of predictions. The second one would add some other challenges in our case: it would either increase the average to a very high number or to a very low number. So, only the last opportunity stays for us: calculate a value for the dataset. If you have several features, such as name, it is somewhat easier to do so. In our sample, we use a very easy method: just calculating the average from the correct values. The following sample does exactly that, I will explain each step after the sample:

from pyspark.sql.functions import *
avage = reallangs.groupby().agg(avg(reallangs.age)).collect()
rage = int(avage[0][0])
clean = reallangs.na.fill(rage)
clean.show()

So, what has happend here?

  • We start by calculating the average from all ages we have. This is done with the agg() function
  • Next, we convert the average (which is a float) to an int. Since it is of type row, we have to use two indices to get to our value
  • We then call the “na.fill” with the previously calculated average

The output of that should look like the following:

+---+-----+---+-----+
|nid| name|age| lang|
+---+-----+---+-----+
|  2|  Max| 46|DE-de|
|  4|  Tom| 34|DE-ch|
|  3|  Sue| 22|EN-uk|
|  1|Mario| 35|DE-at|
+---+-----+---+-----+

That’s all! Easy, isn’t it? Of course, we could add much more intelligence to it, but let’s keep it as is right now. More will be added in the tutorials on Data Science, which are about to come soon :). Dealing with wrong data in Spark is one of the key things you will do before going for Data Science.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

In our previous tutorial, we looked at how to join data in Apache Spark. Another frequently used thing when working with data is to reduce the number of results by limit data in spark to a specific number. This is done with the limit statement.

Limit Data in Spark with the limit() method

Basically, the limit statement is very easy. It is easy to use since it only takes the number of results to return as a parameter. The limit statement is usually applied with an order-statement. In the following sample, we use the limit statement on the df_ordered dataset which we introduced in the tutorial on filtering and ordering data in Spark. After the sample, I will explain what the steps are.

sumed = df_ordered.groupby(df_ordered.personid) \
                  .agg(sum(df_ordered.price)) \
                  .toDF("pid", "ordervalue")
newPers = df_ordered.join(sumed, sumed.pid == df_ordered.personid, "inner") \
                    .drop("productname", "price", "pid").distinct() \
                    .orderBy("ordervalue", ascending=False) \
                    .limit(10)
newPers.show()

Basically, the above sample shows the top 10 customers from our dataset. The following steps are applied:

  1. Grouping the dataset by the person id
  2. Creating the sum of products bought by the customer
  3. And creating a new dataframe from it

We then join the dataset of ordered values back into the person data. Spark doesn’t allow appending this data and keeping all the original values (like personname, age, …) in it. In the next statement, we do the following:

  1. We join the newly created dataset into the original dataset
  2. Remove the unnecessary items such as productname, price and pid
  3. Order everything by ordervalue descending
  4. and limit the results to only have the top 10 customers.

Now, the result should look like the following:

+--------+----------+---+-----+------------------+
|personid|personname|age|state|        ordervalue|
+--------+----------+---+-----+------------------+
|     162|     Heidi| 37|   GA|24269.340000000226|
|      38|     Daisy| 45|   CA|23799.450000000204|
|     140|     Elsie| 64|   FL|  23759.5400000002|
|      18|      Ruby| 47|   GA|23414.710000000185|
|     180|   Caitlin| 65|   NY| 23124.71000000019|
|     159|    Taylor| 41|   NY|23054.670000000162|
|     131|     Aaron| 67|   TX| 23049.63000000016|
|      49|     Dylan| 47|   TX| 23029.68000000018|
|     136|    Isabel| 52|   CA| 22839.85000000014|
|      43|     Mason| 30|   CA|22834.710000000185|
+--------+----------+---+-----+------------------+

The limit statement itself is very easy, however, it is a bit more complex on how to get towards using the statement ;). In the next tutorial, we will look at how to deal with corrupt data – get ready for some data cleaning!

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

One of the reasons why Python is so popular for Data Science is that Python has a very rich set of functionality for Mathematics and Statistics. In this tutorial, I will show the very basic functions; however, you might be very disappointed, since they are really basic. When we talk about real data science, you might rather consider learning scikit learn, pytorch or Spark ML. However, today’s tutorial will focus on the elements of it, before moving on to the more complex tutorials.

Basic Mathematics in Python from the math Library

The math-library in Python provides a great number of most of the relevant functionality you might want to use in Python when working with numbers. The following samples provide some overview on them:

import math
vone = 1.2367
print(math.ceil(vone))

First, we import “math” from the standard library and then we create some values. The first function we use is ceiling. In the following sample, we calculate the greatest common denominator between two numbers.

math.gcd(44,77)

Other functions are logarithmic, power, cosinus and many more. Some of them are displayed in the following sample:

math.log(5)
math.pow(2,3)
math.cos(4)
math.pi

Basic statistics in Python from the statistics library

The standard library offers some elementary statistical functions. We will first import the library and then calculate the mean of 5 values:

from statistics import *
values = [1,2,3,4,5]
mean(values)

Some other possible functions are:

median(values)
stdev(values)
variance(values)

Have a look at those two libraries – there is quite a lot to explore.

What’s next?

Now, the tutorial series for Python is over. You should now be fit to using pyspark. If you are not yet familiar with Spark, have a look at the Spark Tutorial i created here. Also, I will create more tutorials on Python and Machine Learning in the future, so make sure to check back often to the Big Data & Data Science tutorial overview. I hope you liked this tutorial. If you have any suggestions and what to improve, please feel free to get in touch with me! If you want to learn more about Python, I also recommend you the official page.

In our last tutorial, we looked at different aggregation functions in Apache Spark. This time, we will look at what different options are available to join data in Apache Spark. Joining Data is an essential thing when it comes to working with data.

Join Data in Spark

In most cases, you will sooner or later come across data joining in whatever form. In our tutorial, we will look at three different ways of joining data: via a join, a union or an intersect.

But first, we need to create some datasets we can join. Our financial datasets from the previous samples isn’t comprehensive enough, so we create some other dummy data:

names = spark.createDataFrame([(1, "Mario"), (2, "Max"), (3, "Sue")], ("nid", "name"))
revenues = spark.createDataFrame([(1, 9.99), (2, 189.99), (3, 1099.99)], ("rid", "revenue"))

We simply create two dataframes: names and revenues. These dataframes are initialised as a list of key/value pairs. Now, it is about time to learn the first item: the “join” function.

Creating a join in Spark: the Join() in PySpark

A join() operation is performed on the dataframe itself. Basically, the join operation takes several parameters. We will look at the most important ones:

  • Other Dataset: the other dataset to join with
  • Join by: the IDs to join on (must be existing in both datasets)
  • Type: inner, outer, left/right outer join

Basically, the inner join returns all values that are matched (e.g. available in both datasets). The outer join also returns values that haven’t got a matching “partner” in the other dataset; non-matching datasets are filled with Null-values. The difference between left and right outer join is that dataset either from the left (first dataset) or right (second dataset) is used. The left or right syntax comes from SQL, where you write it from left to right. In our following sample, we use an inner join:

joined = names.join(revenues, names.nid == revenues.rid, "inner").drop("rid")
joined.show()

Basically, all items should have a “matching partner” from both datasets. The result should be this:

+---+-----+-------+
|nid| name|revenue|
+---+-----+-------+
|  1|Mario|   9.99|
|  3|  Sue|1099.99|
|  2|  Max| 189.99|
+---+-----+-------+

Creating the union in Spark: the union() method in PySpark

Creating a union is the second way to bring data together. Basically, a Union is appending data to the original dataset without taking the structure or ids into account. Basically, the union just copies the other dataset to the first dataset to the end of it. The following sample shows the union:

dnames.union(revenues).show()

The output should be this:

+---+-------+
|nid|   name|
+---+-------+
|  1|  Mario|
|  2|    Max|
|  3|    Sue|
|  1|   9.99|
|  2| 189.99|
|  3|1099.99|
+---+-------+

Creating the intersection in Spark: the intersect() in PySpark

The third way in our tutorial today is the intersect() method. Basically, it only returns those datasets that have a matching dataset between the two compared datasets. As for our sample, we first need to create a new dataset, where we add new items of id/name pairs and then call the intersect statement:

names2 = spark.createDataFrame([(1, "Mario"), (4, "Tom"), (5, "Lati")], ("nid", "name"))
names2.intersect(names).show()

And the output should be this:

+---+-----+
|nid| name|
+---+-----+
|  1|Mario|
+---+-----+

Now you should be familiar with all different ways on how to join data. In the next sample, we will look at how to limit data results.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

Python has a really great standard library. In the next two tutorial sessions, we will have a first look at this standard library. We will mainly focus on what is relevant for Spark developers in the long run. Today, we will focus on FuncTools and IterTools in Python, the next tutorial will deal with some mathematical functions. But first, let’s start with “reduce

The reduce() function from the IterTools in Python

Basically, the reduce function takes an iterable list and executes a function on it. In most of the cases, this will be a lambda function but it could also be a normal function. In our sample, we take some values and create the sum of it by moving from left to right:

from functools import reduce
values = [1,4,5,3,2]
reduce(lambda x,y: x+y, values)

And we get the expected output

15

The sorted() function

Another very useful function is the “sorted” function. Basically, this sorts values or pairs of tuples in an array. The easiest way to apply it is to do it with our previous values (which were unsorted!):

print(sorted(values))

The output is now in the expected sorting:

[1, 2, 3, 4, 5]

However, we can still improve this by even sorting complex objects. Sorted takes a key to sort on, and this is passed as a lamdba expression. We state that we want to sort it by age. Make sure that you still have the “Person” class from our previous tutorial:

perli = [Person("Mario", "Meir-Huber", 35, 1.0), Person("Helena", "Meir-Huber", 5, 1.0)]
print(perli)
print(sorted(perli, key=lambda p: p.age))

As you can see, our values are now sorted based on the age member.

[Person(firstname='Mario', lastname='Meir-Huber', age=35, score=1.0), Person(firstname='Helena', lastname='Meir-Huber', age=5, score=1.0)]
[Person(firstname='Helena', lastname='Meir-Huber', age=5, score=1.0), Person(firstname='Mario', lastname='Meir-Huber', age=35, score=1.0)]

The chain() function

The chain() method is very helpful if you want to hook up two lists with the same objects in it. Basically, we take the Person-Class again and create a new instance. We then chain the two lists together:

import itertools
perstwo = [Person("Some", "Other", 46, 1.0)]
persons = itertools.chain(perli, perstwo)
for pers in persons:
    print(pers.firstname)

Also here, we get the expected output:

Mario
Helena
Some

The groupby() function

Another great feature when working with data is grouping of data. Python also allows us to do so. The groupby() method takes two parameters: the list to group and the key as lambda expression. We create a new array of tuple pairs and group by the family name:

from itertools import groupby
pl = [("Meir-Huber", "Mario"), ("Meir-Huber", "Helena"), ("Some", "Other")]
    
for k,v in groupby(pl, lambda p: p[0]):
    
    print("Family {}".format(k))
    
    for p in v:
        print("\tFamily member: {}".format(p[1]))

Basically, the groupby() method returns the key (as the value type) and the objects as list in the key group. This means that another iteration is necessary in order to access the elements in the group. The output of the above sample looks like this:

Family Meir-Huber
	Family member: Mario
	Family member: Helena
Family Some
	Family member: Other

The repeat() function

A nice function is the repeat() function. Basically, it copies an element several times. For instance, if we want to copy a person 4 times, this can be done like this:

lst = itertools.repeat(perstwo, 4)
for p in lst:
    print(p)

And also the output is just as expected:

[Person(firstname='Some', lastname='Other', age=46, score=1.0)]
[Person(firstname='Some', lastname='Other', age=46, score=1.0)]
[Person(firstname='Some', lastname='Other', age=46, score=1.0)]
[Person(firstname='Some', lastname='Other', age=46, score=1.0)]

The takewhile() and the dropwhile() function in IterTools in Python

Two functions – takewhile and dropwhile – are also very helpful in Python. Basically, they are very similar, but their result is the opposite form each other. takewhile runs until a condition is true, dropwhile runs once a condition is false. Takewhile will take elements from an array/list as long as the predicate is true (e.g. lower than 20, this would mean that elements are only considered as long as they are below 20) – Dropwhile with the same condition would remove elements as long as their values are below 20. The following sample shows this:

vals = range(1,40)
for v in itertools.takewhile(lambda vl: vl<20, vals):
    print(v)
    
print("######")
for v in itertools.dropwhile(lambda vl: vl<20, vals):
    print(v)

And also here, the output is as expected:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
######
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

As you can see, these are quite helpful functions. In our last Python tutorial, we will have a look at some basic mathematical and statistical functions.

If you are not yet familiar with Spark, have a look at the Spark Tutorial i created here. Also, I will create more tutorials on Python and Machine Learning in the future, so make sure to check back often to the Big Data & Data Science tutorial overview. I hope you liked this tutorial. If you have any suggestions and what to improve, please feel free to get in touch with me! If you want to learn more about Python, I also recommend you the official page.

In our last tutorial section, we worked with Filters and Groups. Now, we will look at different aggregation functions in Spark and Python. Today, we will look at the Aggregation function in Spark, that allows us to apply different aggregations on columns in Spark

The agg() function in PySpark

Basically, there is one function that takes care of all the agglomerations in Spark. This is called “agg()” and takes some other function in it. There are various possibilities, the most common ones are building sums, calculating the average or max/min values. The “agg()” function is called on a grouped dataset and is executed on one column. In the following samples, some possibilities are shown:

from pyspark.sql.functions import *
df_ordered.groupby().agg(max(df_ordered.price)).collect()

In this sample, we imported all available functions from pyspark.sql. We called the “agg” function on the df_ordered dataset that we have created in the previous tutorial. We than use the “max()” function that retrieves the highest value of the price. The output should be the following:

[Row(max(price)=99.99)]

Now, we want to calculate the average value of the price. Similar to the above example, we use the “agg()” function and instead of “max()” we call the “avg” function.

df_ordered.groupby().agg(avg(df_ordered.price)).collect()

The output should be this:

[Row(avg(price)=42.11355000000023)]

Now, let’s get the sum of all orders. This can be done with the “sum()” function. It is also very similar to the previous samples:

df_ordered.groupby().agg(sum(df_ordered.price)).collect()

And the output should be this:

[Row(sum(price)=4211355.000000023)]

To calculate the mean value, we can use “mean” with it:

df_ordered.groupby().agg(mean(df_ordered.price)).collect()

This should be the output:

[Row(avg(price)=42.11355000000023)]

Another useful function is “count”, where we can simply count all datasets for a column:

df_ordered.groupby().agg(count(df_ordered.price)).collect()

And the output should be this:

[Row(count(price)=100000)]

Today’s tutorial was very easy. It was dealing with aggregation function in Spark and how you can create them with the use of the agg() method. In the next sample, we will look at different ways to join data together.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.

One thing that everyone that deals with data is with classes that make data accessible to the code as objects. In all cases – and Python isn’t different here – wrapper classes and O/R mappers have to be written. However, Python has a powerful decorator for us at hand, that allows us to ease up or work. This decorator is called “dataclass”

The dataclass in Python

The nice thing about the dataclass decorator is that it enables us to add a great set of functionality to an object containing data without the need to re-write it always. Basically, this decorator adds the following functionality:

  • __init__: the constructor with all defined member variables. In order to use this, the member variables must be initialised with its type – which is rather uncommon in Python
  • __repr__: this pretty prints the class with all its member variables as a string
  • __eq__: a function to compare two classes for ordering
  • order functions: this creates several order functions such as __lt__ (lower than), __gt__ (greater than), __le__ (lower equals) and __ge__ (greater equals)
  • __hash__: adds a hash-function to the class
  • frozen: prevents the class from adding/deleting attributes on runtime

The definition for a dataclass in Python is easy:

@dataclass
class Classname():
CLASS-BLOCK

You can also add each of the above described properties separately, e.g. with frozen=True or alike.

In the following sample, we will create a Person-Dataclass.

from dataclasses import dataclass
@dataclass
class Person:
    firstname: str
    lastname: str
    age: int
    score: float
        
p = Person("Mario", "Meir-Huber", 35, 1.0)
print(p)

Please note the differences in how to annotate the member variables. You can see that there is now no need for a constructor anymore, since this is already done for you. When you print the class, the __repr__() function is called. The output should look like the following:

Person(firstname='Mario', lastname='Meir-Huber', age=35, score=1.0)

As you can see, the dataclass abstracts a lot of our problems. In the next tutorial we will have a look at IterTools and FuncTools.

If you are not yet familiar with Spark, have a look at the Spark Tutorial i created here. Also, I will create more tutorials on Python and Machine Learning in the future, so make sure to check back often to the Big Data & Data Science tutorial overview. I hope you liked this tutorial. If you have any suggestions and what to improve, please feel free to get in touch with me! If you want to learn more about Python, I also recommend you the official page.

In our last tutorial section, we had a brief introduction to Spark Dataframes. Now, let’s have a look into filtering, grouping and sorting Data with Apache Spark Dataframes. First, we will start with the orderby statement in Spark, which allows us to order data in Spark. Next, we will continue with the sort statement in Spark that allows us to sort data in Spark and then we will focus on the groupby statement in Spark, which will eventually allow us to group data in Spark.

Order Data in Spark

First, let’s look at how to order data. Basically, there is an easy function on all dataframes, called “orderBy”. This function takes several parameters, but the most important parameters for us are:

  • Columns: a list of columns to order the dataset by. This is either one or more items
  • Order: ascending (=True) or descending (ascending=False)

If we again load our dataset from the previous sample, we can now easily apply the orderBy() function. In our sample, we order everything by personid:

df_new = spark.read.parquet(fileName)
df_ordered = df_new.orderBy(df_new.personid, ascending=True)
df_ordered.show()

The output should be:

+--------+----------+---+---------------+-----+-----+
|personid|personname|age|    productname|price|state|
+--------+----------+---+---------------+-----+-----+
|       0|    Amelia| 33|Bicycle gearing|19.99|   NY|
|       0|    Amelia| 33|      Kickstand|14.99|   NY|
|       0|    Amelia| 33|   Bicycle bell| 9.99|   NY|
|       0|    Amelia| 33|         Fender|29.99|   NY|
|       0|    Amelia| 33|         Fender|29.99|   NY|
|       0|    Amelia| 33|      Kickstand|14.99|   NY|
|       0|    Amelia| 33| Bicycle saddle|59.99|   NY|
|       0|    Amelia| 33| Bicycle saddle|59.99|   NY|
|       0|    Amelia| 33|  Bicycle brake|49.99|   NY|
|       0|    Amelia| 33|   Bicycle bell| 9.99|   NY|
|       0|    Amelia| 33|Bicycle gearing|19.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
|       0|    Amelia| 33|  Bicycle Frame|99.99|   NY|
|       0|    Amelia| 33|Luggage carrier|34.99|   NY|
|       0|    Amelia| 33|Luggage carrier|34.99|   NY|
|       0|    Amelia| 33|      Kickstand|14.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
|       0|    Amelia| 33|  Bicycle Frame|99.99|   NY|
|       0|    Amelia| 33|   Bicycle fork|79.99|   NY|
+--------+----------+---+---------------+-----+-----+
only showing top 20 rows

Filter Data in Spark

Next, we want to filter our data. We take the ordered data again and only take customers that are between 33 and 35 years. Just like the previous “orderby” function, we can also apply an easy function here: filter. This function basically takes one filter argument. In order to have a “between”, we need to chain two filter statements together. Also like the previous sample, the column to filter with needs to be applied:

df_filtered = df_ordered.filter(df_ordered.age < 35).filter(df_ordered.age > 33)
df_filtered.show()

The output should look like this:

+--------+----------+---+---------------+-----+-----+
|personid|personname|age|    productname|price|state|
+--------+----------+---+---------------+-----+-----+
|      47|      Jake| 34|  Bicycle chain|39.99|   TX|
|      47|      Jake| 34|Bicycle gearing|19.99|   TX|
|      47|      Jake| 34|  Bicycle Frame|99.99|   TX|
|      47|      Jake| 34|Luggage carrier|34.99|   TX|
|      47|      Jake| 34|Bicycle gearing|19.99|   TX|
|      47|      Jake| 34|Luggage carrier|34.99|   TX|
|      47|      Jake| 34|   Bicycle bell| 9.99|   TX|
|      47|      Jake| 34|Bicycle gearing|19.99|   TX|
|      47|      Jake| 34|   Bicycle fork|79.99|   TX|
|      47|      Jake| 34|  Bicycle Frame|99.99|   TX|
|      47|      Jake| 34|      Saddlebag|24.99|   TX|
|      47|      Jake| 34|Luggage carrier|34.99|   TX|
|      47|      Jake| 34| Bicycle saddle|59.99|   TX|
|      47|      Jake| 34|  Bicycle Frame|99.99|   TX|
|      47|      Jake| 34|         Fender|29.99|   TX|
|      47|      Jake| 34|      Kickstand|14.99|   TX|
|      47|      Jake| 34|   Bicycle bell| 9.99|   TX|
|      47|      Jake| 34|   Bicycle bell| 9.99|   TX|
|      47|      Jake| 34|  Bicycle brake|49.99|   TX|
|      47|      Jake| 34|      Saddlebag|24.99|   TX|
+--------+----------+---+---------------+-----+-----+
only showing top 20 rows

Group Data in Spark

In order to make more complex queries, we can also group data by different columns. This is done with the “groupBy” statement. Basically, this statement also takes just one argument – the column(s) to sort by. The following sample is a bit more complex, but I will explain it after the sample:

from pyspark.sql.functions import bround
df_grouped = df_ordered \
    .groupBy(df_ordered.personid) \
    .sum("price") \
    .orderBy("sum(price)") \
    .select("personid", bround("sum(price)", 2)) \
    .withColumnRenamed("bround(sum(price), 2)", "value")
df_grouped.show()

So, what has happened here? We had several steps:

  • Take the previously ordered dataset and group it by personid
  • Create the sum of each person’s items
  • Order everything descending by the column for the sum. NOTE: the column is named “sum(price)” since it is a new column
  • We round the column “sum(price)” by two decimal points so that it looks nicer. Note again, that the name of the column is changed again to “ground(sum(price), 2)”
  • Since the column is now at a really hard to interpret name, we call the “withColumnRenamed” function to give the column a much nicer name. We call our column “value”

The output should look like this:

+--------+--------+
|personid|   value|
+--------+--------+
|      37|18555.38|
|      69|18825.24|
|       6|18850.19|
|     196|19050.34|
|      96|19060.34|
|     144|19165.37|
|     108|19235.21|
|      61|19275.52|
|      63|19330.23|
|     107|19390.22|
|      46|19445.41|
|      79|19475.16|
|     181|19480.35|
|      17|19575.33|
|     123|19585.29|
|      70|19655.19|
|     134|19675.31|
|     137|19715.16|
|      25|19720.07|
|      45|19720.14|
+--------+--------+
only showing top 20 rows

You can also do each of the statements step-by-step in case you want to see each transformation and its impact.

In our next tutorial, we will have a look at aggregation functions in Apache Spark.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue. For full details about Apache Spark, make sure to visit the official page.