SquareCog's SquareBlog

Pig trick to register latest version of jar from HDFS

Posted in Uncategorized by squarecog on August 30, 2011

%default guavaJar `hadoop fs -ls lib/*guava*jar | awk '{print $8;}' | sort -n | head -1`
register 'hdfs://$guavaJar'

The same idea also works without HDFS being involved, of course.

Pig Variables and Semicolons

Posted in programming by squarecog on August 24, 2011

Pay attention to where you put your semicolons lest they become part of the value!

Pig Script:

%default QUOTED '20090101';

l = load '/logs/$QUOTED';
l = load '/logs/$UNQUOTED';

After preprocessing with pig -x local -r script.pig:


l = load '/logs/20090101';
l = load '/logs/20090101;';

Tagged with:

Hadoop requires stable hashCode() implementations

Posted in Uncategorized by squarecog on February 20, 2011

It makes sense, really, once you think about it, but first you have to know to think about it.

Let’s review the basics.

To write a Map-Reduce job, you implement a Mapper and a Reducer. The mapper takes in pairs of keys and values, and outputs pairs of keys and values. Both keys and values have to implement the Writable interface, which is how Hadoop deals with serializing them. The keys have to implement WritableComparable, a superset of Writable that also implements, unsurprisingly, Comparable. They have to be Comparable because Hadoop sorts keys for the Reducers.

The important bit here is that a partitioner is used to determine which of the reducers a key-value pair should go to. Most of the time the default HashPartitioner is used. The HashPartitioner is very simple — for every key, Hadoop invokes hashCode() and routes based on the result. If you just implement the WritableComparable interface, you will inherit Java Object’s hashCode(). Here’s a little extract from Object’s hashCode() javadoc (emphasis mine):

Whenever it is invoked on the same object more than once during an execution of a Java application, the hashCode method must consistently return the same integer, provided no information used in equals comparisons on the object is modified. This integer need not remain consistent from one execution of an application to another execution of the same application.

So, we extract some key on mapper 1, extract same key on mapper 2, both get hashed in order to determine what reducer they should go to, and your two entries with the same key go to two different reducers because Object does not guarantee that hashCode() returns the same thing for the same key in two different JVM instances.

Now, this is not a problem most of the time because there are a ton of WritableComparable implementations that have hashCode() that is stable across different JVMs including, of course, all the common ones — LongWritable, Text, etc. It is, however, a problem is you are wrapping complex Java objects and proxy to their hashCode() implementations. A correct hashCode() implementation does not have to return the same value in different instantiations of the application, and many, in fact, do not. For example, Protocol Buffer messages. Their implementation of hashCode(), while correct, is not stable. Which presents problems if you are trying to wrap them for use as Hadoop Writables. This showed up as issue 28 in Elephant-Bird, our collection of classes for working with lzo compression, Protocol Buffers, and Thrift objects in Hadoop. The fix is pretty simple — we just call Arrays.hashCode() on a serialized representation of the message, and make sure the serialization is cached. We’ll have to serialize it anyway when Hadoop wants to write it out to disk, so there’s no real overhead.

   * <p>Returns a hashCode that is stable across multiple instances of JVMs.
   * (<code>hashCode()</code> is not required to return the same value in
   * different instances of the same applications in Java, just in a
   * single instance of the application; Hadoop imposes a more strict requirement.)
  public int hashCode() {
    byte[] bytes = serialize();
    return (bytes == null) ? 31 : Arrays.hashCode(bytes);

Now, the fun part. We fixed the bug, now we have to write a test. Our test requires two JVMs — it does not manifest itself in a single JVM, by its very nature. A few terrible ideas came to mind, like shelling out to a java main through Runtime.exec, but eventually it occurred to me that Apache Ant already does this and there must be something I can use. It’s a bit gnarly, but here’s the test — we use Ant’s Java class, set it up with the proper environment, and exec a main() in it.

  public void testStableHashcodeAcrossJVMs() throws IOException {
    int expectedHashCode = referenceAbWritable.hashCode();
    Java otherJvm = new Java();
    for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
      Environment.Variable var = new Environment.Variable();
    for (String prop : System.getProperties().stringPropertyNames()) {
      String propValue = System.getProperty(prop);
      Environment.Variable var = new Environment.Variable();
    otherJvm.setDir(new File(System.getProperty("java.io.tmpdir")));
    File tmpOut = File.createTempFile("otherJvm", "txt");
    DataInputStream is = new DataInputStream(new FileInputStream(tmpOut));
    assertEquals(expectedHashCode, is.readInt());

  public static class OtherJvmClass {
    /* Used for testStableHashcodeAcrossJVMs */
    public static void main(String[] args) throws IOException {
      int hashCode = referenceAbWritable.hashCode();
      File tmpFile = new File(args[0]);
      DataOutputStream os = new DataOutputStream(new FileOutputStream(tmpFile));

There is probably a better way to communicate between the processes than through a temp file, but nothing terribly obvious showed up when I browsed the javadocs, and this did get the job done. The test reliably fails when applied to the original implementation of ProtobufWritable, and passes in the new version.

Incrementing Hadoop Counters in Apache Pig

Posted in programming by squarecog on December 24, 2010

Information about incrementing Hadoop counters from inside Pig UDFs is not currently well-documented, judging by the user list traffic, so this is a brief note showing how to do that.

Hadoop counters are a way to report basic statistics of a job in Hadoop. I won’t go into a detailed discussion what they are and when to use them here — there’s plenty of information about that on the internet (for starters, see the Cloud9 intro to Counters, and some guidelines for appropriate usage in “Apache Hadoop Best Practices and Anti-Patterns”).

Pig 0.6 and before

Counters were not explicitly supported in Pig 0.6 and before, but you could get at them with this hack (inside a UDF):

Reporter reporter = PigHadoopLogger.getInstance().getReporter()
if (reporter != null) {
  reporter.incrCounter(myEnum, 1L);

Pig 0.8
Pig 0.8 has an “official” method for getting and incrementing counters from a UDF:

PigStatusReporter reporter = PigStatusReporter.getInstance();
if (reporter != null) {

You can also get Counters programmatically if you are invoking Pig using PigRunner, and getting a PigStats object on completion. It’s a bit involved:

PigStats.JobGraph jobGraph = pigStats.getJobGraph();
for (JobStats jobStats :  jobGraph) {
  Counters counters = jobStats.getHadoopCounters();

Pig 0.7
Unfortunately I don’t know of a way to do this in 0.7, as the old hack went away and the new PigStatusReporter hadn’t been added yet. If you have a trick, please comment.

Watch out for nulls
We’ve observed that sometimes the reporter is null for a bit even when a UDF is executing on the MR side. To deal with this, we added a little helper class PigCounterHelper to Elephant-Bird that buffers the writes in a Map, and flushes them when it gets a non-null counter.

So there. If someone asks about counters in Pig, send them here.

Tagged with: ,

New Features in Apache Pig 0.8

Posted in programming by squarecog on December 19, 2010

The Pig 0.8 release includes a large number of bug fixes and optimizations, but at the core it is a feature release. It’s been in the works for almost a full year (most of the work on 0.7 was completed by January of 2009, although it took a while to actually get the release out), and the amount of time spent on 0.8 really shows.

I meant to describe these in detail in a series of posts, but it seems blogging regularly is not my forte. This release is so chock-full of great new features, however, that I feel compelled to at least list them. So, behold, in no particular order, a non-exhaustive list of new features I am excited about in Pig 0.8:

  • Support for UDFs in scripting languages
  • This is exactly what it sounds like — if your favorite language has a JVM implementation, it can be used to create Pig UDFs.

    Pig now ships with support for UDFs in Jython, but other languages can be supported by implementing a few interfaces. Details about the Pig UDFs in Python can be found here: http://pig.apache.org/docs/r0.8.0/udf.html#Python+UDFs

    This is the outcome of PIG-928; it was quite a pleasure to watch this develop over time — while most Pig tickets wind up getting worked on by at most one or two people, this turned into a collaboration of quite a few developers, many of them new to the project — Kishore Gopalakrishna’s patch was the initial conversation starter, which was then hacked on or merged into similar work by Woody Anderson, Arnab Nandi, Julien Le Dem, Ashutosh Chauhan and Aniket Mokashi (Aniket deserves an extra shout-out for patiently working to incorporate everyone’s feedback and pushing the patch through the last mile).

  • PigUnit
  • A contribution by Romain Rigaux, PigUnit is exactly what it sounds like — a tool that simplifies the Pig users’ lives by giving them a simple way to unit test Pig scripts.

    The documentation at http://pig.apache.org/docs/r0.8.0/pigunit.html and the code at http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java?view=markup speak for themselves as far as usage.

  • PigStats
  • Pig can now provide much better visibility into what is going on inside a Pig job than it ever did before, thanks to extensive work by Richard Ding (see PIG-1333 and PIG-1478). This feature is a feature in three parts:

    1. Script statistics.
    This is the most easily visible change. At the end of running a script, Pig will output a table with some basic statistics regarding the jobs that it ran. It looks something like this:

    Job Stats (time in seconds):

    JobId Maps Reduces Max
    Alias Feature Outputs
    job_xxx 1654 218 84 6 14 107 87 99 counted_data,
    job_xxx 2 1 9 6 7 13 13 13 ordered_data SAMPLER
    job_xxx 2 1 26 18 22 31 31 31 ordered_data ORDER_BY hdfs://tmp/out,

    This is extremely useful when debugging slow jobs, as you can immediately identify which stages of your script are slow, and correlate the slow Map-Reduce jobs with the actual Pig operators and relations in your script — something that was not trivial before (folks often resorted to setting parallelism to slightly different numbers for every join and group just to figure out which job was doing what. No more of this!)

    2. Data in Job XML

    Pig now inserts several interesting properties into the Hadoop jobs that it generates, including the relations being generated, Pig features being used, and ids of parent Hadoop jobs. This is quite helpful when monitoring a cluster, and is also handy when examining job history using the HadoopJobHistoryLoader , now part of piggybank (use Pig to mine your job history!).

    3. PigRunner API

    The same information that is printed out when Pig runs the script from a command line is available if one uses the Java API to start Pig jobs. If you start a script using the PigRunner.run(String args[], ProgressNotificationListener listener), you will get as a result a PigStats object that gives you access to the job hierarchy, the Hadoop counters from each job, and so on. You can implement the optional ProgressNotificationListener if you want to watch the job as it progresses; the listener will be notified as different component jobs start and finish.

    Documentation of the API, new properties in the Job XML, and more, is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref1.html#Pig+Statistics

  • Scalar values
  • It’s very common to need to use some calculated statistic in a calculation to inform other calculations. For example, consider a data set that consists of people and their eye color; we want to calculate the fraction of the total population that has a given eye color. The script looks something like this:

    people = LOAD '/data/people' using PigStorage() 
      AS (person_id:long, eye_color:chararray);
    num_people = FOREACH (group people all) 
      GENERATE COUNT(people) AS total;
    eye_color_fractions = FOREACH ( GROUP people BY eye_color ) 
        group as eye_color, 
        COUNT(people) / num_people.total AS fraction;

    Pretty straightforward, except it does not work. What’s happening in the above code is that we are referencing the relation num_people from inside another relation, eye_color_fractions and this doesn’t really make sense if Pig does not know that num_people only has one row.

    In the past you had to do something hacky like joining the two relations on a constant to replicate the total into each row, and then generate the division. Needless to say, this was not entirely satisfactory. In PIG-1434 Aniket Mokashi tackled this, implementing an elegant solution that hides all of these details from the user — you can now simply cast a single-row relation as a scalar, and use it as desired. The above script becomes:

    people = LOAD '/data/people' using PigStorage() 
      AS (person_id:long, eye_color:chararray);
    num_people = FOREACH (group people all) 
      GENERATE COUNT(people) AS total;
    eye_color_fractions = FOREACH ( GROUP people BY eye_color ) 
        group as eye_color, 
        COUNT(people) / (long) num_people.total AS fraction;

    This makes the casting explicit, but Pig is now smart enough to do this implicitly as well. A runtime exception is generated if the relation being used as a scalar contains more than one tuple.

    More documentation of this feature is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref2.html#Casting+Relations+to+Scalars

  • Monitored UDFs
  • A new annotation has been added, @MonitoredUDF, which makes Pig spawn a watcher thread that kills an execution that is taking too long, and return a default value instead. This comes in handy when dealing with certain operations like complex regular expressions. More documentation is available at http://pig.apache.org/docs/r0.8.0/udf.html#Monitoring+long-running+UDFs

  • Automatic merge of small files
  • This is a simple one, but useful — when running Pig over many small files, instead of creating a map task per file (paying the overhead of scheduling and running a task for a computation that might only take a few seconds), we can merge the inputs and create a few map tasks that are a bit more hefty.

    Two properties control this behavior: pig.maxCombinedSplitSize controls the maximum size of the resulting split, and pig.splitCombination controls whether or not the feature is activated in the first place (it is on by default).

    This work is documented in the ticket PIG-1518; there are additional details in the release notes attached to the ticket.

  • Generic UDFs
  • I wrote about this one before — a small feature that allows you to invoke static Java methods as Pig UDFs without needing to wrap them in custom code.

    The official documentation is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref1.html#Dynamic+Invokers

  • Safeguards against missing PARALLEL keyword
  • One of the more common mistakes people make when writing Pig scripts is forgetting to specify parallelism for operators that need it. The default behavior used to be that this means parallelism of 1, which can lead to extremely inefficient jobs. A patch by Jeff Zhang in PIG-1249 changes this behavior to instead use a simple heuristic: if parallelism is not specified, derive the number of reducers by taking MIN(max_reducers, total_input_size / bytes_per_reducer). Max number of reducers is controlled by the property pig.exec.reducers.max (default 999) and bytes per reducer are controlled by pig.exec.reducers.bytes.per.reducer (default 1GB).

    This is a safeguard, not a panacea; it only works with file-based input, estimates number of reducers based on input size, not the size of the intermediate data — so if you have a highly selective filter, or you are grouping a large dataset by a low-cardinality field, it will produce bad number — but it’s a nice safeguard against dramatic misconfigurations.

    When porting to Apache Pig 0.8, remember to audit your scripts for parallelized operators that do not specify the PARALLEL keyword — if the intent is to use a single reducer, make this intent explicit by specifying PARALLEL 1.

  • HBaseStorage
  • HBaseStorage has been shored up in Pig 0.8. It can now read data stored in as bytes instead of requiring all numbers to be converted to Strings; it accepts a number of options — limit the number of rows returned, push down filters on HBase keys, etc. In addition, it can now be used to write to HBase in addition to reading from it. Details about the options, etc, can be found in the Release Notes section of PIG-1205.

    Note that at the moment this only works with the HBase 0.20.{4,5,6} releases, and does not work with 0.89+. There is a patch in PIG-1680 that you can apply if you need 0.89 and 0.90 compatibility; it is not applied to the main codebase yet, as it is not backwards compatible.

    We are very interested in help making this Storage engine more featureful, please feel free to jump in and contribute!

  • Support for custom Map-Reduce jobs in the flow
  • Although we try to make these a rarity, sometimes cases come up in which a custom Map-Reduce job fits the bill better than Pig. Weaving a Map-Reduce job into the middle of a Pig workflow was awkward before — you had to use something like Oozie or Azkaban, or write your own workflow application. Pig 0.8 introduces a simple “MAPREDUCE” operator which allows you to invoke an opaque MR job in the middle of the flow, and continue with Pig:

    text = load 'WordcountInput.txt';
    wordcount = MAPREDUCE wordcount.jar 
      STORE text INTO 'inputDir' 
      LOAD 'outputDir' AS (word:chararray, count: int) 
      `org.myorg.WordCount inputDir outputDir`;

    Details are available on the wiki page: http://wiki.apache.org/pig/NativeMapReduce

    The ticket for this one has been open for a while, since Pig 0.2 days, and it’s nice to see it finally implemented. Thumbs up to Aniket Mokashi for this one.

  • Custom Partitioners
  • This feature, also implemented by the amazingly productive Aniket Mokashi, is also a bit of a power-user thing (and also an ancient ticket, PIG-282). It allows the Pig script author to control the function used to distribute map output among reducers. By default, Pig uses a random hash partitioner, but sometimes a custom algorithm is required when the script author knows something particularly unique about the reduce key distribution. When that is the case, a user can now specify the Hadoop Partitioner to swap in instead of the default:

    B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;

    More specific documentation can be found in the Release Notes section of PIG-282

    Tagged with: ,

    Upcoming Features in Pig 0.8: Dynamic Invokers

    Posted in programming by squarecog on August 20, 2010

    Pig release 0.8 is scheduled to be feature-frozen and branched at the end of August 2010. This release has many, many useful new features, mostly addressing usability. In this series of posts, I will demonstrate some of my favorites from this release.

    Pig 0.8 will have a family of built-in UDFs called Dynamic Invokers. The idea is simple: frequently, Pig users need to use a simple function that is already provided by standard Java libraries, but for which a UDF has not been written. Dynamic Invokers allow a Pig programmer to refer to Java functions without having to wrap them in custom Pig UDFs, at the cost of doing some Java reflection on every function call.

    Tagged with: , ,

    Pig, HBase, Hadoop, and Twitter: HUG talk slides

    Posted in programming by squarecog on May 20, 2010

    I presented tonight at the Bay Area Hadoop User Group, talking briefly about Twitter’s use of Hadoop and Pig. Here are the slides:

    View this document on Scribd

    GROUP operator in Apache Pig

    Posted in programming by squarecog on May 11, 2010

    I’ve been doing a fair amount of helping people get started with Apache Pig. One common stumbling block is the GROUP operator. Although familiar, as it serves a similar function to SQL’s GROUP operator, it is just different enough in the Pig Latin language to be confusing. Hopefully this brief post will shed some light on what exactly is going on.

    Tagged with: , ,

    Twitter Lists as Tags

    Posted in Uncategorized by squarecog on November 11, 2009

    I created a toy script that is based on the idea that when people list twitter users in lists, they are implicitly tagging those users. Using the Twitter API, it’s dead simple to extract the lists a user belongs to, do a tiny bit of parsing, and visualize the tags in a word cloud. So I did it. Let me know what you think: TwitTags.
    Wordle for Squarecog tags

    Presentation on Apache Pig at Pittsburgh Hadoop User Group

    Posted in programming by squarecog on November 3, 2009

    Ashutosh and I presented at the Pittsburgh Hadoop User Group on Apache Pig. The slide deck goes through a brief into to Pig Latin, then jumps into an explanation of the different join algorithms, and finishes up with some research ideas. A pretty wide-ranging talk, for a diverse audience.

    Scribd messed up some of the colors, so if you can’t read some of the text, try downloading the original.

    View this document on Scribd

    Get every new post delivered to your Inbox.

    Join 25 other followers

    %d bloggers like this: