MongoDB Leaf Ask Asya

Answers to all your burning questions about MongoDB.

Oct 29, 2014

Question:

Is there a way that I can prevent slow queries in MongoDB? I'd like to be able to set something on the server that will kill all queries running longer than a certain amount of time.

Answer:

There are some options available on the client side, for example $maxTimeMS starting in 2.6 release. This gives you a way to inject an option to your queries before they get to the server that tells the server to kill this query if it takes longer than a certain amount of time. However, this does not help with any query which already got to the server without having this option added to it.

On the server side, there is no global option, because it would impact all databases and all operations, even ones that the system needs to be long running for internal operation (for example tailing the oplog for replication). In addition, it may be okay for some of your queries to be longer running by design but not others.

The correct way to solve this would be to monitor currently running queries via a script and kill the ones that are both long running and user/client initiated - you can then build in exceptions for queries that are long running by design, or have different thresholds for different queries/collections/etc.

The way to implement this script is by using db.currentOp() command (in the shell) to see all currently running operations. The field "secs_running" indicates how long the operation has been running. Other fields visible to you will be the namespace ("ns") whether the operation is "active", whether it's waiting on a lock and for how long. The docs contain some good examples.

Be careful not to kill any long running operations that are not initiated by your application/client - it may be a necessary system operation, like chunk migration in a sharded cluster as just one example, replication threads would be another.


Comments

Sep 7, 2014

GUEST POST by Paul Done 

In a previous Ask Asya blog post, Asya outlined various approaches for preserving historical versions of records for auditing purposes, whilst allowing current versions of records to be easily inserted and queried. Having found the post to be extremely useful for one of my projects, and following some further investigations of my own, I realised that two of the choices could be refined a little to be more optimal. Upon feeding back my findings, Asya graciously asked me to document them here, so here goes.

Revisit of Choice 2  (Embed Versions in a Single Document)

The presented ‘compare-and-swap' example code, to generate a new version and update version history, is very effective at ensuring consistency of versions in a thread-safe manner. However, I felt that there was scope to reduce the update latency which will be particularly high when a document has grown, with many previous versions embedded.

For example, if a current document has tens of embedded previous versions, then projecting the whole document back to the client application, updating part of the document copy and then sending the whole document as an update to the database, will be slower than necessary. I prototyped a refactored version of the example code (shown below) which exhibits the same functional behaviour, but avoids projecting the entire document and uses an in-place update to push changes to the database.

Don't return all the old versions:

    var doc = db.coll.findOne({"_id": 174}, {"prev": 0});  
    var currVersion = doc.current.v;
    var previous = doc.current;
    var current = {
          "v" : currVersion+1,
          "attr1": doc.current.attr1,
          "attr2": "YYYY"
    };

Perform in-place update of changes only: 

    var result = db.coll.update(
         { "_id" : 174, "current.v": currVersion},
         { "$set" :  {"current": current},
           "$push" :  {"prev": previous}
         }
    );

    if (result.nModified != 1)  {
         print("Someone got there first, replay flow to try again");
    }

As a result, even when the number of versions held in a document increases over time, the update latency for adding a new version remains roughly constant.

Revisit of Choice 3  (Separate Collection for Previous Versions)

The original post implies that this choice is technically challenging to implement, to ingest a new document version whilst maintaining consistency with previous versions, in the face of system failure. However, I don't feel it's that bad, if the update flow is crafted carefully. If the order of writes is implemented as "write to previous collection before writing to current collection", then in a failure scenario, there is potential for a duplicate record version but not a lost record version. Also, there are ways for subsequent querying code to gracefully deal with the duplicate.

If the following three principles are acceptable to an application development team, then this is a viable versioning option and doesn't have the implementation complexity of choice 5, for example:

  1. System failure could result in a duplicate version, but not a lost version.

  2. Any application code that wants to query all or some versions of the same entity, is happy to issue two queries simultaneously, one against the current collection (to get the current version) and one against the previous collection (to get historic versions), and then merge the results. In cases where a duplicate has been introduced (but not yet cleaned up - see next point), the application code just has to detect that the latest version in the current collection also appears as a record in the previous collection. When this occurs, the application code just ignores the duplicate, when constructing its results. In my experience, most 'normal' queries issued by an application, will just query the current collection and be interested in latest versions of entities only. Therefore this 'double-query' mechanism is only needed for the parts of an application where historic version analysis is required.

  3. The next time a new version of a document is pushed into the system, the old duplicate in the previous collection (if the duplicate exists) will become a genuine previous version. The current collection will contain the new version and the previous collection will only contain previous versions. As a result, there is no need for any background clean up code mechanisms to be put in place.

For clarity, I've included a JavaScript example of the full update flow below, which can be run from the Mongo shell.

    //
    // CREATE SAMPLE DATA
    //

    db.curr_coll.drop();
    db.prev_coll.drop();
    db.curr_coll.ensureIndex({"docId" : 1}, {"unique": true});
    db.prev_coll.ensureIndex({"docId" : 1,  "v" :1}, {"unique": true});
    db.curr_coll.insert([
         {"docId": 174, "v": 3, "attr1": 184, "attr2": "A-1"},
         {"docId": 133, "v": 3, "attr1": 284, "attr2": "B-1"}
    ]);

    db.prev_coll.insert([
         {"docId": 174, "v": 1, "attr1": 165},
         {"docId": 174, "v": 2, "attr1": 165, "attr2" : "A-1"},
         {"docId": 133, "v": 1, "attr1": 265},
         {"docId": 133, "v": 2, "attr1": 184, "attr2" : "B-1"}
    ]);

    //
    // EXAMPLE TEST RUN FLOW 
    //
    // UPSERT (NOT INSERT) IN CASE FAILURE OCCURED DURING PRIOR ATTEMPT.
    // THE PREV COLLECTION MAY ALREADY CONTAIN THE 'OLD' CURRENT VERSION.
    // IF ALREADY PRESENT, THIS UPSERT WILL BE A 'NO-OP', RETURNING:
    //  nMatched: 1, nUpserted: 0, nModified: 0.

    var previous = db.curr_coll.findOne({"docId": 174}, {_id: 0});
    var currVersion = previous.v;
    var result = db.prev_coll.update(
         {"docId" : previous.docId, "v": previous.v },
         { "$set": previous }
       , {"upsert": true});

    // <-- STOP EXECUTION HERE ON A RUN TO SIMULATE FAILURE, THEN RUN
    //     FULL FLOW TO SHOW HOW THINGS WILL BE NATURALLY CLEANED-UP
    // UPDATE NEW VERSION IN CURR COLLECTION, USING THREAD-SAFE VERSION CHECK

    var current = {"v": currVersion+1, "attr1": previous.attr1, "attr2":"YYYY"};
    var result = db.curr_coll.update({"docId": 174, "v": currVersion},
         {"$set": current}
    );

    if (result.nModified != 1) {
         print("Someone got there first, replay flow to try again");
    }

    //
    // EXAMPLE VERSION HISTORY QUERY CODE
    //

    // BUILD LIST OF ALL VERSIONS OF ENTITY, STARTING WITH CURRENT VERSION

    var fullVersionHistory = [];
    var latest = db.curr_coll.findOne({"docId": 174}, {_id: 0});
    var latestVersion = latest.v;
    fullVersionHistory.push(latest);

    // QUERY ALL PREVIOUS VERSIONS (EXCLUDES DUPLICATE CURRENT VERSION IF EXISTS)
    var previousVersionsCursor = db.prev_coll.find({
         "$and": [
              {"docId": 174},
              {"v": {"$ne": latestVersion}}
         ]
    }, {_id: 0}).sort({v: -1});

    // ADD ALL THESE PREVIOUS VERSIONS TO THE LIST
    previousVersionsCursor.forEach(function(doc) {
          fullVersionHistory.push(doc);
    });

    // DISPLAY ALL VERSIONS OF AN ENTITY (NO DUPLICATES ARE PRESENT)
    printjson(fullVersionHistory);

As a result of this approach, it is easy to query current versions of entities, easy to query the full version history of a given entity and easy to update an entity with a new version.

In Summary

I've taken the liberty of providing a modified version of Asya's summary table below, to expand out the criteria that may be relevant when choosing a versioning strategy for a given use case. My version of the table also reflects the improved results for choices 2 and 3, on the back of what has been discussed in this blog post.

Updated Table of Tradeoffs:

Updated Table of Tradeoffs.


Comments

Aug 27, 2014

Socialite

At MongoDBWorld, my colleague Darren Wood and I gave three back-to-back presentations about an open source project called Socialite which is a reference architecture implementation of a social status feed service. Darren was the one who wrote the bulk of the source code and I installed and ran extensive performance tests in different configurations to determine how the different schema and indexing options scale and to get an understanding of the resources required for various sizes and distributions of workloads.

The recordings and slides are now available on MongoDB website, if you want to jump in and watch, but since we had to race through the material, I'm going to blog about some of the more interesting questions it raised, mainly about schema design, indexing and sharding choices and how to benchmark a large, complex application.

There were three talks because there was a large amount of material and because there are several rather complex orthogonal topics when considering a social status feed:

  1. How will you store the content long term
  2. How will you store the user graph
  3. How will you serve up the status feed for a particular user when they log in

The last one is probably most interesting in that it has the most possible approaches, though as it turns out, some have very big downsides and would only be appropriate to pretty small systems. User graph is fascinating because of its relevance to so many different domains beyond social networks of friends. And performance considerations are complex and interdependent among all of them. For each of the three talks we had two parts - Darren discussed possible schema designs, indexing considerations and if appropriate sharding implications, and I walked through the actual testing I did and whether each option held up as expected.

Unfortunately even across three sessions we were quite time limited, so all the various bits of material we have that didn't make it into these presentations will end up in one of several spots:

I get a lot of questions about schema design, and social data is both popular and very doable in MongoDB but the naive approach is usually bound to meet with failure, so the schema needs to be carefully considered with an eye towards the following two most important considerations:

  • enduser latency
  • linear scaling

As we said during the presentations, for every single decision, we had to consider as the most important goals keeping the user's first read latency as low and constant as possible (or else they would leave and go somewhere else) and our ability to scale any design we had linearly with scaling. That meant that every single workload had to be scalable or partitionable in a way that would isolate the workload to a subset of data.

Over the next few months as I write up different parts of the system, and consider the schema, indexes and possible shard data distribution, you will see me return to these two litmus tests again and again. In order to have highest chance of success at large scale any option that hinders one of these goals should be out of the running.


Comments

Jul 17, 2014

Hello readers:

I haven't posted much lately because I was crazy busy with MongoDB World conference - it went really well and I will be posting some detailed analysis of some of the technical topics that I presented on.

My talk on "Diagnostics and Debugging" which was focused on how to figure out what's going on in your MongoDB cluster, especially when the performance has degraded.

There is a recording of it available on MongoDB.com website, so what I'm going to do in the future here is give some more details about the tools that I described, as well as show complex debugging case(s) that I didn't have the time to go through in the talk.


Comments

May 30, 2014

Question:

Recall our previous discussion about ways to recreate older version of a document that ever existed in a particular collection.

The goal was to preserve every state for each object, but to respond to queries with the "current" or "latest" version. We had a requirement to be able to have an infrequent audit to review all or some previous versions of the document.

Answer:

I had suggested at the time that there was a different way to achieve this that I liked better than the discussed methods and I'm going to describe it now.

Previous Discussion Summary:

Up to this point, we considered keeping versions of the same document within one MongoDB document, in separate documents within the same collection, or by "archiving off" older versions of the document into a separate collection.

We looked at the trade-offs and decided that the important factors were our ability to

  • return or match only the current document(s)
  • generate new version number to "update" existing and add new attributes
    • including recovering from failure in the middle of a set of operations (if there is more than one)
Where we left off:

Here's a table that shows for each schema choice that we considered how well we can handle the reads, writes and if an update has to make more than one write, how easy it is to recover or to be in a relatively "safe" state:

Schema Fetch 1 Fetch Many Update Recover if fail
1) New doc for each Easy,Fast Not easy,Slow Medium N/A
1a) New doc with "current" Easy,Fast Easy,Fast Medium Hard
2) Embedded in single doc Easy,Fastest Easy,Fastest Medium N/A
3) Sep Collection for prev. Easy,Fastest Easy,Fastest Medium Medium Hard
4) Deltas only in new doc Hard,Slow Hard,Slow Medium N/A
?) TBD Easy,Fastest Easy,Fastest Easy,Fastest N/A

"N/A" for recovery means there is no inconsistent state possible - if we only have to make one write to create/add a new version, we are safe from any inconsistency. So "N/A" is the "easiest" value there.

What we want is something that makes all our tasks easy, and does not have any performance issues nor consistency problems. For creating this solution, we will pick and choose the best parts of the previously considered schema.

No doubt you noticed that fetching one or many is fastest and simplest when we keep the old versioned documents out of our "current" collection. This makes our queries whether for one or all latest versions fast and they can use indexes whether you're querying, updating or aggregating.

How do we get fast updates that keep the current document current but save the previous version somewhere else? We know that we don't have multi-statement transaction in MongoDB so we can't ensure that a regular update of one document and an insert of another document are atomic. However, there is something that's always updated atomically along with every write that happens in your collection, and that is the "Oplog".

The Oplog

The oplog (full name: 'oplog.rs' collection in 'local' database) is a special collection that's used by the replication mechanism. Every single write operation is persisted into the oplog atomically with being applied to the data files, indexes and the journal. You can read more about the oplog in the docs, but what I'm going to show you is what it looks like in the oplog when an insert or update happens, and how we can use that for our own purposes.

If I perform this insert into my collection:

> db.docs.insert(
           {"_id":ObjectId("5387edd9ba5871da01786f85"), 
            "docId":174, "version":1, "attr1":165});
WriteResult({ "nInserted" : 1 })

what I will see in the oplog will look like this:

> db.getSiblingDB("local").oplog.rs.find().sort({"$natural":-1}).limit(-1).pretty();
{
         "ts" : Timestamp(1401417307, 1),
          "h" : NumberLong("-1030581192915920539"), 
          "v" : 2, 
          "op" : "i", 
          "ns" : "blog.docs", 
          "o" : { 
                        "_id" : ObjectId("5387edd9ba5871da01786f85"), 
                        "docId" : 174, 
                        "version" : 1, 
                        "attr1" : 165 
          } 
}

If I perform this update:

> db.docs.update( 
               { "docId" : 174 }, 
               { "$inc":{"version":1}, "$set":{ "attr2": "A-1" }  } 
    );
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

what I get in the oplog is this:

{
        "ts" : Timestamp(1401417535, 1),
        "h" : NumberLong("2381950322402503088"),
         "v" : 2,
         "op" : "u",
         "ns" : "blog.docs",
         "o2" : {
                 "_id" : ObjectId("5387edd9ba5871da01786f85")
         },
         "o" : {
                "$set" : {
                        "version" : 2,
                        "attr2" : "A-1"
                }
         }
}

It turns out that with versioned documents, I wouldn't actually ever do an insert, but rather I would just always do an update, with an upsert option, that way I don't need to test if a document with this docId already exists.

> db.docs.update( 
     { "docId" : 175 }, 
     { "$inc":{"version":1}, "$set":{ "attr1": 999 }  }, 
     { "upsert" : true } 
);
WriteResult({
    "nMatched" : 0,
    "nUpserted" : 1,
    "nModified" : 0,
    "_id" : ObjectId("5387eff7a08472e30040b4bc")
})

Let's see what the oplog entry for this upsert looks like:

{ 
    "ts" : Timestamp(1401417719, 1), 
    "h" : NumberLong("2031090002854356513"), 
    "v" : 2, 
    "op" : "i", 
    "ns" : "blog.docs", 
    "o" : {
         "_id" : ObjectId("5387eff7a08472e30040b4bc"), 
         "docId" : 175, 
         "version" : 1, 
         "attr1" : 999 
    } 
}

Looks like the oplog entry reflects the actual operation that was performed, not the operation that I specified. I asked for an update - when it's an update, the oplog will show it as an update, when it's turned into an upsert, the oplog will show it as an insert. When it was an update, I had asked it to "increment" but what it put in the oplog was what the actual value saved was.1

I'm sure most of you see where I'm going with this. Rather than fumbling with creating and updating documents in the "previous versions" collection when we perform an update to a document, we can do it asynchronously, the way MongoDB secondaries do it.

You may think it's not easy, but it turns out that there are lots of helpers for dealing with capped collections (which is what the oplog is). One of the most useful things you can do is "tail the oplog". This is the same mechanism that secondaries use to find out when new writes happen on the primary: they tail the oplog the same way you can do tail -f logfile.txt command - this will show you the last part of the file, but rather than giving you back the prompt when it's done, it will just sit there and wait. When more things are written to the file, tail -f will echo them to the screen. This is exactly how it works with tailable cursors on capped collections. If you specify the right special options, you can get data back, but when there is no more data, instead of timing out and having to re-query, you will just sit there and wait till more data shows up.

Here is a little demo. The code and explanations are after the video, so feel free to browse ahead before watching, or you can watch first and read the explanations after.

Tailing the oplog to maintain a copy of a collection elsewhere

For our first example, we'll do something simple - we will watch the oplog for changes to a specific collection, and then we will apply those changes to our own copy of the collection - we will call our collection something else. Our example stores the copy in the same database, but of course, it could be anywhere else, including in a completely different replica set or standalone server.



Code for set-up of variables with comments:

/* shell likes to ”page” things for us every 20 documents,  *
 * so we will increase that value for this demo */
DBQuery.shellBatchSize=200;
var namespace="blog.docs"; /* the collection to watch */ 
var coll="docs_archive";  /* copy collection name */

db.getCollection(coll).drop();  /* only the very first time :) */
db.lastApplied.drop();
/* find the last timestamp we applied: this is where we restart */
var prevTS=(db.getCollection("lastApplied").count({"_id":coll})==0) ?
     db.getSiblingDB("local").oplog.rs.findOne({"ns":namespace}).ts :
     db.getCollection("lastApplied").findOne({"_id":coll}).ts;
/* initialize or update lastApplied for this collection */
db.getCollection("lastApplied").update(
                { "_id" : coll }, 
                { "$set : { "ts" : prevTS } },
                { "upsert" : true }
);

Code for setting up the cursor using appropriate options allows us to find the right spot in the oplog quickly, and makes our cursor tail the data, asking server to forgo the usual cursor timeout based on inactivity:

/* set up the cursor with appropriate filter and options */
var cursor=db.getSiblingDB("local").getCollection("oplog.rs"
       ).find({"ns":namespace,"ts":{"$gte":prevTS}}).addOption(DBQuery.Option.oplogReplay
       ).addOption(DBQuery.Option.awaitData).addOption(DBQuery.Option.tailable
       ).addOption(DBQuery.Option.noTimeout);

Code running on the right-hand-side (blue screen) which loops and inserts or updates the watched collection every second:

for ( docId = 270; docId < 290; docId++ ) {
        print("waiting one second...");
        sleep(1000);
        printjson(db.docs.update(
                      { "_id": docId },
                      { "$inc" : {"version":1}, "$set":{"attr7":"xxx"+docId } },
                      { "upsert" : true }));
}

Code that fetches documents from the tailable cursor and applies appropriate changes to our "copy" collection:

while (cursor.hasNext()) {
       
       var doc=cursor.next();
       var operation = (doc.op=="u") ? "update" : "insert";
       print("TS: " + doc.ts + " for " + operation + " at " + new Date());
       
       if ( doc.op == "i") {  /* originally an upsert */
           result = db.getCollection(coll).save(doc.o);
           if (result.nUpserted==1) print("Inserted doc _id " + doc.o._id);
           else {
              if (result.nMatched==1 ) {
                 if ( result.nModified==0) print("Doc " + doc.o._id + " exists.");
                 else  print("Doc " + doc.o._id + " may have been newer");
              } else throw "Insert error " + tojson(doc)  + " " + tojson(result);
           }
       } else if ( doc.op == "u" ) { /* originally an update */
           result = db.getCollection(coll).update(doc.o2, doc.o);
           if (result.nModified ==1) print("Updated doc _id " + doc.o2._id);
           else if (result.nMatched==1 && result.nModified==0) print(
                         "Already updated doc _id " + doc.o2._id);
           else  throw "No update for " + tojson(doc) + " " + tojson(result);
       } else if (doc.op != "c") throw "Unexpected op! " + tojson(doc);
       
       res=db.getCollection("lastApplied").update(
                     { "_id" : coll },
                     { "$set" : { "ts" : doc.ts } }
       );

       if (res.nModified==1) print("Updated lastApplied to " + doc.ts);
       else print("Repeated last applied op");

       prevTS=doc.ts; /* save in case we need to refetch from the oplog */
}

Of course this code does minimal error checking and it's not set up to automatically restart if it loses connection to the primary, or the primary changes in the replica set. This is because here we are reading from a local oplog when in real life you may be fetching data from another server or cluster entirely. Even so, about 15 lines of code there are for error checking and information printing, so the actual "work" we do is quite simple.

Creating a full archive from tailing the oplog

Now that we know how to replay original documents to maintain an indentical "copy" collection, let's see what we have to do differently when we want to insert a new version of the document without losing any of the old versions.

For simplicity, I put the docId in the example collection into the _id field, so I will need to structure the full archive collection schema differently, since it cannot have multiple documents with the same _id.2 For simplicity, I will let MongoDB generate the _id and I will use the combination of docId and version with a unique index on them to prevent duplicate versions. I could achieve something similar by using the combination of original _id (which is the docId) and version fields as a compound _id field but then I would need to do more complicated transformations on the oplog entry. I always choose the method that is simpler.

Now when we get an insert operation in the oplog, we should be able to insert the document the same way we were doing it before, except we want to move _id value into docId field. If the save fails to insert a new document because of a duplicate constraint violation, then we already have that docId and version - we would expect that when we are replaying the same entry in the oplog more than once.

If we get an update, it can be one of two kinds - it can be one that sets or unsets specific fields, or it can be the kind that overwrites the entire document with a new document (with the same _id of course). The latter case can be handled by the same code we have for the insert, with an appropriate transformation of the document. If it's the $set and $unset kind, then we have to fetch the previous version of this document and apply the changes to it before inserting it as a document representing a new version.3

Here is our code, with comments:

The setup:

var coll="docs_full_archive";  // different collection
db.getCollection(coll).drop();     // first time only!
db.getCollection(coll).ensureIndex( 
         { "docId":1, "version": 1 },
         { "unique" : true, "background" : true } );
if (db.getCollection("lastApplied").count({"_id":coll})==0) {
     prevTS=db.getSiblingDB("local").oplog.rs.findOne({"ns":namespace}).ts;
     db.getCollection("lastApplied").update( { "_id" : coll }, 
        {"$set" : { "ts" : prevTS } }, { "upsert" : true } );
} else {
     prevTS=db.getCollection("lastApplied").findOne({"_id":coll}).ts;
}

The cursor (same as before):

var cursor=db.getSiblingDB("local").getCollection("oplog.rs"
       ).find({"ns":namespace,"ts":{"$gte":prevTS}}).addOption(DBQuery.Option.oplogReplay
       ).addOption(DBQuery.Option.awaitData).addOption(DBQuery.Option.tailable
       ).addOption(DBQuery.Option.noTimeout);

The worker loop (slightly adjusted):

while (cursor.hasNext()) {
       var doc=cursor.next();
       var operation = (doc.op=="u") ? "update" : "insert";
       var id = (doc.op=="u") ? doc.o2._id : doc.o._id;
       var newDoc={ };
       print("TS: " + doc.ts + " for " + operation + " at " + new Date());
       if ( doc.op == "i" || 
               (doc.op == "u" && doc.o.hasOwnProperty("_id")) ) {
           for (i in doc.o) {
                   if (i=='_id') newDoc.docId=doc["o"][i];
                   else newDoc[i]=doc["o"][i];
           }
       } else if ( doc.op == "u" ) {
           /* create new doc out of old document and the sets and unsets */
           var prevVersion = { "docId" : doc.o2._id, 
                        "version" : doc.o["$set"]["version"]-1 };
           var prevDoc = db.getCollection(coll).findOne("prevVersion", {"_id":0});
           if (prevDoc == null) {
                        throw "Couldn't find previous version in archive! " + 
                                     tojson(prevVersion) + tojson(doc);
           }
           newDoc = prevDoc;
           if (doc.o.hasOwnProperty("$set")) {
               for (i in doc.o["$set"]) {
                   newDoc[i]=doc.o["$set"][i];
               }
           } else if (doc.o.hasOwnProperty("$unset")) { 
               for (i in doc.o["$unset"]) {
                   delete(newDoc[i]);
               }
           } else throw "Can only handle update with '_id', '$set' or '$unset' ";
       } else if (doc.op != "c") throw "Unexpected op! " + tojson(doc);

       var  result = db.getCollection(coll).insert(newDoc);
       if (result.nInserted==1) {
             print("Inserted doc " + 
                      newDoc.docId + " version " + newDoc.version);
       } else {
             if (result.getWriteError().code==11000 ) {
                    print("Doc " + newDoc.docId + " version " + 
                           newDoc.version + " already exists.");
             } else throw "Error inserting " + tojson(doc)  + 
                           " as " + tojson(newDoc)+ "Result " + tojson(result);
       }

       var res=db.getCollection("lastApplied").update(
                   { "_id" : coll },
                   { "$set" : {ts:doc.ts} },
                   { "upsert" : true }
       );
       var prevTS=doc.ts;
       print("Set lastApplied to " + doc.ts);
}

It turns out that the loop will be slightly simpler because no matter what comes in, we will always do an insert into the full archive collection.

Test it out!

Let's run this code and then compare for a single docId the operations in the oplog, and what we end up with in the archive collection:

The oplog entries:

db.getSiblingDB("local").getCollection("oplog.rs").find( {
            "ns" : namespace,
            "$or" : [ { "o._id" : 279 }, { "o2._id" : 279 } ] 
         },
         { "o" : 1 } );
{ "o" : { "_id" : 279, "version" : 1, "attr7" : "xxx279" } }
{ "o" : { "$set" : { "version" : 2 } } }
{ "o" : { "$set" : { "version" : 3, "attrCounter" : 1, "attr9" : 1, "attrArray" : [ "xxx" ] } } }
{ "o" : { "_id" : 279, "version" : 4, "attr7" : "xxx279", "attrCounter" : 1, "attr9" : 1, "attrArray" : [ "xxx" ], "attrNew" : "abc" } }
{ "o" : { "_id" : 279, "version" : 5, "attr7" : "xxx279", "attrCounter" : 2, "attr9" : 1, "attrArray" : [ "xxx" ], "attrNewReplacement" : "abc" } }
{ "o" : { "$set" : { "version" : 6, "attrCounter" : 3, "attrArray" : [ ] }, "$unset" : { "attr9" : true } } }
{ "o" : { "_id" : 279, "version" : 7 } }
{ "o" : { "$set" : { "version" : 8, "attrCounter" : 1, "a" : 1 } } }
{ "o" : { "$set" : { "version" : 9 }, "$unset" : { "a" : true, "attrCounter" : true } } }

The archive collection contents (slightly formatted for readability):

db.docs_full_archive.find( {"docId":279}, {"_id":0} )
{ "docId" : 279, "version" : 1, "attr7" : "xxx279" }
{ "docId" : 279, "version" : 2, "attr7" : "xxx279" }
{ "docId" : 279, "version" : 3, "attr7" : "xxx279", 
   "attrCounter" : 1, "attr9" : 1, "attrArray" : [ "xxx" ] }
{ "docId" : 279, "version" : 4, "attr7" : "xxx279", 
   "attrCounter" : 1, "attr9" : 1, "attrArray" : [ "xxx" ], "attrNew" : "abc" }
{ "docId" : 279, "version" : 5, "attr7" : "xxx279", 
   "attrCounter" : 2, "attr9" : 1, "attrArray" : [ "xxx" ], "attrNewReplacement" : "abc" }
{ "docId" : 279, "version" : 6, "attr7" : "xxx279", 
   "attrCounter" : 3, "attr9" : 1, "attrArray" : [ ], "attrNewReplacement" : "abc" }
{ "docId" : 279, "version" : 7 }
{ "docId" : 279, "version" : 8, "attrCounter" : 1, "a" : 1 }
{ "docId" : 279, "version" : 9, "attrCounter" : 1, "a" : 1 }

There you have it, my preferred way to isolate an infrequently used collection and keep it updated based on every write action that happens in the main DB. I hope you can see how this can be extended for many different pub/sub needs as you can adapt your code to watch for different types of events on different collections.

Hope you found this educational and keep those questions coming!


  1. Because the oplog must write change operations in "idemponent" form, all update operators are transformed into their equivalent $set operations. 

  2. Note that if I used a separate docId field, I would have to do extra work, since replication identifies the document by _id the docId field would not be in the oplog, so I would need to store the original collection's _id fields in my documents, otherwise I wouldn't know which docId was being updated. 

  3. See footnote 1. 


Comments

May 24, 2014

Question:

Consider two separate shapes of data like this in a single collection:

{   type: "A",
    level: 0,
    color: "red",
    locale: "USA"
}
{   type: "A",
    level: 1,
    color: "blue"
}

The goal is to present a merged shape to the application with the level n data overridden by level n+1 if level n+1 data exists for type A, starting with n = 0. In other words, the app wants to see this shape:

{   type: "A",
    level: 1, 
    color: "blue",
    locale: "USA"
}

If no level 1 data exists, the app would see the default (level 0) shape. Think of it as a layered merge.

Answer:

In the previous "AskAsya" on tracking versions we looked at different ways of tracking all versions of changing objects, and this happens to be a complex variant of that problem that we considered as "schema 4" - it's a possible approach to versioning, but it presents an interesting challenge returning the "full" current object back to the client.

Merging Different Shapes

This problem would be easily solved with aggregation framework query, except for the problem that we need to know the names of all the keys/fields, and we might not know all of the possible fields that could exist in our documents. Without this information, the only way we have of merging documents is using MapReduce, which is both more complex and slower. I will show both solutions and I'll leave it up to you to determine which will be more performant in your scenario (or whether you want to switch to a different versioning schema).

Aggregation Framework

This will be the fastest way if you either have all possible attribute names that your documents could have, or get them via a scan of the entire collection (note that the latter immediately becomes stale, as new documents with new attributes could show up as soon as you start querying, but that's inherently an issue that always exists in any system that doesn't provide repeatable read isolation).

Get the possible attribute names (I'm assuming type and level are your 'id' and 'version'):

var att = { };
var attrs = [ ];
db.coll.find( {}, {_id:0, type:0, level:0} ).forEach( function(d) {
    for ( i in d)
         if ( !att.hasOwnProperty(i) ) {
             att[i]=1;
             attrs.push(i);
         }
} );                   

You now have an array attrs which holds all the strings representing different attributes in your collection.

We now programmatically generate stage for $project that turns each attribute into a subdocument with its level first and attribute itself second.

proj1={$project:{type:1, level:1}};
attrs.forEach(function(attr) { 
    _a="_"+attr; 
    a="$"+attr;   
    proj1["$project"][_a]={}; 
    proj1["$project"][_a]["l"]={"$cond":{}};
    proj1["$project"][_a]["l"]["$cond"]={if:{"$gt":[a,null]},then:"$level",else:-1};
    proj1["$project"][_a][attr]=a;
} );

Since levels are increasing, this set us to be able to $group using the $max operator to keep the highest level for each attribute.

group={$group:{_id:"$type",lvl:{$max:"$level"}}};
attrs.forEach(function(attr) { 
    a="$_"+attr;
    group["$group"][attr]={"$max":a};
} )

The last $project just transforms the fields of our document back into the same key names they had before.

proj2={$project:{_id:0,type:"$_id", level:"$lvl"}}
attrs.forEach(function(attr) {
    a="$"+attr;  
    proj2["$project"][attr]=a+"."+attr;
} )

We are now all set to run the aggregation with your programmatically generated stages:

db.coll.aggregate( proj1, group, proj2 );

To recap,proj1 is the stage where we converted every attribute into a subdocument which included "level" (first) and attribute value (second). If a given attribute didn't exist in a document, it went in with level:-1 and value:null.

group is where we grouped by type which is our docId and kept the highest (max) "subdocument" for each possible attribute. This works because MongoDB allows you to compare any types (including BSON) and level:-1 is always going to "lose" to a higher level.

proj2 is when we turned all the fields into readable format, or at least format resembling our initial document.

This now returned to us the merged documents.

If we had original documents like these:

> db.coll.find({},{_id:0}).sort({type:1,level:1})
{ "type" : "A", "level" : 0, "color" : "red", "locale" : "USA" }
{ "type" : "A", "level" : 1, "color" : "blue" }
{ "type" : "A", "level" : 2, "priority" : 5 }
{ "type" : "A", "level" : 3, "locale" : "EMEA" }
{ "type" : "B", "level" : 0, "priority" : 1 }
{ "type" : "B", "level" : 1, "color" : "purple", "locale" : "Canada" }
{ "type" : "B", "level" : 2, "color" : "green" }
{ "type" : "B", "level" : 3, "priority" : 2, "locale" : "USA" }
{ "type" : "B", "level" : 4, "color" : "NONE" }

We got back results that looked like this:

> db.coll.aggregate( proj1, group, proj2 );
{ "color" : "NONE", "locale" : "USA", "priority" : 2, "type" : "B", "level" : 4 }
{ "color" : "blue", "locale" : "EMEA", "priority" : 5, "type" : "A", "level" : 3 }

Note that this is not performant for filtering on attributes since we can't apply the filter until we have "merged" all the documents, and that means that indexes can't be used effectively. While this aggregation may be a good exercise, unless you are saving this output into a new collection that you then index by attributes for querying, it won't be a good schema if you need very fast responses.

Here is MapReduce for the same functionality:

map = function () {
    var doc=this;
    delete(doc._id);
    var level=this.level;
    delete(doc.level);
    var t=this.type;
    delete(doc.type);
    for (i in doc) {
         val={level:level};
         val[i]={ l:level, v:doc[i]};
         emit(t, val);
    }
}

reduce = function (key,values) {
  result={level:-1};
  values.forEach(function(val) {
    if (result.level<val.level) result.level=val.level;
    var attr=null;
    for (a in val) if (a!="level") { attr=a; break; }
    if (!result.hasOwnProperty(attr) || result[attr].l<=val[attr].l) {
          result[attr]=val[attr];

    }
  })
  return result;
}

Comments

May 21, 2014

Question:

Consider requirement that we have to be able to recreate/query any version of a document that ever existed in a particular collection. So we start out with:

{   docId: "A",
    version: 1,
    color: "red",
    locale: "USA"
}

If we need to set color to "blue", instead of updating the "color" field from "red" to "blue", we have to create a new version of the document which now has its full "current" state, and preserve somehow the old version of the document. So we insert

{   docId: "A",
    version: 2,
    color: "blue",
    locale: "USA"
}

The goal is to preserve every state for each object, but we only respond to queries with the "current" or "latest" version, we just have a requirement to be able to have an audit (which would be very infrequent so it's okay if it's slow). Is keeping each version as we do in this example the best schema/approach to this problem?

Answer:

Versioning can be tricky to get right if you don't know all of the requirements of the application and approximate expected loads for various operations. I'll lay out a few possible approaches and point out their strength and weaknesses.

Problem Statement:

In some systems, rather than updating an existing object and overwriting its various attributes there is a business requirement to preserve the original document and to create a new version of this document, instead of updating it. This raises the following interesting challenges:

  1. You must correctly generate the new version number in a multithreaded system
  2. You must return only the current version of each document when there is a query
  3. You must "update" correctly by including all current attributes in addition to newly provided attributes
  4. If the system fails at any point, you must either have a consistent state of the data, or it must be possible on re-start to infer the state of the data and clean it up, or otherwise bring it to consistent state.
Possible Schema Approaches:
  1. Store full document each write with monotonically increasing version number.
    • 1a. possibily with a field in latest version identifying it as such.
  2. Store all document versions inside a single document.
  3. Store current document in your "primary" collection, and keep previous versions in a second collection.
  4. Store only "deltas" with each increasing version.
Generating correct version number

No matter which schema you choose, the issue of generating the correct "higher" version number will come down to one of three possible approaches:

  • maintain a separate collection which hands out the next version for each document. This is probably the worst approach as it can be prone to contention and edge cases in multi-shard, multithreaded environment.
  • use optimistic locking to read the current document, increment its version and save new document contingent on some constraint keeping you from succeeding simultaneously from two different threads. This can be handled differently for different schemas, and it's definitely a common and feasible approach.
  • use a fine-grain timestamp - current time to the millisecond may be good enough, depending on your ability to synchronize all the clocks, unfortunately it only works with two of our four schema options.

I like optimistic locking option and it works well with all four of our schema options. It involves a "compare-and-swap" technique where you read the current document, do the appropriate calculation of new version and adding new attributes, and then try the insert or update, contingent on no one having updated the document ahead of you. If it's an update, you include parts of the original document in your query condition, and if it's an insert you must have a unique constraint to prevent success of multiple simultaneous attempts to version the same document. In both cases you must check the result of your write - knowing if it succeeded or failed is how the thread knows it has to try again.

How does each schema:

  • return only the current document
  • generate new version number to "update" existing and add new attributes
    • this includes recovering from failure in the middle of a set of operations (if there is more than one)

Choice 1

Store full document each time there is a write with monotonically increasing version number inside.

    {  "docId" : 174, "v" : 1,  "attr1": 165 }   /*version 1 */
    {  "docId" : 174, "v" : 2,  "attr1": 165, "attr2": "A-1" } 
    {  "docId" : 174, "v" : 3,  "attr1": 184, "attr2" : "A-1" }

For each docId value the document with the highest "v" represents the full current object state. In this example, docId 174 v:3 represents the total current state. There is a unique index on {"docId":1,"v":1}

To return only current document

If the query is for a single docId, the query would be:

db.docs.find({"docId":174}).sort({"v":-1}).limit(-1);

This will find the documents with "docId" 174 and return the one that has the highest "v" only. This will efficiently use our index on docId and v to only scan a single document. The -1 for limit just tells the server to close the cursor when the document is returned as we are done with it (it's what findOne functionally does under the covers).

But what if you want to query for all documents that match a particular condition, but what you expect is that only the latest version of each document would be returned? Now you have to use the aggregation framework to "merge" your document set to only keep those with the highest version and apply your filter then:

Careful that you don't do it the wrong way:

db.docs.aggregate( [  
    {"$match":{<your-match-condition>}}, /* WRONG */
    {"$sort":{"docId":-1,"v":-1}},
    {"$group":{"_id":"$docId","doc":{"$first":"$$ROOT"}}}
] )

Note that applying your filter before you filter out all but the latest version of each document may not return the current version of some of the documents! So don't do this!

Instead we have to do this:

sort={"$sort": { "documentId" : 1, "version" : -1 } };
group={"$group" : { "_id" : "$documentId",
                    "doc": { "$first" : "$$ROOT" }
                   } };
match={"$match":{"doc.attrN":<value>}}; /* RIGHT */
db.collection.aggregate( sort, group, match )

This is not efficient, sadly, as the indexes on our collection can only be effectively used before the first group operation.

To generate new version and update

Optimistic locking: each thread reads in the most current document (in this case docId:174, v:3) makes attribute changes, increments v by one and then tries the insert of this document:

db.docs.insert({"docId":174,"v":4, "attr1":184,"attr2":"A-1","attr3":"blue"})

If the insert succeeds, it's done, but if it gets a unique constraint violation, it means another thread has already inserted a new version of this document, and this thread needs to try again (making sure to read the new "v":4" or whatever the latest version of the document is and trying its change till it succeeds.

Failure does not create an inconsistent state, since there is only a single write.

Choice 1a

In a variant of 1. we might add a field in the "current" version of each documentId

     {  "docId" : 174, "v" : 1,  "attr1": 165 }
     {  "docId" : 174, "v" : 2,  "attr1": 165, "attr2": "A-1" }
     {  "docId" : 174, "v" : 3,  "attr1": 184, "attr2" : "A-1", current: true }

Fetching the current document should now be easy, just query for { docId:174, current:true }, and when querying for multiple documents, add {"current":true} to the query predicate, solving that problem.

Updating becomes difficult now, since there is no method to insert one document and update another document "as one". So we would want to first insert a new version, based on currently highest version, and then update the previously current document to $unset the "current" field.

Now if our process fails between those two write operations, we will have two documents for a particular docId that have "current" set to true and that means all of our queries will have to guard against that possibility - that seems unnecessarily complex, so let's hold off on this method for now.

Choice 2

Store all document versions inside a single document.

    {  "_id" : 174, "current" : { "v" :3, "attr1": 184, "attr2" : "A-1" },
        "prev" : [ 
              {  "v" : 1,  "attr1": 165 },
              {  "v" : 2,  "attr1": 165, "attr2": "A-1" }
        ]
    }

For each docId, the current state is represented by its "current" subdocument. Since "docId" is unique it can be stored in the "_id" field.

The merits of multiple possible solutions mostly depend on how many versions of each object you expect to have and how long you have to keep them. If the lifetime of a document usually has a number of versions that are in the single digits or low double digits, you can do well embedding the versions inside of the single document that represents each object.

To return only current document

Since there is only one document, we just search by docId or other attributes of "current" and we use projection to exclude the previous array: db.collection.find({"docId":174}, {"prev":0}) except in cases we want to see it.

To generate new version and update

Even though you only have one document per docId, you really can't create a new version with a single update, since it depends on knowing what the current subdocument is, but what you can do is utilize the "compare-and-swap" technique where you read the current document, move "current" field to the end of the previous array, set the new "current" field appropriately, and then do the update contingent on no one having updated the document ahead of you:

var doc = db.collection.findOne( { "_id" : 174 });
/* save the current version */
var currVersion = doc.current.v;
/* push the current subdocument to the end of prev array */
doc.prev.push(doc.current);
/* construct the new "current" subdocument */
doc.current = { "v" : currVersion+1, "attr1" : <value>, "attr2" : <value> }
var result = db.collection.update( { "_id" : 174, "current.v" : currVersion },  { "$set" : doc } )
if (result.nModified != 1) {
    print("Someone must have gotten there first, re-fetch the new document, try again");
}
atomicity and maintainability

There are many pros in this approach: - atomic updates of document (single operation both, sets new current and updates previous) - querying only needs to happen on "current" attributes, since you only need to access previous infrequently - it's simple to return just current attributes or to exclude previous from being returned to the application - the _id index can be used for the unique docId preventing duplicates being accidentally inserted - creating the next version number is simple and thread-safe

The cons are all performance based: - when each document grows beyond its previously allocated size, it has to be moved which makes some updates more time consuming - this won't work at all for documents that have thousands of versions over their lifetime (the documents would get too big, unwieldy, and could potentially exceed 16MB limit) - there'll likely be more fragmentation than with the more "naive" approach of inserting new versions as new documents - if the system has a very high level of concurrency when multiple threads are trying to each make different update to a specific document, a single thread could keep getting beat and it might take multiple re-tries to persist its update

The last "con" shouldn't really be a concern if you are genuinely talking about a system where each document is only going to have a handful of versions (or updates) in its lifecycle, and this schema isn't appropriate for systems where a single document will live through hundreds (or thousands) of updates.

Choice 3

Store current document in your "primary" collection, and keep older versions in another collection.

    CurrentCollection:     {  "docId" : 174,  "v" :3, "attr1": 184, "attr2" : "A-1" }
    CollectionOfPrevious:
              {  "docId" : 174, "v" : 1,  "attr1": 165 }
              {  "docId" : 174, "v" : 2,  "attr1": 165, "attr2": "A-1" }

For each documentId, there is only one document which represented its current state.
To return only current document

This one is the simplest of them all. Since the old versions are in another collection, you just query normally when you need to find a single or multiple documents - they will all be the current version.

To generate new version and update

This one may be the hardest. Technically, you only need to do a few things: read the current document, construct out of it the new current document, save the new current document on top of previous one and if successful, then insert the old current document into the "previous" collection. Of course we would use the same compare-and-swap update to make sure that no one changed the document between our read and our write, and only insert into previous collection if we successfully update current.

The problem is that you may fail before that last write and now you'll be missing a version of this document from the "previous" collection.

What if we switch the order of writes to save into the previous collection first? We read the current document, we write it to previous collection, we now change it to be "new" current and save it into "current" collection. This has several advantages: - if someone else is trying to update this document, they will also be saving into "previous" collection, so having a unique index on docId, version will tell us if we lost the race and now have to try again.
- if the thread dies in the middle (after insert into previous and before updating current) it's not the end of the world, as your current collection was not affected, but you do need a way to "clean up" your "previous" collection, first because you need to remove the version of the document that never existed in "current" and second because it will block all other "updates" on this document by using an invalid "docId", "version".

Luckily, clean-up may be simple, as any time we detect that there exists a docId, version in "current" that also exists in "previous" it means either there is an update "in progress" or it means that an update "died" and we should clean up. Of course the devil is in the details - and it could cause delays in the system since you have to wait long enough to be sure that this "in progress" update actually died. Or you can have another field that you update after successful writes in both places (now making it easier to recover, but needing to do three writes before you're done with a single document update!) Let's call this Medium Hard still.

Choice 4

Store only "deltas" with increasing versions

    {  "documentId" : 174, "v" : 1,  "attr1": 165 }
    {  "documentId" : 174, "v" : 2,  "attr2": "A-1" }
    {  "documentId" : 174, "v" : 3,  "attr1": 184 }

For each docId, the current state must be derived by "merging" all the documents with matching docId, keeping the "latest" or "highest" version's value of each attribute if it occurs in more than one version.

This one is quite complex, and the answer is getting quite long, so let's call it hard and defer the details of how we accomplish this merge till next time.

Let's Summarize

Here's a table that shows for each schema choice how well we can handle the reads, writes and if an update has to make more than one write, how easy it is to recover or to be in a relatively "safe" state:

Schema Fetch 1 Fetch Many Update Recover if fail
1) New doc for each Easy,Fast Not easy,Slow Medium N/A
1a) New doc with "current" Easy,Fast Easy,Fast Medium Hard
2) Embedded in single doc Easy,Fastest Easy,Fastest Medium N/A
3) Sep Collection for prev. Easy,Fastest Easy,Fastest Medium Medium Hard
4) Deltas only in new doc TBD/Hard TBD/Hard Medium N/A
?) TBD Easy,Fastest Easy,Fastest Easy,Fastest N/A

"N/A" for recovery means there is no inconsistent state possible - if we only have to make one write to create/add a new version, we are safe from any inconsistency. So "N/A" is the "easiest" value there.

I deferred describing what it would take to query the "Store deltas only" option till the next "Ask Asya" but let me foreshadow and tell you that it's not particularly easy - it involves a long and tricky aggregation.

But you can see I filled in a yet undescribed way that magically somehow makes all our tasks easy, and yet seems to not have any performance issues nor consistency problems. If you've stuck with me this far, I promise that I will describe the magical "winner" for version keeping in the next "Ask Asya" after the one that shows how to aggregate deltas of document into one.

Since I just enabled comments and discussion on these pages, if you see a possible schema approach I didn't mention, feel free to suggest it. Free "MongoDB" t-shirt for you if you guess the "TBD" schema I have in mind.


Comments

May 19, 2014

Question:

I saw your answer on SO about the difference between "update" and "findAndModify", could you explain in more detail what the difference is, and why MongoDB findAndModify is named what it is?

Answer:

What's in a name? that which we call a rose
By any other name would smell as sweet
;

- said Juliet1

As it turns out, a lot is in a name. A poorly chosen name can confuse many users, year after year. I believe findAndModify was probably not the best name for the role that it plays.

update

An update finds an appropriate document (by default it's just one) and then it changes its contents according to your specification.

findAndModify

The findAndModify command finds an appropriate document (it's always just one) and then it changes its contents according to your specification and then it returns that exact document that it changed (old version or new version, depending on which you ask for)

What's the Difference?

They both find a document and update it atomically. What that means is that it's not possible for another thread to change part of this document between the time we find it and start updating it and when we finish updating it. It also means that no other thread will see this document in "half-updated" state. That's what "atomic" means - all-or-nothing.

Why do we even need findAndModify then?

What if we need to get the full document that we just updated (like marking an item in a queue "yours" and then working on it)?

What I said on StackOverflow was:

If you fetch an item and then update it, there may be an update by another thread between those two steps. If you update an item first and then fetch it, there may be another update in-between and you will get back a different item than what you updated.

Doing it "atomically" means you are guaranteed that you are getting back the exact same item you are updating - i.e. no other operation can happen in between.

That's why you'll hear people talk about findAndModify in the context of implementing a queue mechanism - findAndModify can update a single document to indicate that you are now working on it, and return that same document to you in one operation.

When Not to Use findAndModify

There are scenarios where findAndModify cannot help you. If you need to update a document based on existing values of a document, you can use many update operators which are atomic and allow you to change a field value without knowing what its current value is, like $inc and $addToSet and $min and $max, etc. They allow you to modify a field without having to read the value of that field first. And they work with a regular update as well as with findAndModify.

But if you need to set field a1 based on the current value of the field b2 then you would have to read the document first and then when executing your update, you would have to ensure that the update is conditional on no one else having changed that document in the meantime and/or by having unique constraints to guarantee it.

There is no way to utilize findAndModify here, because it's limited to the exact set of operators that update uses, all it adds is the ability to return the exact document you modified. Of course, findAndModify has to do more work than update so for best performance you should only use findAndModify when you must have the document that you just updated back in the application. If you just want to know if an update succeeded, you can examine the WriteResult that update returns.

Proposal

Let's rename findAndModify to a name that more accurately describes its function. It updates a document and returns it, but to maintain a small connection to its current name, I nearby propose we rename it:

modifyAndReturn

Who's with me? 2


  1. Actually William_Shakespeare said it. 

  2. Please vote for SERVER-13979 if you agree. 


Comments

Apr 29, 2014

Question:

Is it possible to use "Tag aware sharding" feature without having to use a special shard key? The example in the tutorial makes it look like we would have to change our shard key to have a prefix value that we can define tag ranges on but we're already sharded. We have many collections in this database and we want to limit each collection to a subset of the shards so we can isolate the busy ones from each other.

Answer:

Yes, that is absolutely possible, and it's one of the cool capabilities of tag aware sharding. A quick review of the feature.

Tag aware sharding feature

Tags associate specific ranges of shard key values with specific shards for use in managing deployment patterns. What this means is that in your sharded cluster you can assign zero, one or more tags (or labels) to each shard. Then you can assign ranges of shard key values in various sharded collections to these tags. The balancer then moves the appropriate chunks to appropriate shards to keep things the way you "assigned" them.

The Balancer: Diversion into migration details

The whole balancing and migrations process is worthy of its own separate write-up but for now, I will simplify most of it and point out at the high level that the balancer is a thread that runs on mongos that wakes up periodically and checks (1) if it should be running (2) if there is anything for it to do.† For the balancer "something to do" is always about moving chunks between shards. The priorities that it considers when deciding which chunks need to be moved are as follows:

  • draining shards: if one of the shards is "draining" - i.e. you plan to decommission it - then this will always be the first priority for all migrations unless it has no data left.

  • tagged shards: if any chunks are on the "wrong" tagged shard for its range, then it has to be moved to a "correct" shard.

  • balance the remaining chunks: if the shard with the most chunks has nine+ more chunks than the shard with the fewest chunks, then the balancer will move chunks to try to keep things in balance.

How do you tag shards and ranges?

All you have to do for tagging to work is mark some shards with "tags" and specify which ranges of shard key values will be associated with that tag. The relevant commands are sh.addTag("shardName","tagName") and sh.addTagRange("namespace", { shardKey: minValue }, { shardKey: maxValue }, "tagName").

The MongoDB docs have a great tutorial that you always see used as an example for tag aware sharding - your shard key has to include a prefix field that can be used to figure out which geographical region the user is in, and the range of shard key values that starts with certain regions will be associated with shards in that data center.

That's all fine and good, but I'll show you that it doesn't have to be nearly that complex.

How you can use tags to designate which shards a sharded collection can use.

Let's walk through an example. I have three shards in my test cluster:

tagdb@mongos(2.6.0) > db.getSiblingDB("config").shards.find()
{ "_id" : "shard0000", "host" : "localhost:30000" }
{ "_id" : "shard0001", "host" : "localhost:30001" }
{ "_id" : "shard0002", "host" : "localhost:30002" }

I will add two tags, each to two shards. Let's say that shards 0000 and 0001 have a lot of RAM, and shards 0001 and 0002 have very fast flash storage and I plan to distribute my data to take advantage of the different physical resources:

tagdb@mongos(2.6.0) > sh.addShardTag("shard0000","HI_MEM")
tagdb@mongos(2.6.0) > sh.addShardTag("shard0002","FLASH")
tagdb@mongos(2.6.0) > sh.addShardTag("shard0001","FLASH")
tagdb@mongos(2.6.0) > sh.addShardTag("shard0001","HI_MEM")

Now that I tagged my shards, I will add tag ranges to two different collections. Note, I don't have these collections yet, and I haven't even sharded them yet, but I want to have the tags ready for them when they get created:

tagdb@mongos(2.6.0) > sh.addTagRange("tagdb.bigidx", {_id:MinKey},{_id:MaxKey},"HI_MEM");
tagdb@mongos(2.6.0) > sh.addTagRange("tagdb.bigdata", {_id:MinKey},{_id:MaxKey},"FLASH");

I have a collection with big indexes (called bigidx) that I want to constrain only to shards tagged "HI_MEM" and I have another collection with a lot of data (called bigdata) that I want to keep on shards that have flash storage because I know the data will be read from disk a lot. Note that I only needed to know what I will be using as my shard key, and I specified MinKey to MaxKey as my range - that means all of the chunks!

I will now shard the collections and take a look at how things are shaping up:

tagdb@mongos(2.6.0) > sh.enableSharding("tagdb")
{ "ok" : 1 }
tagdb@mongos(2.6.0) > sh.shardCollection("tagdb.bigdata", {_id:"hashed"})
{ "collectionsharded" : "tagdb.bigdata", "ok" : 1 }
tagdb@mongos(2.6.0) > sh.shardCollection("tagdb.bigidx", {_id:"hashed"})
{ "collectionsharded" : "tagdb.bigidx", "ok" : 1 }
tagdb@mongos(2.6.0) > sh.status()
--- Sharding Status ---
  sharding version: {
    "_id" : 1,
    "version" : 4,
    "minCompatibleVersion" : 4,
    "currentVersion" : 5,
    "clusterId" : ObjectId("535be5d7d5274545e9d01426")
  }
  shards:
    {  "_id" : "shard0000",  "host" : "localhost:30000",  "tags" : [ "HI_MEM" ] }
    {  "_id" : "shard0001",  "host" : "localhost:30001",  "tags" : [ "FLASH", "HI_MEM" ] }
    {  "_id" : "shard0002",  "host" : "localhost:30002",  "tags" : [ "FLASH" ] }
  databases:
    {  "_id" : "admin",  "partitioned" : false,  "primary" : "config" }
    {  "_id" : "tagdb",  "partitioned" : true,  "primary" : "shard0001" }
        tagdb.bigdata
            shard key: { "_id" : "hashed" }
            chunks:
                shard0001   3
                shard0002   3
            { "_id" : { "$minKey" : 1 } } -->> { "_id" : -6148914691236517204 } on : shard0001
            { "_id" : -6148914691236517204 } -->> { "_id" : -3074457345618258602 } on : shard0002
            { "_id" : -3074457345618258602 } -->> { "_id" : 0 } on : shard0001
            { "_id" : 0 } -->> { "_id" : 3074457345618258602 } on : shard0001
            { "_id" : 3074457345618258602 } -->> { "_id" : 6148914691236517204 } on : shard0002
            { "_id" : 6148914691236517204 } -->> { "_id" : { "$maxKey" : 1 } } on : shard0002
             tag: FLASH  { "_id" : { "$minKey" : 1 } } -->> { "_id" : { "$maxKey" : 1 } }
        tagdb.bigidx
            shard key: { "_id" : "hashed" }
            chunks:
                shard0000   3
                shard0001   3
            { "_id" : { "$minKey" : 1 } } -->> { "_id" : -6148914691236517204 } on : shard0000
            { "_id" : -6148914691236517204 } -->> { "_id" : -3074457345618258602 } on : shard0000
            { "_id" : -3074457345618258602 } -->> { "_id" : 0 } on : shard0001
            { "_id" : 0 } -->> { "_id" : 3074457345618258602 } on : shard0001
            { "_id" : 3074457345618258602 } -->> { "_id" : 6148914691236517204 } on : shard0000
            { "_id" : 6148914691236517204 } -->> { "_id" : { "$maxKey" : 1 } } on : shard0001
             tag: HI_MEM  { "_id" : { "$minKey" : 1 } } -->> { "_id" : { "$maxKey" : 1 } }
How you can use tags to make collection migrate from one shard to another

What if you have a number of unsharded collections in your sharded database and you don't want for all of them to hang out on the primary shard for this DB? Well, you might need unique tags for each shard, but then you can do this to move collection one to shard0001:

tagdb@mongos(2.6.0) > sh.addShardTag("shard0002","shard2")
tagdb@mongos(2.6.0) > sh.addTagRange("tagdb.one", {_id:MinKey},{_id:MaxKey},"shard2")
tagdb@mongos(2.6.0) > sh.shardCollection("tagdb.one",{_id:1})
{ "collectionsharded" : "tagdb.one", "ok" : 1 }
tagdb@mongos(2.6.0) > sh.status()
   ...
        tagdb.one
            shard key: { "_id" : 1 }
            chunks:
                shard0002   1
            { "_id" : { "$minKey" : 1 } } -->> { "_id" : { "$maxKey" : 1 } } on : shard0002
             tag: shard2  { "_id" : { "$minKey" : 1 } } -->> { "_id" : { "$maxKey" : 1 } }

If we peek inside the config database, we should see our tags in the config.tags collection, our shard ranges attached to chunks in config.chunks and we can find evidence of the chunk moves due to tag policy in the config.changelog collection, as well as the mongos and mongod log files.

To summarize: tag aware sharding can be easily used to distribute a single collection a particular way across all shards, to isolate whole collections on a subset of shards, and even to move an entire collection from one shard to another.


† This is definitely a gross simplification of all the steps the balancer goes through - look for a more detailed write-up demystifying the inner workings of migrations some time soon.


Comments

Apr 15, 2014

Question:

I've heard that MongoDB can be effectively used to model "dynamic attributes" - where you don't know up front all the different attributes and not all attributes apply to all items. Can you describe how that can be done, and in particular how it can be effectively indexed?

Answer:

The problem:

Imagine you are building an e-commerce site and you aspire to be as big as amazon.com some day, which means you will be selling many different types of products. It's easy to see that there will be sets of attributes that will only apply to some of the products you sell.

Product document may look like this:

{
  SKU: "XRD12349",
  type: "book",
  title: "MongoDB, The Definitive Guide",
  ISBN: "xxx",
  author: [ "Kristina Chodorow", "Mike Dieroff"],
  genre: ["computing", "databases"]
}

or this:

{
  SKU: "Y32944EW",
  type: "shoes",
  manufacturer: "ShoesForAll",
  color: "blue",
  style: "comfort",
  size: "7B"
}

You can see how it would be extremely challenging to manage a collection that has an incredibly wide variety of document "shapes". Now, while some people call MongoDB "schemaless" I am not a fan of this designation. The schema of each document is defined by the document itself. To be able to build a robust applications you need to decide what the schema of the documents will be, otherwise your application will spend as much time examining the documents to learn their schema as providing actual functionality.

Possible solutions:

One way to index the attributes you want to be able to search by is by creating an index on each such attribute in a schema like the one above. This is not practical, even if you use "sparse" indexes (since many attributes will be set only on a small subset of the products), because you may end up with dozens, if not hundreds of indexes. In addition, every time a new attribute is introduced, a new index has to be added on the collection. Not very practical.

The other solution, which is a nice generalization of storing attributes which are numerous and not known up-front, is to use an array of key-value pairs.

Our two sample documents might then become:

{
  SKU: "XRD12349",
  type: "book",
  attr: [
      { "k": "title", 
        "v": "MongoDB, The Definitive Guide, 1st Edition"
      },
      { "k": "ISBN",
        "v": "xxx"
      },
      { "k": "author",
        "v": "Kristina Chodorow"
      },
      { "k": "author",
        "v": "Mike Dieroff"
      },
      { "k": "genre",
        "v": ["computing", "databases"] 
      }
  ]
}

and

{
  SKU: "Y32944EW",
  type: "shoes",
  attr: [
      { "k": "manufacturer", 
        "v": "ShoesForAll",
      },
      { "k": "color", 
        "v": "blue",
      },
      { "k": "style", 
        "v": "comfort",
      },
      { "k": "size", 
        "v": "7B"
      }
  ]
}

Note that for an attribute that can have multiple values you have a choice of storing it as an array in a single "key" or you can repeat keys that can have more than one value.

Now we can index all of these attribute values with the following:

PRIMARY(2.6.0) > db.products.ensureIndex( { "attr.k":1, "attr.v":1 } )

Let's take a look at how the queries will execute and use the index by using the "explain()" helper in MongoDB shell. When filtering based on attribute key-value pair, remember to use the $elemMatch operator to indicate that both conditions must be satisfied by the same element of the array.

PRIMARY(2.6.0) > db.products.find( 
                                { "attr": { "$elemMatch" : { "k":"size", "v":"8B" } }
                     } ).explain()
{
    "cursor" : "BtreeCursor attr.k_1_attr.v_1",
    "isMultiKey" : true,
    "n" : 104,
    "nscannedObjects" : 104,
    "nscanned" : 104,
    "nscannedObjectsAllPlans" : 104,
    "nscannedAllPlans" : 104,
    "scanAndOrder" : false,
    "indexOnly" : false,
    "nYields" : 0,
    "nChunkSkips" : 0,
    "millis" : 2,
    "indexBounds" : {
        "attr.k" : [
            [
                "size",
                "size"
            ]
        ],
        "attr.v" : [
            [
                "8B",
                "8B"
            ]
        ]
    },
    "server" : "asyasmacbook.local:27017",
    "filterSet" : false
}
PRIMARY(2.6.0) > db.products.find(
                  { "attr" :  { "$elemMatch" : { "k":"color", "v":"blue"}}
              } ).explain()
{
    "cursor" : "BtreeCursor attr.k_1_attr.v_1",
    "isMultiKey" : true,
    "n" : 98,
    "nscannedObjects" : 98,
    "nscanned" : 98,
    "nscannedObjectsAllPlans" : 98,
    "nscannedAllPlans" : 98,
    "scanAndOrder" : false,
    "indexOnly" : false,
    "nYields" : 0,
    "nChunkSkips" : 0,
    "millis" : 0,
    "indexBounds" : {
        "attr.k" : [
            [
                "color",
                "color"
            ]
        ],
        "attr.v" : [
            [
                "blue",
                "blue"
            ]
        ]
    },
    "server" : "asyasmacbook.local:27017",
    "filterSet" : false
}

Now I'll use both criteria, and I'll add another one for attribute "style" - since I want to match only when all are true (rather than when any is true) I will use the $all operator. Passing "true" as an argument to explain will show all considered plans and not just the winning plan.

PRIMARY(2.6.0) > db.products.find( { "attr" : { "$all" : [ 
                    { "$elemMatch" : { "k":"style", "v":"comfort" } }, 
                    { "$elemMatch" : { "k":"color", "v":"blue" } },
                    { "$elemMatch" : { "k":"size", "v":"8B" } } 
                  ] } } ).explain(true)
{
    "cursor" : "BtreeCursor attr.k_1_attr.v_1",
    "isMultiKey" : true,
    "n" : 1,
    "nscannedObjects" : 98,
    "nscanned" : 98,
    "nscannedObjectsAllPlans" : 296,
    "nscannedAllPlans" : 298,
    "scanAndOrder" : false,
    "indexOnly" : false,
    "nYields" : 2,
    "nChunkSkips" : 0,
    "millis" : 1,
    "indexBounds" : {
        "attr.k" : [
            [
                "color",
                "color"
            ]
        ],
        "attr.v" : [
            [
                "blue",
                "blue"
            ]
        ]
    },
    "allPlans" : [
        {
            "cursor" : "BtreeCursor attr.k_1_attr.v_1",
            "isMultiKey" : true,
            "n" : 1,
            "nscannedObjects" : 98,
            "nscanned" : 98,
            "scanAndOrder" : false,
            "indexOnly" : false,
            "nChunkSkips" : 0,
            "indexBounds" : {
                "attributes.name" : [
                    [
                        "color",
                        "color"
                    ]
                ],
                "attributes.value" : [
                    [
                        "blue",
                        "blue"
                    ]
                ]
            }
        },
        {
            "cursor" : "BtreeCursor attr.k_1_attr.v_1",
            "isMultiKey" : true,
            "n" : 1,
            "nscannedObjects" : 99,
            "nscanned" : 100,
            "scanAndOrder" : false,
            "indexOnly" : false,
            "nChunkSkips" : 0,
            "indexBounds" : {
                "attributes.name" : [
                    [
                        "style",
                        "style"
                    ]
                ],
                "attributes.value" : [
                    [
                        "comfort",
                        "comfort"
                    ]
                ]
            }
        },
        {
            "cursor" : "BtreeCursor attr.k_1_attr.v_1",
            "isMultiKey" : true,
            "n" : 1,
            "nscannedObjects" : 99,
            "nscanned" : 100,
            "scanAndOrder" : false,
            "indexOnly" : false,
            "nChunkSkips" : 0,
            "indexBounds" : {
                "attributes.name" : [
                    [
                        "size",
                        "size"
                    ]
                ],
                "attributes.value" : [
                    [
                        "8B",
                        "8B"
                    ]
                ]
            }
        }
    ],
    "server" : "asyasmacbook.local:27017",
    "filterSet" : false,
    "stats" : {
        "type" : "KEEP_MUTATIONS",
        "works" : 100,
        "yields" : 2,
        "unyields" : 2,
        "invalidates" : 0,
        "advanced" : 1,
        "needTime" : 97,
        "needFetch" : 0,
        "isEOF" : 1,
        "children" : [
            {
                "type" : "FETCH",
                "works" : 99,
                "yields" : 2,
                "unyields" : 2,
                "invalidates" : 0,
                "advanced" : 1,
                "needTime" : 97,
                "needFetch" : 0,
                "isEOF" : 1,
                "alreadyHasObj" : 0,
                "forcedFetches" : 0,
                "matchTested" : 1,
                "children" : [
                    {
                        "type" : "IXSCAN",
                        "works" : 98,
                        "yields" : 2,
                        "unyields" : 2,
                        "invalidates" : 0,
                        "advanced" : 98,
                        "needTime" : 0,
                        "needFetch" : 0,
                        "isEOF" : 1,
                        "keyPattern" : "{ attr.k: 1.0, attr.v: 1.0 }",
                        "boundsVerbose" : "field #0['attr.k']: [\"color\", \"color\"], field #1['attr.v']: [\"blue\", \"blue\"]",
                        "isMultiKey" : 1,
                        "yieldMovedCursor" : 0,
                        "dupsTested" : 98,
                        "dupsDropped" : 0,
                        "seenInvalidated" : 0,
                        "matchTested" : 0,
                        "keysExamined" : 98,
                        "children" : [ ]
                    }
                ]
            }
        ]
    }
}

What does this mean? If we look at allPlans we see that the optimizer tried our attribute index separately (but in parallel) with each of the clauses inside the $all array. The winning plan was for "color" attribute because it turned out to be the most selective.

In MongoDB 2.4 this was not possible and unfortunately the optimizer would use the index for the first clause of the $all expression. If it happened to have low selectivity, then you didn't get as good performance as you might have, had you ordered your conditions differently. In 2.6 the order of expressions inside $all does not make a difference as the one that's most selective will be the one used by the query optimizer.

Depending on how you need to query your attributes, there are different ways of structuring the attribute array. You can use key-value pairs as I showed, you can use the attribute name as the key value, or you can even store a single string value "attrname::attrvalue" - best thing is to take a look at the types of queries and updates you will be running and try it different ways, benchmark which one works best and use that one.


Comments