SquareCog's SquareBlog

Pig trick to register latest version of jar from HDFS

Posted in Uncategorized by squarecog on August 30, 2011


%default guavaJar `hadoop fs -ls lib/*guava*jar | awk '{print $8;}' | sort -n | head -1`
register 'hdfs://$guavaJar'

The same idea also works without HDFS being involved, of course.

Advertisements

Hadoop requires stable hashCode() implementations

Posted in Uncategorized by squarecog on February 20, 2011

It makes sense, really, once you think about it, but first you have to know to think about it.

Let’s review the basics.

To write a Map-Reduce job, you implement a Mapper and a Reducer. The mapper takes in pairs of keys and values, and outputs pairs of keys and values. Both keys and values have to implement the Writable interface, which is how Hadoop deals with serializing them. The keys have to implement WritableComparable, a superset of Writable that also implements, unsurprisingly, Comparable. They have to be Comparable because Hadoop sorts keys for the Reducers.

The important bit here is that a partitioner is used to determine which of the reducers a key-value pair should go to. Most of the time the default HashPartitioner is used. The HashPartitioner is very simple — for every key, Hadoop invokes hashCode() and routes based on the result. If you just implement the WritableComparable interface, you will inherit Java Object’s hashCode(). Here’s a little extract from Object’s hashCode() javadoc (emphasis mine):

Whenever it is invoked on the same object more than once during an execution of a Java application, the hashCode method must consistently return the same integer, provided no information used in equals comparisons on the object is modified. This integer need not remain consistent from one execution of an application to another execution of the same application.

So, we extract some key on mapper 1, extract same key on mapper 2, both get hashed in order to determine what reducer they should go to, and your two entries with the same key go to two different reducers because Object does not guarantee that hashCode() returns the same thing for the same key in two different JVM instances.

Now, this is not a problem most of the time because there are a ton of WritableComparable implementations that have hashCode() that is stable across different JVMs including, of course, all the common ones — LongWritable, Text, etc. It is, however, a problem is you are wrapping complex Java objects and proxy to their hashCode() implementations. A correct hashCode() implementation does not have to return the same value in different instantiations of the application, and many, in fact, do not. For example, Protocol Buffer messages. Their implementation of hashCode(), while correct, is not stable. Which presents problems if you are trying to wrap them for use as Hadoop Writables. This showed up as issue 28 in Elephant-Bird, our collection of classes for working with lzo compression, Protocol Buffers, and Thrift objects in Hadoop. The fix is pretty simple — we just call Arrays.hashCode() on a serialized representation of the message, and make sure the serialization is cached. We’ll have to serialize it anyway when Hadoop wants to write it out to disk, so there’s no real overhead.

  /**
   * <p>Returns a hashCode that is stable across multiple instances of JVMs.
   * (<code>hashCode()</code> is not required to return the same value in
   * different instances of the same applications in Java, just in a
   * single instance of the application; Hadoop imposes a more strict requirement.)
   */
  @Override
  public int hashCode() {
    byte[] bytes = serialize();
    return (bytes == null) ? 31 : Arrays.hashCode(bytes);
  }

Now, the fun part. We fixed the bug, now we have to write a test. Our test requires two JVMs — it does not manifest itself in a single JVM, by its very nature. A few terrible ideas came to mind, like shelling out to a java main through Runtime.exec, but eventually it occurred to me that Apache Ant already does this and there must be something I can use. It’s a bit gnarly, but here’s the test — we use Ant’s Java class, set it up with the proper environment, and exec a main() in it.

  @Test
  public void testStableHashcodeAcrossJVMs() throws IOException {
    int expectedHashCode = referenceAbWritable.hashCode();
    Java otherJvm = new Java();
    otherJvm.setNewenvironment(true);
    otherJvm.setFork(true);
    otherJvm.setClassname(OtherJvmClass.class.getName());
    for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
      Environment.Variable var = new Environment.Variable();
      var.setKey(entry.getKey());
      var.setValue(entry.getValue());
      otherJvm.addEnv(var);
    }
    for (String prop : System.getProperties().stringPropertyNames()) {
      String propValue = System.getProperty(prop);
      Environment.Variable var = new Environment.Variable();
      var.setKey(prop);
      var.setValue(propValue);
      otherJvm.addSysproperty(var);
    }
    otherJvm.setDir(new File(System.getProperty("java.io.tmpdir")));
    File tmpOut = File.createTempFile("otherJvm", "txt");
    otherJvm.setArgs(tmpOut.getAbsolutePath());
    otherJvm.init();
    otherJvm.executeJava();
    DataInputStream is = new DataInputStream(new FileInputStream(tmpOut));
    assertEquals(expectedHashCode, is.readInt());
    is.close();
  }

  public static class OtherJvmClass {
    /* Used for testStableHashcodeAcrossJVMs */
    public static void main(String[] args) throws IOException {
      setUp();
      int hashCode = referenceAbWritable.hashCode();
      File tmpFile = new File(args[0]);
      DataOutputStream os = new DataOutputStream(new FileOutputStream(tmpFile));
      os.writeInt(hashCode);
      os.close();
      System.exit(0);
    }
  }

There is probably a better way to communicate between the processes than through a temp file, but nothing terribly obvious showed up when I browsed the javadocs, and this did get the job done. The test reliably fails when applied to the original implementation of ProtobufWritable, and passes in the new version.

Twitter Lists as Tags

Posted in Uncategorized by squarecog on November 11, 2009

I created a toy script that is based on the idea that when people list twitter users in lists, they are implicitly tagging those users. Using the Twitter API, it’s dead simple to extract the lists a user belongs to, do a tiny bit of parsing, and visualize the tags in a word cloud. So I did it. Let me know what you think: TwitTags.
Wordle for Squarecog tags

Jeeves and Diller — separated at birth?

Posted in Uncategorized by squarecog on April 20, 2009

IAC-owned Ask.com, formerly known as Ask Jeeves, is bringing Jeeves back in the UK (disclaimer: I used to work for Ask, and still moonlight there occasionally).

Jeeves is back after his 3-year leave of absence with a total makeover. Is it just me or does he now look a whole lot like his boss Barry Diller?

Jeeves and Diller

Jeeves and Diller

Free Hadoop Training

Posted in Uncategorized by squarecog on March 13, 2009

Cloudera has made training videos, screencasts, and excercises from their basic Hadoop training available on the web.

Check it out here: http://www.cloudera.com/hadoop-training-basic

They even include a VM image that you can use to get started without messing with installation details.  

But how is it different from the word count tutorial I went through on the Hadoop Wiki, you ask?  There is a section on “algorithms in Map-Reduce” and a screencast on using Hive.  More on this once I actually do the excercises…

Tagged with:

Great post on databases and map/reduce

Posted in Uncategorized by squarecog on January 14, 2009

Anand Rajaraman has a great post on Datawocky with an overview of the various approaches to data analysis using Map/Reduce, and they ways in which this paradigm is bridged with RDBMSes by AsterData and Greenplum, and the Pig project. Don’t miss the comments from people directly responsible for these technologies, as well as Facebook’s Hive.

Tagged with: , , ,
%d bloggers like this: