SquareCog's SquareBlog

Hadoop requires stable hashCode() implementations

Posted in Uncategorized by squarecog on February 20, 2011

It makes sense, really, once you think about it, but first you have to know to think about it.

Let’s review the basics.

To write a Map-Reduce job, you implement a Mapper and a Reducer. The mapper takes in pairs of keys and values, and outputs pairs of keys and values. Both keys and values have to implement the Writable interface, which is how Hadoop deals with serializing them. The keys have to implement WritableComparable, a superset of Writable that also implements, unsurprisingly, Comparable. They have to be Comparable because Hadoop sorts keys for the Reducers.

The important bit here is that a partitioner is used to determine which of the reducers a key-value pair should go to. Most of the time the default HashPartitioner is used. The HashPartitioner is very simple — for every key, Hadoop invokes hashCode() and routes based on the result. If you just implement the WritableComparable interface, you will inherit Java Object’s hashCode(). Here’s a little extract from Object’s hashCode() javadoc (emphasis mine):

Whenever it is invoked on the same object more than once during an execution of a Java application, the hashCode method must consistently return the same integer, provided no information used in equals comparisons on the object is modified. This integer need not remain consistent from one execution of an application to another execution of the same application.

So, we extract some key on mapper 1, extract same key on mapper 2, both get hashed in order to determine what reducer they should go to, and your two entries with the same key go to two different reducers because Object does not guarantee that hashCode() returns the same thing for the same key in two different JVM instances.

Now, this is not a problem most of the time because there are a ton of WritableComparable implementations that have hashCode() that is stable across different JVMs including, of course, all the common ones — LongWritable, Text, etc. It is, however, a problem is you are wrapping complex Java objects and proxy to their hashCode() implementations. A correct hashCode() implementation does not have to return the same value in different instantiations of the application, and many, in fact, do not. For example, Protocol Buffer messages. Their implementation of hashCode(), while correct, is not stable. Which presents problems if you are trying to wrap them for use as Hadoop Writables. This showed up as issue 28 in Elephant-Bird, our collection of classes for working with lzo compression, Protocol Buffers, and Thrift objects in Hadoop. The fix is pretty simple — we just call Arrays.hashCode() on a serialized representation of the message, and make sure the serialization is cached. We’ll have to serialize it anyway when Hadoop wants to write it out to disk, so there’s no real overhead.

  /**
   * <p>Returns a hashCode that is stable across multiple instances of JVMs.
   * (<code>hashCode()</code> is not required to return the same value in
   * different instances of the same applications in Java, just in a
   * single instance of the application; Hadoop imposes a more strict requirement.)
   */
  @Override
  public int hashCode() {
    byte[] bytes = serialize();
    return (bytes == null) ? 31 : Arrays.hashCode(bytes);
  }

Now, the fun part. We fixed the bug, now we have to write a test. Our test requires two JVMs — it does not manifest itself in a single JVM, by its very nature. A few terrible ideas came to mind, like shelling out to a java main through Runtime.exec, but eventually it occurred to me that Apache Ant already does this and there must be something I can use. It’s a bit gnarly, but here’s the test — we use Ant’s Java class, set it up with the proper environment, and exec a main() in it.

  @Test
  public void testStableHashcodeAcrossJVMs() throws IOException {
    int expectedHashCode = referenceAbWritable.hashCode();
    Java otherJvm = new Java();
    otherJvm.setNewenvironment(true);
    otherJvm.setFork(true);
    otherJvm.setClassname(OtherJvmClass.class.getName());
    for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
      Environment.Variable var = new Environment.Variable();
      var.setKey(entry.getKey());
      var.setValue(entry.getValue());
      otherJvm.addEnv(var);
    }
    for (String prop : System.getProperties().stringPropertyNames()) {
      String propValue = System.getProperty(prop);
      Environment.Variable var = new Environment.Variable();
      var.setKey(prop);
      var.setValue(propValue);
      otherJvm.addSysproperty(var);
    }
    otherJvm.setDir(new File(System.getProperty("java.io.tmpdir")));
    File tmpOut = File.createTempFile("otherJvm", "txt");
    otherJvm.setArgs(tmpOut.getAbsolutePath());
    otherJvm.init();
    otherJvm.executeJava();
    DataInputStream is = new DataInputStream(new FileInputStream(tmpOut));
    assertEquals(expectedHashCode, is.readInt());
    is.close();
  }

  public static class OtherJvmClass {
    /* Used for testStableHashcodeAcrossJVMs */
    public static void main(String[] args) throws IOException {
      setUp();
      int hashCode = referenceAbWritable.hashCode();
      File tmpFile = new File(args[0]);
      DataOutputStream os = new DataOutputStream(new FileOutputStream(tmpFile));
      os.writeInt(hashCode);
      os.close();
      System.exit(0);
    }
  }

There is probably a better way to communicate between the processes than through a temp file, but nothing terribly obvious showed up when I browsed the javadocs, and this did get the job done. The test reliably fails when applied to the original implementation of ProtobufWritable, and passes in the new version.

%d bloggers like this: