GROUP operator in Apache Pig
I’ve been doing a fair amount of helping people get started with Apache Pig. One common stumbling block is the GROUP operator. Although familiar, as it serves a similar function to SQL’s GROUP operator, it is just different enough in the Pig Latin language to be confusing. Hopefully this brief post will shed some light on what exactly is going on.
Let us start by loading up some data:
my_data = LOAD '/data/customers' using PigStorage() as (name:chararray, age:int, eye_color:chararray, height:int);
If we want to compute some aggregates from this data, we might want to group the rows into buckets over which we will run the aggregate functions:
by_age = GROUP my_data BY age; by_age_and_color = GROUP my_data BY (age, eye_color);
When you group a relation, the result is a new relation with two columns: “group” and the name of the original relation. The group column has the schema of what you grouped by. If you grouped by an integer column, for example, as in the first example, the type will be int. If you grouped by a tuple of several columns, as in the second example, the “group” column will be a tuple with two fields, “age” and “eye_color”.
They can be retrieved by flattening “group”, or by directly accessing them: “group.age, group.eye_color”:
-- using FLATTEN age_and_color = FOREACH by_age_and_color GENERATE FLATTEN(group) as (age, color); -- or using explicit projections age_and_ccolor = FOREACH by_age_and_color GENERATE group.age, group.color;
Note that using the FLATTEN operator is preferable since it allows algebraic optimizations to work — but that’s a subject for another post.
The second column will be named after the original relation, and contain a bag of all the rows in the original relation that match the corresponding group. The rows are unaltered — they are the same as they were in the original table that you grouped.
As a side note, Pig also provides a handy operator called COGROUP, which essentially performs a join and a group at the same time. The syntax is as follows:
cogrouped_data = COGROUP data1 on id, data2 on user_id;
The resulting schema will be the group as described above, followed by two columns — data1 and data2, each containing bags of tuples with the given group key. This is very useful if you intend to join and group on the same key, as it saves you a whole Map-Reduce stage.
Processing the results
To work on the results of the group operator, you will want to use a FOREACH. This is a simple loop construct that works on a relation one row at a time. You can apply it to any relation, but it’s most frequently used on results of grouping, as it allows you to apply aggregation functions to the collected bags.
Referring to somebag.some_field in a FOREACH operator essentially means “for each tuple in the bag, give me some_field in that tuple”. So you can do things like
age_counts = FOREACH by_age GENERATE group as age, -- the key you grouped on COUNT(my_data), -- the number of people with this age MAX(my_data.height); -- the maximum height of people with this age
Note that all the functions in this example are aggregates. That’s because they are things we can do to a collection of values. Folks sometimes try to apply single-item operations in a foreach — like transforming strings or checking for specific values of a field. Remember, my_data.height doesn’t give you a single height element — it gives you all the heights of all people in a given age group.
It is common to need counts by multiple dimensions; in our running example, we might want to get not just the maximum or the average height of all people in a given age category, but also the number of people in each age category with a certain eye color. There are a few ways two achieve this, depending on how you want to lay out the results.
The simplest is to just group by both age and eye color:
by_age_color = GROUP my_data BY (age, eye_color); -- count colors separately by_age_color_counts = FOREACH by_age_color GENERATE FLATTEN(group) AS (age, eye_color), AVG(my_data.height) as age_color_height_avg, COUNT(my_data) AS age_color_count;
From there, you can group by_age_color_counts again and get your by-age statistics.
If you have a set list of eye colors, and you want the eye color counts to be columns in the resulting table, you can do the following:
-- break out the counts my_data = FOREACH my_data GENERATE name, age, height, (eye_color == 'brown' ? 1 : 0) AS brown_eyes, (eye_color == 'blue' ? 1 : 0) AS blue_eyes, (eye_color = 'green' ? 1 : 0 ) AS green_eyes; -- group and generate by_age = group my_data by age; final_data = FOREACH by_age GENERATE group as age, COUNT(my_data) as num_people, AVG(my_data.height) as avg_height, SUM(brown_eyes) as num_brown_eyes, SUM(blue_eyes) as num_blue_eyes, SUM(green_eyes) as num_green_eyes;
A few notes on more advanced topics, which perhaps should warrant a more extensive treatment in a separate post.
The GROUP operator in Pig is a ‘blocking’ operator, and forces a Hdoop Map-Reduce job. All the data is shuffled, so that rows in different partitions (or “slices”, if you prefer the pre-Pig 0.7 terminology) that have the same grouping key wind up together. Therefore, grouping has non-trivial overhead, unlike operations like filtering or projecting. Consider this when putting together your pipelines.
If you need to calculate statistics on multiple different groupings of the data, it behooves one to take advantage of Pig’s multi-store optimization, wherein it will find opportunities to share work between multiple calculations.
When groups grow too large, they can cause significant memory issues on reducers; they can lead to hot spots, and all kinds of other badness. Look up algebraic and accumulative EvalFunc interfaces in the Pig documentation, and try to use them to avoid this problem when possible. Check the execution plan (using the ‘explain” command) to make sure the algebraic and accumulative optimizations are used.
Pig 0.7 introduces an option to group on the map side, which you can invoke when you know that all of your keys are guaranteed to be on the same partition. Consider it when this condition applies.
So there you have it, a somewhat ill-structured brain dump about the GROUP operator in Pig. I hope it helps folks — if something is confusing, please let me know in the comments!