Apache Spark sql example

apache spark sql limitations and apache spark sql programming guide and apache spark sql query
HartJohnson Profile Pic
HartJohnson,United States,Professional
Published Date:02-08-2017
Your Website URL(Optional)
Chapter 4 Apache Spark SQL In this chapter, I would like to examine Apache Spark SQL, the use of Apache Hive with Spark, and DataFrames. DataFrames have been introduced in Spark 1.3, and are columnar data storage structures, roughly equivalent to relational database tables. The chapters in this book have not been developed in sequence, so the earlier chapters might use older versions of Spark than the later ones. I also want to examine user-defined functions for Spark SQL. A good place to find information about the Spark class API is:spark.apache.org/docs/version/api/scala/index.html. I prefer to use Scala, but the API information is also available in Java and Python formats. Theversion value refers to the release of Spark that you will be using—1.3.1. This chapter will cover the following topics: • SQL context • Importing and saving data • DataFrames • Using SQL • User-defined functions • Using Hive Before moving straight into SQL and DataFrames, I will give an overview of the SQL context. 95 aApache Spark SQL The SQL context The SQL context is the starting point for working with columnar data in Apache Spark. It is created from the Spark context, and provides the means for loading and saving data files of different types, using DataFrames, and manipulating columnar data with SQL, among other things. It can be used for the following: • Executing SQL via the SQL method • Registering user-defined functions via the UDF method • Caching • Configuration • DataFrames • Data source access • DDL operations I am sure that there are other areas, but you get the idea. The examples in this chapter are written in Scala, just because I prefer the language, but you can develop in Python and Java as well. As shown previously, the SQL context is created from the Spark context. Importing the SQL context implicitly allows you to implicitly convert RDDs into DataFrames: val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ For instance, using the previousimplicits call, allows you to import a CSV file and split it by separator characters. It can then convert the RDD that contains the data into a data frame using thetoDF method. It is also possible to define a Hive context for the access and manipulation of Apache Hive database table data (Hive is the Apache data warehouse that is part of the Hadoop eco-system, and it uses HDFS for storage). The Hive context allows a superset of SQL functionality when compared to the Spark context. The use of Hive with Spark will be covered in a later section in this chapter. Next, I will examine some of the supported file formats available for importing and saving data. 96 aChapter 4 Importing and saving data I wanted to add this section about importing and saving data here, even though it is not purely about Spark SQL, so I could introduce concepts such as Parquet and JSON file formats. This section also allows me to cover how to access and save data in loose text; as well as the CSV, Parquet and JSON formats, conveniently, in one place. Processing the Text files Using the Spark context, it is possible to load a text file into an RDD using the textFile method. Also, thewholeTextFile method can read the contents of a directory into an RDD. The following examples show how a file, based on the local file system ( file://), or HDFS (hdfs://) can be read into a Spark RDD. These examples show that the data will be partitioned into six parts for increased performance. The first two examples are the same, as they both manipulate a file on the Linux file system: sc.textFile("/data/spark/tweets.txt",6) sc.textFile("file:///data/spark/tweets.txt",6) sc.textFile("hdfs://server1:4014/data/spark/tweets.txt",6) Processing the JSON files JSON is a data interchange format, developed from Javascript. JSON actually stands for JavaScript Object Notation. It is a text-based format, and can be expressed, for instance, as XML. The following example uses the SQL context method called jsonFile to load the HDFS-based JSON data file named device.json. The resulting data is created as a data frame: val dframe = sqlContext.jsonFile("hdfs:///data/spark/device.json") Data can be saved in JSON format using the data frametoJSON method, as shown by the following example. First, the Apache Spark and Spark SQL classes are imported: import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType,StructField,StringType; 97 aApache Spark SQL Next, the object class calledsql1 is defined as is a main method with parameters. A configuration object is defined that is used to create a spark context. The master Spark URL is left as the default value, so Spark expects local mode, the local host, and the7077 port: object sql1 def main(args: ArrayString) val appName = "sql example 1" val conf = new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf) An SQL context is created from the Spark context, and a raw text file is loaded in CSV format calledadult.test.data_1x, using thetextFile method. A schema string is then created, which contains the data column names and the schema created from it by splitting the string by its spacing, and using theStructType andStructField methods to define each schema column as a string value: val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rawRdd = sc.textFile("hdfs:///data/spark/sql/adult.test.data_1x") val schemaString = "age workclass fnlwgt education " + "educational-num marital-status occupation relationship " + "race gender capital-gain capital-loss hours-per-week " + "native-country income" val schema = StructType( schemaString.split(" ").map(fieldName = StructField(fieldName, StringType, true))) 98 aChapter 4 Each data row is then created from the raw CSV data by splitting it with the help of a comma as a line divider, and then the elements are added to aRow() structure. A data frame is created from the schema, and the row data which is then converted into JSON format using thetoJSON method. Finally, the data is saved to HDFS using the saveAsTextFile method: val rowRDD = rawRdd.map(_.split(",")) .map(p = Row( p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8), p(9),p(10),p(11),p(12),p(13),p(14) )) val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema) val jsonData = adultDataFrame.toJSON jsonData.saveAsTextFile("hdfs:///data/spark/sql/adult.json") // end main // end sql1 So the resulting data can be seen on HDFS, the Hadoop l fi e system ls command below shows that the data resides in thetarget directory as a success l fi e and two part l fi es. hadoophc2nn sql hdfs dfs -ls /data/spark/sql/adult.json Found 3 items -rw-rr 3 hadoop supergroup 0 2015-06-20 17:17 /data/spark/ sql/adult.json/_SUCCESS -rw-rr 3 hadoop supergroup 1731 2015-06-20 17:17 /data/spark/ sql/adult.json/part-00000 -rw-rr 3 hadoop supergroup 1724 2015-06-20 17:17 /data/spark/ sql/adult.json/part-00001 Using the Hadoop file system's cat command, it is possible to display the contents of the JSON data. I will just show a sample to save space: hadoophc2nn sql hdfs dfs -cat /data/spark/sql/adult.json/part-00000 more "age":"25","workclass":" Private","fnlwgt":" 226802","education":" 11th","educational-num":" 99 aApache Spark SQL 7","marital-status":" Never-married","occupation":" Machine-op- inspct","relationship":" Own- child","race":" Black","gender":" Male","capital-gain":" 0","capital- loss":" 0","hours-per-we ek":" 40","native-country":" United-States","income":" =50K" Processing the Parquet data is very similar, as I will show next. Processing the Parquet files Apache Parquet is another columnar-based data format used by many tools in the Hadoop tool set for file I/O, such as Hive, Pig, and Impala. It increases performance by using efficient compression and encoding routines. The Parquet processing example is very similar to the JSON Scala code. The DataFrame is created, and then saved in a Parquet format using the save method with a type of Parquet: val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema) adultDataFrame.save("hdfs:///data/spark/sql/adult.parquet","parquet") // end main // end sql2 This results in an HDFS-based directory, which contains three Parquet-based files: a common Metadata file, a Metadata file, and a temporary file: hadoophc2nn sql hdfs dfs -ls /data/spark/sql/adult.parquet Found 3 items -rw-rr 3 hadoop supergroup 1412 2015-06-21 13:17 /data/spark/ sql/adult.parquet/_common_metadata -rw-rr 3 hadoop supergroup 1412 2015-06-21 13:17 /data/spark/ sql/adult.parquet/_metadata drwxr-xr-x - hadoop supergroup 0 2015-06-21 13:17 /data/spark/ sql/adult.parquet/_temporary Listing the contents of the metadata file, using the Hadoop file system's cat command, gives an idea of the data format. However the Parquet header is binary, and so, it does not display withmore andcat: hadoophc2nn sql hdfs dfs -cat /data/spark/sql/adult.parquet/_metadata more s% 100 aChapter 4 ct","fields":"name":"age","type":"string","nullable":true,"metadata": ,"name":"workclass ","type":"string","nullable":true,"metadata":,"name":"fnlwgt","type": "string","nullable": true,"metadata":, For more information about possible Spark and SQL context methods, check the contents of the classes calledorg.apache.spark.SparkContext, andorg.apache. spark.sql.SQLContext, using the Apache Spark API path here for the specific version of Spark that you are interested in: spark.apache.org/docs/version/api/scala/index.html In the next section, I will examine Apache Spark DataFrames, introduced in Spark 1.3. DataFrames I have already mentioned that a DataFrame is based on a columnar format. Temporary tables can be created from it, but I will expand on this in the next section. There are many methods available to the data frame that allow data manipulation, and processing. I have based the Scala code used here, on the code in the last section, so I will just show you the working lines and the output. It is possible to display a data frame schema as shown here: adultDataFrame.printSchema() root age: string (nullable = true) workclass: string (nullable = true) fnlwgt: string (nullable = true) education: string (nullable = true) educational-num: string (nullable = true) marital-status: string (nullable = true) occupation: string (nullable = true) relationship: string (nullable = true) race: string (nullable = true) gender: string (nullable = true) capital-gain: string (nullable = true) capital-loss: string (nullable = true) hours-per-week: string (nullable = true) native-country: string (nullable = true) income: string (nullable = true) 101 aApache Spark SQL It is possible to use theselect method to filter columns from the data. I have limited the output here, in terms of rows, but you get the idea: adultDataFrame.select("workclass","age","education","income").show() workclass age education income Private 25 11th =50K Private 38 HS-grad =50K Local-gov 28 Assoc-acdm 50K Private 44 Some-college 50K none 18 Some-college =50K Private 34 10th =50K none 29 HS-grad =50K Self-emp-not-inc 63 Prof-school 50K Private 24 Some-college =50K Private 55 7th-8th =50K It is possible to filter the data returned from the DataFrame using the filter method. Here, I have added the occupation column to the output, and filtered on the worker age: adultDataFrame .select("workclass","age","education","occupation","income") .filter( adultDataFrame("age") 30 ) .show() workclass age education occupation income Private 38 HS-grad Farming-fishing =50K Private 44 Some-college Machine-op-inspct 50K Private 34 10th Other-service =50K Self-emp-not-inc 63 Prof-school Prof-specialty 50K Private 55 7th-8th Craft-repair =50K There is also agroup by method for determining volume counts within a data set. As this is an income-based dataset, I think that volumes within the wage brackets would be interesting. I have also used a bigger dataset to give more meaningful results: adultDataFrame .groupBy("income") .count() 102 aChapter 4 .show() income count =50K 24720 50K 7841 This is interesting, but what if I want to compareincome brackets withoccupation, and sort the results for a better understanding? The following example shows how this can be done, and gives the example data volumes. It shows that there is a high volume of managerial roles compared to other occupations. This example also sorts the output by the occupation column: adultDataFrame .groupBy("income","occupation") .count() .sort("occupation") .show() income occupation count 50K Adm-clerical 507 =50K Adm-clerical 3263 =50K Armed-Forces 8 50K Armed-Forces 1 =50K Craft-repair 3170 50K Craft-repair 929 =50K Exec-managerial 2098 50K Exec-managerial 1968 =50K Farming-fishing 879 50K Farming-fishing 115 =50K Handlers-cleaners 1284 50K Handlers-cleaners 86 50K Machine-op-inspct 250 =50K Machine-op-inspct 1752 50K Other-service 137 =50K Other-service 3158 50K Priv-house-serv 1 =50K Priv-house-serv 148 50K Prof-specialty 1859 =50K Prof-specialty 2281 103 aApache Spark SQL So, SQL-like actions can be carried out against DataFrames, includingselect, filter, sortgroup by, andprint. The next section shows how tables can be created from the DataFrames, and how the SQL-based actions are carried out against them. Using SQL After using the previous Scala example to create a data frame, from a CSV based-data input l fi e on HDFS, I can now den fi e a temporary table, based on the data frame, and run SQL against it. The following example shows the temporary table calledadult being den fi ed, and a row count being created using COUNT(): adultDataFrame.registerTempTable("adult") val resRDD = sqlContext.sql("SELECT COUNT() FROM adult") resRDD.map(t = "Count - " + t(0)).collect().foreach(println) This gives a row count of over 32,000 rows: Count – 32561 It is also possible to limit the volume of the data selected from the table using the LIMIT SQL option, which is shown in the following example. The r fi st 10 rows have been selected from the data, this is useful if I just want to check data types and quality: val resRDD = sqlContext.sql("SELECT FROM adult LIMIT 10") resRDD.map(t = t(0) + " " + t(1) + " " + t(2) + " " + t(3) + " " + t(4) + " " + t(5) + " " + t(6) + " " + t(7) + " " + t(8) + " " + t(9) + " " + t(10) + " " + t(11) + " " + t(12) + " " + t(13) + " " + t(14) ) .collect().foreach(println) A sample of the data looks like the following: 50 Private 283676 Some-college 10 Married-civ-spouse Craft-repair Husband White Male 0 0 40 United-States 50K 104 www.finebook.irChapter 4 When the schema for this data was created in the Scala-based data frame example in the last section, all the columns were created as strings. However, if I want to filter the data in SQL usingWHERE clauses, it would be useful to have proper data types. For instance, if an age column stores integer values, it should be stored as an integer so that I can execute numeric comparisons against it. I have changed my Scala code to include all the possible types: import org.apache.spark.sql.types._ I have also now defined my schema using different types, to better match the data, and I have defined the row data in terms of the actual data types, converting raw data string values into integer values, where necessary: val schema = StructType( StructField("age", IntegerType, false) :: StructField("workclass", StringType, false) :: StructField("fnlwgt", IntegerType, false) :: StructField("education", StringType, false) :: StructField("educational-num", IntegerType, false) :: StructField("marital-status", StringType, false) :: StructField("occupation", StringType, false) :: StructField("relationship", StringType, false) :: StructField("race", StringType, false) :: StructField("gender", StringType, false) :: StructField("capital-gain", IntegerType, false) :: StructField("capital-loss", IntegerType, false) :: StructField("hours-per-week", IntegerType, false) :: StructField("native-country", StringType, false) :: StructField("income", StringType, false) :: Nil) val rowRDD = rawRdd.map(_.split(",")) .map(p = Row( p(0).trim.toInt,p(1),p(2).trim.toInt,p(3), p(4).trim.toInt,p(5),p(6),p(7),p(8), p(9),p(10).trim.toInt,p(11).trim.toInt, p(12).trim.toInt,p(13),p(14) )) 105 www.finebook.irApache Spark SQL The SQL can now use numeric filters in the WHERE clause correctly. If theage column were a string, this would not work. You can now see that the data has been filtered to give age values below 60 years: val resRDD = sqlContext.sql("SELECT COUNT() FROM adult WHERE age 60") resRDD.map(t = "Count - " + t(0)).collect().foreach(println) This gives a row count of around 30,000 rows: Count – 29917 It is possible to use Boolean logic in theWHERE-based filter clauses. The following example specifies an age range for the data. Note that I have used variables to describe theselect andfilter components of the SQL statement. This allows me to break down the statement into different parts as they become larger: val selectClause = "SELECT COUNT() FROM adult " val filterClause = "WHERE age 25 AND age 60" val resRDD = sqlContext.sql( selectClause + filterClause ) resRDD.map(t = "Count - " + t(0)).collect().foreach(println) Giving a data count of around 23,000 rows: Count – 23506 I can create compound filter clauses using the Boolean terms, such as AND,OR, as well as parentheses: val selectClause = "SELECT COUNT() FROM adult " val filterClause = "WHERE ( age 15 AND age 25 ) OR ( age 30 AND age 45 ) " val resRDD = sqlContext.sql( selectClause + filterClause ) resRDD.map(t = "Count - " + t(0)).collect().foreach(println) This gives me a row count of 17,000 rows, and represents a count of two age ranges in the data: Count – 17198 106 www.finebook.irChapter 4 It is also possible to use subqueries in Apache Spark SQL. You can see in the following example that I have created a subquery calledt1 by selecting three columns;age,education, andoccupation from the tableadult. I have then used the table calledt1 to create a row count. I have also added a filter clause acting on the age column from the tablet1. Notice also that I have addedgroup by andorder by clauses, even though they are empty currently, to my SQL: val selectClause = "SELECT COUNT() FROM " val tableClause = " ( SELECT age,education,occupation from adult) t1 " val filterClause = "WHERE ( t1.age 25 ) " val groupClause = "" val orderClause = "" val resRDD = sqlContext.sql( selectClause + tableClause + filterClause + groupClause + orderClause ) resRDD.map(t = "Count - " + t(0)).collect().foreach(println) In order to examine the table joins, I have created a version of the adult CSV data file called adult.train.data2, which only differs from the original by the fact that it has an added first column called idx, which is a unique index. The Hadoop file system'scat command here shows a sample of the data. The output from the file has been limited using the Linuxhead command: hadoophc2nn sql hdfs dfs -cat /data/spark/sql/adult.train.data2 head -2 1,39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not- in-family, White, Male, 2174, 0, 40, United-States, =50K 2,50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec- managerial, Husband, White, Male, 0, 0, 13, United-States, =50K The schema has now been redefined to have an integer-based first column called idx for an index, as shown here: val schema = StructType( StructField("idx", IntegerType, false) :: StructField("age", IntegerType, false) :: 107 www.finebook.irApache Spark SQL StructField("workclass", StringType, false) :: StructField("fnlwgt", IntegerType, false) :: StructField("education", StringType, false) :: StructField("educational-num", IntegerType, false) :: StructField("marital-status", StringType, false) :: StructField("occupation", StringType, false) :: StructField("relationship", StringType, false) :: StructField("race", StringType, false) :: StructField("gender", StringType, false) :: StructField("capital-gain", IntegerType, false) :: StructField("capital-loss", IntegerType, false) :: StructField("hours-per-week", IntegerType, false) :: StructField("native-country", StringType, false) :: StructField("income", StringType, false) :: Nil) And the raw row RDD in the Scala example now processes the new initial column, and converts the string value into an integer: val rowRDD = rawRdd.map(_.split(",")) .map(p = Row( p(0).trim.toInt, p(1).trim.toInt, p(2), p(3).trim.toInt, p(4), p(5).trim.toInt, p(6), p(7), p(8), p(9), p(10), p(11).trim.toInt, p(12).trim.toInt, p(13).trim.toInt, p(14), p(15) )) val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema) 108 www.finebook.irChapter 4 We have looked at subqueries. Now, I would like to consider table joins. The next example will use the index that was just created. It uses it to join two derived tables. The example is somewhat contrived, given that it joins two data sets from the same underlying table, but you get the idea. Two derived tables are created as subqueries, and are joined at a common index column. The SQL for a table join now looks like this. Two derived tables have been created from the temporary tableadult calledt1 andt2 as subqueries. The new row index column calledidx has been used to join the data in tablest1 andt2. The major SELECT statement outputs all seven columns from the compound data set. I have added aLIMIT clause to minimize the data output: val selectClause = "SELECT t1.idx,age,education,occupation,workclass,rac e,gender FROM " val tableClause1 = " ( SELECT idx,age,education,occupation FROM adult) t1 JOIN " val tableClause2 = " ( SELECT idx,workclass,race,gender FROM adult) t2 " val joinClause = " ON (t1.idx=t2.idx) " val limitClause = " LIMIT 10" val resRDD = sqlContext.sql( selectClause + tableClause1 + tableClause2 + joinClause + limitClause ) resRDD.map(t = t(0) + " " + t(1) + " " + t(2) + " " + t(3) + " " + t(4) + " " + t(5) + " " + t(6) ) .collect().foreach(println) Note that in the majorSELECT statement, I have to define where the index column comes from, so I uset1.idx. All the other columns are unique to thet1 andt2 datasets, so I don't need to use an alias to refer to them (that is,t1.age). So, the data that is output now looks like the following: 33 45 Bachelors Exec-managerial Private White Male 233 25 Some-college Adm-clerical Private White Male 433 40 Bachelors Prof-specialty Self-emp-not-inc White Female 633 43 Some-college Craft-repair Private White Male 833 26 Some-college Handlers-cleaners Private White Male 1033 27 Some-college Sales Private White Male 109 www.finebook.irApache Spark SQL 1233 27 Bachelors Adm-clerical Private White Female 1433 32 Assoc-voc Sales Private White Male 1633 40 Assoc-acdm Adm-clerical State-gov White Male 1833 46 Some-college Prof-specialty Local-gov White Male This gives some idea of the SQL-based functionality within Apache Spark, but what if I find that the method that I need is not available? Perhaps, I need a new function. This is where the user-defined functions (UDFs) are useful. I will cover them in the next section. User-defined functions In order to create some user-defined functions in Scala, I need to examine my data in the previous adult dataset. I plan to create a UDF that will enumerate the education column, so that I can convert the column into an integer value. This will be useful if I need to use the data for machine learning, and so create a LabelPoint structure. The vector used, which represents each record, will need to be numeric. I will first determine what kind of unique education values exist, then I will create a function to enumerate them, and finally use it in SQL. I have created some Scala code to display a sorted list of the education values. The DISTINCT keyword ensures that there is only one instance of each value. I have selected the data as a subtable, using an alias callededu_dist for the data column to ensure that theORDER BY clause works: val selectClause = "SELECT t1.edu_dist FROM " val tableClause = " ( SELECT DISTINCT education AS edu_dist FROM adult ) t1 " val orderClause = " ORDER BY t1.edu_dist " val resRDD = sqlContext.sql( selectClause + tableClause + orderClause ) resRDD.map(t = t(0)).collect().foreach(println) The data looks like the following. I have removed some values to save space, but you get the idea: 10th 11th 12th 110 www.finebook.irChapter 4 1st-4th ……….. Preschool Prof-school Some-college I have defined a method in Scala to accept the string-based education value, and return an enumerated integer value that represents it. If no value is recognized, then a special value called9999 is returned: def enumEdu( education:String ) : Int = var enumval = 9999 if ( education == "10th" ) enumval = 0 else if ( education == "11th" ) enumval = 1 else if ( education == "12th" ) enumval = 2 else if ( education == "1st-4th" ) enumval = 3 else if ( education == "5th-6th" ) enumval = 4 else if ( education == "7th-8th" ) enumval = 5 else if ( education == "9th" ) enumval = 6 else if ( education == "Assoc-acdm" ) enumval = 7 else if ( education == "Assoc-voc" ) enumval = 8 else if ( education == "Bachelors" ) enumval = 9 else if ( education == "Doctorate" ) enumval = 10 else if ( education == "HS-grad" ) enumval = 11 else if ( education == "Masters" ) enumval = 12 else if ( education == "Preschool" ) enumval = 13 else if ( education == "Prof-school" ) enumval = 14 else if ( education == "Some-college" ) enumval = 15 return enumval I can now register this function using the SQL context in Scala, so that it can be used in an SQL statement: sqlContext.udf.register( "enumEdu", enumEdu _ ) 111 www.finebook.irApache Spark SQL The SQL, and the Scala code to enumerate the data then look like this. The newly registered function calledenumEdu is used in theSELECT statement. It takes the education type as a parameter, and returns the integer enumeration. The column that this value forms is aliased to the nameidx: val selectClause = "SELECT enumEdu(t1.edu_dist) as idx,t1.edu_dist FROM " val tableClause = " ( SELECT DISTINCT education AS edu_dist FROM adult ) t1 " val orderClause = " ORDER BY t1.edu_dist " val resRDD = sqlContext.sql( selectClause + tableClause + orderClause ) resRDD.map(t = t(0) + " " + t(1) ).collect().foreach(println) The resulting data output, as a list of education values and their enumerations, looks like the following: 0 10th 1 11th 2 12th 3 1st-4th 4 5th-6th 5 7th-8th 6 9th 7 Assoc-acdm 8 Assoc-voc 9 Bachelors 10 Doctorate 11 HS-grad 12 Masters 13 Preschool 14 Prof-school 15 Some-college Another example function calledageBracket takes the adult integer age value, and returns an enumerated age bracket: def ageBracket( age:Int ) : Int = 112 www.finebook.irChapter 4 var bracket = 9999 if ( age = 0 && age 20 ) bracket = 0 else if ( age = 20 && age 40 ) bracket = 1 else if ( age = 40 && age 60 ) bracket = 2 else if ( age = 60 && age 80 ) bracket = 3 else if ( age = 80 && age 100 ) bracket = 4 else if ( age 100 ) bracket = 5 return bracket Again, the function is registered using the SQL context so that it can be used in an SQL statement: sqlContext.udf.register( "ageBracket", ageBracket _ ) Then, the Scala-based SQL uses it to select the age, age bracket, and education value from the adult dataset: val selectClause = "SELECT age, ageBracket(age) as bracket,education FROM " val tableClause = " adult " val limitClause = " LIMIT 10 " val resRDD = sqlContext.sql( selectClause + tableClause + limitClause ) resRDD.map(t = t(0) + " " + t(1) + " " + t(2) ).collect(). foreach(println) The resulting data then looks like this, given that I have used theLIMIT clause to limit the output to 10 rows: 39 1 Bachelors 50 2 Bachelors 38 1 HS-grad 53 2 11th 28 1 Bachelors 37 1 Masters 49 2 9th 113 www.finebook.irApache Spark SQL 52 2 HS-grad 31 1 Masters 42 2 Bachelors It is also possible to define functions for use in SQL, inline, during the UDF registration using the SQL context. The following example defines a function called dblAge, which just multiplies the adult's age by two. The registration looks like this. It takes integer parameters (age), and returns twice its value: sqlContext.udf.register( "dblAge", (a:Int) = 2a ) And the SQL that uses it, now selects theage, and the double of theage value called dblAge(age): val selectClause = "SELECT age,dblAge(age) FROM " val tableClause = " adult " val limitClause = " LIMIT 10 " val resRDD = sqlContext.sql( selectClause + tableClause + limitClause ) resRDD.map(t = t(0) + " " + t(1) ).collect().foreach(println) The two columns of the output data, which now contain the age and its doubled value, now look like this: 39 78 50 100 38 76 53 106 28 56 37 74 49 98 52 104 31 62 42 84 So far, DataFrames, SQL, and user-den fi ed functions have been examined, but what if, as in my case, you are using a Hadoop stack cluster, and have Apache Hive available? The adult table that I have den fi ed so far is a temporary table, but if I access Hive using Apache Spark SQL, I can access the static database tables. The next section will examine the steps needed to do this. 114 www.finebook.ir