SquareCog's SquareBlog

Upcoming Features in Pig 0.8: Dynamic Invokers

Posted in programming by squarecog on August 20, 2010

Pig release 0.8 is scheduled to be feature-frozen and branched at the end of August 2010. This release has many, many useful new features, mostly addressing usability. In this series of posts, I will demonstrate some of my favorites from this release.

Pig 0.8 will have a family of built-in UDFs called Dynamic Invokers. The idea is simple: frequently, Pig users need to use a simple function that is already provided by standard Java libraries, but for which a UDF has not been written. Dynamic Invokers allow a Pig programmer to refer to Java functions without having to wrap them in custom Pig UDFs, at the cost of doing some Java reflection on every function call.
(more…)

Tagged with: , ,

New Features in Apache Pig 0.8

Posted in programming by squarecog on December 19, 2010

The Pig 0.8 release includes a large number of bug fixes and optimizations, but at the core it is a feature release. It’s been in the works for almost a full year (most of the work on 0.7 was completed by January of 2009, although it took a while to actually get the release out), and the amount of time spent on 0.8 really shows.

I meant to describe these in detail in a series of posts, but it seems blogging regularly is not my forte. This release is so chock-full of great new features, however, that I feel compelled to at least list them. So, behold, in no particular order, a non-exhaustive list of new features I am excited about in Pig 0.8:

  • Support for UDFs in scripting languages
  • This is exactly what it sounds like — if your favorite language has a JVM implementation, it can be used to create Pig UDFs.

    Pig now ships with support for UDFs in Jython, but other languages can be supported by implementing a few interfaces. Details about the Pig UDFs in Python can be found here: http://pig.apache.org/docs/r0.8.0/udf.html#Python+UDFs

    This is the outcome of PIG-928; it was quite a pleasure to watch this develop over time — while most Pig tickets wind up getting worked on by at most one or two people, this turned into a collaboration of quite a few developers, many of them new to the project — Kishore Gopalakrishna’s patch was the initial conversation starter, which was then hacked on or merged into similar work by Woody Anderson, Arnab Nandi, Julien Le Dem, Ashutosh Chauhan and Aniket Mokashi (Aniket deserves an extra shout-out for patiently working to incorporate everyone’s feedback and pushing the patch through the last mile).

  • PigUnit
  • A contribution by Romain Rigaux, PigUnit is exactly what it sounds like — a tool that simplifies the Pig users’ lives by giving them a simple way to unit test Pig scripts.

    The documentation at http://pig.apache.org/docs/r0.8.0/pigunit.html and the code at http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java?view=markup speak for themselves as far as usage.

  • PigStats
  • Pig can now provide much better visibility into what is going on inside a Pig job than it ever did before, thanks to extensive work by Richard Ding (see PIG-1333 and PIG-1478). This feature is a feature in three parts:

    1. Script statistics.
    This is the most easily visible change. At the end of running a script, Pig will output a table with some basic statistics regarding the jobs that it ran. It looks something like this:

    Job Stats (time in seconds):

    JobId Maps Reduces Max
    Map
    Time

    Min
    Map
    Time
    Avg
    Map
    Time
    Max
    Reduce
    Time
    Min
    Reduce
    Time
    Avg
    Reduce
    Time
    Alias Feature Outputs
    job_xxx 1654 218 84 6 14 107 87 99 counted_data,
    data,
    grouped_data
    GROUP_BY,
    COMBINER
    job_xxx 2 1 9 6 7 13 13 13 ordered_data SAMPLER
    job_xxx 2 1 26 18 22 31 31 31 ordered_data ORDER_BY hdfs://tmp/out,

    This is extremely useful when debugging slow jobs, as you can immediately identify which stages of your script are slow, and correlate the slow Map-Reduce jobs with the actual Pig operators and relations in your script — something that was not trivial before (folks often resorted to setting parallelism to slightly different numbers for every join and group just to figure out which job was doing what. No more of this!)

    2. Data in Job XML

    Pig now inserts several interesting properties into the Hadoop jobs that it generates, including the relations being generated, Pig features being used, and ids of parent Hadoop jobs. This is quite helpful when monitoring a cluster, and is also handy when examining job history using the HadoopJobHistoryLoader , now part of piggybank (use Pig to mine your job history!).

    3. PigRunner API

    The same information that is printed out when Pig runs the script from a command line is available if one uses the Java API to start Pig jobs. If you start a script using the PigRunner.run(String args[], ProgressNotificationListener listener), you will get as a result a PigStats object that gives you access to the job hierarchy, the Hadoop counters from each job, and so on. You can implement the optional ProgressNotificationListener if you want to watch the job as it progresses; the listener will be notified as different component jobs start and finish.

    Documentation of the API, new properties in the Job XML, and more, is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref1.html#Pig+Statistics

  • Scalar values
  • It’s very common to need to use some calculated statistic in a calculation to inform other calculations. For example, consider a data set that consists of people and their eye color; we want to calculate the fraction of the total population that has a given eye color. The script looks something like this:

    people = LOAD '/data/people' using PigStorage() 
      AS (person_id:long, eye_color:chararray);
    num_people = FOREACH (group people all) 
      GENERATE COUNT(people) AS total;
    eye_color_fractions = FOREACH ( GROUP people BY eye_color ) 
      GENERATE
        group as eye_color, 
        COUNT(people) / num_people.total AS fraction;
    

    Pretty straightforward, except it does not work. What’s happening in the above code is that we are referencing the relation num_people from inside another relation, eye_color_fractions and this doesn’t really make sense if Pig does not know that num_people only has one row.

    In the past you had to do something hacky like joining the two relations on a constant to replicate the total into each row, and then generate the division. Needless to say, this was not entirely satisfactory. In PIG-1434 Aniket Mokashi tackled this, implementing an elegant solution that hides all of these details from the user — you can now simply cast a single-row relation as a scalar, and use it as desired. The above script becomes:

    people = LOAD '/data/people' using PigStorage() 
      AS (person_id:long, eye_color:chararray);
    num_people = FOREACH (group people all) 
      GENERATE COUNT(people) AS total;
    eye_color_fractions = FOREACH ( GROUP people BY eye_color ) 
      GENERATE
        group as eye_color, 
        COUNT(people) / (long) num_people.total AS fraction;
    

    This makes the casting explicit, but Pig is now smart enough to do this implicitly as well. A runtime exception is generated if the relation being used as a scalar contains more than one tuple.

    More documentation of this feature is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref2.html#Casting+Relations+to+Scalars

  • Monitored UDFs
  • A new annotation has been added, @MonitoredUDF, which makes Pig spawn a watcher thread that kills an execution that is taking too long, and return a default value instead. This comes in handy when dealing with certain operations like complex regular expressions. More documentation is available at http://pig.apache.org/docs/r0.8.0/udf.html#Monitoring+long-running+UDFs

  • Automatic merge of small files
  • This is a simple one, but useful — when running Pig over many small files, instead of creating a map task per file (paying the overhead of scheduling and running a task for a computation that might only take a few seconds), we can merge the inputs and create a few map tasks that are a bit more hefty.

    Two properties control this behavior: pig.maxCombinedSplitSize controls the maximum size of the resulting split, and pig.splitCombination controls whether or not the feature is activated in the first place (it is on by default).

    This work is documented in the ticket PIG-1518; there are additional details in the release notes attached to the ticket.

  • Generic UDFs
  • I wrote about this one before — a small feature that allows you to invoke static Java methods as Pig UDFs without needing to wrap them in custom code.

    The official documentation is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref1.html#Dynamic+Invokers

  • Safeguards against missing PARALLEL keyword
  • One of the more common mistakes people make when writing Pig scripts is forgetting to specify parallelism for operators that need it. The default behavior used to be that this means parallelism of 1, which can lead to extremely inefficient jobs. A patch by Jeff Zhang in PIG-1249 changes this behavior to instead use a simple heuristic: if parallelism is not specified, derive the number of reducers by taking MIN(max_reducers, total_input_size / bytes_per_reducer). Max number of reducers is controlled by the property pig.exec.reducers.max (default 999) and bytes per reducer are controlled by pig.exec.reducers.bytes.per.reducer (default 1GB).

    This is a safeguard, not a panacea; it only works with file-based input, estimates number of reducers based on input size, not the size of the intermediate data — so if you have a highly selective filter, or you are grouping a large dataset by a low-cardinality field, it will produce bad number — but it’s a nice safeguard against dramatic misconfigurations.

    When porting to Apache Pig 0.8, remember to audit your scripts for parallelized operators that do not specify the PARALLEL keyword — if the intent is to use a single reducer, make this intent explicit by specifying PARALLEL 1.

  • HBaseStorage
  • HBaseStorage has been shored up in Pig 0.8. It can now read data stored in as bytes instead of requiring all numbers to be converted to Strings; it accepts a number of options — limit the number of rows returned, push down filters on HBase keys, etc. In addition, it can now be used to write to HBase in addition to reading from it. Details about the options, etc, can be found in the Release Notes section of PIG-1205.

    Note that at the moment this only works with the HBase 0.20.{4,5,6} releases, and does not work with 0.89+. There is a patch in PIG-1680 that you can apply if you need 0.89 and 0.90 compatibility; it is not applied to the main codebase yet, as it is not backwards compatible.

    We are very interested in help making this Storage engine more featureful, please feel free to jump in and contribute!

  • Support for custom Map-Reduce jobs in the flow
  • Although we try to make these a rarity, sometimes cases come up in which a custom Map-Reduce job fits the bill better than Pig. Weaving a Map-Reduce job into the middle of a Pig workflow was awkward before — you had to use something like Oozie or Azkaban, or write your own workflow application. Pig 0.8 introduces a simple “MAPREDUCE” operator which allows you to invoke an opaque MR job in the middle of the flow, and continue with Pig:

    text = load 'WordcountInput.txt';
    wordcount = MAPREDUCE wordcount.jar 
      STORE text INTO 'inputDir' 
      LOAD 'outputDir' AS (word:chararray, count: int) 
      `org.myorg.WordCount inputDir outputDir`;
    

    Details are available on the wiki page: http://wiki.apache.org/pig/NativeMapReduce

    The ticket for this one has been open for a while, since Pig 0.2 days, and it’s nice to see it finally implemented. Thumbs up to Aniket Mokashi for this one.

  • Custom Partitioners
  • This feature, also implemented by the amazingly productive Aniket Mokashi, is also a bit of a power-user thing (and also an ancient ticket, PIG-282). It allows the Pig script author to control the function used to distribute map output among reducers. By default, Pig uses a random hash partitioner, but sometimes a custom algorithm is required when the script author knows something particularly unique about the reduce key distribution. When that is the case, a user can now specify the Hadoop Partitioner to swap in instead of the default:

    B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;

    More specific documentation can be found in the Release Notes section of PIG-282

    Tagged with: ,