Spark onto a Next Level with UDFs
- Vishakh Rameshan
- Jan 17, 2021
- 3 min read
While woking on a technology up-gradation from a legacy application that is built on shell scripts and green plum functions to a spark+java application, I encountered many hurdles while converting complex GP functions being called from shell script. The main problem was that, in the legacy application they work on per record basis, but when it comes to spark we get the entire input data and performing record level operations are tedious.
Let me give you an example to highlight my problem.
From a shell script, sql select query was fired and then under a for loop each records were taken and some validations where performed like some couple columns were taken and based on some pre configured rules if they don't satisfy the condition then those records were marked as failure and others where put as success.
When I tried to mimic the same in spark the iteration and record level operation wasn't straight forward as I thought it to be. Performing the first complex sql select query was done easily using Spark SQL, but iterating on millions of records using traditional java for loops were taking high amount of time and computation as, for loops are not meant to work in a distributed style processing.
One of the reason of migrating to Spark was to improve the processing time and due to the above approach it took more time than the legacy one.
To find a solution to improve the performance, like everyone does I did a deep dive search on Google and found the Spark UDFs

So, What is Spark UDF ?
Official definition - User-Defined Functions is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.
My Own Definition - User-Defined Functions is a feature of Spark SQL to perform record level operations on one or more columns and returning a single column as output.
How can we create and invoke it?
For sake of simplicity, let's say you have an input file in csv/txt with some user data like first name, age etc. Now you want to perform a validation check on each user record to see if that person's age is between 0 to 100. If his/her age is less than 0 or greater than 100 he is not human.
Now you may thinking why to go for UDF as this can be achieved with a simple sql statement. True you can but to make it complex, let's say you have 10 columns and complex logics between those 10 columns and then return the result as a separate column. It will be difficult to perform using withColumn or sql statements
To create UDF
import org.apache.spark.sql.api.java.UDF1;
public class AgeValidator implements UDF1<String, String>{
private static final long serialVersionUID = 1000008695875L;
@Override
public String call(String age) throws Exception {
if(Integer.parseInt(age) > 0 && Integer.parseInt(age) < 100){
return "Is a Human Being";
}
return "Is not a Human Being";
}
}
To Register UDF and Invoke it
@Test
public void testUdf() {
try (SparkSession sparkSession = SparkSession.builder().appName("demo-udf").master("local").getOrCreate()) {
Dataset<Row> samplePersonDataset = sparkSession.read().option("header", "true").csv("src/test/resources/person_data.csv");
samplePersonDataset.show();
sparkSession.sqlContext().udf().register("age_validation", new AgeValidator(), DataTypes.StringType);
Dataset<Row> validatedDataset = samplePersonDataset.withColumn("valid_age",
functions.callUDF("age_validation", samplePersonDataset.col("age")));
validatedDataset.show();
}
}
Input Dataset read
+-------+---+
| name|age|
+-------+---+
| tom| 32|
|recardo| 0|
|tornado|100|
| kevin| 9|
+-------+---+
Output Dataset after UDF is applied
+-------+---+--------------------+
| name|age| valid_age|
+-------+---+--------------------+
| tom| 32| Is a Human Being|
|recardo| 0|Is not a Human Being|
|tornado|100|Is not a Human Being|
| kevin| 9| Is a Human Being|
+-------+---+--------------------+
Note:-
The UDF name while registering and using must be same.
UDF's can be used only in situation if you want a perform computation on record level on 1 or many (22 is limit) columns and return just a single column as result
UDF 1 has 2 argument one is input column type and other one is result type. If you have 2 columns as input then its UDF2 with 3 args, same way UDF3, UDF4 etc
Comments