Julien's tech blog

talking about tech stuff: stuff I'm interested in, intriguing stuff, random stuff.

PageRank implementation in Pig

In this post I’m going to give a very simple example of how to use Pig embedded in Python to implement the PageRank algorithm. It goes in a little more details on the same example given in the presentation I gave at the Pig user meetup. If you are interested, Daniel just published a nice K-Means implementation on the HortonWorks blog.

1. What is Pig embedded

Pig 0.9 supports running Python scripts with Pig statements embedded. This is simply run using the pig command:

pig myscript.py

The implementation is using Jython to execute the Python code, not CPython. As a consequence this is Python 2.5 (not 3.0) and you can not use the native extensions. On the other hand you can use the java based extensions and the java APIs (for example: java.util.regex.Matcher).
I find embedding especially useful for simplifying the implementation of iterative algorithms. It makes a difference in the following areas:

  • Loop and exit criteria
  • User Defined Functions
  • Parameter passing

In this post I will not cover UDFs just yet. stay tuned for more.

2. A very simple example: The PageRank implementation

Input format:

www.A.com	1	{ (www.B.com), (www.C.com), (www.D.com), (www.E.com) }
www.B.com	1	{ (www.D.com), (www.E.com) }
www.C.com	1	{ (www.D.com) }
www.D.com	1	{ (www.B.com) }
www.E.com	1	{ (www.A.com) }
www.F.com	1	{ (www.B.com), (www.C.com) }

#!/usr/bin/python
from org.apache.pig.scripting import *

P = Pig.compile("""
— PR(A) = (1-d) + d (PR(T1)/C(T1) + … + PR(Tn)/C(Tn))

previous_pagerank =
    LOAD ‘$docs_in’
    USING PigStorage(‘\t‘)
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
    GENERATE
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
    FOREACH
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
    GENERATE
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links;
       
STORE new_pagerank
    INTO ‘$docs_out’
    USING PigStorage(‘\t‘);
""")

params = { ‘d’: ‘0.5’, ‘docs_in’: ‘data/pagerank_data_simple’ }

for i in range(10):
   out = "out/pagerank_data_" + str(i + 1)
   params["docs_out"] = out
   Pig.fs("rmr " + out)
   stats = P.bind(params).runSingle()
   if not stats.isSuccessful():
      raise ‘failed’
   params["docs_in"] = out
 

Things to notice here: We first “compile()” the pig script, then pass parameters using bind(params) and runSingle() for each iteration. The output of each iteration becomes the input of the previous one.

3. Application

Download from http://downloads.dbpedia.org/3.6/en/page_links_en.nt.bz2

Convert to the expected input format:

A = LOAD 'page_links_en.nt.bz2' using PigStorage(' ') as (url:chararray, p:chararray, link:chararray);
B = GROUP A by url;                                                                                  
C = foreach B generate group as url, 1 as pagerank, A.link as links;                                 
STORE C into 'input';

execute the command line:

pig -Dpython.cachedir=/home/{myuser}/tmp pagerank.py

Get Max and Min from the resulting dataset:

A = LOAD 'pagerank_data_10' AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
B= GROUP A ALL;
C = FOREACH B generate FLATTEN(TOBAG(MAX(A.pagerank), MIN(A.pagerank))) as pagerank;
D = JOIN C BY pagerank, A BY pagerank;
E = FOREACH D GENERATE C::pagerank, A::url;
DUMP E;

And the winner is … the United States! (according to en.wikipedia.org)

(6890.17,<http://dbpedia.org/resource/United_States>)
(0.50003946,<http://dbpedia.org/resource/Ficus_macrophylla_f._columnaris>)
(0.50003946,<http://dbpedia.org/resource/Melaleuca_trichostachya>)
(0.50003946,<http://dbpedia.org/resource/Anetholea_anisata>)
(0.50003946,<http://dbpedia.org/resource/Fieldia_australis>)
(0.50003946,<http://dbpedia.org/resource/Coprosma_nitida>)
(0.50003946,<http://dbpedia.org/resource/Heritiera_actinophylla>)
(0.50003946,<http://dbpedia.org/resource/Calochilus_robertsonii>)
(0.50003946,<http://dbpedia.org/resource/Euphorbia_eremophila>)
(0.50003946,<http://dbpedia.org/resource/Macrotyloma_uniflorum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Phebalium_squamulosum_subsp._squamulosum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_nova-anglica>)
(0.50003946,<http://dbpedia.org/resource/Pellaea_nana>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_nivea>)
(0.50003946,<http://dbpedia.org/resource/Poa_meionectes>)
(0.50003946,<http://dbpedia.org/resource/Akania_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Anthocarapa_nitidula>)
(0.50003946,<http://dbpedia.org/resource/Jasminum_volubile>)
(0.50003946,<http://dbpedia.org/resource/Seringia_arborescens>)
(0.50003946,<http://dbpedia.org/resource/Grammitis_billardierei>)
(0.50003946,<http://dbpedia.org/resource/Utricularia_monanthos>)
(0.50003946,<http://dbpedia.org/resource/Acacia_mitchellii>)
(0.50003946,<http://dbpedia.org/resource/Halosarcia_halocnemoides>)
(0.50003946,<http://dbpedia.org/resource/Calomeria_amaranthoides>)
(0.50003946,<http://dbpedia.org/resource/Tripladenia_cunninghamii>)
(0.50003946,<http://dbpedia.org/resource/Gaultheria_appressa>)
(0.50003946,<http://dbpedia.org/resource/Arytera_distylis>)
(0.50003946,<http://dbpedia.org/resource/Premna_lignum-vitae>)
(0.50003946,<http://dbpedia.org/resource/Drosera_burmanni>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_derwentiana>)
(0.50003946,<http://dbpedia.org/resource/Atalaya_multiflora>)
(0.50003946,<http://dbpedia.org/resource/Callitris_gracilis>)
(0.50003946,<http://dbpedia.org/resource/Salix_x_sepulcralis>)

4. A little more interesting: Stop iterating when a threshold is reached

#!/usr/bin/python
from org.apache.pig.scripting import *

P = Pig.compile("""
previous_pagerank =
    LOAD ‘$docs_in’
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
    GENERATE
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
    FOREACH
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
    GENERATE
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links,
        FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank;

pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank – pagerank );

max_diff =
    FOREACH
        ( GROUP pagerank_diff ALL )
    GENERATE
        MAX ( pagerank_diff );

STORE new_pagerank
    INTO ‘$docs_out';

STORE max_diff
    INTO ‘$max_diff';

""")

d = 0.5
docs_in= "data/simple"

for i in range(10):
    docs_out = "out/pagerank_data_" + str(i + 1)
    max_diff = "out/max_diff_" + str(i + 1)
    Pig.fs("rmr " + docs_out)
    Pig.fs("rmr " + max_diff)
    stats = P.bind().runSingle()
    if not stats.isSuccessful():
        raise ‘failed’
    max_diff_value = float(str(stats.result("max_diff").iterator().next().get(0)))
    print " max_diff_value = " + str(max_diff_value)
    if max_diff_value < 0.01:
        print "done at iteration " + str(i)
        break
    docs_in = docs_out
 

Main differences:

  • We compute an extra maxdiff relation that contains a single tuple.
  • Variables can also be bound using the current scope when using the parameterless bind()
  • We can easily check if we reached the expected threshold using the JobStat object returned by runSingle and stop iterating if needed.

Next…

The examples are available on github.
Next time I’ll go in the details of a transitive closure implementation using UDFs. (edit: Transitive closure in Pig)
I could also have a post about nicely embedding syntactically colored code snippets in the free wordpress.com :P

About these ads

4 responses to “PageRank implementation in Pig

  1. Pingback: Transitive Closure in Pig « Julien's tech blog

  2. Laurens February 22, 2013 at 7:59 am

    Thanks a lot for your work!
    Got some issues though (pig 0.10.0):
    – In the generate clause of new_pagerank, the outbound_pagerank.pagerank can be an empty set, which results in a null value if you perform a SUM on it. This causes the pagerank value of new_pagerank to be null as well, which causes an exception in the pagerank_diff calculation (apply ABS to a null value throws exception).
    I dealt with this by changing the calculation of the pagerank value in new_pagerank to (i.e. making the SUM output 0 when there is an empty set):
    1 – $d ) + $d * SUM ((IsEmpty(outbound_pagerank.pagerank)? {0F}: outbound_pagerank.pagerank)) AS pagerank,
    – The output of the pagerank calc functions as input during the next iteration. However, the structure of the original input differs significantly with the input of the latter iterations.
    Row in original input:
    http://www.C.com 1 { (www.D.com) }
    The output however is of structure
    http://www.C.com 1 http://www.D.com 0.5

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: