Friday, September 26, 2014

Tracking Versions in MongoDB

I'm honoured to have been asked by Asya to contribute a guest post to her series on Tracking Versions in MongoDB.

Here's Asya's full series on the topic, which I recommend reading in order:

  1. How to Track Versions with MongoDB
  2. How to Merge Shapes with Aggregation Framework
  3. Best Versions with MongoDB
  4. Further Thoughts on How to Track Versions with MongoDB  (my guest post)



Song for today: Someday I Will Treat You Good by Sparklehorse

Thursday, September 11, 2014

Java Using SSL To Connect to MongoDB With Access Control


In this post I've documented the typical steps you need to enable a Java application to connect to a MongoDB database over SSL. Specifically, I show the so-called "one-way SSL" pattern, where the server is required to present a certificate to the client but the client is not required to present a certificate to the server. Regardless of authentication, communication between client and server is encrypted in both directions. In this example, the client actually authenticates with the database, albeit using a username/password rather than presenting a certificate. In addition, I also show how the client application can run operations against a database that has access control rules defined for it.

Note: Ensure you are running the Enterprise version of MongoDB to be able to configure SSL.


1. Generate Key+Certificate and Configure With MongoDB For SSL


Create a key and certificate:

$ su -
$ cd /etc/ssl/
$ openssl req -new -x509 -days 365 -nodes -out mongodb-cert.crt -keyout mongodb-cert.key
$ cat mongodb-cert.key mongodb-cert.crt > mongodb.pem
$ exit

Add the following entries to mongodb.conf (or equivalent if setting command line options):

sslOnNormalPorts=true
sslPEMKeyFile=/etc/ssl/mongodb.pem

Start the mongod database server.

Test the SSL connection from the Mongo shell.

$ mongo --ssl


2. Configure Java Client For SSL


Create a new Java trust store in the local application's root directory, importing the certificate generated from the previous section. This is necessary because in this example a self-signed certificate is used.

$ keytool -import -alias "MongoDB-cert" -file "/etc/ssl/mongodb-cert.crt" -keystore truststore.ts -noprompt -storepass "mypasswd"
$ keytool -list -keystore truststore.ts -storepass mypasswd

In the client application's Java code, add the "ssl=true" parameter to the MongoDB URI to tell it to use an SSL connection, eg:

MongoClientURI uri = new MongoClientURI("mongodb://localhost:27017/test?ssl=true");

Modify the command line or script that runs the 'Java' executable for the client application, and add the following JVM command line property. This will allow the application to validate the server's certificate against the new trust store, when using SSL.

-Djavax.net.ssl.trustStore=truststore.ts

Run the Java client application to test that the SSL connection works.


3. Configure MongoDB database with Access Control Rules


Using the Mongo shell, connect to the database, and define an 'administrator user', plus a 'regular' user with an access control rule defined to enable it to have read/write access to a database (called 'test' in this example).

$ mongo --ssl
> use admin
> db.addUser({user: "admin", pwd: "admin", roles: [ "userAdminAnyDatabase"]})
> use test
> db.addUser({user: "paul", pwd: "password", roles: ["readWrite", "dbAdmin"]})

Add the following entry to mongodb.conf (or equivalent if setting command line options):

auth=true

Restart the mongod database server to pick up this change.

Test the SSL connection to the 'test' database with username/password authentication, from the Mongo shell.

$ mongo test --ssl -u paul -p password

If the user specified doesn't have permissions to read collections in the database, an error similar to below will occur when trying to run db.mycollection,find(), for example.

error: { "$err" : "not authorized for query on test.mycollection", "code" : 13 }


4. Configure Java Client To Authenticate


Modify the client application code to include the username and password in the URL, eg:

MongoClientURI uri = new MongoClientURI("mongodb://paul:password@localhost:27017/test?ssl=true");

Run the Java client application to test that using SSL with username/password authentication works and has the rights to access the sample database.

If the user specified doesn't have correct permissions, an exception similar to below will occur.

Exception in thread "main" com.mongodb.MongoException: not authorized for query on test.system.namespaces
  at com.mongodb.MongoException.parse(MongoException.java:82)
  at com.mongodb.DBApiLayer$MyCollection.__find(DBApiLayer.java:292)
  at com.mongodb.DBApiLayer$MyCollection.__find(DBApiLayer.java:273)
  at com.mongodb.DB.getCollectionNames(DB.java:400)
  at MongoTest.main(MongoTest.java:25)


Note: In a real Java application, you would invariably build the URL dynamically, to avoid hard-coding a username and password in clear text in code.




Song for today: My Sister in 94 by The Paradise Motel

Friday, May 2, 2014

MongoDB Connector for Hadoop with Authentication - Quick Tip


If you are using the MongoDB Connector for Hadoop and you have enabled authentication on your MongoDB database (eg. auth=true) you may find that you are prevented from getting data in to or out of the database.

You may have provided the username and password to the connector (eg. mongo.input.uri = "mongodb://myuser:mypassword@host:27017/mytestdb.mycollctn"), for an Hadoop Job that pulls data from the database. The connector will authenticate to the database successfully, but early in in the job run, the job will fail with an error message similar to the following:

14/05/02 13:17:01 ERROR util.MongoTool: Exception while executing job...
java.io.IOException: com.mongodb.hadoop.splitter.SplitFailedException: Unable to calculate input splits: need to login
        at com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:53)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:493)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:510)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1295)
......

This is because the connector needs to run the MongoDB-internal splitVector DB command, under the covers, to work out how to split the MongoDB data up into sections ready to distribute across the Hadoop Cluster. However, by default, you are unlikely to have given sufficient privileges to the user, used by the connector, to allow this DB command to be run. This issue can be simulated easily by opening a mongo shell against the database, authenticating with your username and password and then running the splitVector command manually. For example:

> var result = db.runCommand({splitVector: 'mytestdb.mycollctn', keyPattern: {_id: 1}, 
                                                         maxChunkSizeBytes: 32000000})
> result
{
"ok" : 0,
"errmsg" : "not authorized on mytestdb to execute command
    splitVector: "mytestdb.mycollctn", keyPattern: { _id: 1.0 },
    maxChunkSizeBytes: 32000000.0 }",
"code" : 13
}

To address this issue, you first need to use the mongo shell, authenticated as your administration user, and run the updateUser command to give the connector user the clusterManager role, to enable the connector to run the DB commands it requires. For example:

use mytestdb
db.updateUser("myuser", {
                   roles : [
                      { role: "readWrite", db: "mytestdb" },
                      { role : "clusterManager", db : "admin"  }
                   ]
              })

After this, your Hadoop jobs with the connector should run fine.


Note: In my test, I ran Cloudera CDH version 5, MongoDB version 2.6 and Connector version 1.2 (built with target set to 'cdh4').


Song for today: Spanish Sahara by Foals

Wednesday, April 9, 2014

Parallelising MongoDB Aggregation on a Sharded Database


In my last blog post, I explored how I could speed up a MapReduce-style aggregation job, by parallelising the work over subsets of a collection, using multiple threads, for a non-sharded MongoDB database. In this post, I look at how I took the same test case, and a similar approach, to see if I could achieve a speed up when aggregating the 10 million documents stored in a sharded MongoDB database.

Confession time....

After my success in speeding up an aggregation on a non-sharded database, I assumed this second phase of my investigation would be a walk in the park and I'd quickly march on to a conclusion exhibiting even greater gains, in a sharded environment. However, things didn't quite turn out that way, and I ended up spending considerably more time on this investigation, than I'd anticipated. More on that later.

First of all though, I needed to insert the 10 million randomly generated documents into, a now, sharded collection deployment (3 shards all running on a single Linux laptop), which of course, was straight-forward.


Identifying Data Subsets In A Sharded Environment


Once the sharded collection was populated, I had to consider what changes to make to my JavaScript/mongo-Shell code, to deal with differences between non-sharded and sharded environments. It turned out that this was one of the more straight-forward and intuitive changes to make.

In the sharded MongoDB environment, I didn't have to take an educated guess on how to split up the data into ranges or use the undocumented and time-consuming splitVector command, as I did for the non-sharded tests. MongoDB is already working hard to manage the shard-key for me and how best to evenly distribute subsets (chunks) across each shard. Therefore all the information I needed was already available to me in the 'config' database, accessible via the mongos router.

For the sharded test collection, a quick query of the config database using the mongo Shell (connected to a mongos), told me how the collection was currently distributed.

mongos> use config
mongos> db.chunks.find({ns: "mrtest.rawdata"}, {_id: 0, shard: 1, min: 1, max: 1}).sort({shard: 1})

{ "min" : { "dim0" : 635504 }, "max" : { "dim0" : 702334 }, "shard" : "s0" }
{ "min" : { "dim0" : 702334 }, "max" : { "dim0" : 728124 }, "shard" : "s0" }
{ "min" : { "dim0" : 728124 }, "max" : { "dim0" : 759681 }, "shard" : "s0" }
{ "min" : { "dim0" : 759681 }, "max" : { "dim0" : 827873 }, "shard" : "s0" }
{ "min" : { "dim0" : 827873 }, "max" : { "dim0" : 856938 }, "shard" : "s0" }
{ "min" : { "dim0" : 856938 }, "max" : { "dim0" : 893410 }, "shard" : "s0" }
{ "min" : { "dim0" : 893410 }, "max" : { "dim0" : 942923 }, "shard" : "s0" }
{ "min" : { "dim0" : 942923 }, "max" : { "dim0" : 999996 }, "shard" : "s0" }

{ "min" : { "dim0" : { "$minKey" : 1 } }, "max" : { "dim0" : 42875 }, "shard" : "s1" }
{ "min" : { "dim0" : 42875 }, "max" : { "dim0" : 91602 }, "shard" : "s1" }
{ "min" : { "dim0" : 91602 }, "max" : { "dim0" : 149588 }, "shard" : "s1" }
{ "min" : { "dim0" : 149588 }, "max" : { "dim0" : 197061 }, "shard" : "s1" }
{ "min" : { "dim0" : 197061 }, "max" : { "dim0" : 257214 }, "shard" : "s1" }
{ "min" : { "dim0" : 471994 }, "max" : { "dim0" : 520681 }, "shard" : "s1" }
{ "min" : { "dim0" : 569205 }, "max" : { "dim0" : 602306 }, "shard" : "s1" }

{ "min" : { "dim0" : 257214 }, "max" : { "dim0" : 305727 }, "shard" : "s2" }
{ "min" : { "dim0" : 305727 }, "max" : { "dim0" : 363357 }, "shard" : "s2" }
{ "min" : { "dim0" : 363357 }, "max" : { "dim0" : 412656 }, "shard" : "s2" }
{ "min" : { "dim0" : 412656 }, "max" : { "dim0" : 471994 }, "shard" : "s2" }
{ "min" : { "dim0" : 520681 }, "max" : { "dim0" : 569205 }, "shard" : "s2" }
{ "min" : { "dim0" : 602306 }, "max" : { "dim0" : 635504 }, "shard" : "s2" }
{ "min" : { "dim0" : 999996 }, "max" : { "dim0" : { "$maxKey" : 1 } }, "shard" : "s2" }

                   (I added a newline between the chunks belonging to each shard, for reader clarity; 
                    also the elements highlighted in bold are discussed in the next section of this post)

So I already had all the information available at my finger tips, which defined an 'even-ish' spread of data-subsets (chunks) of the collection. In this case, I could see there were 22 chunks, spread over 3 shards. So I needed to change my JavaScript code to create multiple threads per shard, each targeting a portion of the chunks belonging to a shard. To build this list of shards and their owned chunks, at runtime, in my JavaScript code I used an aggregation to group by shard.

// Query the config DB, grouping data on all the chunks for each shard
db = db.getSiblingDB('config');
var shardsWithChunks = db.chunks.aggregate([
    {$match: {
        "ns": "mrtest.rawdata"
    }},
    {$sort: {
        "min.dim0": 1
    }},
    {$group: {
        _id: "$shard",
        chunks: {$push: {
              min: "$min.dim0",
              max: "$max.dim0"
        }}
    }}
]);

My code then used a loop like below, to cycle through each shard, kicking off multiple sub-aggregations in different threads, where each thread uses a range query that resolves to a portion of chunks currently believed to live on that shard.

shardsWithChunks.forEach(function(shard) {
    // For each new thread, collect together a set of some of the chunks from the
    // shard.chunks array and perform the modified aggregation pipeline on it
});


Decorating a Pipeline to Enable a Thread to Target Just One Shard


The code I'd written to modify a pipeline, to allow a thread to operate on a subset of data in a non-sharded also had to be changed, to allow for subtleties only present when the data is sharded. Taking the example of wanting to spawn 4 threads per shard, for shard 's1' (shown in the output from db.chunks.find() earlier in this post), I had to divide out the 7 chunks it currently held, between the 4 threads. So 3 of the threads would need to target 2 chunks each, and 1 thread would target the remaining 1 chunk.

Notice that in the list of chunks returned by db.chunks.find() earlier, not all chunks on a particular shard are contiguous. For example, in the results marked in bold, I highlighted that one chunk ends at 257214 on shard 's1', followed by a chunk on 's1' starting at 471994. However, the chunk starting at  257214 is on shard 's2'. This means that the code to modify the aggregation pipeline, needs to query the start and end range for each chunk, rather than just a range from the start of the first chunk to the end of the last chunk. Below you can see the naive, incorrect pipeline that my code could generate, on the left, and the correct pipeline that the code should and will generate, on the right.

// BAD
[
   {$match: {
      dim0: {
         $gte: 197061,
         $lt : 520680
      }
   }},
   {$group: {
      _id: $dim0,
      value: {$sum: 1}
   }},
   {$out:
      “mraggout_s1_2"
   }
]
  // GOOD
  [
     {$match: {
        $or: [
           {
              dim0: {
                 $gte: 197061,
                 $lt : 257214
              }
           },
           {
              dim0: {
                 $gte: 471994,
                 $lt : 520681
              }
           }
        ]
     }},
     {$group: {
        _id: "$dim0",
        value: {$sum: 1}
     }},
     {$out:
        "mraggout_s1_2"
     }
  ]


To decorate the original aggregation pipeline in this way, I needed to modify my original aggregate_subset() JavaScript function, to build up an $or expression of ranges, for the $match part prepended to the pipeline. The fully modified JavaScript code I produced, to run in the mongo Shell and perform all these actions can be viewed in the zip file available here.

It is worth mentioning that, if during the aggregation run, the cluster happens to migrate any chunks, it would not be a big deal. The modified aggregation pipelines would still function correctly and return the correct results. However, a couple of the threads would just take longer to complete, as they would end up being executed against two shards, not one, by mongos and the query optimiser, to collect the requested data.


Sub-optimal mongos Shard Routing For Some Edge-case Range Queries


One thing I noticed when running explain (eg. using db.coll.aggregate(pipeline, {explain: true})), was that even when running the modified pipeline, shown above, which should be targeting chunks in only one shard, mongos actually chooses to target two shards (both 's1' and 's2'). The correct result is still generated, but it just takes a little longer, as twice the amount of shards are hit than necessary. Essentially the query optimiser is not correctly identifying the range boundary as belonging to just one shard, when the range boundary falls exactly at the intersection of two chunks that happen to reside on different shards. Specifically, the problem is for a sharded collection that has a single (non-compound) shard key, and $lte or $gte is used in the range query. I created a JIRA ticket logging this bug, but with a low priority, as functionally an accurate result is returned, and also, it is possible to use a workaround. My original section of code and the modified version, to use this workaround, is shown below.

// mongos/optimiser targets 2 shards
{
   dim0: {
      $gte: 197061,
      $lt : 257214
   }
}
  // mongos/optimiser targets 1 shard
  {
     dim0: {
        $gte: 197061,
        $lte: 257213
     }
  }

By modifying the range criteria to be '$lte: 257213', in my JavaScript code, only one shard ('s1') is then correctly routed to. However, this does mean that any generic query code, in a more 'real-world' application, has to be explicitly aware of field types (no explicit database schema) and the knowledge that the field actually contains an integer that can possibly be decremented by 1, to help encourage optimal sharded query routing.


Initial Results


When generating my aggregation test results and capturing the execution time, I first wanted to establish the performance of the normal unchanged aggregation on the sharded database. I felt it would be interesting to see if a regular, unchanged aggregation, ran faster once sharded. When I ran the unchanged aggregation against the sharded database, I was a little surprised to see it execute in 27 seconds. Compare this with the execution time of just 23 seconds when I ran the aggregation against the non-sharded collection. At first I was surprised that the aggregation took longer to complete. Later, upon reflection, I came to realise why, and I will discuss these reasons later, in the conclusion of the blog post. However, for the moment my challenge was just to try to beat the time of 27 seconds, with an my 'optimised parallel aggregations' approach.

I then ran my newly modified JavaScript code (see code in zip file), executing parallel aggregation threads targeted at groups of chunks on each shard (4 threads per shard).  This test executed in 20 seconds. Again I was a little disappointed because this yielded a roughly 25% speed-up in the sharded environment, whereas before, in a non-sharded environment, I was achieving over a 40% speed-up.


Attempts on Improvement


At this point, I started to hypothesise on potential reasons why my 'optimisations' couldn't speed-up an aggregation as quickly for a sharded database. Below are some of the main hypotheses I came up with and what happened when I tested each one.
  1. Is running the test via JavaScript in the mongo Shell introducing a bottleneck?  I re-wrote my application code, to decorate the pipeline and run separate aggregation threads, in Python instead, using the PyMongo driver (also included in the code zip file). However, there was no noticeable change in performance, when re-running the test with the Python code. 
  2. Is the single mongos router, used by all 12 aggregation threads, acting as a bottleneck?  So I started up 12 separate mongos instances and modified the Python version of the code, to force each of the spawned 12 threads to connect to its own dedicated mongos. The execution time that resulted was only marginally better, and so I concluded that a single shared mongos was not really a major bottleneck
  3. Is the role of mongos (ie. shard routing/coordination and partial aggregation processing) the main cause of added latency?  So at this point I went a bit off-piste and did something no-one should never do. I modified the Python code again, but this time to make each spawned thread connect directly to the mongod of the shard, rather than going via mongos. My code achieved this by first querying the 'config' database to find out the direct URL for each shard's mongod host. This time I did see a significant decrease in execution time, down to 15 seconds. This is only 2 seconds off the 13 seconds achieved on the non-sharded database. The 2 second different can probably be explained by the fact that I am spawning 12 threads on a now overloaded host machine, running 3 mongod shards, 1 config mongod database and 1 mongos, versus the non-sharded test that runs just 4 threads against a single mongod. However, running the aggregations directly against shards is a bad thing to do, for two reasons: (1) The $match may miss a chunk, read an orphan chunk or read only part of a split chunk, if any chunk splitting/migration has occurred during the process, resulting in an incorrect result. (2) the aggregation $out operator will cause a collection to be written to the local shard database, but the config databases and all the mongos's will have no awareness of its existence (indeed, I witnessed a 'confused' system after trying this). Therefore, I have not included the code modifications in the code zip file, that I made to pull this stunt, and I vow never to try such a thing again! Still, the results were interesting, even if unusable.  :-)

It is probably worth noting that, for all these test variations, I consistently observed the following characteristics:
  • top would show all the host CPUs running at around 99% for approximately the first 3/4 of the test run.
  • mongostat would show zero page faults (inference being that all the working-set is memory-resident) and only single-figure locked database percentages (ie. sub 10%).
  • iostat would show, at its peak, less than 15% disk usage for the host SSD.


Results Summary


So finally my prolonged theorising and testing came to an end. The table below shows the results I achieved for the different variations, ultimately demonstrating a ~30% performance speed up (27 seconds down to 19 seconds), when using my parallel sub-aggregations optimisation via multiple mongos's against 3 shards. This was less than the 40+% performance increase I achieved in my non-sharded database tests.


Aggregation Description Data-Subsets per Shard Spawned Threads per Shard Number of mongos's Execution Time
Regular unchanged (single-threaded) 0 0 1 27 secs
Serial sub-aggregations (single-threaded) 2 0 1 32 secs
Serial sub-aggregations (single-threaded) 4 0 1 32 secs
Parallel sub-aggregations, single mongos 2 2 1 21 secs
Parallel sub-aggregations, single mongos 4 4 1 20 secs
Parallel sub-aggregations, multiple mongos's 2 2 6 20 secs
Parallel sub-aggregations, multiple mongos's 4 4 12 19 secs
Parallel sub-aggregations, direct to mongod's 2 2 0 16 secs
Parallel sub-aggregations, direct to mongod's 4 4 0 15 secs


For interest, I've included in the table, the results from running the parallel aggregations directly against shard mongod's (red for 'danger'!). However, as discussed earlier, doing this for real in a production system will result in bad things happening. You have been warned!


Conclusions


I've demonstrated that the execution time for certain MapReduce-style Aggregation workloads on a sharded database can be reduced, by parallelising the aggregation work over collection subsets, using multiple threads. However, for this particular test case, the speed-up is less pronounced, when compared to the same test on a non-sharded database.

On reflection, it is probably naive to expect a reduction in aggregation execution time, when moving a collection from a non-sharded database to a sharded one. If you think about the main benefits of sharding, sharding is NOT really about reducing individual operation latency. The main purpose of sharding is to enable a database to be able to scale-out to cope with a significant increase in data volumes and/or a significant increase in read and/or write throughput demand. To that effect, I would expect a sharded database, spread out over multiple machines, to be able to sustain a much higher throughput of concurrent aggregation operations, compared with a non-sharded database confined to a single machine (perhaps that can be a test and blog post for another day!).

Lastly, I also do suspect, with a combination of a much larger data-set, and with the shards, mongos's and client app spread out over separate dedicated host machines, the aggregation test on a sharded database may achieve equality or even beat the execution time compared with a non-sharded database. I even feel this would be the case for an unchanged, non-optimised aggregation pipeline, due to MongoDB's built-in ability to run the first part of a pipeline directly on each shard, in parallel. Unfortunately, time has run out for me to try this at the moment, so perhaps it's something for me to revisit in the future.


Song for today: Perth by Bon Iver

Friday, March 14, 2014

How to speed up MongoDB Aggregation using Parallelisation


A little while ago, Antoine Girbal wrote a great blog post describing how to speed up a MonogDB MapReduce job by 20x. Now that MongoDB version 2.6 is nearly upon us, and prompted by an idea from one of my very smart colleagues, John Page, I thought I'd investigate whether MongoDB Aggregation jobs can be sped up using a similar approach. Here, I will summarise the findings from my investigation.

To re-cap, the original MapReduce tests looked for all the unique values in a collection, counting their occurrences. The following improvements were incrementally applied by Antoine against the 10 million documents in the collection:

  1. Define a 'sort' for the MapReduce job
  2. Use multiple-threads, each operating on a subset of the collection 
  3. Write out result subsets to different databases
  4. Specify that the job should use 'pure JavaScript mode'
  5. Run the job using MongoDB 2.6 (an 'in-development' version) rather than 2.4

With all these optimisation in place, Antoine was able to reduce a 1200 second MapReduce job to just 60 seconds! Very impressive.

I expect that one of the reasons Antoine was focussing on MapReduce in MongoDB, was because MongoDB limited the output size of the faster Aggregation framework to 16 MB. With MongoDB 2.6, this threshold is removed. Aggregations generate multi-document results, rather than a single document and these can be navigated by the client applications using a cursor. Alternatively, an output collection can be specified for the Aggregation framework to write directly to. Thus, version 2.6 presents the opportunity to try the MapReduce test scenario with the Aggregation framework instead, to compare performance. In this blog post I do just that, focussing purely on testing a single, non-sharded database.


Re-running the optimised MapReduce


Before I tried running the test with the Aggregation framework, I thought I'd quickly run and time Antoine's optimised MapReduce job on my test machine to normalise any performance difference that occurs due to hardware disparity (for reference, my host machine is a Linux x86-64 laptop with SSD, 2 cores + hyper-threading, running MongoDB 2.6.0rc1).

I first had to insert the 10 million documents, as Antoine did, using the mongo Shell, with each insert containing a single field (‘dim0’) whose value is a random number between 0 and 1 million:

> for (var i = 0; i < 10000000; ++i) {
      db.rawdata.insert({dim0: Math.floor(Math.random()*1000000)});
  }
> db.rawdata.ensureIndex({dim0: 1})

I then ran Antoine's fully optimised MapReduce job, which completed in 32 seconds, versus 63 seconds on Antoine's host machine.


Running a normal, non-optimised Aggregation


So the first thing I needed to establish is how much faster or slower the equivalent Aggregation job is, compared with the 'optimised' MapReduce job I'd just run on the same hardware. The Aggregation pipeline is actually very simple to implement for this test. Below you can see the MapReduce code and the Aggregation pipeline equivalent:

map: function() {
    emit(this.dim0, 1); 


reduce: function(key, values {
    return Array.sum(values);
}
   var pipeline = [
       {$group: {
           _id: "$dim0",
           value: {$sum: 1}
       }}
   ];

The pipeline basically just groups on the field 'dim0', counting the number of occurrences for this value. So I ran this Aggregation against my database, called 'mrtest', containing the random generated 'rawdata' collection:

> db = db.getSiblingDB('mrtest');
> db['rawdata'].aggregate(pipeline);

The aggregation completed in 23 seconds, versus 32 seconds for running the 'optimised' equivalent MapReduce job on the same hardware. This shows just how quick the standard out-of-the-box Aggregation capability is in MongoDB!


Running an optimised Aggregation


So my challenge was clear now: See if I can apply some of Antoine's tricks from speeding up MapReduce, to the Aggregation framework, to reduce this test completion time from 23 seconds.

I considered which of the original 5 optimisations could realistically be applied to optimising the test with the Aggregation framework:

  1. YES - Define a 'sort' for the job
  2. YES - Use multiple-threads, each operating on a subset of the collection 
  3. NO - Write out result subsets to different databases  (the Aggregation framework expects to write out a collection to the same database as the source collection being read)
  4. NO - Specify that the job should use 'pure JavaScript mode'  (doesn't apply here as Aggregations run within C++ code)
  5. NO – Move to using a newer version of MongoDB, 2.4 to 2.6  (we are already at the latest version with my test runs, which is undoubtedly helping achieve the 23 second result already)

So the main thing I needed, in addition to including a 'sort', was to somehow split up the aggregation into parts to be run in parallel, by different threads, on subsets of the data. This would require me to prepend the existing pipeline with operations to match a range of documents in the collection and to sort the matched subset of documents, before allowing the original pipeline operation(s) to be run. Also, I would need to append an operation to the pipeline, to specify that the result should go to a specific named collection. So for example, for one of the subsets of the collection (documents with 'dim0' values ranging from 524915 to 774848), the following table shows how I need to convert the original pipeline:

[
    {$group: {
        _id: "$dim0",
        value: {$sum: 1}
    }}
]
   [
       {$match: {
           dim0: {
               $gte: 524915,
               $lt : 774849
           }
       }},
       {$sort: {
           dim0: 1
       }},
       {$group: {
           _id: $dim0,
           value: {$sum: 1}
       }},
       {$out:
           “aggdata524915"
       }
   ]


A similar pipeline, but with different range values and output collection name, needs to be produced for each thread I intend to run. Each of these pipelines need to be invoked with db.coll.aggregate() in parallel, from different threads..

To achieve this in a generic way, from the Shell, I quickly wrote a small JavaScript function to 'decorate' a given pipeline with $match and $sort prepended and $out appended. The function ends by running MongoDB’s aggregate() function with the modified pipeline, against the subset of matching data.

// Define new wrapper aggregate function, to operate on subset of docs 
// in collection after decorating the pipeline with pre and post stages
var aggregateSubset = function(dbName, srcClltName, tgtClltName,
                                  singleMapKeyName, pipeline, min, max) { 

    var newPipeline = JSON.parse(JSON.stringify(pipeline)); //deep copy
    var singleKeyMatch = {};
    singleKeyMatch[singleMapKeyName] = (max < 0) ? {$gte: min} 
                                                 : {$gte: min, $lt: max};
    var singleKeySort = {};
    singleKeySort[singleMapKeyName] = 1;

    // Prepend pipeline with range match and sort stages
    newPipeline.unshift(
        {$match: singleKeyMatch},
        {$sort: singleKeySort}  
    );

    // Append pipeline with output to a named collection
    newPipeline.push(
       {$out: tgtClltName}
    );

    // Execute the aggregation on the modified pipeline
    db = db.getSiblingDB(dbName);
    var result = db[srcClltName].aggregate(newPipeline);
    return {outCollection: tgtClltName, subsetResult: result};
}

This function takes the name of the database, source collection, and target collection, plus the original pipeline, plus the minimum and maximum values for the subset of the collection. The keen-eyed amongst you will have noticed the 'singleMapKeyName' parameter too. Basically, my example function is hard-coded to assume that the pipeline is mapping and sorting data anchored on a single key ('dim0' in my case, or this could be group-by 'customer', as another example). However, in other scenarios, a compound key may be being used (eg. group-by customer + year). My function would need to be enhanced, to be able to support compound keys.

Once I'd completed and tested my new aggregateSubset() JavaScript function. I was ready to run some test code in the Shell, to call this new function from multiple threads. Below is the code I used, specifying that 4 threads should be spawned. My host machine has 4 hardware threads, so this is probably not a bad number to use, even though I actually chose this number because it matched the thread count used in Antoine’s original MapReduce tests.

// Specify test parameter values
var dbName = 'mrtest';
var srcClltName = 'rawdata';
var tgtClltName = 'aggdata';
var singleMapKeyName = 'dim0';
var numThreads = 4;

// Find an even-ish distributed set of sub-ranges in the collection
var singleKeyPattern = {};
singleKeyPattern[singleMapKeyName] = 1;
var splitKeys = db.runCommand({splitVector: dbName+"."+srcClltName, 
      keyPattern: singleKeyPattern, maxChunkSizeBytes: 32000000}).splitKeys;

// There are more sub-ranges than threads to run, so work out 
// how many sub-ranges to use in each thread
var inc = Math.floor(splitKeys.length / numThreads) + 1;
var threads = [];

// Start each thread, to execute aggregation on a subset of data 
for (var i = 0; i < numThreads; ++i) { 
    var min = (i == 0) ? 0 : splitKeys[i * inc].dim0;
    var max = (i * inc + inc >= splitKeys.length) ? -1 
                                                  : splitKeys[i * inc + inc].dim0;
    var t = new ScopedThread(aggregateSubset, dbName, srcClltName,
                 tgtClltName + min, singleMapKeyName, pipeline, min, max);
    threads.push(t);
    t.start();
}

// Wait for all threads to finish and print out their summaries
for (var i in threads) { 
    var t = threads[i];
    t.join();
    printjson(t.returnData());
}

This multi-threaded aggregation completed in 13 seconds, versus 23 seconds for running the normal 'non-optimised' version of the aggregation pipeline. This optimised pipeline involved writing to 4 different collections, each named 'aggdataN' (eg. 'aggdata374876').

Okay, this isn't the stunning 20x improvement, which Antoine achieved for MapReduce. However, given that the Aggregation framework is already a highly optimised MongoDB capability, I don't believe a 40% speed up is to be sniffed at. Also, I suspect that on host machines with more cores (hence requiring the thread count parameter used in my example to be increased), and with a much larger real-world data set, even more impressive results would be achieved, such as a 2x or 3x improvement.


Some observations


As you'll notice I use a similar pattern to Antoine's, whereby I use the undocumented splitVector command to get a good idea of what is a well balanced set of ranges in the source collection. This is especially useful if the distribution of values in a collection is uneven and hard to predict. In my case, the distribution is pretty even and for 4 threads I could have got away with hard-coding ranges of 0-250000, 250000-500000, 500000-750000 & 750000-1000000. However, more real world collections will invariably have a much more uneven spread of values for a given key. With a naïve approach to partitioning up the data, one thread could end up doing much more work than the other threads. In real world scenarios, you may have other ways of predicting balanced subset ranges, based on the empirical knowledge you have of your data. You would then use a best guess effort to specify the split points for partitioning your collection. If you do choose to use the splitVector utility for a real application, I would suggest caching the value and only re-evaluating it intermittently, to avoid the latency it would otherwise add to the execution time of your aggregations.

Also, you'll notice I followed Antoine's approach of using the undocumented ScopedThread Shell utility object, to spawn new threads. If you are writing a real application to run an aggregation, you would instead use the native threading API of your chosen programming language/platform, alongside the appropriate MongoDB language driver.

One optimisation I wish I could have made, but couldn't was 'write out result subsets to different databases'. In MongoDB 2.6, one can specify an output collection name, but not an output database name, when calling aggregate(). My belief that different threads outputting to different databases would increase performance, is based on some observations I made during my tests. When running 'top', 'mongostat' and 'iostat' on my host machine whilst the optimised aggregation was running, I initially noticed that ‘top’ was reporting all 4 ‘CPUs’  as maxing out at around 95% CPU utilisation, coming down to a lower amount in the later stages of the run. At the point when CPU usage came down, ‘mongostat’ was reporting that the database 'mrtest' was locked for around 90% of the time. At the same point, disk utilisation was reported by ‘iostat' as being a modest 35%. This tells me that for part of the aggregation run, the various threads are all queuing waiting for the write lock, to be able to insert result documents into their own collections in the same database. By allowing different databases to be used, the write lock bottleneck for such parallel jobs would be diminished considerably, thus allowing the document inserts, and hence, the overall aggregation, to complete even quicker. I have created a JIRA enhancement request, asking for the ability for the Aggregation framework's $out operator to specify a target database, in addition to a target collection.


Conclusion


So I've managed to show that certain MapReduce-style workloads can be executed faster if using Aggregation in MongoDB. I showed the execution time can be reduced even further, by parallelising the aggregation work over subsets of the collection, using multiple threads. I demonstrated this for a single non-sharded database. In the future, I intend to investigate how a sharded database can be used to drive even better parallelisation and hence lower execution times, which I will then blog about.


Song for today: The Rat by The Walkmen