Friday, March 18, 2016

SHOCKER: XA Distributed Transactions are only Eventually Consistent!

Apologies for the tabloid-trash style headline. It could have been worse, I could have gone with my working title of "WARNING: XA will eat your first-born"!

This topic has come up in a few conversations I've had recently. It turns out that most don't realise what I'd assumed to be widely understood. 2-phase-commit (2PC), and XA (it's widely used implementation) are NOT ACID compliant. Specifically XA/2PC does not provide strong consistency guarantees and is in-fact just Eventually Consistent. It get's worse in practice. In places where I've seen XA/2PC used, it transpires that Atomicity and Durability are on shaky ground too (more on this later).

Why have I seen this topic rearing it's head recently? Well some organisations have cases where they want to update data in an Oracle Database and in a MongoDB database, as a single transaction. Nothing wrong with that of course, it really just depends on how you choose to implement the transaction and what your definition of a "transaction" really is. All too often, those stating this requirement will then go on to say that MongoDB has a problem because it does not support the XA protocol. They want their database products to magically take care of this complexity, under the covers. They want them to provide the guarantee of "strong" consistency across the multiple systems, without having to deal with this in their application code. If you're one of those people, I'm here to tell that these are not the droids protocols you are looking for.

Let me have a go at explaining why XA/2PC distributed transactions are not strongly consistent, This is based on what I've seen over the past 10 years or so, especially in the mid-2000s, when working with some UK government agencies and seeing this issue at first hand.

First of all, what are some examples of distributed transactions?
  • You've written a piece of application code that needs to put the same data into two different databases (eg. Oracle's database and IBM's DB2 database), all or nothing, as a single transaction. You don't want to have a situation where the data appears in one database but not in the other.
  • You've written a piece of application code that receives a message off a message queue (eg. IBM MQ Series) and inserts the data, contained in the message, into a database. You want these two operations to be part of the same transaction. You want to avoid the situation where the dequeue operation succeeds but the DB insert operation fails, resulting in a lost message and no updated database. You also want to avoid the situation where the database insert succeeds, but the acknowledgement of dequeue operation subsequently fails, resulting in the the message being re-delivered (a duplicate database insert of the same data would then occur).
  • Another "real-world" example that people [incorrectly] quote, is moving funds between two bank systems, where one system is debited, say £50, and the other is credited by the same amount, as a single "transaction". Of course you wouldn't want the situation to occur where £50 is taken from one system, but due to a transient failure, is not placed in the other system, so money is lost *. The reason I say "incorrectly" is that in reality, banks don't manage and record money transfers this way. Eric Brewer explains why this is the case in a much more eloquent way than I ever could. On a related but more existential note, Gregor Hohpe's classic post is still well worth a read: Your Coffee Shop Doesn’t Use Two-Phase Commit.
* although if you were the receiver of the funds, you might like the other possible outcome, where you receive two lots of £50, due to the occurrence of a failure during the 1st transaction attempt

So what's the problem then?

Back in the mid-2000s, I was involved in building distributed systems with application code running on a Java EE Application Server (WebLogic). The code would update a record in a database (Oracle) and then place a message on a queue (IBM MQ Series), as part of the same distributed transaction, using XA. At a simplistic level, the transactions performed looked something like this:

If the update to the DB failed, the enqueue operation to put the message onto the message queue would be rolled back, and vice versa. However, as with most real world scenarios, the business process logic was more involved than that. The business process actually had two stages, which looked like this:

Basically in the first stage of the process, the application code would put some data in the database and then put a message on a queue, as part of a single transaction, ready to allow the next stage of the process to be kicked off. The queuing system would already have a piece of application code registered with it, to listen for arrived messages (called a "message listener"). Once the message was committed to the queue, a new transaction would be initiated. In this new transaction the message would be given to the message listener. The listener's application code would receive the message and then read some of the data, that was previously inserted into the database.

However, when we load tested this, before putting the solution into production, the system didn't always work that way. Sporadically, we saw this instead:

How could this be? A previous transaction had put the data in the database as part of the same transaction that put the message on the queue. Only when the message was successfully committed to the queue, could the second transaction be kicked off. Yet, the subsequent listener code couldn't find the row of data in the database, that was inserted there by the previous transaction!

At first we assumed there was a bug somewhere and hunted for it in Oracle, MQ Series, WebLogic and especially our own code. Getting nowhere, we eventually started digging around the XA/2PC specification a little more, and we realised that the system was behaving correctly. It was correct behaviour to see such race conditions happen intermittently (even though it definitely wasn't desirable behaviour, on our part). This is because, even though XA/2PC guarantees that both resources in a transaction will have their changes either committed or rolled-back atomically, it can't enforce exactly when this will happen in each. The final commit action (the 2nd phase of 2PC) performed by each of those resource systems is initiated in parallel and hence cannot be synchronised.

The asynchronous nature of XA/2PC, for the final commit process, is by necessity. This allows for circumstances where one of the systems may have temporarily gone down between voting "yes" to commit and then subsequently being told to actually commit. If it is never possible for any of the systems to go down, there would be little need for transactions in the first place (quid pro quo). The application server controlling the transaction keeps trying to tell the failed system to commit, until it comes back online and executes the commit action. The database or message queue system can never 100% guarantee to always commit immediately, and thus only guarantees to commit eventually. Even when there isn't a failure, the two systems are being committed in parallel and will each take different and non-deterministic durations to fulfil the commit action (including the variable time it takes to persist to disk, for durability). There's no way of guaranteeing that they both achieve this in exactly the same instance of time - they never will. In our situation, the missing data would eventually appear in the database, but there was no guarantee that it would always be there when the code in a subsequent transaction tried to read it. Indeed, upon researching a couple of things while preparing this blog post, I discovered that even Oracle now documents this type of race condition (see section "Avoiding the XA Race Condition").

Back then, we'd inadvertently created the perfect reproducible test case for PROOF THAT XA/2PC IS ONLY EVENTUALLY CONSISTENT. To fix the situation, we had to put some convoluted workarounds into our application code. The workarounds weren't pretty and they're not something I have any desire to re-visit here.

There's more! When the solution went live, things got even worse...

It wasn't just the "C" in "ACID" that was being butchered. It turned out that there was a contract out to do a hatchet job on the "A" and "D" of "ACID" too.

In live production environments, temporary failures of sub-systems will inevitably occur. In our high throughput system, some distributed transactions will always be in-flight at the time of the failure. These in-flight transactions would then stick around for a while and some would be visible in the Oracle database (tracked in Oracle's "DBA_2PC_PENDING" system table). There's nothing wrong with this, except the application code that created these transactions will have been holding a lock on one or more table rows. These locks are a result of the application code having performed an update operation as part of the transaction, that has not yet been committed. In our live environment, due to these transactions being in-doubt for a while (minutes or even hours depending on the type of failure) a cascading set of follow-on issues would occur. Subsequent client requests coming into the application would start backing up, as they tried to query the same locked rows of data, and would get blocked or fail. This was due to the code having used the very common "SELECT ... FOR UPDATE" operation, which attempts to grab a lock on a row, ready for the row to be updated in a later step.

Pretty soon there would be a deluge of blocking or failing threads and the whole system would appear to lock up. No client requests could be serviced. Of course, the DBA would then receive a rush of calls from irate staff yelling that the mission critical database had ground to a halt. Under such time pressure, all the poor DBA could possibly do was to go to the source of the locks and try to release them. This meant going to Oracle's "pending transactions" system tables and unilaterally rolling back or committing each of them, to allow the system to recover and service requests again. At this point all bets were off. The DBA's decision to rollback or commit would have been completely arbitrary. Some of the in-flight transactions would have been partly rolled-back in Oracle, but would have been partly committed in MQ Series, and vice versa.

So in practice, these in-doubt transactions were neither applied Atomically or Durably. The "theory" of XA guaranteeing Atomicity and Durability was not under attack. However, the practical real-world application of it was. At some point, fallible human intervention was required to quickly rescue a failing mission critical system. Most people I know live in the real world.

My conclusions...

You can probably now guess my view on XA/2PC. It is not a panacea. Nowhere near. It gives developers false hope, lulling them into a false sense of security, where, at best, their heads can be gently warmed whilst buried in the sand.

It is impossible to perform a distributed transaction on two or more different systems, in a fully ACID manner. Accept it and deal with it by allowing for this in application code and/or in compensating business processes. This is why I hope MongoDB is never engineered to support XA, as I'd hate to see such a move encourage good developers to do bad things.

Footnote: Even if your DBA refuses to unilaterally commit or rollback transactions, when the shit is hitting the fan, your database eventually will, thus violating XA/2PC. For example, in Oracle, the database will unilaterally decide to rollback all pending transactions, older than the default value of 24 hours (see "Abandoning Transactions" section of Oracle's documentation).

Song for today: The Greatest by Cat Power

Wednesday, December 16, 2015

MongoDB's BI Connector and pushdown of SQL WHERE clauses

In previous posts I showed how to use SQL & ODBC to query data from MongoDB, via Windows clients and Linux clients. In this post, I want to explore what happens to the SQL statement when it is sent to the BI Connector and onto the MongoDB database. For example, is the SQL WHERE clause pushed down to the database to resolve?

Again, for these tests, I've used the same MOT UK car test results data set as my last two posts.

As I wanted to get a better insight into what MongoDB is doing under the covers to process SQL queries, I used the Mongo Shell to enable profiling for the MongoDB database holding the MOT data.

Then on my Linux desktop client, using the ODBC settings I'd configured in my last post, I fired up isql ready to start issuing queries against the MOT data set, via the BI Connector.

I submitted a SQL statement to query all cars with a recorded mileage of over 500,000 miles, selecting specific columns only.

> SELECT make, model, test_mileage FROM testresults WHERE test_mileage > 500000;

The results were correctly returned in isql, but I was more interested to see what MongoDB was asked to do on the server-side, to fulfil this request. So using the Mongo Shell I queried the system profile collection to show the last recorded entry, displaying the exact request that MongoDB had received as a result of the translated SQL query.

> db.system.profile.find({ns: "mot:testresults"}).sort({$natural: -1}).limit(1).pretty()

As you can see in the output, the BI Connector has indeed pushed down to the database, the WHERE clause, plus the projection to return only specific fields. The profiler output shows that this has been achieved by the BI Connector, by assembling an Aggregation Pipeline.

This is great to see. Most of the work to process the SQL query is being done at the database level, reducing the amount of data (less rows and less columns) that is returned to the ODBC client for final processing, and also enabling database indexes to be leveraged for maximum performance.

Song for today: End Come Too Soon by Wild Beasts

Tuesday, December 15, 2015

Accessing MongoDB data using SQL / ODBC on Linux

In my previous post I showed how generic Windows clients can use ODBC to query data from MongoDB using the new BI Connector. I'm not really a Windows type person though and feel more at home with a Linux desktop. Therefore, in this post, I will show how easy it is to use ODBC from Linux (Ubuntu 14.04 in my case) to query MongoDB data using SQL. I've used the same MOT UK car test results data set as my last post.

First of all I needed to install the Linux ODBC packages plus the PostgreSQL ODBC driver from the package repository.

$ sudo apt-get install unixodbc-bin unixodbc odbc-postgresql

Then I googled how to use ODBC and the PostgreSQL driver to query an ODBC data source. Surprisingly, I didn't find much quality information out there. However, looking at the contents of the "odbc-postgresql" package....

$ apt-file list odbc-postgresql showed some bundled documents including...


Upon opening this text file I found pretty much everything I needed to know, to get going. I really should learn to RTFM more often!

The next step, as documented in that README, was to register the PostgreSQL ANSI & Unicode ODBC drivers in the /etc/odbcinst.ini file, using a pre-supplied template file that contained the common settings.

$ sudo odbcinst -i -d -f /usr/share/psqlodbc/odbcinst.ini.template

Then I needed to create the /etc/odbc.ini file where data sources can be registered. Here I used a skeleton template again.

$ sudo cat /usr/share/doc/odbc-postgresql/examples/odbc.ini.template >> ~/.odbc.ini
$ sudo mv .odbc.ini /etc/odbc.ini

However, this particular template only includes dummy configuration settings, so I needed to edit this file to register the remote MongoDB BI Connector's details properly.

$ sudo vi /etc/odbc.ini

$ cat /etc/odbc.ini 
Description         = MOT Test Data
Driver              = PostgreSQL Unicode
Trace               = No
TraceFile           = /tmp/psqlodbc.log
Database            = mot 
Servername          =
UserName            = mot
Password            = mot
Port                = 27032
ReadOnly            = Yes
RowVersioning       = No
ShowSystemTables    = No
ShowOidColumn       = No
FakeOidIndex        = No
ConnSettings        =

( forgive my poorly secure username/password of "mot/mot". ;) )

Now I was ready to launch the "isql" (Interactive SQL) tool, that is bundled with the Linux/UNIX ODBC package, to be able to issue SQL queries against my remote MongoDB BI Connector data source.

$ isql mot mot mot

( first param is data source name, second param is username, third param is password )

As you can see in the screenshot below, using "isql" I was then able to easily issue arbitrary SQL commands against MongoDB and see the results.

And that's it. Very simple, and I now have a powerful command line SQL tool at my disposal for further experiments in the future.  :-)

Song for today: Swallowtail by Wolf Alice

Friday, December 11, 2015

Accessing MongoDB data from SQL / ODBC on Windows, using the new BI Connector

The latest enterprise version of MongoDB (3.2) includes a new BI Connector to enable business intelligence, analytics and reporting tools, that only "speak" SQL, to access data in a MongoDB database, using ODBC. Most of the examples published so far show how to achieve this using rich graphical tools, like Tableau. Therefore, I thought it would be useful to show here that the data is accessible from any type of tool, that is capable of issuing SQL commands via an ODBC driver. Even from Microsoft's venerable Excel spreadsheet application. Believe it or not, I still come across organisations out there that are using Excel to report on the state of their business!

For my example, I loaded a MongoDB database with the anonymised MOT tests results data, that the UK government makes freely available to use. As an explanation for non-residents of the UK, MOT tests are the annual inspections that all UK road-going cars and other vehicles have to go through, to be legal and safe. There are millions of these car test records recorded every year, and they give a fascinating insight into the types and ages of cars people choose to drive in the UK.

First I loaded the CSV file based MOT data sets into a MongoDB 3.2 database, using a small Python script I wrote, with each document representing a test result for a specific owner's car for a specific year. Below is an example of what one test result document looks like in MongoDB:

I then followed the online MongoDB BI Connector documentation to configure a BI Connector server to listen for ODBC requests for the "mot" database and translate these to calls to the underlying MongoDB "testresults" collection. I just used the default DRDL ("Document Relational Definition Language") schema file that was automatically generated by the "mongodrdl" command line utility (a utility bundled with the BI Connector).

Then, on a separate desktop virtual machine running Windows 10, I downloaded the latest PostgreSQL ODBC driver installer for Windows and installed it.

With the ODBC driver installed, I then proceeded to define a Windows ODBC Data Source to reference the MOT database that I was exposing via by the BI Connector (running on a remote machine).

By default the BI Connector (running on a machine with IP address, in my case), listens on port 27032. Before hitting the Save button, I hit the Test button to ensure that the Windows client could make a successful ODBC connection to the BI Connector.

With the new ODBC Data Source now configured (shown in screenshot above), I then launched Microsoft Excel so that I could use this new data source to explore the MOT test data.

Excel's standard query wizard was able to use the ODBC data source to discover the MongoDB collection's "schema". I chose to include all the "fields" in the query.

I thought it would be useful to ask for the MOT test results to be ordered by Test Year, followed by Car Make, followed by Car Model.

Finally, upon pressing OK, Excel presented me with the results of the SQL/ODBC query, run directly against the MOT test data, sourced from the MongoDB collection.

Excel, then gave me many options, including settings to say whether to periodically refresh the data from source, and how often. I was also able to open Microsoft's built-in Query Builder tool to modify the query and execute again

That's pretty much it. It's straight forward to configure a Windows client to access data from MongoDB, via MongoDB's new BI Connector, using ODBC.

Song for today: Dust and Disquiet by Caspian

Friday, September 26, 2014

Tracking Versions in MongoDB

I'm honoured to have been asked by Asya to contribute a guest post to her series on Tracking Versions in MongoDB.

Here's Asya's full series on the topic, which I recommend reading in order:

  1. How to Track Versions with MongoDB
  2. How to Merge Shapes with Aggregation Framework
  3. Best Versions with MongoDB
  4. Further Thoughts on How to Track Versions with MongoDB  (my guest post)

Song for today: Someday I Will Treat You Good by Sparklehorse

Thursday, September 11, 2014

Java Using SSL To Connect to MongoDB With Access Control

In this post I've documented the typical steps you need to enable a Java application to connect to a MongoDB database over SSL. Specifically, I show the so-called "one-way SSL" pattern, where the server is required to present a certificate to the client but the client is not required to present a certificate to the server. Regardless of authentication, communication between client and server is encrypted in both directions. In this example, the client actually authenticates with the database, albeit using a username/password rather than presenting a certificate. In addition, I also show how the client application can run operations against a database that has access control rules defined for it.

Note: Ensure you are running the Enterprise version of MongoDB to be able to configure SSL.

1. Generate Key+Certificate and Configure With MongoDB For SSL

Create a key and certificate:

$ su -
$ cd /etc/ssl/
$ openssl req -new -x509 -days 365 -nodes -out mongodb-cert.crt -keyout mongodb-cert.key
$ cat mongodb-cert.key mongodb-cert.crt > mongodb.pem
$ exit

Add the following entries to mongodb.conf (or equivalent if setting command line options):


Start the mongod database server.

Test the SSL connection from the Mongo shell.

$ mongo --ssl

2. Configure Java Client For SSL

Create a new Java trust store in the local application's root directory, importing the certificate generated from the previous section. This is necessary because in this example a self-signed certificate is used.

$ keytool -import -alias "MongoDB-cert" -file "/etc/ssl/mongodb-cert.crt" -keystore truststore.ts -noprompt -storepass "mypasswd"
$ keytool -list -keystore truststore.ts -storepass mypasswd

In the client application's Java code, add the "ssl=true" parameter to the MongoDB URI to tell it to use an SSL connection, eg:

MongoClientURI uri = new MongoClientURI("mongodb://localhost:27017/test?ssl=true");

Modify the command line or script that runs the 'Java' executable for the client application, and add the following JVM command line property. This will allow the application to validate the server's certificate against the new trust store, when using SSL.

Run the Java client application to test that the SSL connection works.

3. Configure MongoDB database with Access Control Rules

Using the Mongo shell, connect to the database, and define an 'administrator user', plus a 'regular' user with an access control rule defined to enable it to have read/write access to a database (called 'test' in this example).

$ mongo --ssl
> use admin
> db.addUser({user: "admin", pwd: "admin", roles: [ "userAdminAnyDatabase"]})
> use test
> db.addUser({user: "paul", pwd: "password", roles: ["readWrite", "dbAdmin"]})

Add the following entry to mongodb.conf (or equivalent if setting command line options):


Restart the mongod database server to pick up this change.

Test the SSL connection to the 'test' database with username/password authentication, from the Mongo shell.

$ mongo test --ssl -u paul -p password

If the user specified doesn't have permissions to read collections in the database, an error similar to below will occur when trying to run db.mycollection,find(), for example.

error: { "$err" : "not authorized for query on test.mycollection", "code" : 13 }

4. Configure Java Client To Authenticate

Modify the client application code to include the username and password in the URL, eg:

MongoClientURI uri = new MongoClientURI("mongodb://paul:password@localhost:27017/test?ssl=true");

Run the Java client application to test that using SSL with username/password authentication works and has the rights to access the sample database.

If the user specified doesn't have correct permissions, an exception similar to below will occur.

Exception in thread "main" com.mongodb.MongoException: not authorized for query on test.system.namespaces
  at com.mongodb.MongoException.parse(
  at com.mongodb.DBApiLayer$MyCollection.__find(
  at com.mongodb.DBApiLayer$MyCollection.__find(
  at com.mongodb.DB.getCollectionNames(
  at MongoTest.main(

Note: In a real Java application, you would invariably build the URL dynamically, to avoid hard-coding a username and password in clear text in code.

Song for today: My Sister in 94 by The Paradise Motel

Friday, May 2, 2014

MongoDB Connector for Hadoop with Authentication - Quick Tip

If you are using the MongoDB Connector for Hadoop and you have enabled authentication on your MongoDB database (eg. auth=true) you may find that you are prevented from getting data in to or out of the database.

You may have provided the username and password to the connector (eg. mongo.input.uri = "mongodb://myuser:mypassword@host:27017/mytestdb.mycollctn"), for an Hadoop Job that pulls data from the database. The connector will authenticate to the database successfully, but early in in the job run, the job will fail with an error message similar to the following:

14/05/02 13:17:01 ERROR util.MongoTool: Exception while executing job... com.mongodb.hadoop.splitter.SplitFailedException: Unable to calculate input splits: need to login
        at com.mongodb.hadoop.MongoInputFormat.getSplits(
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(
        at org.apache.hadoop.mapreduce.Job$

This is because the connector needs to run the MongoDB-internal splitVector DB command, under the covers, to work out how to split the MongoDB data up into sections ready to distribute across the Hadoop Cluster. However, by default, you are unlikely to have given sufficient privileges to the user, used by the connector, to allow this DB command to be run. This issue can be simulated easily by opening a mongo shell against the database, authenticating with your username and password and then running the splitVector command manually. For example:

> var result = db.runCommand({splitVector: 'mytestdb.mycollctn', keyPattern: {_id: 1}, 
                                                         maxChunkSizeBytes: 32000000})
> result
"ok" : 0,
"errmsg" : "not authorized on mytestdb to execute command
    splitVector: "mytestdb.mycollctn", keyPattern: { _id: 1.0 },
    maxChunkSizeBytes: 32000000.0 }",
"code" : 13

To address this issue, you first need to use the mongo shell, authenticated as your administration user, and run the updateUser command to give the connector user the clusterManager role, to enable the connector to run the DB commands it requires. For example:

use mytestdb
db.updateUser("myuser", {
                   roles : [
                      { role: "readWrite", db: "mytestdb" },
                      { role : "clusterManager", db : "admin"  }

After this, your Hadoop jobs with the connector should run fine.

Note: In my test, I ran Cloudera CDH version 5, MongoDB version 2.6 and Connector version 1.2 (built with target set to 'cdh4').

Song for today: Spanish Sahara by Foals

Wednesday, April 9, 2014

Parallelising MongoDB Aggregation on a Sharded Database

In my last blog post, I explored how I could speed up a MapReduce-style aggregation job, by parallelising the work over subsets of a collection, using multiple threads, for a non-sharded MongoDB database. In this post, I look at how I took the same test case, and a similar approach, to see if I could achieve a speed up when aggregating the 10 million documents stored in a sharded MongoDB database.

Confession time....

After my success in speeding up an aggregation on a non-sharded database, I assumed this second phase of my investigation would be a walk in the park and I'd quickly march on to a conclusion exhibiting even greater gains, in a sharded environment. However, things didn't quite turn out that way, and I ended up spending considerably more time on this investigation, than I'd anticipated. More on that later.

First of all though, I needed to insert the 10 million randomly generated documents into, a now, sharded collection deployment (3 shards all running on a single Linux laptop), which of course, was straight-forward.

Identifying Data Subsets In A Sharded Environment

Once the sharded collection was populated, I had to consider what changes to make to my JavaScript/mongo-Shell code, to deal with differences between non-sharded and sharded environments. It turned out that this was one of the more straight-forward and intuitive changes to make.

In the sharded MongoDB environment, I didn't have to take an educated guess on how to split up the data into ranges or use the undocumented and time-consuming splitVector command, as I did for the non-sharded tests. MongoDB is already working hard to manage the shard-key for me and how best to evenly distribute subsets (chunks) across each shard. Therefore all the information I needed was already available to me in the 'config' database, accessible via the mongos router.

For the sharded test collection, a quick query of the config database using the mongo Shell (connected to a mongos), told me how the collection was currently distributed.

mongos> use config
mongos> db.chunks.find({ns: "mrtest.rawdata"}, {_id: 0, shard: 1, min: 1, max: 1}).sort({shard: 1})

{ "min" : { "dim0" : 635504 }, "max" : { "dim0" : 702334 }, "shard" : "s0" }
{ "min" : { "dim0" : 702334 }, "max" : { "dim0" : 728124 }, "shard" : "s0" }
{ "min" : { "dim0" : 728124 }, "max" : { "dim0" : 759681 }, "shard" : "s0" }
{ "min" : { "dim0" : 759681 }, "max" : { "dim0" : 827873 }, "shard" : "s0" }
{ "min" : { "dim0" : 827873 }, "max" : { "dim0" : 856938 }, "shard" : "s0" }
{ "min" : { "dim0" : 856938 }, "max" : { "dim0" : 893410 }, "shard" : "s0" }
{ "min" : { "dim0" : 893410 }, "max" : { "dim0" : 942923 }, "shard" : "s0" }
{ "min" : { "dim0" : 942923 }, "max" : { "dim0" : 999996 }, "shard" : "s0" }

{ "min" : { "dim0" : { "$minKey" : 1 } }, "max" : { "dim0" : 42875 }, "shard" : "s1" }
{ "min" : { "dim0" : 42875 }, "max" : { "dim0" : 91602 }, "shard" : "s1" }
{ "min" : { "dim0" : 91602 }, "max" : { "dim0" : 149588 }, "shard" : "s1" }
{ "min" : { "dim0" : 149588 }, "max" : { "dim0" : 197061 }, "shard" : "s1" }
{ "min" : { "dim0" : 197061 }, "max" : { "dim0" : 257214 }, "shard" : "s1" }
{ "min" : { "dim0" : 471994 }, "max" : { "dim0" : 520681 }, "shard" : "s1" }
{ "min" : { "dim0" : 569205 }, "max" : { "dim0" : 602306 }, "shard" : "s1" }

{ "min" : { "dim0" : 257214 }, "max" : { "dim0" : 305727 }, "shard" : "s2" }
{ "min" : { "dim0" : 305727 }, "max" : { "dim0" : 363357 }, "shard" : "s2" }
{ "min" : { "dim0" : 363357 }, "max" : { "dim0" : 412656 }, "shard" : "s2" }
{ "min" : { "dim0" : 412656 }, "max" : { "dim0" : 471994 }, "shard" : "s2" }
{ "min" : { "dim0" : 520681 }, "max" : { "dim0" : 569205 }, "shard" : "s2" }
{ "min" : { "dim0" : 602306 }, "max" : { "dim0" : 635504 }, "shard" : "s2" }
{ "min" : { "dim0" : 999996 }, "max" : { "dim0" : { "$maxKey" : 1 } }, "shard" : "s2" }

                   (I added a newline between the chunks belonging to each shard, for reader clarity; 
                    also the elements highlighted in bold are discussed in the next section of this post)

So I already had all the information available at my finger tips, which defined an 'even-ish' spread of data-subsets (chunks) of the collection. In this case, I could see there were 22 chunks, spread over 3 shards. So I needed to change my JavaScript code to create multiple threads per shard, each targeting a portion of the chunks belonging to a shard. To build this list of shards and their owned chunks, at runtime, in my JavaScript code I used an aggregation to group by shard.

// Query the config DB, grouping data on all the chunks for each shard
db = db.getSiblingDB('config');
var shardsWithChunks = db.chunks.aggregate([
    {$match: {
        "ns": "mrtest.rawdata"
    {$sort: {
        "min.dim0": 1
    {$group: {
        _id: "$shard",
        chunks: {$push: {
              min: "$min.dim0",
              max: "$max.dim0"

My code then used a loop like below, to cycle through each shard, kicking off multiple sub-aggregations in different threads, where each thread uses a range query that resolves to a portion of chunks currently believed to live on that shard.

shardsWithChunks.forEach(function(shard) {
    // For each new thread, collect together a set of some of the chunks from the
    // shard.chunks array and perform the modified aggregation pipeline on it

Decorating a Pipeline to Enable a Thread to Target Just One Shard

The code I'd written to modify a pipeline, to allow a thread to operate on a subset of data in a non-sharded also had to be changed, to allow for subtleties only present when the data is sharded. Taking the example of wanting to spawn 4 threads per shard, for shard 's1' (shown in the output from db.chunks.find() earlier in this post), I had to divide out the 7 chunks it currently held, between the 4 threads. So 3 of the threads would need to target 2 chunks each, and 1 thread would target the remaining 1 chunk.

Notice that in the list of chunks returned by db.chunks.find() earlier, not all chunks on a particular shard are contiguous. For example, in the results marked in bold, I highlighted that one chunk ends at 257214 on shard 's1', followed by a chunk on 's1' starting at 471994. However, the chunk starting at  257214 is on shard 's2'. This means that the code to modify the aggregation pipeline, needs to query the start and end range for each chunk, rather than just a range from the start of the first chunk to the end of the last chunk. Below you can see the naive, incorrect pipeline that my code could generate, on the left, and the correct pipeline that the code should and will generate, on the right.

// BAD
   {$match: {
      dim0: {
         $gte: 197061,
         $lt : 520680
   {$group: {
      _id: $dim0,
      value: {$sum: 1}
  // GOOD
     {$match: {
        $or: [
              dim0: {
                 $gte: 197061,
                 $lt : 257214
              dim0: {
                 $gte: 471994,
                 $lt : 520681
     {$group: {
        _id: "$dim0",
        value: {$sum: 1}

To decorate the original aggregation pipeline in this way, I needed to modify my original aggregate_subset() JavaScript function, to build up an $or expression of ranges, for the $match part prepended to the pipeline. The fully modified JavaScript code I produced, to run in the mongo Shell and perform all these actions can be viewed in the zip file available here.

It is worth mentioning that, if during the aggregation run, the cluster happens to migrate any chunks, it would not be a big deal. The modified aggregation pipelines would still function correctly and return the correct results. However, a couple of the threads would just take longer to complete, as they would end up being executed against two shards, not one, by mongos and the query optimiser, to collect the requested data.

Sub-optimal mongos Shard Routing For Some Edge-case Range Queries

One thing I noticed when running explain (eg. using db.coll.aggregate(pipeline, {explain: true})), was that even when running the modified pipeline, shown above, which should be targeting chunks in only one shard, mongos actually chooses to target two shards (both 's1' and 's2'). The correct result is still generated, but it just takes a little longer, as twice the amount of shards are hit than necessary. Essentially the query optimiser is not correctly identifying the range boundary as belonging to just one shard, when the range boundary falls exactly at the intersection of two chunks that happen to reside on different shards. Specifically, the problem is for a sharded collection that has a single (non-compound) shard key, and $lte or $gte is used in the range query. I created a JIRA ticket logging this bug, but with a low priority, as functionally an accurate result is returned, and also, it is possible to use a workaround. My original section of code and the modified version, to use this workaround, is shown below.

// mongos/optimiser targets 2 shards
   dim0: {
      $gte: 197061,
      $lt : 257214
  // mongos/optimiser targets 1 shard
     dim0: {
        $gte: 197061,
        $lte: 257213

By modifying the range criteria to be '$lte: 257213', in my JavaScript code, only one shard ('s1') is then correctly routed to. However, this does mean that any generic query code, in a more 'real-world' application, has to be explicitly aware of field types (no explicit database schema) and the knowledge that the field actually contains an integer that can possibly be decremented by 1, to help encourage optimal sharded query routing.

Initial Results

When generating my aggregation test results and capturing the execution time, I first wanted to establish the performance of the normal unchanged aggregation on the sharded database. I felt it would be interesting to see if a regular, unchanged aggregation, ran faster once sharded. When I ran the unchanged aggregation against the sharded database, I was a little surprised to see it execute in 27 seconds. Compare this with the execution time of just 23 seconds when I ran the aggregation against the non-sharded collection. At first I was surprised that the aggregation took longer to complete. Later, upon reflection, I came to realise why, and I will discuss these reasons later, in the conclusion of the blog post. However, for the moment my challenge was just to try to beat the time of 27 seconds, with an my 'optimised parallel aggregations' approach.

I then ran my newly modified JavaScript code (see code in zip file), executing parallel aggregation threads targeted at groups of chunks on each shard (4 threads per shard).  This test executed in 20 seconds. Again I was a little disappointed because this yielded a roughly 25% speed-up in the sharded environment, whereas before, in a non-sharded environment, I was achieving over a 40% speed-up.

Attempts on Improvement

At this point, I started to hypothesise on potential reasons why my 'optimisations' couldn't speed-up an aggregation as quickly for a sharded database. Below are some of the main hypotheses I came up with and what happened when I tested each one.
  1. Is running the test via JavaScript in the mongo Shell introducing a bottleneck?  I re-wrote my application code, to decorate the pipeline and run separate aggregation threads, in Python instead, using the PyMongo driver (also included in the code zip file). However, there was no noticeable change in performance, when re-running the test with the Python code. 
  2. Is the single mongos router, used by all 12 aggregation threads, acting as a bottleneck?  So I started up 12 separate mongos instances and modified the Python version of the code, to force each of the spawned 12 threads to connect to its own dedicated mongos. The execution time that resulted was only marginally better, and so I concluded that a single shared mongos was not really a major bottleneck
  3. Is the role of mongos (ie. shard routing/coordination and partial aggregation processing) the main cause of added latency?  So at this point I went a bit off-piste and did something no-one should never do. I modified the Python code again, but this time to make each spawned thread connect directly to the mongod of the shard, rather than going via mongos. My code achieved this by first querying the 'config' database to find out the direct URL for each shard's mongod host. This time I did see a significant decrease in execution time, down to 15 seconds. This is only 2 seconds off the 13 seconds achieved on the non-sharded database. The 2 second different can probably be explained by the fact that I am spawning 12 threads on a now overloaded host machine, running 3 mongod shards, 1 config mongod database and 1 mongos, versus the non-sharded test that runs just 4 threads against a single mongod. However, running the aggregations directly against shards is a bad thing to do, for two reasons: (1) The $match may miss a chunk, read an orphan chunk or read only part of a split chunk, if any chunk splitting/migration has occurred during the process, resulting in an incorrect result. (2) the aggregation $out operator will cause a collection to be written to the local shard database, but the config databases and all the mongos's will have no awareness of its existence (indeed, I witnessed a 'confused' system after trying this). Therefore, I have not included the code modifications in the code zip file, that I made to pull this stunt, and I vow never to try such a thing again! Still, the results were interesting, even if unusable.  :-)

It is probably worth noting that, for all these test variations, I consistently observed the following characteristics:
  • top would show all the host CPUs running at around 99% for approximately the first 3/4 of the test run.
  • mongostat would show zero page faults (inference being that all the working-set is memory-resident) and only single-figure locked database percentages (ie. sub 10%).
  • iostat would show, at its peak, less than 15% disk usage for the host SSD.

Results Summary

So finally my prolonged theorising and testing came to an end. The table below shows the results I achieved for the different variations, ultimately demonstrating a ~30% performance speed up (27 seconds down to 19 seconds), when using my parallel sub-aggregations optimisation via multiple mongos's against 3 shards. This was less than the 40+% performance increase I achieved in my non-sharded database tests.

Aggregation Description Data-Subsets per Shard Spawned Threads per Shard Number of mongos's Execution Time
Regular unchanged (single-threaded) 0 0 1 27 secs
Serial sub-aggregations (single-threaded) 2 0 1 32 secs
Serial sub-aggregations (single-threaded) 4 0 1 32 secs
Parallel sub-aggregations, single mongos 2 2 1 21 secs
Parallel sub-aggregations, single mongos 4 4 1 20 secs
Parallel sub-aggregations, multiple mongos's 2 2 6 20 secs
Parallel sub-aggregations, multiple mongos's 4 4 12 19 secs
Parallel sub-aggregations, direct to mongod's 2 2 0 16 secs
Parallel sub-aggregations, direct to mongod's 4 4 0 15 secs

For interest, I've included in the table, the results from running the parallel aggregations directly against shard mongod's (red for 'danger'!). However, as discussed earlier, doing this for real in a production system will result in bad things happening. You have been warned!


I've demonstrated that the execution time for certain MapReduce-style Aggregation workloads on a sharded database can be reduced, by parallelising the aggregation work over collection subsets, using multiple threads. However, for this particular test case, the speed-up is less pronounced, when compared to the same test on a non-sharded database.

On reflection, it is probably naive to expect a reduction in aggregation execution time, when moving a collection from a non-sharded database to a sharded one. If you think about the main benefits of sharding, sharding is NOT really about reducing individual operation latency. The main purpose of sharding is to enable a database to be able to scale-out to cope with a significant increase in data volumes and/or a significant increase in read and/or write throughput demand. To that effect, I would expect a sharded database, spread out over multiple machines, to be able to sustain a much higher throughput of concurrent aggregation operations, compared with a non-sharded database confined to a single machine (perhaps that can be a test and blog post for another day!).

Lastly, I also do suspect, with a combination of a much larger data-set, and with the shards, mongos's and client app spread out over separate dedicated host machines, the aggregation test on a sharded database may achieve equality or even beat the execution time compared with a non-sharded database. I even feel this would be the case for an unchanged, non-optimised aggregation pipeline, due to MongoDB's built-in ability to run the first part of a pipeline directly on each shard, in parallel. Unfortunately, time has run out for me to try this at the moment, so perhaps it's something for me to revisit in the future.

Song for today: Perth by Bon Iver

Friday, March 14, 2014

How to speed up MongoDB Aggregation using Parallelisation

A little while ago, Antoine Girbal wrote a great blog post describing how to speed up a MonogDB MapReduce job by 20x. Now that MongoDB version 2.6 is nearly upon us, and prompted by an idea from one of my very smart colleagues, John Page, I thought I'd investigate whether MongoDB Aggregation jobs can be sped up using a similar approach. Here, I will summarise the findings from my investigation.

To re-cap, the original MapReduce tests looked for all the unique values in a collection, counting their occurrences. The following improvements were incrementally applied by Antoine against the 10 million documents in the collection:

  1. Define a 'sort' for the MapReduce job
  2. Use multiple-threads, each operating on a subset of the collection 
  3. Write out result subsets to different databases
  4. Specify that the job should use 'pure JavaScript mode'
  5. Run the job using MongoDB 2.6 (an 'in-development' version) rather than 2.4

With all these optimisation in place, Antoine was able to reduce a 1200 second MapReduce job to just 60 seconds! Very impressive.

I expect that one of the reasons Antoine was focussing on MapReduce in MongoDB, was because MongoDB limited the output size of the faster Aggregation framework to 16 MB. With MongoDB 2.6, this threshold is removed. Aggregations generate multi-document results, rather than a single document and these can be navigated by the client applications using a cursor. Alternatively, an output collection can be specified for the Aggregation framework to write directly to. Thus, version 2.6 presents the opportunity to try the MapReduce test scenario with the Aggregation framework instead, to compare performance. In this blog post I do just that, focussing purely on testing a single, non-sharded database.

Re-running the optimised MapReduce

Before I tried running the test with the Aggregation framework, I thought I'd quickly run and time Antoine's optimised MapReduce job on my test machine to normalise any performance difference that occurs due to hardware disparity (for reference, my host machine is a Linux x86-64 laptop with SSD, 2 cores + hyper-threading, running MongoDB 2.6.0rc1).

I first had to insert the 10 million documents, as Antoine did, using the mongo Shell, with each insert containing a single field (‘dim0’) whose value is a random number between 0 and 1 million:

> for (var i = 0; i < 10000000; ++i) {
      db.rawdata.insert({dim0: Math.floor(Math.random()*1000000)});
> db.rawdata.ensureIndex({dim0: 1})

I then ran Antoine's fully optimised MapReduce job, which completed in 32 seconds, versus 63 seconds on Antoine's host machine.

Running a normal, non-optimised Aggregation

So the first thing I needed to establish is how much faster or slower the equivalent Aggregation job is, compared with the 'optimised' MapReduce job I'd just run on the same hardware. The Aggregation pipeline is actually very simple to implement for this test. Below you can see the MapReduce code and the Aggregation pipeline equivalent:

map: function() {
    emit(this.dim0, 1); 

reduce: function(key, values {
    return Array.sum(values);
   var pipeline = [
       {$group: {
           _id: "$dim0",
           value: {$sum: 1}

The pipeline basically just groups on the field 'dim0', counting the number of occurrences for this value. So I ran this Aggregation against my database, called 'mrtest', containing the random generated 'rawdata' collection:

> db = db.getSiblingDB('mrtest');
> db['rawdata'].aggregate(pipeline);

The aggregation completed in 23 seconds, versus 32 seconds for running the 'optimised' equivalent MapReduce job on the same hardware. This shows just how quick the standard out-of-the-box Aggregation capability is in MongoDB!

Running an optimised Aggregation

So my challenge was clear now: See if I can apply some of Antoine's tricks from speeding up MapReduce, to the Aggregation framework, to reduce this test completion time from 23 seconds.

I considered which of the original 5 optimisations could realistically be applied to optimising the test with the Aggregation framework:

  1. YES - Define a 'sort' for the job
  2. YES - Use multiple-threads, each operating on a subset of the collection 
  3. NO - Write out result subsets to different databases  (the Aggregation framework expects to write out a collection to the same database as the source collection being read)
  4. NO - Specify that the job should use 'pure JavaScript mode'  (doesn't apply here as Aggregations run within C++ code)
  5. NO – Move to using a newer version of MongoDB, 2.4 to 2.6  (we are already at the latest version with my test runs, which is undoubtedly helping achieve the 23 second result already)

So the main thing I needed, in addition to including a 'sort', was to somehow split up the aggregation into parts to be run in parallel, by different threads, on subsets of the data. This would require me to prepend the existing pipeline with operations to match a range of documents in the collection and to sort the matched subset of documents, before allowing the original pipeline operation(s) to be run. Also, I would need to append an operation to the pipeline, to specify that the result should go to a specific named collection. So for example, for one of the subsets of the collection (documents with 'dim0' values ranging from 524915 to 774848), the following table shows how I need to convert the original pipeline:

    {$group: {
        _id: "$dim0",
        value: {$sum: 1}
       {$match: {
           dim0: {
               $gte: 524915,
               $lt : 774849
       {$sort: {
           dim0: 1
       {$group: {
           _id: $dim0,
           value: {$sum: 1}

A similar pipeline, but with different range values and output collection name, needs to be produced for each thread I intend to run. Each of these pipelines need to be invoked with db.coll.aggregate() in parallel, from different threads..

To achieve this in a generic way, from the Shell, I quickly wrote a small JavaScript function to 'decorate' a given pipeline with $match and $sort prepended and $out appended. The function ends by running MongoDB’s aggregate() function with the modified pipeline, against the subset of matching data.

// Define new wrapper aggregate function, to operate on subset of docs 
// in collection after decorating the pipeline with pre and post stages
var aggregateSubset = function(dbName, srcClltName, tgtClltName,
                                  singleMapKeyName, pipeline, min, max) { 

    var newPipeline = JSON.parse(JSON.stringify(pipeline)); //deep copy
    var singleKeyMatch = {};
    singleKeyMatch[singleMapKeyName] = (max < 0) ? {$gte: min} 
                                                 : {$gte: min, $lt: max};
    var singleKeySort = {};
    singleKeySort[singleMapKeyName] = 1;

    // Prepend pipeline with range match and sort stages
        {$match: singleKeyMatch},
        {$sort: singleKeySort}  

    // Append pipeline with output to a named collection
       {$out: tgtClltName}

    // Execute the aggregation on the modified pipeline
    db = db.getSiblingDB(dbName);
    var result = db[srcClltName].aggregate(newPipeline);
    return {outCollection: tgtClltName, subsetResult: result};

This function takes the name of the database, source collection, and target collection, plus the original pipeline, plus the minimum and maximum values for the subset of the collection. The keen-eyed amongst you will have noticed the 'singleMapKeyName' parameter too. Basically, my example function is hard-coded to assume that the pipeline is mapping and sorting data anchored on a single key ('dim0' in my case, or this could be group-by 'customer', as another example). However, in other scenarios, a compound key may be being used (eg. group-by customer + year). My function would need to be enhanced, to be able to support compound keys.

Once I'd completed and tested my new aggregateSubset() JavaScript function. I was ready to run some test code in the Shell, to call this new function from multiple threads. Below is the code I used, specifying that 4 threads should be spawned. My host machine has 4 hardware threads, so this is probably not a bad number to use, even though I actually chose this number because it matched the thread count used in Antoine’s original MapReduce tests.

// Specify test parameter values
var dbName = 'mrtest';
var srcClltName = 'rawdata';
var tgtClltName = 'aggdata';
var singleMapKeyName = 'dim0';
var numThreads = 4;

// Find an even-ish distributed set of sub-ranges in the collection
var singleKeyPattern = {};
singleKeyPattern[singleMapKeyName] = 1;
var splitKeys = db.runCommand({splitVector: dbName+"."+srcClltName, 
      keyPattern: singleKeyPattern, maxChunkSizeBytes: 32000000}).splitKeys;

// There are more sub-ranges than threads to run, so work out 
// how many sub-ranges to use in each thread
var inc = Math.floor(splitKeys.length / numThreads) + 1;
var threads = [];

// Start each thread, to execute aggregation on a subset of data 
for (var i = 0; i < numThreads; ++i) { 
    var min = (i == 0) ? 0 : splitKeys[i * inc].dim0;
    var max = (i * inc + inc >= splitKeys.length) ? -1 
                                                  : splitKeys[i * inc + inc].dim0;
    var t = new ScopedThread(aggregateSubset, dbName, srcClltName,
                 tgtClltName + min, singleMapKeyName, pipeline, min, max);

// Wait for all threads to finish and print out their summaries
for (var i in threads) { 
    var t = threads[i];

This multi-threaded aggregation completed in 13 seconds, versus 23 seconds for running the normal 'non-optimised' version of the aggregation pipeline. This optimised pipeline involved writing to 4 different collections, each named 'aggdataN' (eg. 'aggdata374876').

Okay, this isn't the stunning 20x improvement, which Antoine achieved for MapReduce. However, given that the Aggregation framework is already a highly optimised MongoDB capability, I don't believe a 40% speed up is to be sniffed at. Also, I suspect that on host machines with more cores (hence requiring the thread count parameter used in my example to be increased), and with a much larger real-world data set, even more impressive results would be achieved, such as a 2x or 3x improvement.

Some observations

As you'll notice I use a similar pattern to Antoine's, whereby I use the undocumented splitVector command to get a good idea of what is a well balanced set of ranges in the source collection. This is especially useful if the distribution of values in a collection is uneven and hard to predict. In my case, the distribution is pretty even and for 4 threads I could have got away with hard-coding ranges of 0-250000, 250000-500000, 500000-750000 & 750000-1000000. However, more real world collections will invariably have a much more uneven spread of values for a given key. With a naïve approach to partitioning up the data, one thread could end up doing much more work than the other threads. In real world scenarios, you may have other ways of predicting balanced subset ranges, based on the empirical knowledge you have of your data. You would then use a best guess effort to specify the split points for partitioning your collection. If you do choose to use the splitVector utility for a real application, I would suggest caching the value and only re-evaluating it intermittently, to avoid the latency it would otherwise add to the execution time of your aggregations.

Also, you'll notice I followed Antoine's approach of using the undocumented ScopedThread Shell utility object, to spawn new threads. If you are writing a real application to run an aggregation, you would instead use the native threading API of your chosen programming language/platform, alongside the appropriate MongoDB language driver.

One optimisation I wish I could have made, but couldn't was 'write out result subsets to different databases'. In MongoDB 2.6, one can specify an output collection name, but not an output database name, when calling aggregate(). My belief that different threads outputting to different databases would increase performance, is based on some observations I made during my tests. When running 'top', 'mongostat' and 'iostat' on my host machine whilst the optimised aggregation was running, I initially noticed that ‘top’ was reporting all 4 ‘CPUs’  as maxing out at around 95% CPU utilisation, coming down to a lower amount in the later stages of the run. At the point when CPU usage came down, ‘mongostat’ was reporting that the database 'mrtest' was locked for around 90% of the time. At the same point, disk utilisation was reported by ‘iostat' as being a modest 35%. This tells me that for part of the aggregation run, the various threads are all queuing waiting for the write lock, to be able to insert result documents into their own collections in the same database. By allowing different databases to be used, the write lock bottleneck for such parallel jobs would be diminished considerably, thus allowing the document inserts, and hence, the overall aggregation, to complete even quicker. I have created a JIRA enhancement request, asking for the ability for the Aggregation framework's $out operator to specify a target database, in addition to a target collection.


So I've managed to show that certain MapReduce-style workloads can be executed faster if using Aggregation in MongoDB. I showed the execution time can be reduced even further, by parallelising the aggregation work over subsets of the collection, using multiple threads. I demonstrated this for a single non-sharded database. In the future, I intend to investigate how a sharded database can be used to drive even better parallelisation and hence lower execution times, which I will then blog about.

Song for today: The Rat by The Walkmen