Using the MongoDB Aggregation Framework via MongoDB Asynchronous Java Driver
Introduction
I often use Morphia in projects for mapping Java objects to/from MongoDB and it’s Query API instead of building the complex DBObject query. However, the current version (v. 0.105) without aggregate command support. Fortunately, the MongoDB Asynchronous Java Driver provides the aggregate builder
to construct complex pipelines of operators in fluent way. As usual, the following paragraphs will express some basic usages of mongodb-async-driver in aggregation pipeline framework through an example.
Add MongoDB Asynchronous Java Driver Dependency
Add current mongodb-async-driver’s coordinate in pom.xml
.
<dependency>
<groupId>com.allanbank</groupId>
<artifactId>mongodb-async-driver</artifactId>
<version>1.2.3</version>
</dependency>
A Simple Example
Suppose I’m using MongoDB to store customers’ orders in a collection named orders
. And we want to examine this collection in hour of order-date. Here’s a sample document via MongoHub:
{
"_id": { "$oid" : "52b474563004023000a51b59" },
"orderDate": { "$date": 1386863940000 },
"orderNo": "C13121201131",
"salePrice": 15900
}
Java Code
The driver provides the AggregationGroupId
class to help construct the combination of id field. Here we want to browse the data in hour-granularity, therefore, using AggregationGroupId.id()
to combine the other fields we projected as our group id.
public static String getOrdersByHour() {
// * Connect to MongoDB
MongoClientConfiguration config = new MongoClientConfiguration();
config.addServer("server:port");
MongoClient mongoClient = MongoFactory.createClient(config);
// * Get a reference to a database.
MongoDatabase database = mongoClient.getDatabase("test");
// * Get collection from the 'test' database.
MongoCollection collection = database.getCollection("orders");
Aggregate.Builder builder = new Aggregate.Builder();
builder.project(
AggregationProjectFields.includeWithoutId("orderDate"),
Expressions.set("year",
Expressions.year(Expressions.field("orderDate"))
),
Expressions.set("month",
Expressions.month(Expressions.field("orderDate"))
),
Expressions.set("day",
Expressions.dayOfMonth(Expressions.field("orderDate"))
),
Expressions.set("hour",
Expressions.hour(Expressions.field("orderDate"))
)
);
builder.group(
AggregationGroupId.id().addField("year").addField("month").addField("day").addField("hour"),
AggregationGroupField.set("total").count()
);
builder.sort(Sort.asc("_id"));
System.out.println(new ArrayElement("pipeline", builder.build()
.getPipeline()));
Iterable<Document> docs = collection.aggregate(builder.build());
for (Document d : docs) {
System.out.println(d);
}
}
Aggregate Command / Builder / Results
Through the getPipeline
method of builder, we can verify our command and the sequence of pipeline.
pipeline : [
{
'$project' : {
'_id' : 0,
orderDate : 1,
year : { '$year' : '$orderDate' },
month : { '$month' : '$orderDate' },
day : { '$dayOfMonth' : '$orderDate' },
hour : { '$hour' : '$orderDate' }
}
},
{
'$group' : {
'_id' : {
year : '$year',
month : '$month',
day : '$day',
hour : '$hour'
},
total : { '$sum' : 1 }
}
},
{
'$sort' : { '_id' : 1 }
}
]
and the portion of aggregated results are shown in below:
...
{
'_id' : {
year : 2013,
month : 12,
day : 12,
hour : 13
},
total : 23
}
{
'_id' : {
year : 2013,
month : 12,
day : 12,
hour : 14
},
total : 20
}
{
'_id' : {
year : 2013,
month : 12,
day : 12,
hour : 15
},
total : 22
}
...