Julien's tech blog

talking about tech stuff: stuff I'm interested in, intriguing stuff, random stuff.

Dremel made simple with Parquet

I recently published a post on the twitter engineering blog about Parquet and how it stores nested data structures.

You can read it there: https://blog.twitter.com/2013/dremel-made-simple-with-parquet

Java JIT compiler inlining

As you know the Java Virtual Machine (JVM) optimizes the java bytecode at runtime using a just-in-time-compiler (JIT). However the exact behavior of the JIT is hard to predict and documentation is scarce. You probably know that the JIT will try to inline frequently called methods in order to avoid the overhead of method invocation. But you may not realize that the heuristic it uses depends on both how often a method is invoked and also on how big it is. Methods that are too big can not be inlined without bloating the call sites.

Keeping this heuristic in mind and enabling flags on the java command line, we can find places where we can help the JIT better optimize our code by breaking large methods into smaller hot methods that can usefully be inlined.

The JIT aggressively inlines methods, removing the overhead of method calls. Methods that can be inlined include static, private or final methods but also public methods if it can be determined that they are not overridden. Because of this, subsequent class loading can invalidate the previously generated code. Because inlining every method everywhere would take time and would generate an unreasonably big binary, the JIT compiler inlines the hot methods first until it reaches a threshold. To determine which methods are hot, the JVM keeps counters to see how many times a method is called and how many loop iterations it has executed. This means that inlining happens only after a steady state has been reached, so you need to repeat the operations a certain number of times before there is enough profiling information available for the JIT compiler to do its job.

Rather than trying to guess what the JIT is doing, you can take a peek at what’s happening by turning on java command line flags: -XX:+PrintCompilation -XX:+UnlockDiagnosticVMOptions -XX:+PrintInlining

Here is what they do:

  • -XX:+PrintCompilation: logs when JIT compilation happens
  • -XX:+UnlockDiagnosticVMOptions: enables other flags like -XX:+PrintInlining
  • -XX:+PrintInlining: prints what methods get inlined and where

Turning those flags on, you will see the JVM print out compilation information to the standard output.

Inlined methods are displayed as a tree with its leaves annotated:

  • inline (hot): the method was determined hot and inlined
  • too big: the method was not inlined as the generated code was getting too big (but the method was not hot)
  • hot method too big: the method was determined hot but not inlined because the resulting code was getting too big.

In the output, you want to make sure hot methods get inlined. Seeing “too big” on its own is nothing to worry about, as the cost of a method call that does not happen often is negligible. It is when you see “hot method too big” that you want to take a closer look and find out how you can make the JIT compiler’s life easier. The compiler works at the granularity of the method and inlining is an all-or-nothing operation: big methods reduce opportunities for inlining.

To illustrate the theory let’s take a look at the decoding implementation for Parquet, a columnar file format for Hadoop. I used the -XX:+PrintInlining flag to look at how methods get inlined and saw an instance of “hot method too big”.

! @ 1 parquet.column.impl.ColumnReaderImpl::checkRead (492 bytes)   hot method too big

For every value read in a column it checks if the current page is fully consumed and reads a page if it needs more data to process.

private void checkRead() {
  if (isPageFullyConsumed()) {
     //read page
     … code for reading a page

The code reading the content of the page is right there in the method making it too big to be inlined. The method is called for every value we read (which is very frequent) but the test is true only when we are done reading the page (which is rare).

We can improve it by modifying the method as follows:

private void checkRead() {
  if (isPageFullyConsumed()) {

private void readPage() {
  //read page
  … code for reading a page

Now the checkRead(), isPageFullyConsumed() and read() methods get inlined removing those method calls from the hot loop. The readPage() call does not get inlined but as it does not happen often, the cost is negligible. The change on github.

To generalize the principle, when there is a hot method containing tests that evaluate rarely to true, it could be a good idea to break the content of the if statement (or switch or for) into a separate method, increasing the granularity at which the JIT compiler can optimize the code. This is of course after you used -XX:+PrintInlining to determine there is something to improve in the first place.

The morale of this story is that I made my code faster by adding a method call.

thanks @peterseibel for the feedback.

Java Classloader tips

This post is not trying to exhaustively describe classloaders. I merely intend to give some hints about use of classloaders and traps to avoid.

1. Introduction

A Classloader is responsible for loading the code for a class based on its name. The mechanism for loading a class depends on the context. In Java, linking is dynamic: java bytecode contains references to other classes, methods and fields by name. The address pointers are determined at class loading time.

In a simple java process you will find the following classloaders:

– The Bootstrap classloader: Loads the classes from the JDK. As those classes do not change, the Classloader can cache the result of dynamic linking to ensure a faster startup time of the JVM.

– The Application classloader: It loads classes by looking up directories and jars defined in the classpath. The first class encountered wins. Having multiple times the same class name (but a different version) on the classpath can have unexpected side effects. A typical issue is resolving dependency conflicts when compiling against third party jars, which often involves replacing a version of a jar by a “compatible” one. If the signature of a class differs at runtime you may experience various Errors (not Exceptions). These errors will happen only when the classes are used an loaded, so the problem may not show up until you reach the code path depending on it.

2. Classloader Delegation

In a given Java process, the classloaders are organized in a hierarchy. Each classloader has a parent and the bootstrap classloader is the root of the tree. When attempting to load a class a classloader is supposed to first delegate to its parent (recursively until the root) and if the parent can not load the class will use its own mechanism to load it. However this is just a convention and a given implementation can choose to not delegate to the parent first. In that case the class loaded by the parent would be hidden. For example the WebapplicationClassloader in Tomcat looks up for classes in the webapp first. It also forbids loading sevlet.jar in the webapp itself as this would prevent the webapp from working. The advantage is that webapps can use a different versions of a given library than the container itself.

3. Class loading

– Implicit: The dependencies of a class will be loaded by the same classloader that loaded the dependent class, by the same mechanism used with Class.forName(className). If the version of those compile time dependencies is different and not binary compatible from the one used during compilation a runtime error is raised.

– Explicit: The other way of loading classes is to explicitly call a Classloader.loadClass(className). Note that to initialize instances of such classes you need to use reflection, typically using Class.newInstance(). All their dependencies will be loaded implicitly by the same classloader. Usually, classes loaded dynamically extend a class or interface in the parent classloader so that they can be called by the classes in the parent classloader.

– Deserialization: The java serialization format contains the class names along with the data. When deserializing classes will be looked-up in the current classloader. If you serialize instances of classes loaded dynamically, you need to explicitly override the class look-up mechanism in ObjectInputStream.resolveClass(ObjectStreamClass desc).

4. Usage examples

– Loading classes that were not available at startup. This is typically used to extend the behavior of an application. The application provides an interface that can be implemented by various extensions bundled in jars. Creating an instance of URLClassLoader is a very easy way to do this.


ClassLoader cl = new URLClassLoader(new URL[] {new URL("file:/path/to/extension.jar")});
MyInterface extension = (MyInterface)cl.loadClass(className).newInstance();

The jar could as easily be located on a remote server (“http://myhost/extension.jar”)

For example, this is how Pig loads REGISTERed jars on the frontend to initialize UDFs. JDBC drivers could be loaded the same way.

– Bytecode rewriting or class generation. Overloading the loadClass method of a URLClassLoader is a convenient way of modifying the bytecode or generating new classes on the fly. A simple example is to add profiling on every method call by timing it. Another one is to add transactional support to an API by dynamically extending it.

– Allowing different versions of the same class to coexist in the same Java process. Typically in Application servers, the actual implementation of the server lives in a different classloader than the application themselves to avoid dependency conflicts.

5. Fun facts

– Siblings classloaders (two classloaders who are in each other’s hierarchy) can load the same class independently, possibly from the same location. In that case the two classes are different from the JVM point of view. As a consequence casting an instance of this class from one of the classloader to the class from the other will result in a ClassCastException (error messages will show the exact same name but the classes are different!).

// assuming /path/to/extension.jar is not on the classpath
ClassLoader cl1 = new URLClassLoader(new URL[] {new URL("file:/path/to/extension.jar")});
ClassLoader cl2 = new URLClassLoader(new URL[] {new URL("file:/path/to/extension.jar")});
Class foo1 = cl1.loadClass("Foo");
Class foo2 = cl2.loadClass("Foo");
foo2.isAssignableFrom(foo1); // false

– If a class is in the parent class loader it takes precedence and will never be loaded by a child classloader (unless defined otherwise, like in the WebappClassLoader, but in that case it is a different class from the one in the parent class loader)

– Classloader hierarchies should follow the dependency hierarchy of the classes they are trying to load. The child class loader sees the classes in the parent, not the other way around. Loading a jar at runtime in a classloader whose parent contains classes which have compile time dependencies on the same jar will cause runtime errors.

– Classloaders do not retry to initialize a class. If an exception is thrown in a static initializer the classloader will throw a NoClassDefFoundError. Subsequent calls will throw the same exception even if the cause was transient. Do not initialize database connections in a static initializer!

– The thread ContextClassLoader is a convention used for example by Application Servers. In that case threads are managed by the server but the code executed comes from the webapps deployed. Setting Thread.setContextClassLoader(ClassLoader) does not change the class loading behavior of the JVM, it is up to the libraries using this convention to use it explicitly.

Exception handling, Checked vs Unchecked exceptions, …

Some thoughts in random order about exceptions in Java, as they often get overlooked. The following points are often related. Mostly I’m talking to myself here, don’t take it too personally when I say “you should do this” 🙂 just imagine I’m the guy from Memento and that I tattoo myself with my blog posts. Feel free to disagree/add your own views in the comments (though I may not get your opinion tattooed on my body). Here I’m assuming we are building a library that others will use.

1. Exceptions are part of the contract

When you declare a method, it defines a contract for the level of abstraction of the class. The exceptions are part of this contract; exceptions thrown by a method should be relevant to the level of abstraction. Just throwing the exceptions from the underlying layer is letting the implementation details slip through the interface. (See the point about chaining exceptions)

Example :
You write a login method implemented by querying an underlying DB. It should throw a LoginFailedException (identifier or password incorrect), it should not throw a SQLException. LoginFailedException extends a base class that you define (extending java.lang.Exception) and is common to all your checked exceptions. (See following point).

2. Have base classes for the checked and unchecked exceptions

Sometimes it is useful to be able to catch exceptions of one specific library at a higher level. For example, catching all the runtime exceptions that have not been handled by the library. That way they can be caught in a single catch block. In Java 7 you will be able to catch multiple exceptions in a single block. Inheritance for exception is just a category tree even if usually that’s not how you want to think of inheritance.

3. Do not throw java.lang.Exception and make sure that declared exceptions are actually thrown

Throwing java.lang.Exception hides the different cases that you may have to handle depending of what type of exception it is. Checked exceptions is a handy way of making sure you don’t forget to handle error cases, as long as you declare them correctly. People using your library will swear (or maybe it’s just me. Either way, if I’m going to use your code, please do it for my co-workers). In general design your API in a way that does not force people to declare catch blocks in cases when exceptions are never thrown. For examplenew String(bytes, “UTF-8”) throws UnsupportedEncodingException even though UTF-8 is always supported. In java 6 new String(byte[], Charset) was added to avoid this.

4. Checked vs Unchecked exceptions

Checked exceptions are for cases where the caller should do something about the error. This is for exceptional cases that should be handled. For example, if the login failed, you should problably display an error and ask to retry. The users will dometimes mistype their password and you should always handle it.
Unchecked exception are for run time errors caused by bugs or unexpected failures when the caller could not possibly do something about it and the default expected behavior is just to fail. Most of the time you want to centrally handle those at the top of the stack to display an error message or send an alert to the monitoring system. The caller can still catch it if it wants to, but it does not have to.
For example, if the data base refuses the connection you may throw a DatabaseUnavailableException that extends a base class (extending java.lang.RuntimeException) common to all your uncaught exceptions. You could have an intermediary layer that will retry the transaction or a top level apologetic error message asking to come back later. The main point is that the exception is not dealt with where it is thrown.

5. Chaining Exceptions

Since Java 1.4 all exceptions can be chained. When you catch an Exception and throw a new one related to your level of abstraction, you should chain the original one to make sure you have all available information to fix a problem. A good stack trace tells you exactly where the bug is (See the “fail early” point). It not fun when the production issue you have to fix urgently reports itself without providing the root cause. You usually end up patching the exception chaining first then reproduce the error then know what happened.

Sometimes people ask how to display the “… 2 more” at the end of a chained stack trace. The display does not truncate any information and those lines are already there in the parent stack trace. Obviously exceptions that have a cause will have the end of the stack trace in common with their cause, printStackTrace() is not printing those duplicate lines.

6. Add more information.

When you catch an Exception and throw a new one related to your level of abstraction, you should add information related to this upper level.

void readConfiguration() throws InvalidConfigurationException {
  File confFile = new File("conf/conf.properties");
  try {
  } catch (IOException e) {
    throw new InvalidConfigurationException("Error while reading the configuration at "+confFile.getAbsolutePath(),e);

In general put as much information as you can (id of the object for which it failed …), but keep it to one line.

7. do not have empty catch blocks

If the exception can not possibly get thrown (new StringReader(stream,”UTF-8″) throws UnsupportedEncodingException) just throw an exception saying so. That way if you’re wrong you don’t hide the problem.
If it is really what you want to do (the API you call probably needs refactoring) at least put a comment explaining why.

8. Fail early

Prefer throwing an exception to use a default value when it fail. You prefer your code to tell you what’s wrong instead of doing something you didn’t ask for.
If something is not what’s expected throw an exception, better fail early on the real cause than later on the consequences which will be harder to debug.

9. Read the error message

When there’s an exception if you did a good job, you should be able to know quickly what the problem is by reading the messages and the stack traces of the chain of exceptions. I know this one sounds silly, but how many times have you had a bug report mentioning that “it fails” without a stacktrace?

(this is an open list, I may come back later to add some)

Transitive Closure in Pig

1. Introduction

This a follow-up on my previous post about implementing PageRank in Pig using embedding. I also talked about this in a presentation to the Pig user group.
One of the best features of embedding is how it simplifies writing UDFs and using them right away in the same script without superfluous declarations. Computing a transitive closure is a good example of an algorithm requiring an iteration, a few simple UDFs and an end condition to decide when to stop iterating. Embedding is available in Pig 0.9. Knowledge of both Pig and Python is required to follow. Examples are available on github.

2. The problem

Before diving into the solution I will give a brief overview of the problem. We want to find the connected components of a graph by performing a transitive closure. Of course I will assume that the graph does not fit in main memory otherwise what follows is not very useful.
A picture is worth a thousand words; here is a visual statement of the problem:

Transitive Closure

Transitive Closure

I highlighted the diameter in red as it plays an important role in the complexity of the algorithm. The diameter of a connected component is the longest shortest path between 2 nodes of the component.
The algorithm will require log2(max(diameter of a component)) iterations because every iteration we double the number of relations that have been followed. This is not the overall complexity but it ensures that the number of iterations will stay low as with just 10 iterations you can handle components with a diameter of 1000+. In most real life use cases the diameter is very low and a few iterations will suffice.

3. The algorithm

I wrote the following implementation to illustrate the embedding feature. It may not be optimal but it is a working solution with a reasonable number of iterations. The algorithm does a graph traversal from all the nodes in parallel. It does one hop per iteration (pardon the scientific jargon) and then merges the results per node (using follow() and OR()). As a consequence the diameter of the graph followed from a node doubles every iteration. In this case, the edges of the graph are considered bidirectional and are stored one way (see normalize() and bidirectional()) to save space. The edges are stored in two files: followed (empty at the beginning) and to_follow which will tell us when all the edges have been followed (to_follow is empty). The last part of the script does some formatting to turn a list of edges into group of nodes (using sort() to make sure they are always represented the same way).

4. The code

Here is an example of how to solve this problem by embedding Pig in Python.
(explanation follows)
Input data (data/tc_data_simple.txt):

id0	id1
id1	id2
id2	id3
id3	id4
id4	id5
id5	id6
id6	id7
id7	id8
id11	id10
id12	id11
id13	id12
id14	id13
id14	id15
id15	id16
id16	id17
id17	id18



from org.apache.pig.scripting import *

@outputSchema("rels:{t:(id1: chararray, id2: chararray)}")
def bidirectional(id1, id2):
    if id2 != id1:
        return [ (id1, id2), (id2, id1) ]
        return [ (id1, id2) ]

def normalize(t):
    id1, id2, followed = t;
    if id2>id1:
        return (id1, id2, followed)
        return (id2, id1, followed)

@outputSchema("rels:{t:(id1: chararray, id2: chararray, followed: int)}")
def follow(to_follow_id1, links_id1, links_id2):
    outputBag = [ normalize( (links_id1, links_id2, True) ) ]
    if to_follow_id1 is not None:
        outputBag.append( normalize( (to_follow_id1, links_id2, False) ) )
    return outputBag

@outputSchema("followed: int")
def OR(bag):
    result = False;
    for followed in bag:
        result = result | followed[0]
    return result  

@outputSchema("group: {t: (id: chararray)}")
def SORT(bag):
    return bag

def main():
    # cleanup output directory before starting
    Pig.fs("rmr out/tc")
    Q = Pig.compile("""
    followed = LOAD ‘$followed’ AS (id1: chararray, id2: chararray);
    followed = FOREACH followed GENERATE FLATTEN(bidirectional(id1, id2)), 1 AS followed; — 1 == true
    to_follow = LOAD ‘$to_follow’ AS (id1: chararray, id2: chararray);
    to_follow = FOREACH to_follow GENERATE FLATTEN(bidirectional(id1, id2)), 0 AS followed; — 0 == false
    links = UNION to_follow, followed;
    joined_links = JOIN links BY id1 LEFT, to_follow BY id2;
    new_links_dup =
        FOREACH joined_links
        GENERATE FLATTEN( follow(to_follow::rels::id1, links::rels::id1, links::rels::id2) );

    new_links =
        FOREACH (GROUP new_links_dup BY (id1, id2))
        GENERATE group.id1, group.id2, OR(new_links_dup.followed);

    SPLIT new_links INTO new_followed IF followed != 0, new_to_follow IF followed == 0;
    new_followed = FOREACH new_followed GENERATE id1, id2;
    new_to_follow = FOREACH new_to_follow GENERATE id1, id2;
    STORE new_followed INTO ‘$new_followed’;
    STORE new_to_follow INTO ‘$new_to_follow’;
    to_follow = "data/tc_data_simple"
    followed = "out/tc/followed_0"
    # create empty dataset for first iteration
    Pig.fs("mkdir " + followed)
    Pig.fs("touchz " + followed + "/part-m-00000")
    for i in range(10):
        new_to_follow = "out/tc/to_follow_" + str(i + 1)
        new_followed = "out/tc/followed_" + str(i + 1)
        job = Q.bind().runSingle()
        if not job.isSuccessful():
            raise ‘failed’
        to_follow = new_to_follow
        followed = new_followed
        # detect if we are done
        if not job.result("new_to_follow").iterator().hasNext():

    links = LOAD ‘$followed’ AS (id1: chararray, id2: chararray);
    links = FOREACH links GENERATE FLATTEN( bidirectional(id1, id2) );
    result = DISTINCT (
        FOREACH (GROUP links by id1)
        GENERATE SORT(links.id2));
    STORE result INTO ‘out/tc/groups’;

if __name__ == ‘__main__’:

Output data:


5. Notables

There are a few things to notice in this implementation:
– UDFs are just defined by functions in the same script
– The output schema of the UDF is defined by using the @outputSchema decorator. Pig needs the schema of the output to interpret the statements following the UDF call.
– The native Python structures (dictionary, tuple, list) are used, the conversion to Pig types (Map, tuple, bag) is automatic. This makes the UDF code much more compact.
– UDFs are directly available in embedded Pig scripts using the function names. No other declaration is required.
– We iterate a maximum of 10 times (max diameter = 210 = 1024) and check if the “to follow” relation is empty to decide if we need to stop the iteration. This is done using the JobStats object returned by runSingle().

There’s also an experimental Javascript embedding that I’ll talk about in a future post.

PageRank implementation in Pig

In this post I’m going to give a very simple example of how to use Pig embedded in Python to implement the PageRank algorithm. It goes in a little more details on the same example given in the presentation I gave at the Pig user meetup. If you are interested, Daniel just published a nice K-Means implementation on the HortonWorks blog.

1. What is Pig embedded

Pig 0.9 supports running Python scripts with Pig statements embedded. This is simply run using the pig command:

pig myscript.py

The implementation is using Jython to execute the Python code, not CPython. As a consequence this is Python 2.5 (not 3.0) and you can not use the native extensions. On the other hand you can use the java based extensions and the java APIs (for example: java.util.regex.Matcher).
I find embedding especially useful for simplifying the implementation of iterative algorithms. It makes a difference in the following areas:

  • Loop and exit criteria
  • User Defined Functions
  • Parameter passing

In this post I will not cover UDFs just yet. stay tuned for more.

2. A very simple example: The PageRank implementation

Input format:

www.A.com	1	{ (www.B.com), (www.C.com), (www.D.com), (www.E.com) }
www.B.com	1	{ (www.D.com), (www.E.com) }
www.C.com	1	{ (www.D.com) }
www.D.com	1	{ (www.B.com) }
www.E.com	1	{ (www.A.com) }
www.F.com	1	{ (www.B.com), (www.C.com) }

from org.apache.pig.scripting import *

P = Pig.compile("""
— PR(A) = (1-d) + d (PR(T1)/C(T1) + … + PR(Tn)/C(Tn))

previous_pagerank =
    LOAD ‘$docs_in’
    USING PigStorage(‘\t‘)
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links;
STORE new_pagerank
    INTO ‘$docs_out’
    USING PigStorage(‘\t‘);

params = { ‘d’: ‘0.5’, ‘docs_in’: ‘data/pagerank_data_simple’ }

for i in range(10):
   out = "out/pagerank_data_" + str(i + 1)
   params["docs_out"] = out
   Pig.fs("rmr " + out)
   stats = P.bind(params).runSingle()
   if not stats.isSuccessful():
      raise ‘failed’
   params["docs_in"] = out

Things to notice here: We first “compile()” the pig script, then pass parameters using bind(params) and runSingle() for each iteration. The output of each iteration becomes the input of the previous one.

3. Application

Download from http://downloads.dbpedia.org/3.6/en/page_links_en.nt.bz2

Convert to the expected input format:

A = LOAD 'page_links_en.nt.bz2' using PigStorage(' ') as (url:chararray, p:chararray, link:chararray);
B = GROUP A by url;                                                                                  
C = foreach B generate group as url, 1 as pagerank, A.link as links;                                 
STORE C into 'input';

execute the command line:

pig -Dpython.cachedir=/home/{myuser}/tmp pagerank.py

Get Max and Min from the resulting dataset:

A = LOAD 'pagerank_data_10' AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
C = FOREACH B generate FLATTEN(TOBAG(MAX(A.pagerank), MIN(A.pagerank))) as pagerank;
D = JOIN C BY pagerank, A BY pagerank;
E = FOREACH D GENERATE C::pagerank, A::url;

And the winner is … the United States! (according to en.wikipedia.org)


4. A little more interesting: Stop iterating when a threshold is reached

from org.apache.pig.scripting import *

P = Pig.compile("""
previous_pagerank =
    LOAD ‘$docs_in’
    AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
    FOREACH previous_pagerank
        pagerank / COUNT ( links ) AS pagerank,
        FLATTEN ( links ) AS to_url;

new_pagerank =
        ( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
        group AS url,
        ( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
        FLATTEN ( previous_pagerank.links ) AS links,
        FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank;

pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank – pagerank );

max_diff =
        ( GROUP pagerank_diff ALL )
        MAX ( pagerank_diff );

STORE new_pagerank
    INTO ‘$docs_out’;

STORE max_diff
    INTO ‘$max_diff’;


d = 0.5
docs_in= "data/simple"

for i in range(10):
    docs_out = "out/pagerank_data_" + str(i + 1)
    max_diff = "out/max_diff_" + str(i + 1)
    Pig.fs("rmr " + docs_out)
    Pig.fs("rmr " + max_diff)
    stats = P.bind().runSingle()
    if not stats.isSuccessful():
        raise ‘failed’
    max_diff_value = float(str(stats.result("max_diff").iterator().next().get(0)))
    print " max_diff_value = " + str(max_diff_value)
    if max_diff_value < 0.01:
        print "done at iteration " + str(i)
    docs_in = docs_out

Main differences:

  • We compute an extra maxdiff relation that contains a single tuple.
  • Variables can also be bound using the current scope when using the parameterless bind()
  • We can easily check if we reached the expected threshold using the JobStat object returned by runSingle and stop iterating if needed.


The examples are available on github.
Next time I’ll go in the details of a transitive closure implementation using UDFs. (edit: Transitive closure in Pig)
I could also have a post about nicely embedding syntactically colored code snippets in the free wordpress.com 😛

Detecting low memory in Java Part 2

This is a follow up on my previous post the rationale is explained there.

I ended up spending a little more time on the low memory detection issue and played with the MemoryPoolMXBean. I had found some posts about it but none were satisfying. This post (thanks @techmilind for pointing it out) lead me in the right direction even though it is partially incorrect. Experimenting with a monotonically increasing memory usage is a very special (and invalid) case.
In particular the setUsageThreshold() is not very useful in my case as it is triggered the very first time we use that much memory, regardless of imminent garbage collection. As discussed in my previous post it is useful only if the available memory is measured right after garbage collection.
However setCollectionUsageThreshold() is exactly what I need. This is setting a threshold for notification when the memory is low right after a GC.

We need to do this in two steps, first set a collectionUsageThreshold on the tenured partition.

// heuristic to find the tenured pool (largest heap) as seen on http://www.javaspecialists.eu/archive/Issue092.html
MemoryPoolMXBean tenuredGenPool = null;
for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
  if (pool.getType() == MemoryType.HEAP && pool.isUsageThresholdSupported()) {
    tenuredGenPool = pool;
// we do something when we reached 80% of memory usage

Here tenuredGenPool.getCollectionUsage() is the memory usage as measured right after the last garbage collection. This is the value that we can rely on to detect low memory.

Then we setup a listener to get notified. Only MEMORY_COLLECTION_THRESHOLD_EXCEEDED is interesting as explained before.

//set a listener
MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
NotificationEmitter emitter = (NotificationEmitter) mbean;
emitter.addNotificationListener(new NotificationListener() {
public void handleNotification(Notification n, Object hb) {
if (n.getType().equals(
   // this is the signal => end the application early to avoid OOME
}}, null, null);

My experiment slowly increases the memory usage while also freeing up a bunch of existing object. This shows that the MEMORY_THRESHOLD_EXCEEDED is reached pretty quickly when most of the memory gets cleaned afterwards. MEMORY_COLLECTION_THRESHOLD_EXCEEDED is triggered when we actually fill up the memory.

edit: actual code explains it better than words.

Here is some test code to show how that works:


sample output: (only the last line is the warning we want)

it=   0 - Par Eden Space: u= 49% cu=  0% th=0% - Par Survivor Space: u=  0% cu=  0% th=0% - CMS Old Gen: u=  0% cu=  0% th=79%
it= 100 - Par Eden Space: u= 16% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 35% cu=  0% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u= 92% cu=  0% th=0% - Par Survivor Space: u= 96% cu= 96% th=0% - CMS Old Gen: u= 86% cu=  0% th=79%
it= 200 - Par Eden Space: u= 84% cu= 45% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 19% cu= 19% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  1% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 83% cu= 19% th=79%
it= 300 - Par Eden Space: u= 49% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 79% cu= 74% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  4% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 84% cu= 74% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  0% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 83% cu= 33% th=79%
it= 400 - Par Eden Space: u= 15% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 51% cu= 38% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  6% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 83% cu= 38% th=79%
it= 500 - Par Eden Space: u= 79% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 59% cu= 46% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  0% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 84% cu= 51% th=79%
it= 600 - Par Eden Space: u= 43% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 64% cu= 56% th=79%
memory threshold exceeded !!! : 
        - Par Eden Space: u=  0% cu=  0% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 84% cu= 56% th=79%
memory collection threshold exceeded !!! : 
        - Par Eden Space: u= 19% cu=  9% th=0% - Par Survivor Space: u= 99% cu= 99% th=0% - CMS Old Gen: u= 85% cu= 85% th=79%

This is going to be my tech blog

Maintaining this is going to be a challenge but I’ll give it a try.

Here are some topics I plan to talk about (in no particular order):

  • Embedding Pig in Python/Javascript (edit: PageRank implementation in Pig and Transitive closure in Pig)
  • Simple Ray tracing tutorial
  • winmine.exe player
  • Agility (in software development)
  • productive development environment for web services
  • Map/Reduce in the browser for fast experimentation
  • Schema based vs Schema less on the grid

Of course it is going to take me forever to write about all of this so if you have interest in some of those, let me know.

Detecting low memory in Java

This seems to be a common need and difficult thing to do in Java. Here is my solution, let me know what you think.

1. The problem

I am focusing on only one aspect of the problem as I have a very specific use case. I have a piece of code that accumulates information about a (possibly big) data set. Let’s assume I have a very big file that contains a list of records encoded in JSON and that I iterate on the records to accumulate statistics about the data. This is close enough to what I’m doing. The result of the operation is a report containing the schema inferred from the data and statistics about the fields (max/min/avg size, list of unique values and count if bounded, type, …). This is implemented as an Algebraic UDF (so that it can use the Combiner) in a Pig script, so there’s a fair amount of the code that I don’t control. If the data is homogenous everything is fine and it generates a rather small report with a lot of interesting information about the data. Now if the data is random enough (dynamic keys, …) it will eventually eat up all the memory until it fails. The first thing to do is to make it configurable to limit the number of fields/values/… it will accumulate but getting this configuration right is still black magic and unsatisfying. What I really want is keep accumulating until I run out of memory and output what I can. Of course without getting to the dreaded OutOfMemoryError as it can be thrown anywhere and most often outside of my code where I can’t catch it. One important point here: I don’t want false positives.

2. Trying out things

First I looked into java.lang.Runtime.{free|max|total}Memory() and the more precise MemoryPoolMXBean. This tells me how much memory I am using and I can even set a usage threshold to be notified. Solved? Not really. The trouble is that reaching a level of usage does not mean you’re going to run out of memory. The unreachable objects do not get garbage collected until it is needed. To simplify we will first eat up all the memory then the garbage collector will free old objects and again and again. Graphing memory usage will show an upward trend with a drop every time GC kicks in. One way to make the value returned by freeMemory() more accurate is to force a garbage collection right before, but of course this slows down the application drastically; there’s a reason the garbage collector runs only when needed. Also there’s a special type of OutOfMemoryError (“GC overhead limit exceeded”) that gets thrown when you spend over a certain percentage of time in GC and you could artificially trigger it. Another way to look at this is to read the value of freeMemory() right after a (full) GC, but there’s no way to get notified of GC from the java side. It seems you need to write an agent in C which exceeded by far the complexity threshold I had set for the solution. You could also imagine polling GarbageCollectorMXBean which knows how many GCs happened (current>previous ? get freeMemory() ) but I did not try that (there was also a threshold on the time I spent on this 🙂 ) and I’m not sure how reliable it would be.

3. My solution

I settled for something very different which happens to be triggered by the garbage collector when you are close to get an OutOfMemoryError. I initialize a byte array and I set it in a SoftReference. The byte array is big enough so that freeing it will give me enough memory to finish what I’m doing gracefully. This acts like canaries in coal mines: the SoftReference “dies” as a warning that we are out of memory.

canary = new SoftReference<Object>(new byte[bufferSize])

Now in each iteration I can check canary.get() == null as a signal that I’m running low on memory before actually getting an OutOfMemoryError (remember that it can be thrown somewhere I can not catch it). You could also use a ReferenceQueue to get notified of this. A SoftReference is how you tell the JVM that you’d rather keep the object but that it can be freed when there’s a need.

Experimentation monitoring the memory and GCs shows that the SoftReference actually gets freed in last resort and not before so it fits the bill.

One big inconvenient of this is that the buffer is actually using memory that can not be used for something else. In particular the less memory is left, the more time the GC takes. The consequence is that the application will slow down a lot right before freeing the buffer, delaying the detection. My experiments showed that it was acceptable for my use case. The big advantage: this is a very simple solution.

If you have an opinion about this, please comment.

edit: I have posted an update regarding the MemoryPoolMXBean.