We have already learned a couple of things about the RDDs in Spark – Spark’s low-level API. Today, it is time for the Apache Spark Dataframes Tutorial. Basically, Spark Dataframes are easier to work with than RDDs are. Working with them is more straight forward and easier and gives a more natural feeling.
Introduction to Spark Dataframes
To get started with Dataframes, we need some data to work with. In the RDD examples, we’ve created random values. Now, we want to work with a more comprehensive dataset. I’ve prepared a “table” about revenues and uploaded them to my Git repository. You can download it here. [NOTE: if you did the Hive tutorial, you should already have the dataset]
After you have downloaded the data, upload the data to your Jupyter environment. First, create a new folder named “data” in Jupyter. Now, navigate to this folder and click on “Upload” in the upper right corner. With this, you can now upload a new file.
Once you have done this, your file will be available under “data/revenues.csv”. It will make sense to explore the file so that you understand the structure of it.
Dataframes are very comfortable to work with. First of all, you can use them like tables and add headers and datatypes to them. If you already looked at our dataset, you can see that there are no types or headers assigned, so we need to create them first. This works with using the “StructType” map of the pyspark.sql.types package within the Dataframes API. For each header entry, you need to provide a “StructField” entity to the StructType map. A StructField has the following arguments:
- Name: name of the field provided as a string value
- Type: data type of the field. This can be any basic datatype from the pyspark.sql.types package. Normally, they are:
- Numeric values: ByteType, ShortType, IntegerType, LongType
- Decimal values: FloatType, DoubleType, DecimalType
- Date values: DateType, TimestampType
- Characters: StringType
- Boolean: BooleanType
- Null: NullType
- Nullable: True or False if the field should be nullable
Working with Spark Dataframes
Once this is done, we can read the file from the path we have uploaded it with providing the “spark.read.csv” method. But before this happens, it is necessary to create the spark session. With dataframes, this is done with the Session builder. We provide an app name and some config parameters and call the “getOrCreate” function of SparkSession.
from pyspark.sql import SparkSession from pyspark.sql.types import * spark = SparkSession.builder.master("local") \ .appName("Cloudvane-Sample-02") \ .config("net.cloudvane.sampleId", "Cloudvane.net Spark Sample 02").getOrCreate() structure = StructType([StructField("personid", IntegerType(), False), StructField("personname", StringType(), False), StructField("state", StringType(), True), StructField("age", IntegerType(), True), StructField("productid", IntegerType(), False), StructField("productname", StringType(), False), StructField("price", DoubleType(), False)]) df = spark.read.csv("data/revenues.csv", structure)
Now, let’s show the dataframe:
The output of this should look like the following:
DataFrame[personid: int, personname: string, state: string, age: int, productid: int, productname: string, price: double]
However, this only shows the header names and associated data types. Spark is built for large datasets, often having billions of rows – so seeing the result of such things is often not possible. However, you can show the top items by calling the “show()” function of a dataframe.
This will now show some of the items in the form of a table. With our dataset, this looks like this:
Modifying a Spark Dataframe
In the spark dataframe, there are some unnecessary rows inside. A common task for data engineers and data scientists is data cleansing. For our following samples, we won’t need “productid” – unless it might be useful for other tasks. Therefore, we simply remove the entire column with calling the “drop” functionality with Spark. After that, we show the result of it again.
df_clean = df.drop("productid") df_clean.show()
The output of this again looks like the following:
Now we have a heavily modified dataframe. At some point it would be useful to persist this dataset somewhere. In our case, we store it simply within our jupyter environment because it is the easiest way to access it for our samples. We store the result as a parquet file – one of the most common file formats to work with Spark. We can use the “write” function and after that add “parquet” to the calls. The “parquet” format knows several parameters that we will use:
- FileName: the name of the file. For our sample below, we will create the filename from the current date and time. Therefore, we import “datetime” from the python framework. The method “strftime” formats the datetime in the specified format. our format should be “dd-mm-yyyy hh:mm:ss”. Also, we need to import the “os” package, which provides information about the filesystem used by the operating system. We will use the “path.join” function out of the package.
- PartitionBy: With large datasets, it is best to partition them in order to increase efficiency of access. Partitions are done on Columns, which has a positive impact on read/scan operations on data. In our case, we provide the state as the partition criteria.
- Compression: in order to decrease the size on disk, we use a compression algorithm. In our case, this should be gzip.
This is now all that we need to know in order to run the next code snippet:
import datetime import os today = datetime.datetime.today() fileName = os.path.join("data", today.strftime('%d-%m-%Y %H:%M')) df_clean.write.parquet(fileName, partitionBy="state", compression="gzip")
In today’s Spark Dataframes Tutorial, we have learned about the basics. In our next tutorial, we will look at grouping, filtering and sorting data.
If you enjoyed this tutorial, make sure to read the entire Apache Spark Tutorial. I regularly update this tutorial with new content. Also, I created several other tutorials, such as the Machine Learning Tutorial and the Python for Spark Tutorial. Your learning journey can still continue.