In this post I’m going to give a very simple example of how to use Pig embedded in Python to implement the PageRank algorithm. It goes in a little more details on the same example given in the presentation I gave at the Pig user meetup. If you are interested, Daniel just published a nice K-Means implementation on the HortonWorks blog.
1. What is Pig embedded
Pig 0.9 supports running Python scripts with Pig statements embedded. This is simply run using the pig command:
pig myscript.py
The implementation is using Jython to execute the Python code, not CPython. As a consequence this is Python 2.5 (not 3.0) and you can not use the native extensions. On the other hand you can use the java based extensions and the java APIs (for example: java.util.regex.Matcher).
I find embedding especially useful for simplifying the implementation of iterative algorithms. It makes a difference in the following areas:
- Loop and exit criteria
- User Defined Functions
- Parameter passing
In this post I will not cover UDFs just yet. stay tuned for more.
2. A very simple example: The PageRank implementation
Input format:
www.A.com 1 { (www.B.com), (www.C.com), (www.D.com), (www.E.com) }
www.B.com 1 { (www.D.com), (www.E.com) }
www.C.com 1 { (www.D.com) }
www.D.com 1 { (www.B.com) }
www.E.com 1 { (www.A.com) }
www.F.com 1 { (www.B.com), (www.C.com) }
#!/usr/bin/python
from org.
apache.
pig.
scripting import *
P = Pig.compile("""
– PR(A) = (1-d) + d (PR(T1)/C(T1) + … + PR(Tn)/C(Tn))
previous_pagerank =
LOAD ‘$docs_in’
USING PigStorage(‘\t‘)
AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
outbound_pagerank =
FOREACH previous_pagerank
GENERATE
pagerank / COUNT ( links ) AS pagerank,
FLATTEN ( links ) AS to_url;
new_pagerank =
FOREACH
( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
GENERATE
group AS url,
( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
FLATTEN ( previous_pagerank.links ) AS links;
STORE new_pagerank
INTO ‘$docs_out’
USING PigStorage(‘\t‘);
""")
params = { ‘d’: ’0.5′, ‘docs_in’: ‘data/pagerank_data_simple’ }
for i in range(10):
out = "out/pagerank_data_" + str(i + 1)
params["docs_out"] = out
Pig.fs("rmr " + out)
stats = P.bind(params).runSingle()
if not stats.isSuccessful():
raise ‘failed’
params["docs_in"] = out
Things to notice here: We first “compile()” the pig script, then pass parameters using bind(params) and runSingle() for each iteration. The output of each iteration becomes the input of the previous one.
3. Application
Download from http://downloads.dbpedia.org/3.6/en/page_links_en.nt.bz2
Convert to the expected input format:
A = LOAD 'page_links_en.nt.bz2' using PigStorage(' ') as (url:chararray, p:chararray, link:chararray);
B = GROUP A by url;
C = foreach B generate group as url, 1 as pagerank, A.link as links;
STORE C into 'input';
execute the command line:
pig -Dpython.cachedir=/home/{myuser}/tmp pagerank.py
Get Max and Min from the resulting dataset:
A = LOAD 'pagerank_data_10' AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
B= GROUP A ALL;
C = FOREACH B generate FLATTEN(TOBAG(MAX(A.pagerank), MIN(A.pagerank))) as pagerank;
D = JOIN C BY pagerank, A BY pagerank;
E = FOREACH D GENERATE C::pagerank, A::url;
DUMP E;
And the winner is … the United States! (according to en.wikipedia.org)
(6890.17,<http://dbpedia.org/resource/United_States>)
(0.50003946,<http://dbpedia.org/resource/Ficus_macrophylla_f._columnaris>)
(0.50003946,<http://dbpedia.org/resource/Melaleuca_trichostachya>)
(0.50003946,<http://dbpedia.org/resource/Anetholea_anisata>)
(0.50003946,<http://dbpedia.org/resource/Fieldia_australis>)
(0.50003946,<http://dbpedia.org/resource/Coprosma_nitida>)
(0.50003946,<http://dbpedia.org/resource/Heritiera_actinophylla>)
(0.50003946,<http://dbpedia.org/resource/Calochilus_robertsonii>)
(0.50003946,<http://dbpedia.org/resource/Euphorbia_eremophila>)
(0.50003946,<http://dbpedia.org/resource/Macrotyloma_uniflorum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Phebalium_squamulosum_subsp._squamulosum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_nova-anglica>)
(0.50003946,<http://dbpedia.org/resource/Pellaea_nana>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_nivea>)
(0.50003946,<http://dbpedia.org/resource/Poa_meionectes>)
(0.50003946,<http://dbpedia.org/resource/Akania_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Anthocarapa_nitidula>)
(0.50003946,<http://dbpedia.org/resource/Jasminum_volubile>)
(0.50003946,<http://dbpedia.org/resource/Seringia_arborescens>)
(0.50003946,<http://dbpedia.org/resource/Grammitis_billardierei>)
(0.50003946,<http://dbpedia.org/resource/Utricularia_monanthos>)
(0.50003946,<http://dbpedia.org/resource/Acacia_mitchellii>)
(0.50003946,<http://dbpedia.org/resource/Halosarcia_halocnemoides>)
(0.50003946,<http://dbpedia.org/resource/Calomeria_amaranthoides>)
(0.50003946,<http://dbpedia.org/resource/Tripladenia_cunninghamii>)
(0.50003946,<http://dbpedia.org/resource/Gaultheria_appressa>)
(0.50003946,<http://dbpedia.org/resource/Arytera_distylis>)
(0.50003946,<http://dbpedia.org/resource/Premna_lignum-vitae>)
(0.50003946,<http://dbpedia.org/resource/Drosera_burmanni>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_derwentiana>)
(0.50003946,<http://dbpedia.org/resource/Atalaya_multiflora>)
(0.50003946,<http://dbpedia.org/resource/Callitris_gracilis>)
(0.50003946,<http://dbpedia.org/resource/Salix_x_sepulcralis>)
4. A little more interesting: Stop iterating when a threshold is reached
#!/usr/bin/python
from org.
apache.
pig.
scripting import *
P = Pig.compile("""
previous_pagerank =
LOAD ‘$docs_in’
AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
outbound_pagerank =
FOREACH previous_pagerank
GENERATE
pagerank / COUNT ( links ) AS pagerank,
FLATTEN ( links ) AS to_url;
new_pagerank =
FOREACH
( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
GENERATE
group AS url,
( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
FLATTEN ( previous_pagerank.links ) AS links,
FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank;
pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank – pagerank );
max_diff =
FOREACH
( GROUP pagerank_diff ALL )
GENERATE
MAX ( pagerank_diff );
STORE new_pagerank
INTO ‘$docs_out’;
STORE max_diff
INTO ‘$max_diff’;
""")
d = 0.5
docs_in= "data/simple"
for i in range(10):
docs_out = "out/pagerank_data_" + str(i + 1)
max_diff = "out/max_diff_" + str(i + 1)
Pig.fs("rmr " + docs_out)
Pig.fs("rmr " + max_diff)
stats = P.bind().runSingle()
if not stats.isSuccessful():
raise ‘failed’
max_diff_value = float(str(stats.result("max_diff").iterator().next().get(0)))
print " max_diff_value = " + str(max_diff_value)
if max_diff_value < 0.01:
print "done at iteration " + str(i)
break
docs_in = docs_out
Main differences:
- We compute an extra maxdiff relation that contains a single tuple.
- Variables can also be bound using the current scope when using the parameterless bind()
- We can easily check if we reached the expected threshold using the JobStat object returned by runSingle and stop iterating if needed.
Next…
The examples are available on github.
Next time I’ll go in the details of a transitive closure implementation using UDFs. (edit: Transitive closure in Pig)
I could also have a post about nicely embedding syntactically colored code snippets in the free wordpress.com
Like this:
Be the first to like this post.
Pingback: Transitive Closure in Pig « Julien's tech blog