Mark Grover
  • Home
  • Bio
  • Poetry
  • Book
  • Tech blog
  • Speaking
  • Contact Me

The power of metadata in an increasingly semi-structured world

3/22/2017

0 Comments

 
At the onset of the big data age, there was an argument for doing schema-on-read, instead of schema-on-write. This encouraged practitioners to just dump their data in a data lake without having to worry about its schema. That was a blessing for many. However, it turns out that all data has metadata, we just got lazier and stopped labelling our data which made accessing it more difficult. Who knows what this data set contains? What schema does it follow? What version of the schema does it correspond to, in case it has evolved over time? And, now we seem to be retracting back to make sure that we add metadata to our data sets, metadata that is flexible enough to allow for multiple views of same information and evolvable enough to allow for changes in the future. Today's blog post is about one of the centerpieces of metadata in the big data ecosystem - the Hive metastore.

Apache Hive was, of course, the first SQL-on-Hadoop engine and with it came a small service that talked Thrift, called the Hive metastore service (or Hive metastore server). This service was backed by a relational database of the likes of PostgreSQL, MySQL, etc. and it houses metadata about big data. This includes information like what is the table name associated with a data set, its columns and types (so you can query them), where the data set is stored (on HDFS, S3, HBase, Cassandra, etc.), how it's stored, where applicable (using Parquet on HDFS, for example) and location of the data set, if applicable (/user/hive/warehouse/my_data_set on HDFS). It was then expanded to include further metadata to speed up queries. This started off with partitioning info (sub-directories in your data set, most of which could be filtered out if you chose the right where clause), bucketing and sorting info i.e how the data set was "bucketed" (in other words, hash partitioned) in files and how was the data sorted in those files. Bucketing and sorting metadata could cut down time considerably on certain SQL operations. For example, if you were joining two large tables, joining them had they been bucketed and sorted would be much faster (of the order of square of number of buckets) because the join can now be on done on individual buckets (assuming they have the same number of buckets) instead of the entire tables. Then, the Hive metastore was further expanded to store statistics about the data sets - how many records, histograms and distribution of particular keys in the data set so they could be used to optimize query planning.

But, now new SQL-on-Hadoop engines are popping up every day and Hive usage is being replaced by the likes of Spark SQL, Presto and others. However, the Hive metastore still remains the de-facto repository of metadata in the big data ecosystem. All SQL engines - Spark SQL, Presto, Impala, etc. integrate with Hive metastore - they are able to use it as their catalog of metadata and optionally statistics so disparate users with different software systems can have a single consistent view of metadata.

As you bring in big data practices in your organization, I encourage to think of what metadata you can store to speed up your queries or in some cases, get rid of queries (for example, hitting the metastore for the query, instead of actual data). Paramount to this is to configure your tools, to ensure that the metadata is up-to-date and being used to drive queries. And, as you do this, don't forget to leverage the de-facto metadata repository - Hive metastore.
0 Comments

Spark SQL Gotchas

11/19/2015

0 Comments

 
Picture
Apache Spark has had support for running SQL queries on Spark for a while. In particular, you can share the metadata (names of tables, column names and types, storage format, etc.) with the Hive metastore. The latter has become the de-facto metadata repository in the Hadoop ecosystem.

In this brief post, I will share with you some Spark SQL gotchas (as of Apache Spark 1.5.1):

​1. Don't run multiple SQL statements separated by semicolon.
Let's say you want to create a table named my_table but delete it if it exists, before you create it. You may want to write something like this on the spark-shell:
spark-shell> // Spark Shell creates a HiveContext for you, to launch SQL queries.
// This context can be used by referring to the sqlContext variable.
spark-shell> // Don't do this
spark-shell> sqlContext.sql("DROP TABLE IF EXISTS my_table; CREATE TABLE my_table (col INT)")
In most SQL systems, you can separate 2 SQL statements with a semicolon and expect them to run one at at time, one after the other. However, in Spark SQL, they are treated just as one statement so the above doesn't work. So, you have to run each of the statements separately.
spark-shell> // Do this
spark-shell> sqlContext.sql("DROP TABLE IF EXISTS my_table)
spark-shell> sqlContext.sql("CREATE TABLE my_table (col INT)")

​2. If you are writing your own Spark app (i.e. not using the spark-shell), be sure to create a HiveContext instead of a SQLContext object for running SQL queries.
    val conf = new SparkConf().setAppName(this.getClass.toString)
    val sc = new SparkContext
    // Don't do this
    val sqlContext = new SQLContext(sc)
If you use SQLContext, you may get an error like:
Exception in thread "main" java.lang.RuntimeException: [1.90] failure: ``UNION'' expected but `left' found
This is because you are using SQLContext, instead of HiveContext. SQLContext implements a very small subset of the SQL dialect. Instead, it's recommended to write the query in HiveQL (which is exactly the same as "SQL" in most practical cases) and use HiveContext instead. HiveContext forwards most of the queries to Hive for parsing, thereby piggybacking off of existing parsing functionality in Hive, which is much more encompassing than the one implemented in SQLContext.
    val conf = new SparkConf().setAppName(this.getClass.toString)
    val sc = new SparkContext
    // Do this
    val sqlContext = new HiveContext(sc)
Hope that was helpful!
0 Comments

Application Architectures with Apache Hadoop talk at East Bay Java Users Group (JUG)

6/23/2014

0 Comments

 
Hi everyone!

I am excited to be giving a talk on Application Architectures with Apache Hadoop on Wednesday, June 25th, at East Bay JUG. This talk is, of course, inspired by the same motivations behind the book, but has a fun twist. I had promised, Chris Richardson, the organizer of the JUG, that I will make this talk more Java friendly. So, I wrote some MapReduce code for doing sessionization of clicks, a very common algorithm used in clickstream analytics and many slides describing the Clickstream analytics use case and walking through the sessionization code. Also, in the same code repo, you will find code for doing the same sessionization in Hive and in Spark (thanks to my talented co-author Gwen Shapira).

The talk starts off with a quick introduction to the case study of Clickstream Analytics, what has been the status quo without using Hadoop and how Hadoop has changed the said status quo. It, then, walks through the high level design, and goes into the details of doing sessionization in MapReduce. Depending on how much time we have left, we will talk about various other architectural considerations for ingesting, storing and processing data in Hadoop, continuing to use Clickstream Analytics as an example. The presentation, as of now, has about 100 slides, the intent is definitely not to cover all of those (thank God!) but talk more about the sections that interest the audience and dive deep into those areas, ignoring the rest for another time.

If you are in the San Francisco bay area this Wednesday, come join us!

- Mark (Follow me on Twitter)

And, here are the presentation slides:
Application architectures with hadoop and Sessionization in MR from markgrover
0 Comments

How to write a Hive UDF

6/3/2012

10 Comments

 
Introduction
This article talks about how to write a UDF in Apache Hive and provides some tips, tricks and insights related to writing UDFs. 

What is a UDF?
Apache Hive comes with a set of pre-defined User Defined Functions (aka UDFs) available for use. A complete listing of Hive UDFs is available here. These UDFs are equivalent of functions like hex(), unhex(), from_unixtime(), unix_timestamp() in MySQL. The Hive community has made an effort to make most of the commonly used UDFs available to users. However, often times, the existing UDFs are not good enough for us and users want to write their own custom UDF. This article goes through that process.

UDF vs. Generic UDF
Hive UDFs are written in Java. In order to create a Hive UDF you need to derive from one of two classes UDF or GenericUDF. Here is a small comparison table identifying the differences between writing a UDF that derives from UDF class vs. GenericUDF class:

UDF GenericUDF
Easier to develop A little more difficult to develop
Lower performance due to use of reflection Better performance because use of lazy evaluation and short-circuiting
Doesn't accept some non-primitive parameters like struct Supports all non-primitive parameters as input parameters and return types
(Thanks to Steve Waggoner for suggesting 2 corrections to the above table.)

So, why am I telling you all this?
Deriving your UDF from UDF class will make you develop faster but the code wouldn't be scalable and arguably less performing. Using GenericUDF has some learning curve to it but will allow your UDF to be more scalabale. Moreover, this article aims to reduce the steepness of that learning curve to make GenericUDF use as easy as UDF class' use.

My recommendation: Read along and use GenericUDF.

Sample Code
The code we are going to use as a reference for learning how to write a UDF is my code for a UDF that I created, translate(). The code is available in Apache Hive's trunk.

We wouldn't go into the nitty gritty of what this code does but learn about the general semantics of writing a UDF using GenericUDF. You need to overwrite 3 methods: initialize(), evaluate() and getDisplayString().

Annotations
In general, you should annotate your UDF with the following annotations (replace the values of various parameters based on your use case):
  • @UDFType(deterministic = true)
A deterministic UDF is one which always gives the same result when passed the same parameters. An example of such UDF are length(string input), regexp_replace(string initial_string, string pattern, string replacement), etc. A non-deterministic UDF, on the other hand can return different result for the same set of parameters. For example, unix_timestamp() returns the current timestamp using the default time zone. Therefore, when unix_timestamp() is invoked with the same parameters (no parameters) at different times, different results are obtained, making it non-deterministic. This annotation allows Hive to perform some optimization if the UDF is deterministic.
  • @Description(name="my_udf", value="This will be the result returned by explain statement.", extended = "This will be result returned by the explain extended statement.")
This annotation tells Hive the name of your UDF. It will also be used to populate the result of queries like `DESCRIBE FUNCTION MY_UDF` or `DESCRIBE FUNCTION EXTENDED MY_UDF`

initialize()
This method only gets called once per JVM at the beginning to initilize the UDF. initilialize() is used to assert and validate the number and type of parameters that a UDF takes and the type of argument it returns. For example, translate udf takes in 3 string parameters and returns a string param. As you will see in the code, initialize() does the following:
  • Asserts that the expected number of parameters are received by the UDF
  • In the example, we were expecting all parameters to be of String type. Therefore, this method iterates through all the parameters and ensures that they are of  Primitive Category (non-primitive data types in Hive include Arrays, Maps and Structs). Once asserted, that primitives are sent as arguments, this method asserts that the primitive category of each of the parameters is a String or Void. We need to check for Void category since NULLs passed to the UDF are treated as Primitive datatypes with primitive category of VOID. If you are expecting an argument of non-primitive type, you may have to additional checks for asserting the primitive types the datatype encapsulates (e.g. ensuring that the array being sent as a parameter is an array of strings). TODO: be more precise here
  • Transforms the ObjectInspector array (length same as the number of arguments) received to a Converter array. These converters are statically stored and use to retrieve the values of each of the parameters in the evaluate() (more on that later).
  • Returns an ObjectInspector corresponding to the return type of the UDF. For returning a String output, we would return PrimitiveObjectInspectorFactory.writableStringObjectInspector.
evaluate()
This method is called once for every row of data being processed. This method would use the Converter array populated in initialize() to retrieve the value of the parameters for the row in question. You would then put the logic to compute the return value based on the value of the parameters and return that value. In the case of Translate UDF, a string is being returned.

getDisplayString()
A simple method for returning the display string for the UDF

Having implemented the above 3 methods, you are all set to build and test out your UDF.

Building the UDF
In order to build the UDF, you will need to have hive-exec*.jar in your Java build path. For example, if you are using maven, you will a snippet like the following in your pom.xml    

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>0.9.0</version>
    </dependency>


You may change the version number to the version of Hive you are compiling against, although higher versions are backwards compatible.

Deploying and testing the UDF
Once your have built a JAR containing your UDF, you will have to run the following commands on Hive to make your UDF available for use:
hive> add jar my_udf.jar;
hive> create temporary function my_udf as 'com.me.udf.GenericMyUdf';
hive> select my_udf(col1, col2, col3) from my_table limit 5;
10 Comments

    Mark's Blog

    Technical writings by Mark Grover

    Picture

    Categories

    All
    Hadoop Application Architectures
    Hive

    RSS Feed

Powered by Create your own unique website with customizable templates.
  • Home
  • Bio
  • Poetry
  • Book
  • Tech blog
  • Speaking
  • Contact Me