SquareCog's SquareBlog

Pig Variables and Semicolons

Posted in programming by squarecog on August 24, 2011

Pay attention to where you put your semicolons lest they become part of the value!

Pig Script:

%default QUOTED '20090101';

l = load '/logs/$QUOTED';
l = load '/logs/$UNQUOTED';

After preprocessing with pig -x local -r script.pig:


l = load '/logs/20090101';
l = load '/logs/20090101;';

Tagged with:

Incrementing Hadoop Counters in Apache Pig

Posted in programming by squarecog on December 24, 2010

Information about incrementing Hadoop counters from inside Pig UDFs is not currently well-documented, judging by the user list traffic, so this is a brief note showing how to do that.

Hadoop counters are a way to report basic statistics of a job in Hadoop. I won’t go into a detailed discussion what they are and when to use them here — there’s plenty of information about that on the internet (for starters, see the Cloud9 intro to Counters, and some guidelines for appropriate usage in “Apache Hadoop Best Practices and Anti-Patterns”).

Pig 0.6 and before

Counters were not explicitly supported in Pig 0.6 and before, but you could get at them with this hack (inside a UDF):

Reporter reporter = PigHadoopLogger.getInstance().getReporter()
if (reporter != null) {
  reporter.incrCounter(myEnum, 1L);

Pig 0.8
Pig 0.8 has an “official” method for getting and incrementing counters from a UDF:

PigStatusReporter reporter = PigStatusReporter.getInstance();
if (reporter != null) {

You can also get Counters programmatically if you are invoking Pig using PigRunner, and getting a PigStats object on completion. It’s a bit involved:

PigStats.JobGraph jobGraph = pigStats.getJobGraph();
for (JobStats jobStats :  jobGraph) {
  Counters counters = jobStats.getHadoopCounters();

Pig 0.7
Unfortunately I don’t know of a way to do this in 0.7, as the old hack went away and the new PigStatusReporter hadn’t been added yet. If you have a trick, please comment.

Watch out for nulls
We’ve observed that sometimes the reporter is null for a bit even when a UDF is executing on the MR side. To deal with this, we added a little helper class PigCounterHelper to Elephant-Bird that buffers the writes in a Map, and flushes them when it gets a non-null counter.

So there. If someone asks about counters in Pig, send them here.

Tagged with: ,

New Features in Apache Pig 0.8

Posted in programming by squarecog on December 19, 2010

The Pig 0.8 release includes a large number of bug fixes and optimizations, but at the core it is a feature release. It’s been in the works for almost a full year (most of the work on 0.7 was completed by January of 2009, although it took a while to actually get the release out), and the amount of time spent on 0.8 really shows.

I meant to describe these in detail in a series of posts, but it seems blogging regularly is not my forte. This release is so chock-full of great new features, however, that I feel compelled to at least list them. So, behold, in no particular order, a non-exhaustive list of new features I am excited about in Pig 0.8:

  • Support for UDFs in scripting languages
  • This is exactly what it sounds like — if your favorite language has a JVM implementation, it can be used to create Pig UDFs.

    Pig now ships with support for UDFs in Jython, but other languages can be supported by implementing a few interfaces. Details about the Pig UDFs in Python can be found here: http://pig.apache.org/docs/r0.8.0/udf.html#Python+UDFs

    This is the outcome of PIG-928; it was quite a pleasure to watch this develop over time — while most Pig tickets wind up getting worked on by at most one or two people, this turned into a collaboration of quite a few developers, many of them new to the project — Kishore Gopalakrishna’s patch was the initial conversation starter, which was then hacked on or merged into similar work by Woody Anderson, Arnab Nandi, Julien Le Dem, Ashutosh Chauhan and Aniket Mokashi (Aniket deserves an extra shout-out for patiently working to incorporate everyone’s feedback and pushing the patch through the last mile).

  • PigUnit
  • A contribution by Romain Rigaux, PigUnit is exactly what it sounds like — a tool that simplifies the Pig users’ lives by giving them a simple way to unit test Pig scripts.

    The documentation at http://pig.apache.org/docs/r0.8.0/pigunit.html and the code at http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/pigunit/TestPigTest.java?view=markup speak for themselves as far as usage.

  • PigStats
  • Pig can now provide much better visibility into what is going on inside a Pig job than it ever did before, thanks to extensive work by Richard Ding (see PIG-1333 and PIG-1478). This feature is a feature in three parts:

    1. Script statistics.
    This is the most easily visible change. At the end of running a script, Pig will output a table with some basic statistics regarding the jobs that it ran. It looks something like this:

    Job Stats (time in seconds):

    JobId Maps Reduces Max

    Alias Feature Outputs
    job_xxx 1654 218 84 6 14 107 87 99 counted_data,
    job_xxx 2 1 9 6 7 13 13 13 ordered_data SAMPLER
    job_xxx 2 1 26 18 22 31 31 31 ordered_data ORDER_BY hdfs://tmp/out,

    This is extremely useful when debugging slow jobs, as you can immediately identify which stages of your script are slow, and correlate the slow Map-Reduce jobs with the actual Pig operators and relations in your script — something that was not trivial before (folks often resorted to setting parallelism to slightly different numbers for every join and group just to figure out which job was doing what. No more of this!)

    2. Data in Job XML

    Pig now inserts several interesting properties into the Hadoop jobs that it generates, including the relations being generated, Pig features being used, and ids of parent Hadoop jobs. This is quite helpful when monitoring a cluster, and is also handy when examining job history using the HadoopJobHistoryLoader , now part of piggybank (use Pig to mine your job history!).

    3. PigRunner API

    The same information that is printed out when Pig runs the script from a command line is available if one uses the Java API to start Pig jobs. If you start a script using the PigRunner.run(String args[], ProgressNotificationListener listener), you will get as a result a PigStats object that gives you access to the job hierarchy, the Hadoop counters from each job, and so on. You can implement the optional ProgressNotificationListener if you want to watch the job as it progresses; the listener will be notified as different component jobs start and finish.

    Documentation of the API, new properties in the Job XML, and more, is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref1.html#Pig+Statistics

  • Scalar values
  • It’s very common to need to use some calculated statistic in a calculation to inform other calculations. For example, consider a data set that consists of people and their eye color; we want to calculate the fraction of the total population that has a given eye color. The script looks something like this:

    people = LOAD '/data/people' using PigStorage() 
      AS (person_id:long, eye_color:chararray);
    num_people = FOREACH (group people all) 
      GENERATE COUNT(people) AS total;
    eye_color_fractions = FOREACH ( GROUP people BY eye_color ) 
        group as eye_color, 
        COUNT(people) / num_people.total AS fraction;

    Pretty straightforward, except it does not work. What’s happening in the above code is that we are referencing the relation num_people from inside another relation, eye_color_fractions and this doesn’t really make sense if Pig does not know that num_people only has one row.

    In the past you had to do something hacky like joining the two relations on a constant to replicate the total into each row, and then generate the division. Needless to say, this was not entirely satisfactory. In PIG-1434 Aniket Mokashi tackled this, implementing an elegant solution that hides all of these details from the user — you can now simply cast a single-row relation as a scalar, and use it as desired. The above script becomes:

    people = LOAD '/data/people' using PigStorage() 
      AS (person_id:long, eye_color:chararray);
    num_people = FOREACH (group people all) 
      GENERATE COUNT(people) AS total;
    eye_color_fractions = FOREACH ( GROUP people BY eye_color ) 
        group as eye_color, 
        COUNT(people) / (long) num_people.total AS fraction;

    This makes the casting explicit, but Pig is now smart enough to do this implicitly as well. A runtime exception is generated if the relation being used as a scalar contains more than one tuple.

    More documentation of this feature is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref2.html#Casting+Relations+to+Scalars

  • Monitored UDFs
  • A new annotation has been added, @MonitoredUDF, which makes Pig spawn a watcher thread that kills an execution that is taking too long, and return a default value instead. This comes in handy when dealing with certain operations like complex regular expressions. More documentation is available at http://pig.apache.org/docs/r0.8.0/udf.html#Monitoring+long-running+UDFs

  • Automatic merge of small files
  • This is a simple one, but useful — when running Pig over many small files, instead of creating a map task per file (paying the overhead of scheduling and running a task for a computation that might only take a few seconds), we can merge the inputs and create a few map tasks that are a bit more hefty.

    Two properties control this behavior: pig.maxCombinedSplitSize controls the maximum size of the resulting split, and pig.splitCombination controls whether or not the feature is activated in the first place (it is on by default).

    This work is documented in the ticket PIG-1518; there are additional details in the release notes attached to the ticket.

  • Generic UDFs
  • I wrote about this one before — a small feature that allows you to invoke static Java methods as Pig UDFs without needing to wrap them in custom code.

    The official documentation is available at http://pig.apache.org/docs/r0.8.0/piglatin_ref1.html#Dynamic+Invokers

  • Safeguards against missing PARALLEL keyword
  • One of the more common mistakes people make when writing Pig scripts is forgetting to specify parallelism for operators that need it. The default behavior used to be that this means parallelism of 1, which can lead to extremely inefficient jobs. A patch by Jeff Zhang in PIG-1249 changes this behavior to instead use a simple heuristic: if parallelism is not specified, derive the number of reducers by taking MIN(max_reducers, total_input_size / bytes_per_reducer). Max number of reducers is controlled by the property pig.exec.reducers.max (default 999) and bytes per reducer are controlled by pig.exec.reducers.bytes.per.reducer (default 1GB).

    This is a safeguard, not a panacea; it only works with file-based input, estimates number of reducers based on input size, not the size of the intermediate data — so if you have a highly selective filter, or you are grouping a large dataset by a low-cardinality field, it will produce bad number — but it’s a nice safeguard against dramatic misconfigurations.

    When porting to Apache Pig 0.8, remember to audit your scripts for parallelized operators that do not specify the PARALLEL keyword — if the intent is to use a single reducer, make this intent explicit by specifying PARALLEL 1.

  • HBaseStorage
  • HBaseStorage has been shored up in Pig 0.8. It can now read data stored in as bytes instead of requiring all numbers to be converted to Strings; it accepts a number of options — limit the number of rows returned, push down filters on HBase keys, etc. In addition, it can now be used to write to HBase in addition to reading from it. Details about the options, etc, can be found in the Release Notes section of PIG-1205.

    Note that at the moment this only works with the HBase 0.20.{4,5,6} releases, and does not work with 0.89+. There is a patch in PIG-1680 that you can apply if you need 0.89 and 0.90 compatibility; it is not applied to the main codebase yet, as it is not backwards compatible.

    We are very interested in help making this Storage engine more featureful, please feel free to jump in and contribute!

  • Support for custom Map-Reduce jobs in the flow
  • Although we try to make these a rarity, sometimes cases come up in which a custom Map-Reduce job fits the bill better than Pig. Weaving a Map-Reduce job into the middle of a Pig workflow was awkward before — you had to use something like Oozie or Azkaban, or write your own workflow application. Pig 0.8 introduces a simple “MAPREDUCE” operator which allows you to invoke an opaque MR job in the middle of the flow, and continue with Pig:

    text = load 'WordcountInput.txt';
    wordcount = MAPREDUCE wordcount.jar 
      STORE text INTO 'inputDir' 
      LOAD 'outputDir' AS (word:chararray, count: int) 
      `org.myorg.WordCount inputDir outputDir`;

    Details are available on the wiki page: http://wiki.apache.org/pig/NativeMapReduce

    The ticket for this one has been open for a while, since Pig 0.2 days, and it’s nice to see it finally implemented. Thumbs up to Aniket Mokashi for this one.

  • Custom Partitioners
  • This feature, also implemented by the amazingly productive Aniket Mokashi, is also a bit of a power-user thing (and also an ancient ticket, PIG-282). It allows the Pig script author to control the function used to distribute map output among reducers. By default, Pig uses a random hash partitioner, but sometimes a custom algorithm is required when the script author knows something particularly unique about the reduce key distribution. When that is the case, a user can now specify the Hadoop Partitioner to swap in instead of the default:

    B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;

    More specific documentation can be found in the Release Notes section of PIG-282

    Tagged with: ,

    Upcoming Features in Pig 0.8: Dynamic Invokers

    Posted in programming by squarecog on August 20, 2010

    Pig release 0.8 is scheduled to be feature-frozen and branched at the end of August 2010. This release has many, many useful new features, mostly addressing usability. In this series of posts, I will demonstrate some of my favorites from this release.

    Pig 0.8 will have a family of built-in UDFs called Dynamic Invokers. The idea is simple: frequently, Pig users need to use a simple function that is already provided by standard Java libraries, but for which a UDF has not been written. Dynamic Invokers allow a Pig programmer to refer to Java functions without having to wrap them in custom Pig UDFs, at the cost of doing some Java reflection on every function call.

    Tagged with: , ,

    Pig, HBase, Hadoop, and Twitter: HUG talk slides

    Posted in programming by squarecog on May 20, 2010

    I presented tonight at the Bay Area Hadoop User Group, talking briefly about Twitter’s use of Hadoop and Pig. Here are the slides:

    View this document on Scribd

    GROUP operator in Apache Pig

    Posted in programming by squarecog on May 11, 2010

    I’ve been doing a fair amount of helping people get started with Apache Pig. One common stumbling block is the GROUP operator. Although familiar, as it serves a similar function to SQL’s GROUP operator, it is just different enough in the Pig Latin language to be confusing. Hopefully this brief post will shed some light on what exactly is going on.

    Tagged with: , ,

    Presentation on Apache Pig at Pittsburgh Hadoop User Group

    Posted in programming by squarecog on November 3, 2009

    Ashutosh and I presented at the Pittsburgh Hadoop User Group on Apache Pig. The slide deck goes through a brief into to Pig Latin, then jumps into an explanation of the different join algorithms, and finishes up with some research ideas. A pretty wide-ranging talk, for a diverse audience.

    Scribd messed up some of the colors, so if you can’t read some of the text, try downloading the original.

    View this document on Scribd

    Great Database Performance Presentations and Videos

    Posted in programming by squarecog on September 11, 2009

    Percona is posting presentations from their Performance Conference at this blog: http://www.percona.tv/ , both the videos and the PPTs.  There is some great info in there, not just on MySQL, but also on Hive, PostgreSQL, performance monitoring and more. Well worth checking out.

    Tagged with:

    Building an Inverted Index with Hadoop and Pig

    Posted in programming by squarecog on January 17, 2009

    Note: For some reason, this post appears to be pretty popular. Here's the thing. This was the first thing I wrote when learning Pig. Literally -- I wrote it down the evening I sat down to play with Pig's syntax. You wouldn't really ever construct an inverted index this way. The point was that you can, not that you should. It is, however, kind of neat.

    boarPig is a system for processing very large datasets, developed mostly at Yahoo and now an Apache Hadoop sub-project.  Pig aims to provide massive scalability by translating code written in a new data processing language called Pig Latin into Hadoop (map/reduce) plans.

    In this post, I present a (very) brief description of the Pig project and demonstrate how one can construct an inverted index from a collection of text files using just a few lines of PigLatin. (more…)

    Dealing with underflow in joint probability calculations

    Posted in programming by squarecog on January 10, 2009

    Been meaning to post this for a while, but it kept dropping to the bottom of the to-do list. There is a subtle bug in the code I posted earlier for splitting long strings into words.

    The problem is that for most words, their probability of occurrence is extremely small.  This means that when we have a sequence of several words, the probability of all of them occurring is

    P(a, b, c) = P(a) * P(b) * P(c)

    which is three very small numbers multiplied by each other, which is a far smaller number. If we get enough of those, we quickly encounter the undeflow problem — internal computer representation of these small numbers does not have enough bits to represent the enormous smallness of them, and rounds them to roughly zero. (more…)

    %d bloggers like this: