We have already learned a couple of things about the RDDs in Spark – Spark’s low-level API. Today, it is time for the Apache Spark Dataframes Tutorial. Basically, Spark Dataframes are easier to work with than RDDs are. Working with them is more straight forward and easier and gives a more natural feeling.

Introduction to Spark Dataframes

To get started with Dataframes, we need some data to work with. In the RDD examples, we’ve created random values. Now, we want to work with a more comprehensive dataset. I’ve prepared a “table” about revenues and uploaded them to my Git repository. You can download it here. [NOTE: if you did the Hive tutorial, you should already have the dataset]

After you have downloaded the data, upload the data to your Jupyter environment. First, create a new folder named “data” in Jupyter. Now, navigate to this folder and click on “Upload” in the upper right corner. With this, you can now upload a new file.

Upload a file to Jupyter

Once you have done this, your file will be available under “data/revenues.csv”. It will make sense to explore the file so that you understand the structure of it.

Dataframes are very comfortable to work with. First of all, you can use them like tables and add headers and datatypes to them. If you already looked at our dataset, you can see that there are no types or headers assigned, so we need to create them first. This works with using the “StructType” map of the pyspark.sql.types package within the Dataframes API. For each header entry, you need to provide a “StructField” entity to the StructType map. A StructField has the following arguments:

  • Name: name of the field provided as a string value
  • Type: data type of the field. This can be any basic datatype from the pyspark.sql.types package. Normally, they are:
    • Numeric values: ByteType, ShortType, IntegerType, LongType
    • Decimal values: FloatType, DoubleType, DecimalType
    • Date values: DateType, TimestampType
    • Characters: StringType
    • Boolean: BooleanType
    • Null: NullType
  • Nullable: True or False if the field should be nullable

Working with Spark Dataframes

Once this is done, we can read the file from the path we have uploaded it with providing the “spark.read.csv” method. But before this happens, it is necessary to create the spark session. With dataframes, this is done with the Session builder. We provide an app name and some config parameters and call the “getOrCreate” function of SparkSession.

from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.master("local") \
.appName("Cloudvane-Sample-02") \
.config("net.cloudvane.sampleId", "Cloudvane.net Spark Sample 02").getOrCreate()
structure = StructType([StructField("personid", IntegerType(), False),
                      StructField("personname", StringType(), False),
                      StructField("state", StringType(), True),
                      StructField("age", IntegerType(), True),
                      StructField("productid", IntegerType(), False),
                      StructField("productname", StringType(), False),
                      StructField("price", DoubleType(), False)])
df = spark.read.csv("data/revenues.csv", structure)

Now, let’s show the dataframe:

df

The output of this should look like the following:

DataFrame[personid: int, personname: string, state: string, age: int, productid: int, productname: string, price: double]

However, this only shows the header names and associated data types. Spark is built for large datasets, often having billions of rows – so seeing the result of such things is often not possible. However, you can show the top items by calling the “show()” function of a dataframe.

df.show()

This will now show some of the items in the form of a table. With our dataset, this looks like this:

The output of the Dataframe show() function

Modifying a Spark Dataframe

In the spark dataframe, there are some unnecessary rows inside. A common task for data engineers and data scientists is data cleansing. For our following samples, we won’t need “productid” – unless it might be useful for other tasks. Therefore, we simply remove the entire column with calling the “drop” functionality with Spark. After that, we show the result of it again.

df_clean = df.drop("productid")
df_clean.show()

The output of this again looks like the following:

Using the df.drop function in Spark

Now we have a heavily modified dataframe. At some point it would be useful to persist this dataset somewhere. In our case, we store it simply within our jupyter environment because it is the easiest way to access it for our samples. We store the result as a parquet file – one of the most common file formats to work with Spark. We can use the “write” function and after that add “parquet” to the calls. The “parquet” format knows several parameters that we will use:

  • FileName: the name of the file. For our sample below, we will create the filename from the current date and time. Therefore, we import “datetime” from the python framework. The method “strftime” formats the datetime in the specified format. our format should be “dd-mm-yyyy hh:mm:ss”. Also, we need to import the “os” package, which provides information about the filesystem used by the operating system. We will use the “path.join” function out of the package.
  • PartitionBy: With large datasets, it is best to partition them in order to increase efficiency of access. Partitions are done on Columns, which has a positive impact on read/scan operations on data. In our case, we provide the state as the partition criteria.
  • Compression: in order to decrease the size on disk, we use a compression algorithm. In our case, this should be gzip.

This is now all that we need to know in order to run the next code snippet:

import datetime
import os
 
today = datetime.datetime.today()
fileName = os.path.join("data", today.strftime('%d-%m-%Y %H:%M'))
df_clean.write.parquet(fileName, partitionBy="state", compression="gzip")

In today’s Spark Dataframes Tutorial, we have learned about the basics. In our next tutorial, we will look at grouping, filtering and sorting data.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue.

In our previous tutorial, we had a look at how to (de) serialise objects from and to JSON in Python. Now, let’s have a look into how to dynamically create and extend classes in Python. Basically, we are using the library that Python itself is using. This is the dynamic type function in Python. This function takes several parameters, we will only focus on three relevant one’s for our sample.

How to use the dynamic type function in Python

Basically, this function takes several parameters. We utilize 3 parameters. These are:

type(CLASS_NAME, INHERITS, PARAMETERS)

These parameters have the following meaning:

  • CLASS_NAME: The name of the new class
  • INHERITS: from which the new type should inherit
  • PARAMETERS: new methods or parameters added to the class

In our following example, we want to extend the Person class with a new attribute called “location”. We call our new class “PersonNew” and instruct Python to inherit from “Person”, which we have created some tutorials earlier. Strange is that it is passed as an array, even there can only be one inheritance hierarchy in Python. Last, we specify the method “location” as key-value pair. Our sample looks like the following:

pn = type("PersonNew", (Person,), {"location": "Vienna"})
pn.age = 35
pn.name = "Mario"

If you test the code, it will just work like expected. All other objects such as age and name can also be retrieved. Now, let’s make it a bit more complex. We extend our previous sample with the JSON serialisation to be capable of dynamically creating a JSON object from a string.

Dynamically creating a class in Python from JSON

We therefore create a new function that takes the object to serialise and takes all values out of that. In addition, we add one more key-value pair, which we call “__class__” in order to store the name of the class. getting the class-name is a bit more complex, since it is written like “class ‘main.PersonNew'”. Therefore, we first split the object name with a “.”, take the last entry and again split it by the ‘ and take the first one. There are more elegant ways for this, but I want to keep it simple. Once we have the classname, we store it in the dictionary and return the dictionary. The complex sample is here:

def map_proxy(obj):
    dict = {}
    
    for k in obj.__dict__.keys():
        dict.update({k : obj.__dict__.get(k)})
        
    cls_name = str(obj).split(".")[1].split("'")[0]
    dict.update({"__class__" : cls_name})
        
    return dict

We can now use the json.dumps method and call the map_proxy function to return the JSON string:

st_pn = json.dumps(map_proxy(pn))
print(st_pn)

Now, we are ready to dynamically create a new class with the “type” method. We name the method after the class name that was provided above. This can be retrieved with “__class__”. We let it inherit from Person and pass the parameters from the entire object into it, since it is already a key/value pair:

def dyn_create(obj):
    
    return type(obj["__class__"], (Person, ), obj)

We can now also invoke the json.loads method to dynamically create the class:

obj = json.loads(st_pn, object_hook=dyn_create)
print(obj)
print(obj.location)

And the output should be like that:

{"location": "Vienna", "__module__": "__main__", "__doc__": null, "age": 35, "name": "Mario", "__class__": "PersonNew"}
<class '__main__.PersonNew'>
Vienna

As you can see, it is very easy to dynamically create new classes in Python. We could largely improve this code, but i’ve created this tutorial for explanatory reasons rather than usability ;).

In our next tutorial, we will have a look at logging.

Here you can go to the overview of the Python tutorial. If you want to dig deeper into the language, have a look at the official Python documentation.

In our last tutorials, we had a look at Transformations in Spark. Now, we look at Actions in Spark. One of them – collect – we already used frequently in our transformation samples. The reason for this is simple – transformations use late binding – so nothing happens – until you call an action. We used the most simple one – collect. Actions always do something with data and thus you should be prepared to use this. With the collect method, all the data is loaded into the memory. For our samples this wasn’t an issue, since we only had very small data. However, If datasets are larger, you need to re-consider this. Other functions might deliver better results for you. Let’s now have a look at the different options available

Reduce Action in Spark

Reduce calls a function on all items in a dataset that accumulates them. Basically, all binary operators can be used. For Python, please refer to this documentation: https://docs.python.org/3/library/operator.html. If we want to sum up all items by multiplying them in Spark, this would look like the following:

from operator import *
spark_data.reduce(mul)

The output would be this:

First and Count Actions in Spark

These Actions are easy to use – they either return the first item in an RDD or return the count of elements in the RDD. If you use count() on a very large RDD, it might take very long and your task could run into a timeout. There is another function called countApprox() that returns the approximate count in an RDD to prevent that.

ds_one.first()

Count is also used the same way like first – without applying a function or similar.

ds_one.count()

Saving data

One important thing to do is to store data eventually somewhere. In our case, we store it into the same directory like we have Jupyter notebooks. We can therefore check if it worked as expected. Normally, you would store data on S3 or any other storage you use. Spark RDDs provide several means to save to files, in our case we will use “saveAsTextFile”. This stores the content as part files that are in text format.

ds_one.saveAsTextFile("data/dsone.csv")

You can now navigate to the output at this folder: data/dsone.csv.

Spark SaveAsTextFile Result

So, today we have learned about Actions in Spark and how to apply them to RDDs.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue.

One important aspect of working with Data is serialisation. Basically, this means that classes can be persisted to a storage (e.g. the file system, HDFS or S3). With Spark, a lot of file formats are possible. However, in this tutorial we will have a look on how to deal with JSON, a very popular file format and often used in Spark.

JSON stands for “Java Script Object Notation” and was usually developed for Client-Server applications with JavaScript as main user of it. It was built to have less overhead than XML.

First, let’s start with copying objects. Basically, Python knows two ways: normal copies and deep copies. The difference is that with normal copies, references to objects within the copied object are built. This is relevant when using objects as classes. In a deep copy, no references are built but every value is copied to the new object. This means that you can now use it independent from the previous one.

To copy objects to another, you only need to import copy and call the copy or deepcopy function. The following code shows how this works.

import copy
ps1 = Person("Mario", 35)
pss = copy.copy(ps1)
psd = copy.deepcopy(ps1)
ps1.name = "Meir-Huber"
print(ps1.name)
print(pss.name)
print(psd.name)

And the output should be this:

Meir-Huber
Mario
Mario

Now, let’s look at how we can serialise an object with the use of JSON. Basically, you need to import “json”. An object that you want to serialise needs to be serialise-able. A lot of classes in Python already implement that. However, when we want to serialise our own object (e.g. the “Person” class that we have created in this tutorial), we need to implement the serialise-function or a custom serialiser. However, Python is great and provides us the possibility to access all variables in an object via the “__dict__” dictionary. This means that we don’t have to write our own serialiser and can do this via an easy call to “dumps” of “json”:

import json
js = json.dumps(ps1.__dict__)
print(js)

The above function creates a JSON representation of the entire class

{"name": "Meir-Huber", "age": 35}

We might want to add more information to the JSON string – e.g. the class name that it was originally stored in. We can do this by calling a custom function in the “dumps” method. This method gets the object to be serialised as only parameter. We then only pass the original object (Person) and the function we want to execute. We name this function “make_nice”. In the function, we create a dictionary and add the name of the class as first parameter. We give this the key “obj_name”. We then join the dictionary of the object into the new dictionary and return it.

Another parameter added to the “dumps” function is “indent”. The only thing it does is printing it pretty – by adding line breaks and indents. This is just for improved readability. The method and call looks like this:

def make_nice(obj):
    dict = {
        "obj_name": obj.__class__.__name__
    }
    
    dict.update(obj.__dict__)
    
    return dict
js_pretty = json.dumps(ps1, default=make_nice,indent=3)
print(js_pretty)

And the result should now look like the following:

{
   "obj_name": "Person",
   "name": "Meir-Huber",
   "age": 35
}

Now, we know how we can serialise an object to a JSON string. Basically, you can now store this string to a file or an object on S3. The only thing that we haven’t discussed yet is how to get back an object from a string. We therefore take the JSON object we “dumps” before. Our goal now is to create a Person object from it. This can be done via the call “loads” from the json-object. We also define a method to do the casting via the “object_hook” parameter. This object_hook method has one argument – the JSON object itself. We access each of the parameters from the object with named indexers and return the new object.

str_json = "{\"name\": \"Meir-Huber\", \"age\": 35}"
def create(obj):
    
    print(obj)
    
    return Person(obj["name"], obj["age"])
    
obj = json.loads(str_json, object_hook=create)
print(obj)

The output should now look like this.

{'name': 'Meir-Huber', 'age': 35}
<__main__.Person object at 0x7fb84831ddd8>

Now we know how to create JSON serialisers and how to get them back from a string value. In the next tutorial, we will have a look on how to improve this and make it more dynamic – by dynamic class creation in Python.

In the last tutorials, we already worked a lot with Strings and even manipulated some of them. Now, it is about time to have a look at the theory behind it. Basically, formatting strings is very easy. The only thing you need is the “format” method appended to a string with a variable amount of data. If you add numbers, the str() function is executed on them by itself, so no need to convert them.

Basically, the annotation is very similar to the one from other string formatters you are used to. One really nice thing though is that you don’t need to provide the positional arguments. Python assumes that the positions are in-line with the parameters you provide. An easy sample is this:

str01 = "This is my string {} and the value is {}".format("Test", 11)
print(str01)

And the output should look like this:

This is my string Test and the value is 11

You can also use classes for this. Therefore, we define a class “Person”:

class Person:    
    def __init__(self, name, age):
        self.name = name
        self.age = age
    
p = Person("Mario Meir-Huber", 35)
str02 = "The author \"{}\" is {} years old".format(p.name, p.age)
print(p.name)
print(str02)

The output for this should look like this:

Mario Meir-Huber
The author "Mario Meir-Huber" is 35 years old

One nice thing in Python is the difflib. This library enables us to easily check two array of strings for differences. One use-case would be to check my lastname for differences. Note that my lastname is one of the most frequent lastname combinations in the german speaking countries and thus allows different ways to write it.

To work with difflib, simply import it and call the difflib context_diff function. This prints the differences detected with “!”.

import difflib
arr01 = ["Mario", "Meir", "Huber"]
arr02 = ["Mario", "Meier", "Huber"]
for line in difflib.context_diff(arr01, arr02):
    print(line)

Below you can see the output. One difference was spotted. You can easily use this for spotting differences in datasets and creating golden records from it.

*** 
--- 
***************
*** 1,3 ****
  Mario
! Meir
  Huber
--- 1,3 ----
  Mario
! Meier
  Huber

Another nice feature in Python is the usage of textwrap. This library has some basic features for text “prettyfying”. Basically, in the following sample, we use 5 different things:

  • Indent: creates an indent to a text, e.g. a tab before the text
  • Wrap: wraps the text into an array of strings in case it is longer than the maximum width. This is useful to split text into a maximum number of arrays
  • Fill: does the same as Wrap, but creates new lines out of it
  • Shorten: shortens the text with a specified maximum number. This is written like “[…]” and you might use it to add a “read more” around it
  • Detent: deletes any whitespace before or after the text

The functions are used in simple statements:

from textwrap import *
print(indent("Mario Meir-Huber", "\t"))
print(wrap("Mario Meir-Huber", width=10))
print(fill("Mario Meir-Huber", width=10))
print(shorten("Mario Meir-Huber Another", width=15))
print(dedent(" Mario Meir-Huber "))

And the output should look like this:

	Mario Meir-Huber
['Mario', 'Meir-Huber']
Mario
Meir-Huber
Mario [...]
Mario Meir-Huber 

Today’s tutorial was more of a “housekeeping” since we used it already. In the next tutorial, I will write about object serialisation with JSON, as this is also very useful.

In our last tutorial section, we looked at more data transformations with Spark. In this tutorial, we will continue with data transformations on Spark RDD before we move on to Actions. Today, we will focus on map and intersect keywords and apply them to Spark RDDs.

Intersect

A intersection between two different datasets only returns that values that – well – intersect. This means that only those values are returned if their values correspond. Only one element is returned, even though there are multiple elements in both datasets. Intersection is used like this:

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Mark", 1984), ("Anastasia", 2017)])
sorted(ds_one.intersection(ds_two).collect())

The output should look like this now:

The Intersection keyword

Please note that “Mark” is only considered by the intersection keyword if all keys match. In our case, this means the year and name.

Map

Map returns a new RDD, that was transformed by applying a function to it. This is very useful if you want to check a dataset for different things or modify data. You can also add entries by this. The following example is using a little more complex transformation where we create our own function and calculate the age and gender of the person:

import datetime
def doTheTrans(values):
    
    age = datetime.datetime.now().year - values[1]
    gender = ""
    
    if values[0].endswith("a"):
        gender = "f"
    else:
        gender = "m"
        
    return ([values[0], age, gender])
ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
sorted(ds_one.map(doTheTrans).collect())

What happens in the code?

First, we import datetime. We need this to get the current year (we could also hardcode “2019”, but then I would have to re-write the entire tutorial in january!). Next, we create our function which we call “doTheTrans”. This gets tuples from the map function (which we will use later). Note that tuples are immutable, therefore we need to create new variables and return a new tuple.

First, we calculate the age. This is easily done by “datetime.now().year – values[1]”. We have the year the person is born in the second index (number 1) in the tuple. Then, we check if the name ends with “a” and define if the person is female or male. Please note here that this isn’t sufficient normally, we just use it for our sample.

Once we are done with this, we can call the “map” function with the function we have used. Now, the map function applies the new function which we created to each item in the rdd.

Now, the output of this should look like the following:

[['Lisa', 34, 'f'], ['Mark', 35, 'm']]

Data Transformations on Spark with a Lambda Function

It is also possible to write the function inline as lambda, if your transformations aren’t that complex. Suppose we only want to return the age of a person; here, the code would look like the following:

sorted(ds_one.map(lambda x: (x[0], datetime.datetime.now().year - x[1] +1)).collect())

And the output should be this:

[('Lisa', 35), ('Mark', 36)]

Now, after 3 intense tutorials on Data transformations on Spark RDD, we will focus on Actions in Spark in the next tutorial.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. The official Apache Spark page can intensify your experience. Your learning journey can still continue.

In the last tutorials, we had a look at methods, classes and deorators. Now, let’s have a brief look at asynchronous operations in Python. Most of the time, this is anyway abstracted for us via Spark, but it is nevertheless relevant to have some basic understanding of it. Basically, you define a method to be asynchronous by simply adding “async” as keyword ahead of the method definition. This is written like that:

async def FUNCTION_NAME():

FUNCTION-BLOCK

Another keyword in that context is “await”. Basically, every function that is doing something asynchronous is awaitable. When adding “await”, nothing else happens until the asynchronous function has finished. This means that you might loose the benefit of asynchronous execution but get better handling when working with web data. In the following code, we create an async function that sleeps some seconds (between 1 and 10). We call the function twice with the “await” operator.

import asyncio
import random
async def func():
    tim = random.randint(1,10)
    await asyncio.sleep(tim)
    print(f"Function finished after {tim} seconds")
    
await func()
await func()

In the output, you can see that it was first waited for the first function to finish and only then the second one was executed. Basically, all of the execution happened sequentially, not in parallel.

Function finished after 9 seconds
Function finished after 9 seconds

Python also knows parallel execution. This is done via Tasks. We use the Method “create_task” from the asyncio library in order to execute a function in parallel. In order to see how this works, we invoke the function several times and add a print-statement at the end of the code.

asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
print("doing something else ...")

This now looks very different to the previous sample. The print statement is the first to show up, and all code path finish after 9 seconds max. This is due to the fact that (A) the first execution finishes after 1 second – thus the print statement is the first to be shown, since it is executed immediately. (B) Everything is executed in parallel and the maximum sleep interval is 9 seconds.

doing something else ...
Function finished after 1 seconds
Function finished after 1 seconds
Function finished after 3 seconds
Function finished after 4 seconds
Function finished after 5 seconds
Function finished after 7 seconds
Function finished after 7 seconds
Function finished after 7 seconds
Function finished after 8 seconds
Function finished after 10 seconds
Function finished after 10 seconds
Function finished after 10 seconds

However, there are also some issues with async operations. You can never say how long it takes a task to execute. It could finish fast or it could also take forever, due to a weak network connection or an overloaded server. Therefore, you might want to specify a timeout, which is the maximum an operation should be waited for. In Python, this is done via the “wait_for” method. It basically takes the function to execute and the timeout in seconds. In case the call runs into a timeout, a “TimeoutError” is raised. This allows us to surround it with a try-block.

try:
    await asyncio.wait_for(func(), timeout=3.0)
except asyncio.TimeoutError:
    print("Timeout occured")

In two third of the cases, our function will run into a timeout. The function should return this:

Timeout occured

Each task that should be executed can also be controlled. Whenever you call the “create_task” function, it returns a Task-object. A task can either be done, cancelled or contain an error. In the next sample, we create a new task and wait for it’s completion. We then check if the task was done or cancelled. You could also check for an error and retrieve the error message from it.

task = asyncio.create_task(func())
print("running task")
await task
if task.done():
    print("Task was done")
elif task.cancelled():
    print("Task was cancelled")

In our case, no error should have occurred and thus the output should be the following:

running task
Function finished after 8 seconds
Task was done

Now we know how to work with async operations in Python. In our next tutorial, we will have a deeper look into how to work with Strings.

Decorators are powerful things in most programming languages. They help us making code more readable and adding functionality to a method or class. Basically, decorators are added above the method or class declaration in order to create some behaviour. Basically, we differentiate between two kind of decorators: method decorators and class decorators. In this tutorial, we will have a look at Class decorators.

Class decorators

Class decorators are used to add some behaviour to a class. Normally, you would use this when you want to add some kind of behaviour to a class that is outside of its inheritance structure – e.g. by adding something that is too abstract to bring it to the inheritance structure itself.

The definition of that is very similar to the method decorators:

@DECORATORNAME
class CLASSNAME():
CLASS-BLOCK

The decorator definition is also very similar to the last tutorial’s sample. We first create a method that takes a class and then create the inner method. Within the inner method, we create a new function that we want to “append” to the class. We call this method “fly” that simply prints “Now flying …” to the console. To add this function to the class, we call the “setattr” function of Python. We then return the class and the class wrapper.

def altitude(cls):
    def clswrapper(*args):
        def fly():
            print("Now flying ... ")
        setattr(cls, "fly", fly)
        return cls
    return clswrapper

Now, our decorator is ready to be used. We first need to create a class. Therefore, we re-use the sample of the vehicles, but simplify it a bit. We create a class “Vehicle” that has a function “accelerate” and create two sub classes “Car” and “Plane” that both inherit from “Vehicle”. The only difference now is that we add a decorator to the class “Plane”. We want to add the possibility to fly to the Plane.

class Vehicle:
    
    speed = 0
        
    def accelerate(self, speed):
        self.speed = speed
class Car(Vehicle):
    pass
@altitude
class Plane(Vehicle):
    pass

Now, we want to test our output:

c = Car()
p = Plane()
c.accelerate(100)
print(c.speed)
print(p.fly())

Output:

100
Now flying ... 

Basically, there are a lot of scenarios when you would use class decorators. For instance, you can add functionality to classes that contain data in order to convert this into a more readable table or alike.

In our next tutorial, we will look at the await-operator.

In our last tutorial section, we looked at filtering, joining and sorting data. Today, we will look at more Spark Data Transformations on RDD. Our focus topics for today are: distinct, groupby and union in Spark. Now, we will have a look at several other operators for that. First, we will use the “Distinct” transformation

Distinct

Distinct enables us to return exactly one of each item. For instance, if we have more than one entry in the same sequence, we can reduce this. A sample would be the following array:

[1, 2, 3, 4, 1]

However, we only want to return each number exactly once. This is done via the distinct keyword. The following example illustrates this:

ds_distinct = sc.parallelize([(1), (2), (3), (4), (1)]).distinct().collect()
ds_distinct

GROUPBY

A very important task when working with data is grouping. In Spark, we have the GroupBy transformation for this. In our case, this is “GroupByKey”. Basically, this groups the dataset into a specific form and the execution is added when calling the “mapValues” function. With this function, you can provide how you want to deal with the values. Some options are pre-defined, such as “len” for the number of occurrences or “list” for the actual values. The following sample illustrates this with our dataset introduced in the previous tutorial:

ds_set = sc.parallelize([("Mark", 1984), ("Lisa", 1985), ("Mark", 2015)])
ds_grp = ds_set.groupByKey().mapValues(list).collect()
ds_grp

If you want to have a count instead, simply use “len” for it:

ds_set.groupByKey().mapValues(len).collect()

The output should look like this now:

The GroupByKey keyword in Apache Spark
The GroupByKey keyword in Apache Spark

Union

A union joins together two datasets into one. In contrast to the “Join” transformation that we already looked at in our last tutorial, it doesn’t take any keys and simply appends the datasets. It is very similar to the previous one, but the result is different. The syntax for it is straight forward: it is written “dsone.union(dstwo)”. Let’s have a look at it:

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Luke", 2015), ("Anastasia", 2017)])
sorted(ds_one.union(ds_two).collect())

Now, the output of this should look like the following:

[('Anastasia', 2017), ('Lisa', 1985), ('Luke', 2015), ('Mark', 1984)]

Today, we learned a lot about Spark data transformations. In our next tutorial – part 3 – we will have a look at even more of them.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. The official Apache Spark page can intensify your experience. Your learning journey can still continue.

Decorators are powerful things in most programming languages. They help us making code more readable and adding functionality to a method or class. Basically, decorators are added above the method or class declaration in order to create some behaviour. Basically, we differentiate between two kind of decorators: method decorators and class decorators. In this tutorial, we will have a look at Method decorators.

Method decorators

Method decorators are used to perform some kind of behaviour on a method. For instance, you could add a stopwatch to check for performance, configure logging or make some checks on the method itself. All of that is done by “wrapping” the method into a decorator method. This basically means that the method “decorated” is executed in the decorator method. This, for instance, would allow us to surround a method with a try-catch block and thus add all exceptions occurred in a method into a global error handling tool.

The definition of that is very easy:

@DECORATORNAME
def METHODNAME():
METHOD-BLOCK

Basically, the only thing that you need is the “@” and the decorator name. There are several decorators available, but now we will create our own decorator. We start by creating a performance counter. The goal of that is to measure how long it takes a method to execute. We therefore create the decorator from scratch.

Basically, I stated that the decorator takes the function and executes it inside the decorator function. We start by defining our performance counter as function, that takes one argument – the function to wrap in. Within this function, we add another function (yes, we can do this in Python – creating inline functions!) – typically we call it either “wrapper” or “inner”. I call it “inner”. The inner function should provide the capability to pass on arguments; typically, a function call can have 0 to n arguments. In order to do this, we provide “*args” and “**kwargs”. Both mean that there is a variable number of arguments available. The only difference between args and kwargs is that kwargs are named arguments (e.g. “person = “Pete”).

In this inner function, we now create the start-variable that is the time once the performance counting should start. After the start-variable, we call the function (any function which we decorate) by passing on all the *args and **kwargs. After that, we measure the time again and do the math. Simple, isn’t it? However, we haven’t decorated anything yet. This is now done by creating a function that sleeps and prints text afterwards. The code for this is shown below.

import time
def perfcounter(func):
    def inner(*args, **kwargs):
        start = time.perf_counter()
        func(*args, **kwargs) #This is the invokation of the function!
        print(time.perf_counter() - start)
    return inner
    
@perfcounter
def printText(text):
    time.sleep(0.3)
    print(text)
    
printText("Hello Decorator")

Output:

Hello Decorator
0.3019062000021222

As you can see, we are now capable of adding this perfcounter decorator to any kind of function we like. Normally, it makes sense to add this to functions which take rather long – e.g. in Spark jobs or web requests. In the next sample, I create a type checker decorator. Basically, this type checker should validate that all parameters passed to any kind of function are of a specific type. E.g. we want to ensure that all parameters passed to a multiplication function are only of type integer, parameters passed to a print function are only of type string. Basically, you could also do this check inline, but it is much easier if you write the function once and simply apply it to the function as a decorator. Also, it greatly decreases the number of code lines and thus increases the readability of your code. The decorator for that should look like the following:

@typechecker(int)

For integer values and

@typechecker(str)

for string values.

The only difference now is that the decorator itself takes parameters as well, so we need to wrap the function into another function – compared to the previous sample, another level is added. What are the steps necessary?

  1. Create the method to get the parameter: def typechecker(type)
  2. Create the outer function that takes the function and holds the inner function
  3. Create the function block that holds the inner function and a type checker:
    1. We add a function called “isInt(arg)” that checks if the argument passed is of a specific type. We can use “isinstance” to check if an argument is of a specific type – e.g. int or str. If it isn’t of the expected type, we raise an error
    2. We add the inner function with args and kwargs. In this function, we iterate over all args and kwargs passed and check it against the above function (isInt). If all checks succeed, we invoke the wrapped function.
Sounds a bit complex? Don't worry, it isn't that complex at all. Let's have a look at the code:
def typechecker(type):
    def check(func):
        def isInt(arg):
            if not isinstance(arg, type):
                raise TypeError("Only full numbers permitted. Please check")
        def inner(*args, **kwargs):
            for arg in args:
                isInt(arg)
            for kwarg in kwargs:
                isInt(kwarg)
            return func(*args, **kwargs)
        return inner
    return check

Now, since we are done with the decorator itself, let’s decorate some functions. We create two functions. The first one multiplies all values passed to the function. The values can be of variable length. The second function prints all strings passed to the function. We decorate the two functions with the typechecker-decorator defined above.

@typechecker(int)
def mulall(*args):
    res = 0
    for arg in args:
        if res == 0: res = arg
        else: res *= arg
    return res
@typechecker(str)
def concat(*args):
    res = ""
    for arg in args:
        res += arg
    
    return res

I guess you can now see the benefit of decorators. We can influence the behaviour of a function and create code-snippets that are re-usable. But now, let’s call the functions to see if our decorator works as expected. Note: the third invokation should produce an error 🙂

print(mulall(1,2,3))
print(concat("a", "b", "c"))
print(mulall(1,2,"a"))

Output:

6
abc

… and the error message:

TypeErrorTraceback (most recent call last)
<ipython-input-6-cd2213a0d884> in <module>
     35 print(mulall(1,2,3))
     36 print(concat("a", "b", "c"))
---> 37 print(mulall(1,2,"a"))

<ipython-input-6-cd2213a0d884> in inner(*args, **kwargs)
      7         def inner(*args, **kwargs):
      8             for arg in args:
----> 9                 isInt(arg)
     10 
     11             for kwarg in kwargs:

<ipython-input-6-cd2213a0d884> in isInt(arg)
      3         def isInt(arg):
      4             if not isinstance(arg, type):
----> 5                 raise TypeError("Only full numbers permitted. Please check")
      6 
      7         def inner(*args, **kwargs):

TypeError: Only full numbers permitted. Please check

I hope you like decorators. In my opinion, they are very helpful and provide great value. In the next tutorial, I will show how class decorators work.