Julien's tech blog

talking about tech stuff: stuff I'm interested in, intriguing stuff, random stuff.

Monthly Archives: July 2011

PageRank implementation in Pig

In this post I’m going to give a very simple example of how to use Pig embedded in Python to implement the PageRank algorithm. It goes in a little more details on the same example given in the presentation I gave at the Pig user meetup. If you are interested, Daniel just published a nice K-Means implementation on the HortonWorks blog.

1. What is Pig embedded

Pig 0.9 supports running Python scripts with Pig statements embedded. This is simply run using the pig command:

pig myscript.py

The implementation is using Jython to execute the Python code, not CPython. As a consequence this is Python 2.5 (not 3.0) and you can not use the native extensions. On the other hand you can use the java based extensions and the java APIs (for example: java.util.regex.Matcher).
I find embedding especially useful for simplifying the implementation of iterative algorithms. It makes a difference in the following areas:

  • Loop and exit criteria
  • User Defined Functions
  • Parameter passing

In this post I will not cover UDFs just yet. stay tuned for more.

2. A very simple example: The PageRank implementation

Input format:

www.A.com	1	{ (www.B.com), (www.C.com), (www.D.com), (www.E.com) }
www.B.com	1	{ (www.D.com), (www.E.com) }
www.C.com	1	{ (www.D.com) }
www.D.com	1	{ (www.B.com) }
www.E.com	1	{ (www.A.com) }
www.F.com	1	{ (www.B.com), (www.C.com) }

#!/usr/bin/python
from org.apache.pig.scripting import *

P = Pig.compile("""
— PR(A) = (1-d) + d (PR(T1)/C(T1) + … + PR(Tn)/C(Tn))

previous_pagerank =
    LOAD ‘$docs_in’
    USING PigStorage(‘\t‘)
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
    GENERATE
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
    FOREACH
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
    GENERATE
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links;
       
STORE new_pagerank
    INTO ‘$docs_out’
    USING PigStorage(‘\t‘);
""")

params = { ‘d’: ‘0.5’, ‘docs_in’: ‘data/pagerank_data_simple’ }

for i in range(10):
   out = "out/pagerank_data_" + str(i + 1)
   params["docs_out"] = out
   Pig.fs("rmr " + out)
   stats = P.bind(params).runSingle()
   if not stats.isSuccessful():
      raise ‘failed’
   params["docs_in"] = out
 

Things to notice here: We first “compile()” the pig script, then pass parameters using bind(params) and runSingle() for each iteration. The output of each iteration becomes the input of the previous one.

3. Application

Download from http://downloads.dbpedia.org/3.6/en/page_links_en.nt.bz2

Convert to the expected input format:

A = LOAD 'page_links_en.nt.bz2' using PigStorage(' ') as (url:chararray, p:chararray, link:chararray);
B = GROUP A by url;                                                                                  
C = foreach B generate group as url, 1 as pagerank, A.link as links;                                 
STORE C into 'input';

execute the command line:

pig -Dpython.cachedir=/home/{myuser}/tmp pagerank.py

Get Max and Min from the resulting dataset:

A = LOAD 'pagerank_data_10' AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
B= GROUP A ALL;
C = FOREACH B generate FLATTEN(TOBAG(MAX(A.pagerank), MIN(A.pagerank))) as pagerank;
D = JOIN C BY pagerank, A BY pagerank;
E = FOREACH D GENERATE C::pagerank, A::url;
DUMP E;

And the winner is … the United States! (according to en.wikipedia.org)

(6890.17,<http://dbpedia.org/resource/United_States>)
(0.50003946,<http://dbpedia.org/resource/Ficus_macrophylla_f._columnaris>)
(0.50003946,<http://dbpedia.org/resource/Melaleuca_trichostachya>)
(0.50003946,<http://dbpedia.org/resource/Anetholea_anisata>)
(0.50003946,<http://dbpedia.org/resource/Fieldia_australis>)
(0.50003946,<http://dbpedia.org/resource/Coprosma_nitida>)
(0.50003946,<http://dbpedia.org/resource/Heritiera_actinophylla>)
(0.50003946,<http://dbpedia.org/resource/Calochilus_robertsonii>)
(0.50003946,<http://dbpedia.org/resource/Euphorbia_eremophila>)
(0.50003946,<http://dbpedia.org/resource/Macrotyloma_uniflorum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Phebalium_squamulosum_subsp._squamulosum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_nova-anglica>)
(0.50003946,<http://dbpedia.org/resource/Pellaea_nana>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_nivea>)
(0.50003946,<http://dbpedia.org/resource/Poa_meionectes>)
(0.50003946,<http://dbpedia.org/resource/Akania_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Anthocarapa_nitidula>)
(0.50003946,<http://dbpedia.org/resource/Jasminum_volubile>)
(0.50003946,<http://dbpedia.org/resource/Seringia_arborescens>)
(0.50003946,<http://dbpedia.org/resource/Grammitis_billardierei>)
(0.50003946,<http://dbpedia.org/resource/Utricularia_monanthos>)
(0.50003946,<http://dbpedia.org/resource/Acacia_mitchellii>)
(0.50003946,<http://dbpedia.org/resource/Halosarcia_halocnemoides>)
(0.50003946,<http://dbpedia.org/resource/Calomeria_amaranthoides>)
(0.50003946,<http://dbpedia.org/resource/Tripladenia_cunninghamii>)
(0.50003946,<http://dbpedia.org/resource/Gaultheria_appressa>)
(0.50003946,<http://dbpedia.org/resource/Arytera_distylis>)
(0.50003946,<http://dbpedia.org/resource/Premna_lignum-vitae>)
(0.50003946,<http://dbpedia.org/resource/Drosera_burmanni>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_derwentiana>)
(0.50003946,<http://dbpedia.org/resource/Atalaya_multiflora>)
(0.50003946,<http://dbpedia.org/resource/Callitris_gracilis>)
(0.50003946,<http://dbpedia.org/resource/Salix_x_sepulcralis>)

4. A little more interesting: Stop iterating when a threshold is reached

#!/usr/bin/python
from org.apache.pig.scripting import *

P = Pig.compile("""
previous_pagerank =
    LOAD ‘$docs_in’
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
    GENERATE
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
    FOREACH
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
    GENERATE
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links,
        FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank;

pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank – pagerank );

max_diff =
    FOREACH
        ( GROUP pagerank_diff ALL )
    GENERATE
        MAX ( pagerank_diff );

STORE new_pagerank
    INTO ‘$docs_out’;

STORE max_diff
    INTO ‘$max_diff’;

""")

d = 0.5
docs_in= "data/simple"

for i in range(10):
    docs_out = "out/pagerank_data_" + str(i + 1)
    max_diff = "out/max_diff_" + str(i + 1)
    Pig.fs("rmr " + docs_out)
    Pig.fs("rmr " + max_diff)
    stats = P.bind().runSingle()
    if not stats.isSuccessful():
        raise ‘failed’
    max_diff_value = float(str(stats.result("max_diff").iterator().next().get(0)))
    print " max_diff_value = " + str(max_diff_value)
    if max_diff_value < 0.01:
        print "done at iteration " + str(i)
        break
    docs_in = docs_out
 

Main differences:

  • We compute an extra maxdiff relation that contains a single tuple.
  • Variables can also be bound using the current scope when using the parameterless bind()
  • We can easily check if we reached the expected threshold using the JobStat object returned by runSingle and stop iterating if needed.

Next…

The examples are available on github.
Next time I’ll go in the details of a transitive closure implementation using UDFs. (edit: Transitive closure in Pig)
I could also have a post about nicely embedding syntactically colored code snippets in the free wordpress.com 😛

Detecting low memory in Java Part 2

This is a follow up on my previous post the rationale is explained there.

I ended up spending a little more time on the low memory detection issue and played with the MemoryPoolMXBean. I had found some posts about it but none were satisfying. This post (thanks @techmilind for pointing it out) lead me in the right direction even though it is partially incorrect. Experimenting with a monotonically increasing memory usage is a very special (and invalid) case.
In particular the setUsageThreshold() is not very useful in my case as it is triggered the very first time we use that much memory, regardless of imminent garbage collection. As discussed in my previous post it is useful only if the available memory is measured right after garbage collection.
However setCollectionUsageThreshold() is exactly what I need. This is setting a threshold for notification when the memory is low right after a GC.

We need to do this in two steps, first set a collectionUsageThreshold on the tenured partition.

// heuristic to find the tenured pool (largest heap) as seen on http://www.javaspecialists.eu/archive/Issue092.html
MemoryPoolMXBean tenuredGenPool = null;
for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
  if (pool.getType() == MemoryType.HEAP && pool.isUsageThresholdSupported()) {
    tenuredGenPool = pool;
  }
}
// we do something when we reached 80% of memory usage
tenuredGenPool.setCollectionUsageThreshold((int)Math.floor(tenuredGenPool.getUsage().getMax()*0.8));

Here tenuredGenPool.getCollectionUsage() is the memory usage as measured right after the last garbage collection. This is the value that we can rely on to detect low memory.

Then we setup a listener to get notified. Only MEMORY_COLLECTION_THRESHOLD_EXCEEDED is interesting as explained before.

//set a listener
MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(new NotificationListener() {
public void handleNotification(Notification n, Object hb) {
if (n.getType().equals(
  MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
   // this is the signal => end the application early to avoid OOME
}
}}, null, null);

My experiment slowly increases the memory usage while also freeing up a bunch of existing object. This shows that the MEMORY_THRESHOLD_EXCEEDED is reached pretty quickly when most of the memory gets cleaned afterwards. MEMORY_COLLECTION_THRESHOLD_EXCEEDED is triggered when we actually fill up the memory.

edit: actual code explains it better than words.

Here is some test code to show how that works:

https://github.com/julienledem/blog/blob/master/2011/07/21/detecting-low-memory-in-java-part-2/MemoryTest.java

sample output: (only the last line is the warning we want)

it=   0 - Par Eden Space: u= 49% cu=  0% th=0% - Par Survivor Space: u=  0% cu=  0% th=0% - CMS Old Gen: u=  0% cu=  0% th=79%
it= 100 - Par Eden Space: u= 16% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 35% cu=  0% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u= 92% cu=  0% th=0% - Par Survivor Space: u= 96% cu= 96% th=0% - CMS Old Gen: u= 86% cu=  0% th=79%
it= 200 - Par Eden Space: u= 84% cu= 45% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 19% cu= 19% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  1% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 83% cu= 19% th=79%
it= 300 - Par Eden Space: u= 49% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 79% cu= 74% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  4% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 84% cu= 74% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  0% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 83% cu= 33% th=79%
it= 400 - Par Eden Space: u= 15% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 51% cu= 38% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  6% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 83% cu= 38% th=79%
it= 500 - Par Eden Space: u= 79% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 59% cu= 46% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  0% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 84% cu= 51% th=79%
it= 600 - Par Eden Space: u= 43% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 64% cu= 56% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  0% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 84% cu= 56% th=79%
memory collection threshold exceeded !!! : 
        - Par Eden Space: u= 19% cu=  9% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 85% cu= 85% th=79%

This is going to be my tech blog

Maintaining this is going to be a challenge but I’ll give it a try.

Here are some topics I plan to talk about (in no particular order):

  • Embedding Pig in Python/Javascript (edit: PageRank implementation in Pig and Transitive closure in Pig)
  • Simple Ray tracing tutorial
  • winmine.exe player
  • Agility (in software development)
  • productive development environment for web services
  • Map/Reduce in the browser for fast experimentation
  • Schema based vs Schema less on the grid

Of course it is going to take me forever to write about all of this so if you have interest in some of those, let me know.

Detecting low memory in Java

This seems to be a common need and difficult thing to do in Java. Here is my solution, let me know what you think.

1. The problem

I am focusing on only one aspect of the problem as I have a very specific use case. I have a piece of code that accumulates information about a (possibly big) data set. Let’s assume I have a very big file that contains a list of records encoded in JSON and that I iterate on the records to accumulate statistics about the data. This is close enough to what I’m doing. The result of the operation is a report containing the schema inferred from the data and statistics about the fields (max/min/avg size, list of unique values and count if bounded, type, …). This is implemented as an Algebraic UDF (so that it can use the Combiner) in a Pig script, so there’s a fair amount of the code that I don’t control. If the data is homogenous everything is fine and it generates a rather small report with a lot of interesting information about the data. Now if the data is random enough (dynamic keys, …) it will eventually eat up all the memory until it fails. The first thing to do is to make it configurable to limit the number of fields/values/… it will accumulate but getting this configuration right is still black magic and unsatisfying. What I really want is keep accumulating until I run out of memory and output what I can. Of course without getting to the dreaded OutOfMemoryError as it can be thrown anywhere and most often outside of my code where I can’t catch it. One important point here: I don’t want false positives.

2. Trying out things

First I looked into java.lang.Runtime.{free|max|total}Memory() and the more precise MemoryPoolMXBean. This tells me how much memory I am using and I can even set a usage threshold to be notified. Solved? Not really. The trouble is that reaching a level of usage does not mean you’re going to run out of memory. The unreachable objects do not get garbage collected until it is needed. To simplify we will first eat up all the memory then the garbage collector will free old objects and again and again. Graphing memory usage will show an upward trend with a drop every time GC kicks in. One way to make the value returned by freeMemory() more accurate is to force a garbage collection right before, but of course this slows down the application drastically; there’s a reason the garbage collector runs only when needed. Also there’s a special type of OutOfMemoryError (“GC overhead limit exceeded”) that gets thrown when you spend over a certain percentage of time in GC and you could artificially trigger it. Another way to look at this is to read the value of freeMemory() right after a (full) GC, but there’s no way to get notified of GC from the java side. It seems you need to write an agent in C which exceeded by far the complexity threshold I had set for the solution. You could also imagine polling GarbageCollectorMXBean which knows how many GCs happened (current>previous ? get freeMemory() ) but I did not try that (there was also a threshold on the time I spent on this 🙂 ) and I’m not sure how reliable it would be.

3. My solution

I settled for something very different which happens to be triggered by the garbage collector when you are close to get an OutOfMemoryError. I initialize a byte array and I set it in a SoftReference. The byte array is big enough so that freeing it will give me enough memory to finish what I’m doing gracefully. This acts like canaries in coal mines: the SoftReference “dies” as a warning that we are out of memory.

canary = new SoftReference<Object>(new byte[bufferSize])

Now in each iteration I can check canary.get() == null as a signal that I’m running low on memory before actually getting an OutOfMemoryError (remember that it can be thrown somewhere I can not catch it). You could also use a ReferenceQueue to get notified of this. A SoftReference is how you tell the JVM that you’d rather keep the object but that it can be freed when there’s a need.

Experimentation monitoring the memory and GCs shows that the SoftReference actually gets freed in last resort and not before so it fits the bill.

One big inconvenient of this is that the buffer is actually using memory that can not be used for something else. In particular the less memory is left, the more time the GC takes. The consequence is that the application will slow down a lot right before freeing the buffer, delaying the detection. My experiments showed that it was acceptable for my use case. The big advantage: this is a very simple solution.

If you have an opinion about this, please comment.

edit: I have posted an update regarding the MemoryPoolMXBean.