In my previous posts, I explained the Linear Regression and stated that there are some errors in it. This is called the error or prediction (for individual predictions) and there is also a standard error. A prediction is good if the individual errors of prediction and the standard error are small. Let’s now start by examining the error of prediction.

Error of prediction in Linear regression

Let’s recall the table from the previous tutorial:

YearAd Spend (X)Revenue (Y)Prediction (Y’)
2013 €    345.126,00  €   41.235.645,00  €   48.538.859,48
2014 €    534.678,00  €   62.354.984,00  €   65.813.163,80
2015 €    754.738,00  €   82.731.657,00  €   85.867.731,47
2016 €    986.453,00  € 112.674.539,00  € 106.984.445,76
2017 € 1.348.754,00  € 156.544.387,00  € 140.001.758,86
2018 € 1.678.943,00  € 176.543.726,00  € 170.092.632,46
2019 € 2.165.478,00  € 199.645.326,00  € 214.431.672,17

We can see that there is a clear difference in between the prediction and the actual numbers. We calculate the error in each prediction by taking the real value minus the prediction:

Y-Y’
-€   7.303.214,48
-€   3.458.179,80
-€   3.136.074,47
 €   5.690.093,24
 € 16.542.628,14
 €   6.451.093,54
-€ 14.786.346,17

In the above table, we can see how each prediction differs from the real value. Thus it is our prediction error on the actual values.

Calculating the Standard Error

Now, we want to calculate the standard error. First, let’s have a look at the formular:

Basically, we take the sum of all error to the square, divide it by the number of occurrences and take the square root of it. We already have Y-Y’ calculated, so we only need to make the square of it:

Y-Y’(Y-Y’)^2
-€   7.303.214,48  €    53.336.941.686.734,40
-€   3.458.179,80  €    11.959.007.558.032,20
-€   3.136.074,47  €      9.834.963.088.101,32
 €   5.690.093,24  €    32.377.161.053.416,10
 € 16.542.628,14  €  273.658.545.777.043,00
 €   6.451.093,54  €    41.616.607.923.053,70
-€ 14.786.346,17  €  218.636.033.083.835,00

The sum of it is:  €  641.419.260.170.216,00

And N is 7, since it contains 7 Elements. Divided by 7, it is:  €    91.631.322.881.459,50

The last step is to take the square root, which results in the standard error of  € 9.572.425,13 for our linear regression.

Now, we have most items cleared for our linear regression and can move on to the logistic regression in our next tutorial.

We have already learned a couple of things about the RDDs in Spark – Spark’s low-level API. Today, it is time for the Apache Spark Dataframes Tutorial. Basically, Spark Dataframes are easier to work with than RDDs are. Working with them is more straight forward and easier and gives a more natural feeling.

Introduction to Spark Dataframes

To get started with Dataframes, we need some data to work with. In the RDD examples, we’ve created random values. Now, we want to work with a more comprehensive dataset. I’ve prepared a “table” about revenues and uploaded them to my Git repository. You can download it here. [NOTE: if you did the Hive tutorial, you should already have the dataset]

After you have downloaded the data, upload the data to your Jupyter environment. First, create a new folder named “data” in Jupyter. Now, navigate to this folder and click on “Upload” in the upper right corner. With this, you can now upload a new file.

Upload a file to Jupyter

Once you have done this, your file will be available under “data/revenues.csv”. It will make sense to explore the file so that you understand the structure of it.

Dataframes are very comfortable to work with. First of all, you can use them like tables and add headers and datatypes to them. If you already looked at our dataset, you can see that there are no types or headers assigned, so we need to create them first. This works with using the “StructType” map of the pyspark.sql.types package within the Dataframes API. For each header entry, you need to provide a “StructField” entity to the StructType map. A StructField has the following arguments:

  • Name: name of the field provided as a string value
  • Type: data type of the field. This can be any basic datatype from the pyspark.sql.types package. Normally, they are:
    • Numeric values: ByteType, ShortType, IntegerType, LongType
    • Decimal values: FloatType, DoubleType, DecimalType
    • Date values: DateType, TimestampType
    • Characters: StringType
    • Boolean: BooleanType
    • Null: NullType
  • Nullable: True or False if the field should be nullable

Working with Spark Dataframes

Once this is done, we can read the file from the path we have uploaded it with providing the “spark.read.csv” method. But before this happens, it is necessary to create the spark session. With dataframes, this is done with the Session builder. We provide an app name and some config parameters and call the “getOrCreate” function of SparkSession.

from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.master("local") \
.appName("Cloudvane-Sample-02") \
.config("net.cloudvane.sampleId", "Cloudvane.net Spark Sample 02").getOrCreate()
structure = StructType([StructField("personid", IntegerType(), False),
                      StructField("personname", StringType(), False),
                      StructField("state", StringType(), True),
                      StructField("age", IntegerType(), True),
                      StructField("productid", IntegerType(), False),
                      StructField("productname", StringType(), False),
                      StructField("price", DoubleType(), False)])
df = spark.read.csv("data/revenues.csv", structure)

Now, let’s show the dataframe:

df

The output of this should look like the following:

DataFrame[personid: int, personname: string, state: string, age: int, productid: int, productname: string, price: double]

However, this only shows the header names and associated data types. Spark is built for large datasets, often having billions of rows – so seeing the result of such things is often not possible. However, you can show the top items by calling the “show()” function of a dataframe.

df.show()

This will now show some of the items in the form of a table. With our dataset, this looks like this:

The output of the Dataframe show() function

Modifying a Spark Dataframe

In the spark dataframe, there are some unnecessary rows inside. A common task for data engineers and data scientists is data cleansing. For our following samples, we won’t need “productid” – unless it might be useful for other tasks. Therefore, we simply remove the entire column with calling the “drop” functionality with Spark. After that, we show the result of it again.

df_clean = df.drop("productid")
df_clean.show()

The output of this again looks like the following:

Using the df.drop function in Spark

Now we have a heavily modified dataframe. At some point it would be useful to persist this dataset somewhere. In our case, we store it simply within our jupyter environment because it is the easiest way to access it for our samples. We store the result as a parquet file – one of the most common file formats to work with Spark. We can use the “write” function and after that add “parquet” to the calls. The “parquet” format knows several parameters that we will use:

  • FileName: the name of the file. For our sample below, we will create the filename from the current date and time. Therefore, we import “datetime” from the python framework. The method “strftime” formats the datetime in the specified format. our format should be “dd-mm-yyyy hh:mm:ss”. Also, we need to import the “os” package, which provides information about the filesystem used by the operating system. We will use the “path.join” function out of the package.
  • PartitionBy: With large datasets, it is best to partition them in order to increase efficiency of access. Partitions are done on Columns, which has a positive impact on read/scan operations on data. In our case, we provide the state as the partition criteria.
  • Compression: in order to decrease the size on disk, we use a compression algorithm. In our case, this should be gzip.

This is now all that we need to know in order to run the next code snippet:

import datetime
import os
 
today = datetime.datetime.today()
fileName = os.path.join("data", today.strftime('%d-%m-%Y %H:%M'))
df_clean.write.parquet(fileName, partitionBy="state", compression="gzip")

In today’s Spark Dataframes Tutorial, we have learned about the basics. In our next tutorial, we will look at grouping, filtering and sorting data.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue.

In our previous tutorial, we had a look at how to (de) serialise objects from and to JSON in Python. Now, let’s have a look into how to dynamically create and extend classes in Python. Basically, we are using the library that Python itself is using. This is the dynamic type function in Python. This function takes several parameters, we will only focus on three relevant one’s for our sample.

How to use the dynamic type function in Python

Basically, this function takes several parameters. We utilize 3 parameters. These are:

type(CLASS_NAME, INHERITS, PARAMETERS)

These parameters have the following meaning:

  • CLASS_NAME: The name of the new class
  • INHERITS: from which the new type should inherit
  • PARAMETERS: new methods or parameters added to the class

In our following example, we want to extend the Person class with a new attribute called “location”. We call our new class “PersonNew” and instruct Python to inherit from “Person”, which we have created some tutorials earlier. Strange is that it is passed as an array, even there can only be one inheritance hierarchy in Python. Last, we specify the method “location” as key-value pair. Our sample looks like the following:

pn = type("PersonNew", (Person,), {"location": "Vienna"})
pn.age = 35
pn.name = "Mario"

If you test the code, it will just work like expected. All other objects such as age and name can also be retrieved. Now, let’s make it a bit more complex. We extend our previous sample with the JSON serialisation to be capable of dynamically creating a JSON object from a string.

Dynamically creating a class in Python from JSON

We therefore create a new function that takes the object to serialise and takes all values out of that. In addition, we add one more key-value pair, which we call “__class__” in order to store the name of the class. getting the class-name is a bit more complex, since it is written like “class ‘main.PersonNew'”. Therefore, we first split the object name with a “.”, take the last entry and again split it by the ‘ and take the first one. There are more elegant ways for this, but I want to keep it simple. Once we have the classname, we store it in the dictionary and return the dictionary. The complex sample is here:

def map_proxy(obj):
    dict = {}
    
    for k in obj.__dict__.keys():
        dict.update({k : obj.__dict__.get(k)})
        
    cls_name = str(obj).split(".")[1].split("'")[0]
    dict.update({"__class__" : cls_name})
        
    return dict

We can now use the json.dumps method and call the map_proxy function to return the JSON string:

st_pn = json.dumps(map_proxy(pn))
print(st_pn)

Now, we are ready to dynamically create a new class with the “type” method. We name the method after the class name that was provided above. This can be retrieved with “__class__”. We let it inherit from Person and pass the parameters from the entire object into it, since it is already a key/value pair:

def dyn_create(obj):
    
    return type(obj["__class__"], (Person, ), obj)

We can now also invoke the json.loads method to dynamically create the class:

obj = json.loads(st_pn, object_hook=dyn_create)
print(obj)
print(obj.location)

And the output should be like that:

{"location": "Vienna", "__module__": "__main__", "__doc__": null, "age": 35, "name": "Mario", "__class__": "PersonNew"}
<class '__main__.PersonNew'>
Vienna

As you can see, it is very easy to dynamically create new classes in Python. We could largely improve this code, but i’ve created this tutorial for explanatory reasons rather than usability ;).

In our next tutorial, we will have a look at logging.

Here you can go to the overview of the Python tutorial. If you want to dig deeper into the language, have a look at the official Python documentation.

In my previous posts, I introduced the basics of machine learning. Today, I want to focus on the two elementary algorithms: linear and logistic regression. Basically, you would learn them at the very beginning of your journey for machine learning, but eventually not use them much later on any more. But to understand the concepts of it, it is helpful to understand them.

Linear Regression

A Linear Regression is the simplest model for Data Science. Linear Regression is of supervised learning and used in Trend Analysis, Time-Series Analysis, Risk in Banking and many more.

In a linear regression, a relationship between a dependent variable y and a dataset of xn is linear. This basically means, that if there is data of a specific trend, a future trend can be predicted. Let’s assume that there is a significant relation between ad spendings and sales. We would have the following table:

YearAd SpendRevenue
2013 €      345.126,00  €      41.235.645,00
2014 €      534.678,00  €      62.354.984,00
2015 €      754.738,00  €      82.731.657,00
2016 €      986.453,00  €    112.674.539,00
2017 €   1.348.754,00  €    156.544.387,00
2018 €   1.678.943,00  €    176.543.726,00
2019 €   2.165.478,00  €    199.645.326,00

If you look at the data, it is very easy to figure out that that there is some kind of relation between how much money you spend on the ads and the revenue you get. Basically, the ratio is 1:92 to 1:119. Please not that I totally made up the numbers. however, based on this numbers, you could basically predict what revenues to obtain when spending X amount of data. The relation between them is therefore linear and we can easily plot it on a line chart:

Linear Regression

As you can see, some of the values are above the line and others below. Let’s now manually calculate the linear function. There are some steps necessary that should eventually lead to the prediction values. Let’s assume we want to know if we spend a specific money on ads, what revenue we can expect. Let’s assume we want to know how much value we create for 1 Million spend on ads. The linear regression function for this is:

predicted score (Y') = bX + intercept (A)

This means that we now need to calculate several values: (A) the slope (it is our “b” and the intercept (it is our A). X is the only value we know – our 1 Million spend. Let’s first calculate the slope

Calculating the Slope

The first thing we need to do is calculating the slope. For this, we need to have the standard deviation of both X and XY. Let’s first start with X – our revenues. The standard deviation is calculated for each revenue individually. There are some steps involved:

  • Creating the average of the revenues
  • Subtracting the individual revenue
  • Building the square

The first step is to create the average of both values. The average for the revenues should be:  € 118.818.609,14 and the average for the spend should be:  € 1.116.310,00.

Next, we need to create the standard deviation of each item. For the ad spend, we do this by substracting each individual ad spend and building the square. The table for this should look like the following:

The formular is: (Average of Ad spend – ad spend) ^ 2

YearAd spendStddev (X)
2013 €    345.126,00  €              594.724.761.856,00
2014 €    534.678,00  €              338.295.783.424,00
2015 €    754.738,00  €              130.734.311.184,00
2016 €    986.453,00  €                16.862.840.449,00
2017 € 1.348.754,00  €                54.030.213.136,00
2018 € 1.678.943,00  €              316.555.892.689,00
2019 € 2.165.478,00  €           1.100.753.492.224,00

Quite huge numbers already, right? Now, let’s create the standard deviation for the revenues. This is done by taking the average of the ad spend – ad spend and multiplying it with the same procedure for the revenues. This should result in:

YearRevenueY_Ad_Stddev
2013 €                  41.235.645,00  €    59.830.740.619.545,10
2014 €                  62.354.984,00  €    32.841.051.219.090,30
2015 €                  82.731.657,00  €    13.048.031.460.197,10
2016 €                112.674.539,00  €         797.850.516.541,00
2017 €                156.544.387,00  €      8.769.130.708.225,71
2018 €                176.543.726,00  €    32.478.055.672.684,90
2019 €                199.645.326,00  €    84.800.804.871.574,80

Now, we only need to sum up the columns for Y and X. The sums should be:

€ 2.551.957.294.962,00 for the X-Row
€ 232.565.665.067.859,00 for the Y-Row

Now, we need to divide the Y-Row by the X-Row and would get the following slope: 91,1322715

Calculating the Intercept

The intercept is somewhat easier. The formular for it is: average(y) – Slope * average(x). We already have all relevant variables calculated in our previous step. Our intercept should equal:  € 17.086.743,14.

Predicting the value with the Linear Regression

Now, we can build our function. This is: Y = 91,1322715X + 17.086.743,14

As stated in the beginning, our X should be 1 Million and we want to know our revenue:  € 108.219.014,64

The prediction is actually lower than the values which are closer (2016 and 2017 values). If you change the values, e.g. to 2 Million or 400k, it will again get closer. Predictions always produce some errors and they are normally shown. In our case, the error table would look like the following:

ad spentreal revenue (Y)prediction (Y’)error
2013 €                       345.126,00  €                  41.235.645,00  €                  48.538.859,48 -€     7.303.214,48
2014 €                       534.678,00  €                  62.354.984,00  €                  65.813.163,80 -€     3.458.179,80
2015 €                       754.738,00  €                  82.731.657,00  €                  85.867.731,47 -€     3.136.074,47
2016 €                       986.453,00  €                112.674.539,00  €                106.984.445,76  €     5.690.093,24
2017 €                    1.348.754,00  €                156.544.387,00  €                140.001.758,86  €   16.542.628,14
2018 €                    1.678.943,00  €                176.543.726,00  €                170.092.632,46  €     6.451.093,54
2019 €                    2.165.478,00  €                199.645.326,00  €                214.431.672,17 -€   14.786.346,17

The error calculation is done by using the real value and deducting the predicted value from it. And voila – you have your error. One common thing in machine learning is to reduce the error and make predictions more accurate.

In our last tutorials, we had a look at Transformations in Spark. Now, we look at Actions in Spark. One of them – collect – we already used frequently in our transformation samples. The reason for this is simple – transformations use late binding – so nothing happens – until you call an action. We used the most simple one – collect. Actions always do something with data and thus you should be prepared to use this. With the collect method, all the data is loaded into the memory. For our samples this wasn’t an issue, since we only had very small data. However, If datasets are larger, you need to re-consider this. Other functions might deliver better results for you. Let’s now have a look at the different options available

Reduce Action in Spark

Reduce calls a function on all items in a dataset that accumulates them. Basically, all binary operators can be used. For Python, please refer to this documentation: https://docs.python.org/3/library/operator.html. If we want to sum up all items by multiplying them in Spark, this would look like the following:

from operator import *
spark_data.reduce(mul)

The output would be this:

First and Count Actions in Spark

These Actions are easy to use – they either return the first item in an RDD or return the count of elements in the RDD. If you use count() on a very large RDD, it might take very long and your task could run into a timeout. There is another function called countApprox() that returns the approximate count in an RDD to prevent that.

ds_one.first()

Count is also used the same way like first – without applying a function or similar.

ds_one.count()

Saving data

One important thing to do is to store data eventually somewhere. In our case, we store it into the same directory like we have Jupyter notebooks. We can therefore check if it worked as expected. Normally, you would store data on S3 or any other storage you use. Spark RDDs provide several means to save to files, in our case we will use “saveAsTextFile”. This stores the content as part files that are in text format.

ds_one.saveAsTextFile("data/dsone.csv")

You can now navigate to the output at this folder: data/dsone.csv.

Spark SaveAsTextFile Result

So, today we have learned about Actions in Spark and how to apply them to RDDs.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue.

One important aspect of working with Data is serialisation. Basically, this means that classes can be persisted to a storage (e.g. the file system, HDFS or S3). With Spark, a lot of file formats are possible. However, in this tutorial we will have a look on how to deal with JSON, a very popular file format and often used in Spark.

JSON stands for “Java Script Object Notation” and was usually developed for Client-Server applications with JavaScript as main user of it. It was built to have less overhead than XML.

First, let’s start with copying objects. Basically, Python knows two ways: normal copies and deep copies. The difference is that with normal copies, references to objects within the copied object are built. This is relevant when using objects as classes. In a deep copy, no references are built but every value is copied to the new object. This means that you can now use it independent from the previous one.

To copy objects to another, you only need to import copy and call the copy or deepcopy function. The following code shows how this works.

import copy
ps1 = Person("Mario", 35)
pss = copy.copy(ps1)
psd = copy.deepcopy(ps1)
ps1.name = "Meir-Huber"
print(ps1.name)
print(pss.name)
print(psd.name)

And the output should be this:

Meir-Huber
Mario
Mario

Now, let’s look at how we can serialise an object with the use of JSON. Basically, you need to import “json”. An object that you want to serialise needs to be serialise-able. A lot of classes in Python already implement that. However, when we want to serialise our own object (e.g. the “Person” class that we have created in this tutorial), we need to implement the serialise-function or a custom serialiser. However, Python is great and provides us the possibility to access all variables in an object via the “__dict__” dictionary. This means that we don’t have to write our own serialiser and can do this via an easy call to “dumps” of “json”:

import json
js = json.dumps(ps1.__dict__)
print(js)

The above function creates a JSON representation of the entire class

{"name": "Meir-Huber", "age": 35}

We might want to add more information to the JSON string – e.g. the class name that it was originally stored in. We can do this by calling a custom function in the “dumps” method. This method gets the object to be serialised as only parameter. We then only pass the original object (Person) and the function we want to execute. We name this function “make_nice”. In the function, we create a dictionary and add the name of the class as first parameter. We give this the key “obj_name”. We then join the dictionary of the object into the new dictionary and return it.

Another parameter added to the “dumps” function is “indent”. The only thing it does is printing it pretty – by adding line breaks and indents. This is just for improved readability. The method and call looks like this:

def make_nice(obj):
    dict = {
        "obj_name": obj.__class__.__name__
    }
    
    dict.update(obj.__dict__)
    
    return dict
js_pretty = json.dumps(ps1, default=make_nice,indent=3)
print(js_pretty)

And the result should now look like the following:

{
   "obj_name": "Person",
   "name": "Meir-Huber",
   "age": 35
}

Now, we know how we can serialise an object to a JSON string. Basically, you can now store this string to a file or an object on S3. The only thing that we haven’t discussed yet is how to get back an object from a string. We therefore take the JSON object we “dumps” before. Our goal now is to create a Person object from it. This can be done via the call “loads” from the json-object. We also define a method to do the casting via the “object_hook” parameter. This object_hook method has one argument – the JSON object itself. We access each of the parameters from the object with named indexers and return the new object.

str_json = "{\"name\": \"Meir-Huber\", \"age\": 35}"
def create(obj):
    
    print(obj)
    
    return Person(obj["name"], obj["age"])
    
obj = json.loads(str_json, object_hook=create)
print(obj)

The output should now look like this.

{'name': 'Meir-Huber', 'age': 35}
<__main__.Person object at 0x7fb84831ddd8>

Now we know how to create JSON serialisers and how to get them back from a string value. In the next tutorial, we will have a look on how to improve this and make it more dynamic – by dynamic class creation in Python.

One of the frequent statements vendors make is “Agile Analytics”. In pitches towards business units, they often claim that it would only take them some weeks to do agile analytics. However, this isn’t necessarily true, since they can easily abstract the hardest part of “agile” analytics: data access, retrieval and preparation. On the one hand side, this creates “bad blood” within a company: business units might ask why it takes their internal department so long (and there most likely has been some history to get the emotions going). But on the other side, it is necessary to solve this problem, as agile analytics is still possible – if done right.

In my opinion, there are several aspects necessary to go for agile analytics. First, it is about culture. Second, it is about organization and third is it about technology. Let’s start with culture first.

Culture

The company must be silo-free. Sounds easy, in fact it is very difficult. Different business units use data as a “weapon” which could easily be thermo-nuclear. If you own the data, you can easily create your own truth. This means that marketing could create their view of the market in terms of reach, sales could tweak the numbers (until the overall performance is measured by controlling), … So, business units might fight giving away data and will try to keep it in their ownership. However, data should be a company-wide good that is available to all units (of course, on the need to know basis and with adhering to legal and regulatory standards). This can only be achieved if the data unit is close to the CEO or any other powerful board member. Once this is achieved, it is easier to go for self-service analytics.

Organisation

Similar like culture, it is necessary to organize yourself for agile analytics. This is now more focused on the internal structure of an organization (e.g. the data unit). There is now silver bullet for this available, it very much depends on the overall culture of a company. However, certain aspects have to be fulfilled:

  • BizDevOps: I outlined it in one of my previous posts and I insist on this approach being necessary for many things around data. One of them is agile analytics, since handover of tasks is always complicated. End-to-end responsibility is really crucial for agile analytics
  • Data Governance: There is no way around it; either do it or forget about anything close to agile analytics. It is necessary to have security and privacy at control and to allow users to access data easy but secure. Also, it is very important to log what is going on (SOX!)
  • Self-Service Tools: Have tools available that enable you to access data without complex processes. I will write about this in “Technology”.

Technology

Last but not least, agile analytics is done via technology. Technology is just an enabler, so if you don’t get the previous 2 right, you will most likely fail here – even though you invest millions into it. You will need different tools that handle security and privacy, but also a clear and easy to use Metadata repository (let’s face it – a data catalog!). Also, you need tools that allow easy access of data via a data science workbench, a fully functional data lake and a data abstraction layer. That sounds quite a lot – and it is. The good news though is, that most of that comes for free – as all of them are mainly open source tools. At some point, you might need an enterprise license but cost-wise it is still manageable. And remember one thing: technology comes last. If you don’t fix culture and organization, you won’t be capable to deliver.

In the last tutorials, we already worked a lot with Strings and even manipulated some of them. Now, it is about time to have a look at the theory behind it. Basically, formatting strings is very easy. The only thing you need is the “format” method appended to a string with a variable amount of data. If you add numbers, the str() function is executed on them by itself, so no need to convert them.

Basically, the annotation is very similar to the one from other string formatters you are used to. One really nice thing though is that you don’t need to provide the positional arguments. Python assumes that the positions are in-line with the parameters you provide. An easy sample is this:

str01 = "This is my string {} and the value is {}".format("Test", 11)
print(str01)

And the output should look like this:

This is my string Test and the value is 11

You can also use classes for this. Therefore, we define a class “Person”:

class Person:    
    def __init__(self, name, age):
        self.name = name
        self.age = age
    
p = Person("Mario Meir-Huber", 35)
str02 = "The author \"{}\" is {} years old".format(p.name, p.age)
print(p.name)
print(str02)

The output for this should look like this:

Mario Meir-Huber
The author "Mario Meir-Huber" is 35 years old

One nice thing in Python is the difflib. This library enables us to easily check two array of strings for differences. One use-case would be to check my lastname for differences. Note that my lastname is one of the most frequent lastname combinations in the german speaking countries and thus allows different ways to write it.

To work with difflib, simply import it and call the difflib context_diff function. This prints the differences detected with “!”.

import difflib
arr01 = ["Mario", "Meir", "Huber"]
arr02 = ["Mario", "Meier", "Huber"]
for line in difflib.context_diff(arr01, arr02):
    print(line)

Below you can see the output. One difference was spotted. You can easily use this for spotting differences in datasets and creating golden records from it.

*** 
--- 
***************
*** 1,3 ****
  Mario
! Meir
  Huber
--- 1,3 ----
  Mario
! Meier
  Huber

Another nice feature in Python is the usage of textwrap. This library has some basic features for text “prettyfying”. Basically, in the following sample, we use 5 different things:

  • Indent: creates an indent to a text, e.g. a tab before the text
  • Wrap: wraps the text into an array of strings in case it is longer than the maximum width. This is useful to split text into a maximum number of arrays
  • Fill: does the same as Wrap, but creates new lines out of it
  • Shorten: shortens the text with a specified maximum number. This is written like “[…]” and you might use it to add a “read more” around it
  • Detent: deletes any whitespace before or after the text

The functions are used in simple statements:

from textwrap import *
print(indent("Mario Meir-Huber", "\t"))
print(wrap("Mario Meir-Huber", width=10))
print(fill("Mario Meir-Huber", width=10))
print(shorten("Mario Meir-Huber Another", width=15))
print(dedent(" Mario Meir-Huber "))

And the output should look like this:

	Mario Meir-Huber
['Mario', 'Meir-Huber']
Mario
Meir-Huber
Mario [...]
Mario Meir-Huber 

Today’s tutorial was more of a “housekeeping” since we used it already. In the next tutorial, I will write about object serialisation with JSON, as this is also very useful.

In our last tutorial section, we looked at more data transformations with Spark. In this tutorial, we will continue with data transformations on Spark RDD before we move on to Actions. Today, we will focus on map and intersect keywords and apply them to Spark RDDs.

Intersect

A intersection between two different datasets only returns that values that – well – intersect. This means that only those values are returned if their values correspond. Only one element is returned, even though there are multiple elements in both datasets. Intersection is used like this:

ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
ds_two = sc.parallelize([("Mark", 1984), ("Anastasia", 2017)])
sorted(ds_one.intersection(ds_two).collect())

The output should look like this now:

The Intersection keyword

Please note that “Mark” is only considered by the intersection keyword if all keys match. In our case, this means the year and name.

Map

Map returns a new RDD, that was transformed by applying a function to it. This is very useful if you want to check a dataset for different things or modify data. You can also add entries by this. The following example is using a little more complex transformation where we create our own function and calculate the age and gender of the person:

import datetime
def doTheTrans(values):
    
    age = datetime.datetime.now().year - values[1]
    gender = ""
    
    if values[0].endswith("a"):
        gender = "f"
    else:
        gender = "m"
        
    return ([values[0], age, gender])
ds_one = sc.parallelize([("Mark", 1984), ("Lisa", 1985)])
sorted(ds_one.map(doTheTrans).collect())

What happens in the code?

First, we import datetime. We need this to get the current year (we could also hardcode “2019”, but then I would have to re-write the entire tutorial in january!). Next, we create our function which we call “doTheTrans”. This gets tuples from the map function (which we will use later). Note that tuples are immutable, therefore we need to create new variables and return a new tuple.

First, we calculate the age. This is easily done by “datetime.now().year – values[1]”. We have the year the person is born in the second index (number 1) in the tuple. Then, we check if the name ends with “a” and define if the person is female or male. Please note here that this isn’t sufficient normally, we just use it for our sample.

Once we are done with this, we can call the “map” function with the function we have used. Now, the map function applies the new function which we created to each item in the rdd.

Now, the output of this should look like the following:

[['Lisa', 34, 'f'], ['Mark', 35, 'm']]

Data Transformations on Spark with a Lambda Function

It is also possible to write the function inline as lambda, if your transformations aren’t that complex. Suppose we only want to return the age of a person; here, the code would look like the following:

sorted(ds_one.map(lambda x: (x[0], datetime.datetime.now().year - x[1] +1)).collect())

And the output should be this:

[('Lisa', 35), ('Mark', 36)]

Now, after 3 intense tutorials on Data transformations on Spark RDD, we will focus on Actions in Spark in the next tutorial.

If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. The official Apache Spark page can intensify your experience. Your learning journey can still continue.

In the last tutorials, we had a look at methods, classes and deorators. Now, let’s have a brief look at asynchronous operations in Python. Most of the time, this is anyway abstracted for us via Spark, but it is nevertheless relevant to have some basic understanding of it. Basically, you define a method to be asynchronous by simply adding “async” as keyword ahead of the method definition. This is written like that:

async def FUNCTION_NAME():

FUNCTION-BLOCK

Another keyword in that context is “await”. Basically, every function that is doing something asynchronous is awaitable. When adding “await”, nothing else happens until the asynchronous function has finished. This means that you might loose the benefit of asynchronous execution but get better handling when working with web data. In the following code, we create an async function that sleeps some seconds (between 1 and 10). We call the function twice with the “await” operator.

import asyncio
import random
async def func():
    tim = random.randint(1,10)
    await asyncio.sleep(tim)
    print(f"Function finished after {tim} seconds")
    
await func()
await func()

In the output, you can see that it was first waited for the first function to finish and only then the second one was executed. Basically, all of the execution happened sequentially, not in parallel.

Function finished after 9 seconds
Function finished after 9 seconds

Python also knows parallel execution. This is done via Tasks. We use the Method “create_task” from the asyncio library in order to execute a function in parallel. In order to see how this works, we invoke the function several times and add a print-statement at the end of the code.

asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
asyncio.create_task(func())
print("doing something else ...")

This now looks very different to the previous sample. The print statement is the first to show up, and all code path finish after 9 seconds max. This is due to the fact that (A) the first execution finishes after 1 second – thus the print statement is the first to be shown, since it is executed immediately. (B) Everything is executed in parallel and the maximum sleep interval is 9 seconds.

doing something else ...
Function finished after 1 seconds
Function finished after 1 seconds
Function finished after 3 seconds
Function finished after 4 seconds
Function finished after 5 seconds
Function finished after 7 seconds
Function finished after 7 seconds
Function finished after 7 seconds
Function finished after 8 seconds
Function finished after 10 seconds
Function finished after 10 seconds
Function finished after 10 seconds

However, there are also some issues with async operations. You can never say how long it takes a task to execute. It could finish fast or it could also take forever, due to a weak network connection or an overloaded server. Therefore, you might want to specify a timeout, which is the maximum an operation should be waited for. In Python, this is done via the “wait_for” method. It basically takes the function to execute and the timeout in seconds. In case the call runs into a timeout, a “TimeoutError” is raised. This allows us to surround it with a try-block.

try:
    await asyncio.wait_for(func(), timeout=3.0)
except asyncio.TimeoutError:
    print("Timeout occured")

In two third of the cases, our function will run into a timeout. The function should return this:

Timeout occured

Each task that should be executed can also be controlled. Whenever you call the “create_task” function, it returns a Task-object. A task can either be done, cancelled or contain an error. In the next sample, we create a new task and wait for it’s completion. We then check if the task was done or cancelled. You could also check for an error and retrieve the error message from it.

task = asyncio.create_task(func())
print("running task")
await task
if task.done():
    print("Task was done")
elif task.cancelled():
    print("Task was cancelled")

In our case, no error should have occurred and thus the output should be the following:

running task
Function finished after 8 seconds
Task was done

Now we know how to work with async operations in Python. In our next tutorial, we will have a deeper look into how to work with Strings.