Advanced Features
Aggregation Pipelines
Analyze and transform data with MongoDB aggregation
Aggregation Pipelines
Aggregation pipelines are MongoDB's framework for data aggregation and transformation.
What is Aggregation?
Aggregation operations process data records and return computed results. Think of it as a data processing pipeline where documents enter, pass through stages, and emerge transformed.
Documents → [Stage 1] → [Stage 2] → [Stage 3] → Results
Basic Syntax
local results = collection:Aggregate(pipeline)
Parameters
pipeline(table): Array of aggregation stages
Returns
table: Array of result documentsnil: On failure
Aggregation Stages
$match - Filter Documents
Filter documents early in the pipeline for efficiency:
local results = collection:Aggregate({
{
["$match"] = {
level = { ["$gte"] = 10 },
banned = { ["$ne"] = true }
}
}
})
$group - Group Documents
Group documents by a field and perform calculations:
-- Group by class
local byClass = collection:Aggregate({
{
["$group"] = {
_id = "$class",
count = { ["$sum"] = 1 },
avgLevel = { ["$avg"] = "$level" },
totalCredits = { ["$sum"] = "$credits" },
maxLevel = { ["$max"] = "$level" },
minLevel = { ["$min"] = "$level" }
}
}
})
for _, group in ipairs(byClass) do
print(string.format("%s: %d players, avg level %.1f",
group._id, group.count, group.avgLevel))
end
Use _id = nil (or omit) to aggregate across all documents without grouping.
$sort - Sort Results
local sorted = collection:Aggregate({
{ ["$match"] = { banned = false } },
{ ["$sort"] = { score = -1, level = -1 } }, -- -1 = descending
{ ["$limit"] = 10 }
})
$limit - Limit Results
local top10 = collection:Aggregate({
{ ["$sort"] = { score = -1 } },
{ ["$limit"] = 10 }
})
$skip - Skip Results
-- Pagination: page 2 with 10 per page
local page2 = collection:Aggregate({
{ ["$sort"] = { score = -1 } },
{ ["$skip"] = 10 },
{ ["$limit"] = 10 }
})
$project - Reshape Documents
Control which fields appear in output:
local projected = collection:Aggregate({
{
["$project"] = {
username = 1, -- Include
level = 1,
score = 1,
_id = 0, -- Exclude
-- Computed field
scorePerLevel = {
["$divide"] = { "$score", "$level" }
}
}
}
})
$unwind - Deconstruct Arrays
Flatten array fields:
-- If players have inventory arrays
local items = collection:Aggregate({
{ ["$unwind"] = "$inventory" },
{
["$group"] = {
_id = "$inventory.type",
count = { ["$sum"] = 1 }
}
}
})
$lookup - Join Collections
Join data from another collection:
local withGuild = collection:Aggregate({
{
["$lookup"] = {
from = "guilds",
localField = "guild_id",
foreignField = "_id",
as = "guild"
}
},
{ ["$unwind"] = "$guild" } -- Flatten the array
})
$bucket - Group into Ranges
local levelDist = collection:Aggregate({
{
["$bucket"] = {
groupBy = "$level",
boundaries = { 0, 10, 20, 30, 40, 50 },
default = "50+",
output = {
count = { ["$sum"] = 1 },
avgCredits = { ["$avg"] = "$credits" }
}
}
}
})
for _, bucket in ipairs(levelDist) do
print(string.format("Level %s: %d players",
tostring(bucket._id), bucket.count))
end
Accumulator Operators
Use these in $group stages:
| Operator | Description | Example |
|---|---|---|
$sum | Sum values | { ["$sum"] = "$credits" } |
$avg | Average | { ["$avg"] = "$level" } |
$min | Minimum | { ["$min"] = "$score" } |
$max | Maximum | { ["$max"] = "$score" } |
$first | First value | { ["$first"] = "$name" } |
$last | Last value | { ["$last"] = "$name" } |
$push | Array of values | { ["$push"] = "$name" } |
$addToSet | Unique array | { ["$addToSet"] = "$class" } |
Complete Examples
Player Statistics Dashboard
function GetServerStats()
local players = db:Collection("players")
local stats = players:Aggregate({
-- Filter active players
{
["$match"] = {
banned = { ["$ne"] = true }
}
},
-- Calculate statistics
{
["$group"] = {
_id = nil,
totalPlayers = { ["$sum"] = 1 },
avgLevel = { ["$avg"] = "$level" },
avgCredits = { ["$avg"] = "$credits" },
totalCredits = { ["$sum"] = "$credits" },
maxLevel = { ["$max"] = "$level" },
activeToday = {
["$sum"] = {
["$cond"] = {
["if"] = {
["$gte"] = { "$last_login", os.time() - 86400 }
},
["then"] = 1,
["else"] = 0
}
}
}
}
}
})
return stats[1]
end
local stats = GetServerStats()
print("Total Players:", stats.totalPlayers)
print("Average Level:", string.format("%.1f", stats.avgLevel))
print("Active Today:", stats.activeToday)
Class Distribution
function GetClassDistribution()
local players = db:Collection("players")
return players:Aggregate({
{ ["$match"] = { banned = false } },
{
["$group"] = {
_id = "$class",
count = { ["$sum"] = 1 },
avgLevel = { ["$avg"] = "$level" },
players = { ["$push"] = "$username" }
}
},
{ ["$sort"] = { count = -1 } }
})
end
local classes = GetClassDistribution()
for _, class in ipairs(classes) do
print(string.format("%s: %d players (avg level %.1f)",
class._id, class.count, class.avgLevel))
end
Top Players Leaderboard
function GetLeaderboard(limit)
local players = db:Collection("players")
return players:Aggregate({
-- Filter out banned players
{
["$match"] = {
banned = { ["$ne"] = true }
}
},
-- Sort by score
{
["$sort"] = { score = -1 }
},
-- Limit results
{
["$limit"] = limit or 10
},
-- Project only needed fields
{
["$project"] = {
username = 1,
score = 1,
level = 1,
class = 1,
_id = 0
}
}
})
end
local top10 = GetLeaderboard(10)
print("=== Top 10 Players ===")
for i, player in ipairs(top10) do
print(string.format("%d. %s - %d points (Level %d %s)",
i, player.username, player.score, player.level, player.class))
end
Time-Based Analytics
function GetDailyActivity(days)
local logs = db:Collection("login_logs")
return logs:Aggregate({
{
["$match"] = {
timestamp = { ["$gte"] = os.time() - (days * 86400) }
}
},
{
["$group"] = {
_id = {
["$floor"] = {
["$divide"] = { "$timestamp", 86400 }
}
},
logins = { ["$sum"] = 1 },
uniquePlayers = { ["$addToSet"] = "$steamid" }
}
},
{
["$project"] = {
day = "$_id",
logins = 1,
uniqueCount = { ["$size"] = "$uniquePlayers" }
}
},
{ ["$sort"] = { day = 1 } }
})
end
Async Aggregation
collection:AggregateAsync({
{ ["$group"] = { _id = "$class", count = { ["$sum"] = 1 } } }
}, function(err, results)
if err then
print("Aggregation error:", err)
return
end
for _, result in ipairs(results) do
print(result._id, ":", result.count)
end
end)
Performance Tips
- Use $match early: Filter documents before expensive operations
- Use indexes: Ensure $match and $sort use indexed fields
- Limit fields: Use $project to reduce data size
- Consider $limit: Don't process more data than needed
-- Good: Filter first
{
{ ["$match"] = { active = true } },
{ ["$group"] = { ... } }
}
-- Bad: Group all then filter
{
{ ["$group"] = { ... } },
{ ["$match"] = { count = { ["$gt"] = 10 } } }
}
Next Steps
- Index Management - Optimize aggregation performance
- Examples - More aggregation examples