Julien's tech blog

talking about tech stuff: stuff I'm interested in, intriguing stuff, random stuff.

Category Archives: Apache Pig

Transitive Closure in Pig

1. Introduction

This a follow-up on my previous post about implementing PageRank in Pig using embedding. I also talked about this in a presentation to the Pig user group.
One of the best features of embedding is how it simplifies writing UDFs and using them right away in the same script without superfluous declarations. Computing a transitive closure is a good example of an algorithm requiring an iteration, a few simple UDFs and an end condition to decide when to stop iterating. Embedding is available in Pig 0.9. Knowledge of both Pig and Python is required to follow. Examples are available on github.

2. The problem

Before diving into the solution I will give a brief overview of the problem. We want to find the connected components of a graph by performing a transitive closure. Of course I will assume that the graph does not fit in main memory otherwise what follows is not very useful.
A picture is worth a thousand words; here is a visual statement of the problem:

Transitive Closure

Transitive Closure

I highlighted the diameter in red as it plays an important role in the complexity of the algorithm. The diameter of a connected component is the longest shortest path between 2 nodes of the component.
The algorithm will require log2(max(diameter of a component)) iterations because every iteration we double the number of relations that have been followed. This is not the overall complexity but it ensures that the number of iterations will stay low as with just 10 iterations you can handle components with a diameter of 1000+. In most real life use cases the diameter is very low and a few iterations will suffice.

3. The algorithm

I wrote the following implementation to illustrate the embedding feature. It may not be optimal but it is a working solution with a reasonable number of iterations. The algorithm does a graph traversal from all the nodes in parallel. It does one hop per iteration (pardon the scientific jargon) and then merges the results per node (using follow() and OR()). As a consequence the diameter of the graph followed from a node doubles every iteration. In this case, the edges of the graph are considered bidirectional and are stored one way (see normalize() and bidirectional()) to save space. The edges are stored in two files: followed (empty at the beginning) and to_follow which will tell us when all the edges have been followed (to_follow is empty). The last part of the script does some formatting to turn a list of edges into group of nodes (using sort() to make sure they are always represented the same way).

4. The code

Here is an example of how to solve this problem by embedding Pig in Python.
(explanation follows)
Input data (data/tc_data_simple.txt):

id0	id1
id1	id2
id2	id3
id3	id4
id4	id5
id5	id6
id6	id7
id7	id8
id11	id10
id12	id11
id13	id12
id14	id13
id14	id15
id15	id16
id16	id17
id17	id18



from org.apache.pig.scripting import *

@outputSchema("rels:{t:(id1: chararray, id2: chararray)}")
def bidirectional(id1, id2):
    if id2 != id1:
        return [ (id1, id2), (id2, id1) ]
        return [ (id1, id2) ]

def normalize(t):
    id1, id2, followed = t;
    if id2>id1:
        return (id1, id2, followed)
        return (id2, id1, followed)

@outputSchema("rels:{t:(id1: chararray, id2: chararray, followed: int)}")
def follow(to_follow_id1, links_id1, links_id2):
    outputBag = [ normalize( (links_id1, links_id2, True) ) ]
    if to_follow_id1 is not None:
        outputBag.append( normalize( (to_follow_id1, links_id2, False) ) )
    return outputBag

@outputSchema("followed: int")
def OR(bag):
    result = False;
    for followed in bag:
        result = result | followed[0]
    return result  

@outputSchema("group: {t: (id: chararray)}")
def SORT(bag):
    return bag

def main():
    # cleanup output directory before starting
    Pig.fs("rmr out/tc")
    Q = Pig.compile("""
    followed = LOAD ‘$followed’ AS (id1: chararray, id2: chararray);
    followed = FOREACH followed GENERATE FLATTEN(bidirectional(id1, id2)), 1 AS followed; — 1 == true
    to_follow = LOAD ‘$to_follow’ AS (id1: chararray, id2: chararray);
    to_follow = FOREACH to_follow GENERATE FLATTEN(bidirectional(id1, id2)), 0 AS followed; — 0 == false
    links = UNION to_follow, followed;
    joined_links = JOIN links BY id1 LEFT, to_follow BY id2;
    new_links_dup =
        FOREACH joined_links
        GENERATE FLATTEN( follow(to_follow::rels::id1, links::rels::id1, links::rels::id2) );

    new_links =
        FOREACH (GROUP new_links_dup BY (id1, id2))
        GENERATE group.id1, group.id2, OR(new_links_dup.followed);

    SPLIT new_links INTO new_followed IF followed != 0, new_to_follow IF followed == 0;
    new_followed = FOREACH new_followed GENERATE id1, id2;
    new_to_follow = FOREACH new_to_follow GENERATE id1, id2;
    STORE new_followed INTO ‘$new_followed’;
    STORE new_to_follow INTO ‘$new_to_follow’;
    to_follow = "data/tc_data_simple"
    followed = "out/tc/followed_0"
    # create empty dataset for first iteration
    Pig.fs("mkdir " + followed)
    Pig.fs("touchz " + followed + "/part-m-00000")
    for i in range(10):
        new_to_follow = "out/tc/to_follow_" + str(i + 1)
        new_followed = "out/tc/followed_" + str(i + 1)
        job = Q.bind().runSingle()
        if not job.isSuccessful():
            raise ‘failed’
        to_follow = new_to_follow
        followed = new_followed
        # detect if we are done
        if not job.result("new_to_follow").iterator().hasNext():

    links = LOAD ‘$followed’ AS (id1: chararray, id2: chararray);
    links = FOREACH links GENERATE FLATTEN( bidirectional(id1, id2) );
    result = DISTINCT (
        FOREACH (GROUP links by id1)
        GENERATE SORT(links.id2));
    STORE result INTO ‘out/tc/groups’;

if __name__ == ‘__main__’:

Output data:


5. Notables

There are a few things to notice in this implementation:
– UDFs are just defined by functions in the same script
– The output schema of the UDF is defined by using the @outputSchema decorator. Pig needs the schema of the output to interpret the statements following the UDF call.
– The native Python structures (dictionary, tuple, list) are used, the conversion to Pig types (Map, tuple, bag) is automatic. This makes the UDF code much more compact.
– UDFs are directly available in embedded Pig scripts using the function names. No other declaration is required.
– We iterate a maximum of 10 times (max diameter = 210 = 1024) and check if the “to follow” relation is empty to decide if we need to stop the iteration. This is done using the JobStats object returned by runSingle().

There’s also an experimental Javascript embedding that I’ll talk about in a future post.


PageRank implementation in Pig

In this post I’m going to give a very simple example of how to use Pig embedded in Python to implement the PageRank algorithm. It goes in a little more details on the same example given in the presentation I gave at the Pig user meetup. If you are interested, Daniel just published a nice K-Means implementation on the HortonWorks blog.

1. What is Pig embedded

Pig 0.9 supports running Python scripts with Pig statements embedded. This is simply run using the pig command:

pig myscript.py

The implementation is using Jython to execute the Python code, not CPython. As a consequence this is Python 2.5 (not 3.0) and you can not use the native extensions. On the other hand you can use the java based extensions and the java APIs (for example: java.util.regex.Matcher).
I find embedding especially useful for simplifying the implementation of iterative algorithms. It makes a difference in the following areas:

  • Loop and exit criteria
  • User Defined Functions
  • Parameter passing

In this post I will not cover UDFs just yet. stay tuned for more.

2. A very simple example: The PageRank implementation

Input format:

www.A.com	1	{ (www.B.com), (www.C.com), (www.D.com), (www.E.com) }
www.B.com	1	{ (www.D.com), (www.E.com) }
www.C.com	1	{ (www.D.com) }
www.D.com	1	{ (www.B.com) }
www.E.com	1	{ (www.A.com) }
www.F.com	1	{ (www.B.com), (www.C.com) }

from org.apache.pig.scripting import *

P = Pig.compile("""
— PR(A) = (1-d) + d (PR(T1)/C(T1) + … + PR(Tn)/C(Tn))

previous_pagerank =
    LOAD ‘$docs_in’
    USING PigStorage(‘\t‘)
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links;
STORE new_pagerank
    INTO ‘$docs_out’
    USING PigStorage(‘\t‘);

params = { ‘d’: ‘0.5’, ‘docs_in’: ‘data/pagerank_data_simple’ }

for i in range(10):
   out = "out/pagerank_data_" + str(i + 1)
   params["docs_out"] = out
   Pig.fs("rmr " + out)
   stats = P.bind(params).runSingle()
   if not stats.isSuccessful():
      raise ‘failed’
   params["docs_in"] = out

Things to notice here: We first “compile()” the pig script, then pass parameters using bind(params) and runSingle() for each iteration. The output of each iteration becomes the input of the previous one.

3. Application

Download from http://downloads.dbpedia.org/3.6/en/page_links_en.nt.bz2

Convert to the expected input format:

A = LOAD 'page_links_en.nt.bz2' using PigStorage(' ') as (url:chararray, p:chararray, link:chararray);
B = GROUP A by url;                                                                                  
C = foreach B generate group as url, 1 as pagerank, A.link as links;                                 
STORE C into 'input';

execute the command line:

pig -Dpython.cachedir=/home/{myuser}/tmp pagerank.py

Get Max and Min from the resulting dataset:

A = LOAD 'pagerank_data_10' AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
C = FOREACH B generate FLATTEN(TOBAG(MAX(A.pagerank), MIN(A.pagerank))) as pagerank;
D = JOIN C BY pagerank, A BY pagerank;
E = FOREACH D GENERATE C::pagerank, A::url;

And the winner is … the United States! (according to en.wikipedia.org)


4. A little more interesting: Stop iterating when a threshold is reached

from org.apache.pig.scripting import *

P = Pig.compile("""
previous_pagerank =
    LOAD ‘$docs_in’
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links,
        FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank;

pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank – pagerank );

max_diff =
        ( GROUP pagerank_diff ALL )
        MAX ( pagerank_diff );

STORE new_pagerank
    INTO ‘$docs_out’;

STORE max_diff
    INTO ‘$max_diff’;


d = 0.5
docs_in= "data/simple"

for i in range(10):
    docs_out = "out/pagerank_data_" + str(i + 1)
    max_diff = "out/max_diff_" + str(i + 1)
    Pig.fs("rmr " + docs_out)
    Pig.fs("rmr " + max_diff)
    stats = P.bind().runSingle()
    if not stats.isSuccessful():
        raise ‘failed’
    max_diff_value = float(str(stats.result("max_diff").iterator().next().get(0)))
    print " max_diff_value = " + str(max_diff_value)
    if max_diff_value < 0.01:
        print "done at iteration " + str(i)
    docs_in = docs_out

Main differences:

  • We compute an extra maxdiff relation that contains a single tuple.
  • Variables can also be bound using the current scope when using the parameterless bind()
  • We can easily check if we reached the expected threshold using the JobStat object returned by runSingle and stop iterating if needed.


The examples are available on github.
Next time I’ll go in the details of a transitive closure implementation using UDFs. (edit: Transitive closure in Pig)
I could also have a post about nicely embedding syntactically colored code snippets in the free wordpress.com 😛