The aggregation pipeline is MongoDB’s most powerful query mechanism — it lets you transform and analyse data with a sequence of processing stages, each stage passing its output as input to the next. Simple find() queries retrieve documents as they are stored. Aggregation transforms them: computing totals, grouping by field, reshaping document structure, joining collections, calculating statistics, and generating summary reports. The Mongoose ORM exposes the full pipeline API via Model.aggregate([]). Every analytics endpoint, dashboard stat, and reporting feature in a MEAN Stack application will use the aggregation pipeline.
Core Pipeline Stages
| Stage | Purpose | Example |
|---|---|---|
$match |
Filter documents — like find() |
{ $match: { status: 'pending' } } |
$group |
Group documents and compute aggregations | { $group: { _id: '$status', count: { $sum: 1 } } } |
$sort |
Sort documents in the pipeline | { $sort: { count: -1 } } |
$project |
Reshape documents — include, exclude, or add computed fields | { $project: { title: 1, fullName: { $concat: ['$first', ' ', '$last'] } } } |
$limit |
Restrict the number of documents | { $limit: 10 } |
$skip |
Skip N documents | { $skip: 20 } |
$unwind |
Deconstruct an array — one document per element | { $unwind: '$tags' } |
$addFields |
Add computed fields without removing existing ones | { $addFields: { age: { $subtract: ['$endDate', '$startDate'] } } } |
$count |
Count documents in the pipeline | { $count: 'total' } |
$replaceRoot |
Replace document with a subdocument | { $replaceRoot: { newRoot: '$address' } } |
$set |
Alias for $addFields — add/update fields |
{ $set: { updatedAt: '$$NOW' } } |
$unset |
Remove fields from documents | { $unset: ['__v', 'password'] } |
$group Accumulator Operators
| Accumulator | Result |
|---|---|
{ $sum: 1 } |
Count of documents in the group |
{ $sum: '$field' } |
Sum of field values |
{ $avg: '$field' } |
Average of field values |
{ $min: '$field' } |
Minimum value in the group |
{ $max: '$field' } |
Maximum value in the group |
{ $first: '$field' } |
First value in the group (depends on sort) |
{ $last: '$field' } |
Last value in the group |
{ $push: '$field' } |
Array of all values in the group |
{ $addToSet: '$field' } |
Array of unique values in the group |
{ $count: {} } |
Count documents (MongoDB 5.2+) |
$match as the first stage in the pipeline. A $match at the start can use indexes, drastically reducing the number of documents that flow through subsequent stages. A $match placed after a $group has no documents left to filter against an index — the data has already been fully processed. Similarly, a $sort placed early in the pipeline can use an index; placed after a $group, it must sort in memory.$facet to run multiple aggregation pipelines within a single stage. Instead of making three separate requests for task counts by status, counts by priority, and total overdue — which requires three round-trips to MongoDB — wrap them in a $facet stage and get all three results in a single query. This is ideal for dashboard endpoints that need multiple metrics simultaneously.$sort or $group stage needs to process more data than this, MongoDB throws an error. Enable disk use for large aggregations: Model.aggregate([...]).option({ allowDiskUse: true }). In production, avoid aggregations that require large in-memory sorts by always having appropriate indexes and filtering aggressively with $match early in the pipeline.Complete Pipeline Examples
const { Types: { ObjectId } } = require('mongoose');
const userId = new ObjectId('64a1f2b3c8e4d5f6a7b8c9d1');
// ── Task statistics dashboard ──────────────────────────────────────────────
const stats = await Task.aggregate([
// Stage 1: Filter to this user's non-deleted tasks
{ $match: { user: userId, deletedAt: { $exists: false } } },
// Stage 2: Group by status, count each
{ $group: { _id: '$status', count: { $sum: 1 } } },
// Stage 3: Sort by count descending
{ $sort: { count: -1 } },
// Stage 4: Reshape output
{ $project: { _id: 0, status: '$_id', count: 1 } },
]);
// Result: [{ status: 'pending', count: 24 }, { status: 'completed', count: 18 }, ...]
// ── Tasks per priority with average completion time ───────────────────────
const priorityStats = await Task.aggregate([
{ $match: { user: userId, status: 'completed' } },
{
$group: {
_id: '$priority',
count: { $sum: 1 },
avgCompletionMs: {
$avg: {
$subtract: ['$completedAt', '$createdAt']
}
},
minCreatedAt: { $min: '$createdAt' },
maxCreatedAt: { $max: '$createdAt' },
},
},
{
$addFields: {
avgCompletionHours: {
$round: [{ $divide: ['$avgCompletionMs', 3600000] }, 1]
},
},
},
{ $sort: { count: -1 } },
]);
// ── Tasks created per day (last 30 days) ──────────────────────────────────
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const dailyActivity = await Task.aggregate([
{
$match: {
user: userId,
createdAt: { $gte: thirtyDaysAgo },
},
},
{
$group: {
_id: {
year: { $year: '$createdAt' },
month: { $month: '$createdAt' },
day: { $dayOfMonth: '$createdAt' },
},
count: { $sum: 1 },
completed: {
$sum: { $cond: [{ $eq: ['$status', 'completed'] }, 1, 0] }
},
},
},
{
$project: {
_id: 0,
date: {
$dateFromParts: {
year: '$_id.year',
month: '$_id.month',
day: '$_id.day',
},
},
count: 1,
completed: 1,
},
},
{ $sort: { date: 1 } },
]);
// ── $unwind — one document per tag ───────────────────────────────────────
// Find the most-used tags across all tasks
const topTags = await Task.aggregate([
{ $match: { user: userId } },
{ $unwind: '$tags' }, // { tags: ['a','b'] } becomes two docs with tags:'a' and tags:'b'
{ $group: { _id: '$tags', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 10 },
{ $project: { _id: 0, tag: '$_id', count: 1 } },
]);
// Result: [{ tag: 'urgent', count: 12 }, { tag: 'Q4', count: 8 }, ...]
// ── $unwind with preserveNullAndEmpty ────────────────────────────────────
await Task.aggregate([
{ $unwind: { path: '$tags', preserveNullAndEmptyArrays: true } },
// Includes documents with no tags array (they appear once with tags: undefined)
]);
// ── $addFields — computed fields ──────────────────────────────────────────
const enriched = await Task.aggregate([
{ $match: { user: userId } },
{
$addFields: {
isOverdue: {
$and: [
{ $lt: ['$dueDate', new Date()] },
{ $ne: ['$status', 'completed'] },
],
},
daysUntilDue: {
$divide: [
{ $subtract: ['$dueDate', new Date()] },
86400000, // ms per day
],
},
priorityOrder: {
$switch: {
branches: [
{ case: { $eq: ['$priority', 'high'] }, then: 1 },
{ case: { $eq: ['$priority', 'medium'] }, then: 2 },
{ case: { $eq: ['$priority', 'low'] }, then: 3 },
],
default: 4,
},
},
},
},
{ $sort: { priorityOrder: 1, daysUntilDue: 1 } },
]);
How It Works
Step 1 — Stages Transform a Stream of Documents
The pipeline processes documents as a stream: stage 1 outputs documents to stage 2, stage 2 outputs to stage 3, and so on. Each stage can produce fewer documents (filter), more documents (unwind), or the same count (project, addFields, sort). The final stage’s output is returned to your application. This streaming model means MongoDB can process large datasets without loading everything into memory at once.
Step 2 — $match Uses Indexes When Placed First
A $match stage is essentially a find() filter. When it is the first pipeline stage, MongoDB can use collection indexes to quickly narrow down documents — the same way a regular find() query does. If $match appears after a $group, the index is useless because $group has already reshuffled the data into aggregated documents that no longer look like the original collection.
Step 3 — $group Collapses Multiple Documents into Summaries
The _id field in $group defines the grouping key. All documents with the same _id value are collapsed into a single output document. Accumulator operators ($sum, $avg, $push) compute values across all documents in the group. Setting _id: null groups all documents into a single total — useful for computing a single aggregate across the entire filtered set.
Step 4 — $unwind Flattens Arrays for Per-Element Analysis
A document with tags: ['urgent', 'Q4', 'client'] becomes three documents after $unwind: '$tags' — one for each tag. This allows grouping, counting, and filtering on individual array elements. Without unwind, you can check if an array contains a value but you cannot easily count how many documents have each tag value. Unwind is essential for any analysis that treats array elements as individual data points.
Step 5 — $project and $addFields Shape the Output
$project is the aggregation equivalent of projection — include fields with 1, exclude with 0, and compute new fields using expressions. $addFields is like $project but keeps all existing fields and adds or modifies specified ones. Computed fields can use arithmetic ($subtract, $divide), string ($concat, $toUpper), date ($year, $dayOfMonth), conditional ($cond, $switch), and dozens of other expression operators.
Real-World Example: Comprehensive Dashboard Aggregation
// GET /api/v1/tasks/dashboard — all stats in one query using $facet
exports.getDashboard = asyncHandler(async (req, res) => {
const userId = new ObjectId(req.user.id);
const now = new Date();
const [result] = await Task.aggregate([
// First: filter to this user's active tasks
{ $match: { user: userId, deletedAt: { $exists: false } } },
// Then: compute multiple metrics in parallel with $facet
{
$facet: {
// Count by status
byStatus: [
{ $group: { _id: '$status', count: { $sum: 1 } } },
{ $project: { _id: 0, status: '$_id', count: 1 } },
],
// Count by priority
byPriority: [
{ $group: { _id: '$priority', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
],
// Overdue tasks
overdue: [
{
$match: {
dueDate: { $lt: now },
status: { $ne: 'completed' },
},
},
{ $count: 'count' },
],
// Due today
dueToday: [
{
$match: {
dueDate: {
$gte: new Date(now.getFullYear(), now.getMonth(), now.getDate()),
$lt: new Date(now.getFullYear(), now.getMonth(), now.getDate() + 1),
},
status: { $ne: 'completed' },
},
},
{ $count: 'count' },
],
// Top 5 tags
topTags: [
{ $unwind: '$tags' },
{ $group: { _id: '$tags', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 5 },
{ $project: { _id: 0, tag: '$_id', count: 1 } },
],
},
},
]);
// Shape the response
const statusMap = {};
result.byStatus.forEach(({ status, count }) => { statusMap[status] = count; });
res.json({
success: true,
data: {
byStatus: statusMap,
byPriority: result.byPriority,
overdue: result.overdue[0]?.count ?? 0,
dueToday: result.dueToday[0]?.count ?? 0,
topTags: result.topTags,
},
});
});
Common Mistakes
Mistake 1 — Placing $match after $group
❌ Wrong — filters AFTER grouping: all documents processed before filtering:
Task.aggregate([
{ $group: { _id: '$user', total: { $sum: 1 } } },
{ $match: { _id: userId } }, // too late — all users processed
]);
✅ Correct — filter first with $match to reduce documents before $group:
Task.aggregate([
{ $match: { user: userId } }, // uses index — reduces to only this user's tasks
{ $group: { _id: '$status', total: { $sum: 1 } } },
]);
Mistake 2 — Forgetting preserveNullAndEmptyArrays on $unwind
❌ Wrong — documents with empty or missing arrays are silently dropped:
{ $unwind: '$tags' }
// Document { title: 'Task with no tags', tags: [] } is completely removed from results!
✅ Correct — preserve documents without the array field:
{ $unwind: { path: '$tags', preserveNullAndEmptyArrays: true } }
Mistake 3 — Using string userId instead of ObjectId in $match
❌ Wrong — query returns 0 results because types don’t match:
const userId = req.user.id; // string: '64a1f...'
Task.aggregate([{ $match: { user: userId } }]);
// Task.user is an ObjectId — comparing with a string always returns 0 results
✅ Correct — convert to ObjectId for aggregation:
const userId = new mongoose.Types.ObjectId(req.user.id);
Task.aggregate([{ $match: { user: userId } }]);
Quick Reference
| Need | Stage | Example |
|---|---|---|
| Filter documents | $match |
{ $match: { user: userId, status: 'pending' } } |
| Count / aggregate groups | $group |
{ $group: { _id: '$status', n: { $sum: 1 } } } |
| Sort results | $sort |
{ $sort: { count: -1 } } |
| Select / compute fields | $project |
{ $project: { title: 1, _id: 0 } } |
| Add computed field | $addFields |
{ $addFields: { isOverdue: { $lt: ['$dueDate', now] } } } |
| Flatten array | $unwind |
{ $unwind: '$tags' } |
| Limit results | $limit |
{ $limit: 10 } |
| Multiple pipelines | $facet |
{ $facet: { a: [...], b: [...] } } |
| Count total | $count |
{ $count: 'total' } |