In this post I’m going to give a very simple example of how to use Pig embedded in Python to implement the PageRank algorithm. It goes in a little more details on the same example given in the presentation I gave at the Pig user meetup. If you are interested, Daniel just published a nice K-Means implementation on the HortonWorks blog.
1. What is Pig embedded
Pig 0.9 supports running Python scripts with Pig statements embedded. This is simply run using the pig command:
pig myscript.py
The implementation is using Jython to execute the Python code, not CPython. As a consequence this is Python 2.5 (not 3.0) and you can not use the native extensions. On the other hand you can use the java based extensions and the java APIs (for example: java.util.regex.Matcher).
I find embedding especially useful for simplifying the implementation of iterative algorithms. It makes a difference in the following areas:
- Loop and exit criteria
- User Defined Functions
- Parameter passing
In this post I will not cover UDFs just yet. stay tuned for more.
2. A very simple example: The PageRank implementation
Input format:
www.A.com 1 { (www.B.com), (www.C.com), (www.D.com), (www.E.com) }
www.B.com 1 { (www.D.com), (www.E.com) }
www.C.com 1 { (www.D.com) }
www.D.com 1 { (www.B.com) }
www.E.com 1 { (www.A.com) }
www.F.com 1 { (www.B.com), (www.C.com) }
#!/usr/bin/python
from org.
apache.
pig.
scripting import *
P = Pig.compile("""
– PR(A) = (1-d) + d (PR(T1)/C(T1) + … + PR(Tn)/C(Tn))
previous_pagerank =
LOAD ‘$docs_in’
USING PigStorage(‘\t‘)
AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
outbound_pagerank =
FOREACH previous_pagerank
GENERATE
pagerank / COUNT ( links ) AS pagerank,
FLATTEN ( links ) AS to_url;
new_pagerank =
FOREACH
( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
GENERATE
group AS url,
( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
FLATTEN ( previous_pagerank.links ) AS links;
STORE new_pagerank
INTO ‘$docs_out’
USING PigStorage(‘\t‘);
""")
params = { ‘d’: ’0.5′, ‘docs_in’: ‘data/pagerank_data_simple’ }
for i in range(10):
out = "out/pagerank_data_" + str(i + 1)
params["docs_out"] = out
Pig.fs("rmr " + out)
stats = P.bind(params).runSingle()
if not stats.isSuccessful():
raise ‘failed’
params["docs_in"] = out
Things to notice here: We first “compile()” the pig script, then pass parameters using bind(params) and runSingle() for each iteration. The output of each iteration becomes the input of the previous one.
3. Application
Download from http://downloads.dbpedia.org/3.6/en/page_links_en.nt.bz2
Convert to the expected input format:
A = LOAD 'page_links_en.nt.bz2' using PigStorage(' ') as (url:chararray, p:chararray, link:chararray);
B = GROUP A by url;
C = foreach B generate group as url, 1 as pagerank, A.link as links;
STORE C into 'input';
execute the command line:
pig -Dpython.cachedir=/home/{myuser}/tmp pagerank.py
Get Max and Min from the resulting dataset:
A = LOAD 'pagerank_data_10' AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
B= GROUP A ALL;
C = FOREACH B generate FLATTEN(TOBAG(MAX(A.pagerank), MIN(A.pagerank))) as pagerank;
D = JOIN C BY pagerank, A BY pagerank;
E = FOREACH D GENERATE C::pagerank, A::url;
DUMP E;
And the winner is … the United States! (according to en.wikipedia.org)
(6890.17,<http://dbpedia.org/resource/United_States>)
(0.50003946,<http://dbpedia.org/resource/Ficus_macrophylla_f._columnaris>)
(0.50003946,<http://dbpedia.org/resource/Melaleuca_trichostachya>)
(0.50003946,<http://dbpedia.org/resource/Anetholea_anisata>)
(0.50003946,<http://dbpedia.org/resource/Fieldia_australis>)
(0.50003946,<http://dbpedia.org/resource/Coprosma_nitida>)
(0.50003946,<http://dbpedia.org/resource/Heritiera_actinophylla>)
(0.50003946,<http://dbpedia.org/resource/Calochilus_robertsonii>)
(0.50003946,<http://dbpedia.org/resource/Euphorbia_eremophila>)
(0.50003946,<http://dbpedia.org/resource/Macrotyloma_uniflorum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Phebalium_squamulosum_subsp._squamulosum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_nova-anglica>)
(0.50003946,<http://dbpedia.org/resource/Pellaea_nana>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_nivea>)
(0.50003946,<http://dbpedia.org/resource/Poa_meionectes>)
(0.50003946,<http://dbpedia.org/resource/Akania_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Anthocarapa_nitidula>)
(0.50003946,<http://dbpedia.org/resource/Jasminum_volubile>)
(0.50003946,<http://dbpedia.org/resource/Seringia_arborescens>)
(0.50003946,<http://dbpedia.org/resource/Grammitis_billardierei>)
(0.50003946,<http://dbpedia.org/resource/Utricularia_monanthos>)
(0.50003946,<http://dbpedia.org/resource/Acacia_mitchellii>)
(0.50003946,<http://dbpedia.org/resource/Halosarcia_halocnemoides>)
(0.50003946,<http://dbpedia.org/resource/Calomeria_amaranthoides>)
(0.50003946,<http://dbpedia.org/resource/Tripladenia_cunninghamii>)
(0.50003946,<http://dbpedia.org/resource/Gaultheria_appressa>)
(0.50003946,<http://dbpedia.org/resource/Arytera_distylis>)
(0.50003946,<http://dbpedia.org/resource/Premna_lignum-vitae>)
(0.50003946,<http://dbpedia.org/resource/Drosera_burmanni>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_derwentiana>)
(0.50003946,<http://dbpedia.org/resource/Atalaya_multiflora>)
(0.50003946,<http://dbpedia.org/resource/Callitris_gracilis>)
(0.50003946,<http://dbpedia.org/resource/Salix_x_sepulcralis>)
4. A little more interesting: Stop iterating when a threshold is reached
#!/usr/bin/python
from org.
apache.
pig.
scripting import *
P = Pig.compile("""
previous_pagerank =
LOAD ‘$docs_in’
AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
outbound_pagerank =
FOREACH previous_pagerank
GENERATE
pagerank / COUNT ( links ) AS pagerank,
FLATTEN ( links ) AS to_url;
new_pagerank =
FOREACH
( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
GENERATE
group AS url,
( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
FLATTEN ( previous_pagerank.links ) AS links,
FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank;
pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank – pagerank );
max_diff =
FOREACH
( GROUP pagerank_diff ALL )
GENERATE
MAX ( pagerank_diff );
STORE new_pagerank
INTO ‘$docs_out’;
STORE max_diff
INTO ‘$max_diff’;
""")
d = 0.5
docs_in= "data/simple"
for i in range(10):
docs_out = "out/pagerank_data_" + str(i + 1)
max_diff = "out/max_diff_" + str(i + 1)
Pig.fs("rmr " + docs_out)
Pig.fs("rmr " + max_diff)
stats = P.bind().runSingle()
if not stats.isSuccessful():
raise ‘failed’
max_diff_value = float(str(stats.result("max_diff").iterator().next().get(0)))
print " max_diff_value = " + str(max_diff_value)
if max_diff_value < 0.01:
print "done at iteration " + str(i)
break
docs_in = docs_out
Main differences:
- We compute an extra maxdiff relation that contains a single tuple.
- Variables can also be bound using the current scope when using the parameterless bind()
- We can easily check if we reached the expected threshold using the JobStat object returned by runSingle and stop iterating if needed.
Next…
The examples are available on github.
Next time I’ll go in the details of a transitive closure implementation using UDFs. (edit: Transitive closure in Pig)
I could also have a post about nicely embedding syntactically colored code snippets in the free wordpress.com
Like this:
Like Loading...
Pingback: Transitive Closure in Pig « Julien's tech blog
Thanks a lot for your work!
Got some issues though (pig 0.10.0):
- In the generate clause of new_pagerank, the outbound_pagerank.pagerank can be an empty set, which results in a null value if you perform a SUM on it. This causes the pagerank value of new_pagerank to be null as well, which causes an exception in the pagerank_diff calculation (apply ABS to a null value throws exception).
I dealt with this by changing the calculation of the pagerank value in new_pagerank to (i.e. making the SUM output 0 when there is an empty set):
1 – $d ) + $d * SUM ((IsEmpty(outbound_pagerank.pagerank)? {0F}: outbound_pagerank.pagerank)) AS pagerank,
- The output of the pagerank calc functions as input during the next iteration. However, the structure of the original input differs significantly with the input of the latter iterations.
Row in original input:
http://www.C.com 1 { (www.D.com) }
The output however is of structure
http://www.C.com 1 http://www.D.com 0.5
Forget about my second point, was a bit of a brainfart
Hi Laurens,
Thanks for your comment. I suppose I did not have any document without outgoing links in my case.