At the onset of the big data age, there was an argument for doing schema-on-read, instead of schema-on-write. This encouraged practitioners to just dump their data in a data lake without having to worry about its schema. That was a blessing for many. However, it turns out that all data has metadata, we just got lazier and stopped labelling our data which made accessing it more difficult. Who knows what this data set contains? What schema does it follow? What version of the schema does it correspond to, in case it has evolved over time? And, now we seem to be retracting back to make sure that we add metadata to our data sets, metadata that is flexible enough to allow for multiple views of same information and evolvable enough to allow for changes in the future. Today's blog post is about one of the centerpieces of metadata in the big data ecosystem - the Hive metastore.
Apache Hive was, of course, the first SQL-on-Hadoop engine and with it came a small service that talked Thrift, called the Hive metastore service (or Hive metastore server). This service was backed by a relational database of the likes of PostgreSQL, MySQL, etc. and it houses metadata about big data. This includes information like what is the table name associated with a data set, its columns and types (so you can query them), where the data set is stored (on HDFS, S3, HBase, Cassandra, etc.), how it's stored, where applicable (using Parquet on HDFS, for example) and location of the data set, if applicable (/user/hive/warehouse/my_data_set on HDFS). It was then expanded to include further metadata to speed up queries. This started off with partitioning info (sub-directories in your data set, most of which could be filtered out if you chose the right where clause), bucketing and sorting info i.e how the data set was "bucketed" (in other words, hash partitioned) in files and how was the data sorted in those files. Bucketing and sorting metadata could cut down time considerably on certain SQL operations. For example, if you were joining two large tables, joining them had they been bucketed and sorted would be much faster (of the order of square of number of buckets) because the join can now be on done on individual buckets (assuming they have the same number of buckets) instead of the entire tables. Then, the Hive metastore was further expanded to store statistics about the data sets - how many records, histograms and distribution of particular keys in the data set so they could be used to optimize query planning. But, now new SQL-on-Hadoop engines are popping up every day and Hive usage is being replaced by the likes of Spark SQL, Presto and others. However, the Hive metastore still remains the de-facto repository of metadata in the big data ecosystem. All SQL engines - Spark SQL, Presto, Impala, etc. integrate with Hive metastore - they are able to use it as their catalog of metadata and optionally statistics so disparate users with different software systems can have a single consistent view of metadata. As you bring in big data practices in your organization, I encourage to think of what metadata you can store to speed up your queries or in some cases, get rid of queries (for example, hitting the metastore for the query, instead of actual data). Paramount to this is to configure your tools, to ensure that the metadata is up-to-date and being used to drive queries. And, as you do this, don't forget to leverage the de-facto metadata repository - Hive metastore.
Apache Spark has had support for running SQL queries on Spark for a while. In particular, you can share the metadata (names of tables, column names and types, storage format, etc.) with the Hive metastore. The latter has become the de-facto metadata repository in the Hadoop ecosystem.
In this brief post, I will share with you some Spark SQL gotchas (as of Apache Spark 1.5.1): 1. Don't run multiple SQL statements separated by semicolon. Let's say you want to create a table named my_table but delete it if it exists, before you create it. You may want to write something like this on the spark-shell: spark-shell> // Spark Shell creates a HiveContext for you, to launch SQL queries. // This context can be used by referring to the sqlContext variable. spark-shell> // Don't do this spark-shell> sqlContext.sql("DROP TABLE IF EXISTS my_table; CREATE TABLE my_table (col INT)")
In most SQL systems, you can separate 2 SQL statements with a semicolon and expect them to run one at at time, one after the other. However, in Spark SQL, they are treated just as one statement so the above doesn't work. So, you have to run each of the statements separately.
spark-shell> // Do this spark-shell> sqlContext.sql("DROP TABLE IF EXISTS my_table) spark-shell> sqlContext.sql("CREATE TABLE my_table (col INT)") 2. If you are writing your own Spark app (i.e. not using the spark-shell), be sure to create a HiveContext instead of a SQLContext object for running SQL queries. val conf = new SparkConf().setAppName(this.getClass.toString) val sc = new SparkContext // Don't do this val sqlContext = new SQLContext(sc)
If you use SQLContext, you may get an error like:
Exception in thread "main" java.lang.RuntimeException: [1.90] failure: ``UNION'' expected but `left' found
This is because you are using SQLContext, instead of HiveContext. SQLContext implements a very small subset of the SQL dialect. Instead, it's recommended to write the query in HiveQL (which is exactly the same as "SQL" in most practical cases) and use HiveContext instead. HiveContext forwards most of the queries to Hive for parsing, thereby piggybacking off of existing parsing functionality in Hive, which is much more encompassing than the one implemented in SQLContext.
val conf = new SparkConf().setAppName(this.getClass.toString) val sc = new SparkContext // Do this val sqlContext = new HiveContext(sc)
Hope that was helpful!
Hi everyone!
I am excited to be giving a talk on Application Architectures with Apache Hadoop on Wednesday, June 25th, at East Bay JUG. This talk is, of course, inspired by the same motivations behind the book, but has a fun twist. I had promised, Chris Richardson, the organizer of the JUG, that I will make this talk more Java friendly. So, I wrote some MapReduce code for doing sessionization of clicks, a very common algorithm used in clickstream analytics and many slides describing the Clickstream analytics use case and walking through the sessionization code. Also, in the same code repo, you will find code for doing the same sessionization in Hive and in Spark (thanks to my talented co-author Gwen Shapira). The talk starts off with a quick introduction to the case study of Clickstream Analytics, what has been the status quo without using Hadoop and how Hadoop has changed the said status quo. It, then, walks through the high level design, and goes into the details of doing sessionization in MapReduce. Depending on how much time we have left, we will talk about various other architectural considerations for ingesting, storing and processing data in Hadoop, continuing to use Clickstream Analytics as an example. The presentation, as of now, has about 100 slides, the intent is definitely not to cover all of those (thank God!) but talk more about the sections that interest the audience and dive deep into those areas, ignoring the rest for another time. If you are in the San Francisco bay area this Wednesday, come join us! - Mark (Follow me on Twitter) And, here are the presentation slides: Introduction This article talks about how to write a UDF in Apache Hive and provides some tips, tricks and insights related to writing UDFs. What is a UDF? Apache Hive comes with a set of pre-defined User Defined Functions (aka UDFs) available for use. A complete listing of Hive UDFs is available here. These UDFs are equivalent of functions like hex(), unhex(), from_unixtime(), unix_timestamp() in MySQL. The Hive community has made an effort to make most of the commonly used UDFs available to users. However, often times, the existing UDFs are not good enough for us and users want to write their own custom UDF. This article goes through that process. UDF vs. Generic UDF Hive UDFs are written in Java. In order to create a Hive UDF you need to derive from one of two classes UDF or GenericUDF. Here is a small comparison table identifying the differences between writing a UDF that derives from UDF class vs. GenericUDF class:
(Thanks to Steve Waggoner for suggesting 2 corrections to the above table.)
So, why am I telling you all this? Deriving your UDF from UDF class will make you develop faster but the code wouldn't be scalable and arguably less performing. Using GenericUDF has some learning curve to it but will allow your UDF to be more scalabale. Moreover, this article aims to reduce the steepness of that learning curve to make GenericUDF use as easy as UDF class' use. My recommendation: Read along and use GenericUDF. Sample Code The code we are going to use as a reference for learning how to write a UDF is my code for a UDF that I created, translate(). The code is available in Apache Hive's trunk. We wouldn't go into the nitty gritty of what this code does but learn about the general semantics of writing a UDF using GenericUDF. You need to overwrite 3 methods: initialize(), evaluate() and getDisplayString(). Annotations In general, you should annotate your UDF with the following annotations (replace the values of various parameters based on your use case):
initialize() This method only gets called once per JVM at the beginning to initilize the UDF. initilialize() is used to assert and validate the number and type of parameters that a UDF takes and the type of argument it returns. For example, translate udf takes in 3 string parameters and returns a string param. As you will see in the code, initialize() does the following:
This method is called once for every row of data being processed. This method would use the Converter array populated in initialize() to retrieve the value of the parameters for the row in question. You would then put the logic to compute the return value based on the value of the parameters and return that value. In the case of Translate UDF, a string is being returned. getDisplayString() A simple method for returning the display string for the UDF Having implemented the above 3 methods, you are all set to build and test out your UDF. Building the UDF In order to build the UDF, you will need to have hive-exec*.jar in your Java build path. For example, if you are using maven, you will a snippet like the following in your pom.xml <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>0.9.0</version> </dependency> You may change the version number to the version of Hive you are compiling against, although higher versions are backwards compatible. Deploying and testing the UDF Once your have built a JAR containing your UDF, you will have to run the following commands on Hive to make your UDF available for use: hive> add jar my_udf.jar; hive> create temporary function my_udf as 'com.me.udf.GenericMyUdf'; hive> select my_udf(col1, col2, col3) from my_table limit 5; |
Mark's BlogTechnical writings by Mark Grover Categories |