MongoDB’s aggregation pipeline is the most powerful tool in its query arsenal — a multi-stage data processing engine that can transform, group, join, and reshape documents in a single database operation. While find() is sufficient for simple data retrieval, real applications need computed dashboards, user activity summaries, search-ranked results, and cross-collection joins — all use cases that require the aggregation pipeline. Mastering the pipeline stages and their ordering is the difference between one optimised database query and ten sequential application-level operations.
Core Aggregation Stages
| Stage | Purpose | Performance Note |
|---|---|---|
$match |
Filter documents (like find() query) |
Always first — uses indexes |
$group |
Group and accumulate — $sum, $avg, $count, $push |
Requires full scan of matched docs |
$project |
Reshape — include/exclude/compute fields | Reduces document size for subsequent stages |
$sort |
Order results | After $match and $limit when possible |
$limit / $skip |
Pagination | Apply before $sort to reduce sort input size |
$lookup |
Left outer join to another collection | Use pipeline sub-stage to filter joined docs |
$unwind |
Deconstruct array — one document per array element | Increases document count — limit before unwinding |
$facet |
Multiple sub-pipelines on same input simultaneously | One query for multiple aggregations |
$addFields |
Add computed fields without removing existing ones | Use instead of $project when keeping all fields |
$bucket |
Categorise documents into ranges | Better than client-side binning |
$match before a $sort if it detects the match does not depend on any fields added by a preceding stage, and it can push $limit through $sort to avoid sorting more documents than needed. However, you should not rely on this optimisation. Always write the pipeline in the most logical order: $match first to narrow the dataset, then compute, then sort, then limit.$facet to replace multiple sequential aggregation queries with a single pipeline. A dashboard that needs task counts by status, counts by priority, and recent activity can be computed in one query — MongoDB processes the initial $match once, then fans out into the sub-pipelines simultaneously. This eliminates N round-trips to the database and ensures all facets reflect the same consistent snapshot of data.$lookup stage performs a JOIN which can be expensive — it fetches documents from the foreign collection for each input document. Always add a pipeline sub-stage with $match and $project to filter and limit the fields fetched from the joined collection. Without this, MongoDB fetches entire documents from the foreign collection and discards most fields after the join — wasting I/O. The pipeline option in $lookup runs the sub-pipeline on the joined collection, pushing the filter to the database.Complete Aggregation Pipeline Examples
const mongoose = require('mongoose');
const Task = require('../models/task.model');
// ── 1. Dashboard stats — $facet for multi-metric in one query ─────────────
async function getTaskDashboard(userId) {
const objectId = new mongoose.Types.ObjectId(userId);
const [result] = await Task.aggregate([
// Stage 1: filter to current user's non-deleted tasks
{ $match: { user: objectId, deletedAt: { $exists: false } } },
// Stage 2: fan out into multiple sub-pipelines simultaneously
{ $facet: {
byStatus: [
{ $group: { _id: '$status', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
],
byPriority: [
{ $group: { _id: '$priority', count: { $sum: 1 } } },
],
overdueTasks: [
{ $match: {
dueDate: { $lt: new Date() },
status: { $ne: 'completed' },
}},
{ $count: 'total' },
],
completedThisWeek: [
{ $match: {
status: 'completed',
completedAt: { $gte: new Date(Date.now() - 7 * 86400000) },
}},
{ $count: 'total' },
],
recentActivity: [
{ $sort: { updatedAt: -1 } },
{ $limit: 5 },
{ $project: { title: 1, status: 1, priority: 1, updatedAt: 1 } },
],
tagCloud: [
{ $unwind: '$tags' },
{ $group: { _id: '$tags', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 20 },
],
}},
]);
// Normalise the facet output
return {
byStatus: Object.fromEntries(result.byStatus.map(s => [s._id, s.count])),
byPriority: Object.fromEntries(result.byPriority.map(p => [p._id, p.count])),
overdue: result.overdueTasks[0]?.total ?? 0,
completedThisWeek:result.completedThisWeek[0]?.total ?? 0,
recentActivity: result.recentActivity,
tagCloud: result.tagCloud,
};
}
// ── 2. $lookup — join tasks with their assignees ───────────────────────────
async function getTasksWithAssignees(workspaceId) {
return Task.aggregate([
{ $match: { workspace: new mongoose.Types.ObjectId(workspaceId) } },
{ $lookup: {
from: 'users',
localField: 'assignee', // field in tasks
foreignField: '_id', // field in users
as: 'assigneeInfo',
// ✅ pipeline sub-stage — only fetch needed fields
pipeline: [
{ $project: { name: 1, email: 1, avatarUrl: 1 } },
],
}},
// $unwind converts assigneeInfo array to single object (null if no assignee)
{ $unwind: { path: '$assigneeInfo', preserveNullAndEmptyArrays: true } },
{ $project: {
title: 1,
status: 1,
priority: 1,
dueDate: 1,
assigneeName: '$assigneeInfo.name',
assigneeEmail:'$assigneeInfo.email',
}},
]);
}
// ── 3. Time-series aggregation — tasks completed per day ──────────────────
async function getCompletionTrend(userId, days = 30) {
const since = new Date(Date.now() - days * 86400000);
return Task.aggregate([
{ $match: {
user: new mongoose.Types.ObjectId(userId),
status: 'completed',
completedAt: { $gte: since },
}},
{ $group: {
_id: {
$dateToString: { format: '%Y-%m-%d', date: '$completedAt' }
},
count: { $sum: 1 },
}},
{ $sort: { _id: 1 } },
{ $project: {
_id: 0,
date: '$_id',
count: 1,
}},
]);
}
// ── 4. $bucket — task age distribution ────────────────────────────────────
async function getTaskAgeDistribution(userId) {
const now = new Date();
return Task.aggregate([
{ $match: {
user: new mongoose.Types.ObjectId(userId),
status: { $ne: 'completed' },
}},
{ $addFields: {
ageInDays: {
$divide: [{ $subtract: [now, '$createdAt'] }, 86400000],
},
}},
{ $bucket: {
groupBy: '$ageInDays',
boundaries: [0, 1, 7, 30, 90, 180],
default: '180+',
output: {
count: { $sum: 1 },
tasks: { $push: { id: '$_id', title: '$title' } },
},
}},
]);
}
// ── 5. Full-text search with relevance ranking ─────────────────────────────
// Requires: taskSchema.index({ title: 'text', description: 'text', tags: 'text' }, { weights: { title: 10, tags: 5 } })
async function searchTasks(userId, query, page = 1, limit = 10) {
return Task.aggregate([
{ $match: {
user: new mongoose.Types.ObjectId(userId),
$text: { $search: query },
}},
{ $addFields: { score: { $meta: 'textScore' } } },
{ $sort: { score: { $meta: 'textScore' }, createdAt: -1 } },
{ $skip: (page - 1) * limit },
{ $limit: limit },
{ $project: { title: 1, status: 1, priority: 1, score: 1, tags: 1 } },
]);
}
How It Works
Step 1 — $match Uses Indexes and Must Come First
A $match stage at the start of a pipeline runs against the collection’s indexes — identical to a find() query. It reduces the number of documents entering subsequent stages. If $match follows $group, it runs against the in-memory grouped output with no index access. The rule is absolute: if you can filter before computing, always filter first.
Step 2 — $facet Parallelises Multiple Aggregations
Each sub-pipeline in $facet operates on the same set of documents produced by the stages before $facet. The sub-pipelines run concurrently and their results are collected into an output document with one field per sub-pipeline. A single MongoDB round-trip replaces N sequential queries — each with its own network latency and connection overhead.
Step 3 — $lookup pipeline Filters at the Source
Without the pipeline option, $lookup fetches complete documents from the foreign collection and transmits them to the aggregation pipeline. For a users collection with 50-field documents, joining 1000 tasks means 1000 full user documents transferred internally. Adding a { $project: { name: 1, email: 1 } } in the pipeline sub-stage filters on the foreign collection before transfer — dramatically reducing data movement.
Step 4 — $bucket Creates Histogram Bins
$bucket assigns each document to a range (bucket) based on a numeric field value. The boundaries array defines the bin edges — [0, 1, 7, 30] creates buckets [0,1), [1,7), [7,30). The default bucket catches values outside the defined boundaries. The output option is equivalent to $group‘s accumulator operators — computing counts, sums, averages, and arrays per bucket.
Step 5 — textScore Sorts by Relevance
After a $text match, MongoDB computes a relevance score for each document based on how well it matches the search terms — weighted by the index weights (title: 10 means title matches count 10x more than unweighted fields). { $meta: 'textScore' } in $addFields projects this score into the document, and { $meta: 'textScore' } in $sort orders by it. Higher-scoring (more relevant) documents appear first.
Quick Reference
| Task | Stage |
|---|---|
| Filter documents | { $match: { field: value } } |
| Count per group | { $group: { _id: '$field', count: { $sum: 1 } } } |
| Reshape document | { $project: { field: 1, computed: '$other' } } |
| Add computed field | { $addFields: { fullName: { $concat: ['$first', ' ', '$last'] } } } |
| Join collections | { $lookup: { from, localField, foreignField, as, pipeline } } |
| Flatten array | { $unwind: { path: '$tags', preserveNullAndEmptyArrays: true } } |
| Multi-metric dashboard | { $facet: { metric1: [...], metric2: [...] } } |
| Histogram bins | { $bucket: { groupBy, boundaries, default, output } } |
| Text search score | { $addFields: { score: { $meta: 'textScore' } } } |