Advanced Features

Aggregation Pipelines

Analyze and transform data with MongoDB aggregation

Aggregation Pipelines

Aggregation pipelines are MongoDB's framework for data aggregation and transformation.

What is Aggregation?

Aggregation operations process data records and return computed results. Think of it as a data processing pipeline where documents enter, pass through stages, and emerge transformed.

Documents → [Stage 1] → [Stage 2] → [Stage 3] → Results

Basic Syntax

local results = collection:Aggregate(pipeline)

Parameters

  • pipeline (table): Array of aggregation stages

Returns

  • table: Array of result documents
  • nil: On failure

Aggregation Stages

$match - Filter Documents

Filter documents early in the pipeline for efficiency:

local results = collection:Aggregate({
    {
        ["$match"] = {
            level = { ["$gte"] = 10 },
            banned = { ["$ne"] = true }
        }
    }
})

$group - Group Documents

Group documents by a field and perform calculations:

-- Group by class
local byClass = collection:Aggregate({
    {
        ["$group"] = {
            _id = "$class",
            count = { ["$sum"] = 1 },
            avgLevel = { ["$avg"] = "$level" },
            totalCredits = { ["$sum"] = "$credits" },
            maxLevel = { ["$max"] = "$level" },
            minLevel = { ["$min"] = "$level" }
        }
    }
})

for _, group in ipairs(byClass) do
    print(string.format("%s: %d players, avg level %.1f",
        group._id, group.count, group.avgLevel))
end

Use _id = nil (or omit) to aggregate across all documents without grouping.

$sort - Sort Results

local sorted = collection:Aggregate({
    { ["$match"] = { banned = false } },
    { ["$sort"] = { score = -1, level = -1 } },  -- -1 = descending
    { ["$limit"] = 10 }
})

$limit - Limit Results

local top10 = collection:Aggregate({
    { ["$sort"] = { score = -1 } },
    { ["$limit"] = 10 }
})

$skip - Skip Results

-- Pagination: page 2 with 10 per page
local page2 = collection:Aggregate({
    { ["$sort"] = { score = -1 } },
    { ["$skip"] = 10 },
    { ["$limit"] = 10 }
})

$project - Reshape Documents

Control which fields appear in output:

local projected = collection:Aggregate({
    {
        ["$project"] = {
            username = 1,        -- Include
            level = 1,
            score = 1,
            _id = 0,            -- Exclude
            -- Computed field
            scorePerLevel = {
                ["$divide"] = { "$score", "$level" }
            }
        }
    }
})

$unwind - Deconstruct Arrays

Flatten array fields:

-- If players have inventory arrays
local items = collection:Aggregate({
    { ["$unwind"] = "$inventory" },
    {
        ["$group"] = {
            _id = "$inventory.type",
            count = { ["$sum"] = 1 }
        }
    }
})

$lookup - Join Collections

Join data from another collection:

local withGuild = collection:Aggregate({
    {
        ["$lookup"] = {
            from = "guilds",
            localField = "guild_id",
            foreignField = "_id",
            as = "guild"
        }
    },
    { ["$unwind"] = "$guild" }  -- Flatten the array
})

$bucket - Group into Ranges

local levelDist = collection:Aggregate({
    {
        ["$bucket"] = {
            groupBy = "$level",
            boundaries = { 0, 10, 20, 30, 40, 50 },
            default = "50+",
            output = {
                count = { ["$sum"] = 1 },
                avgCredits = { ["$avg"] = "$credits" }
            }
        }
    }
})

for _, bucket in ipairs(levelDist) do
    print(string.format("Level %s: %d players", 
        tostring(bucket._id), bucket.count))
end

Accumulator Operators

Use these in $group stages:

OperatorDescriptionExample
$sumSum values{ ["$sum"] = "$credits" }
$avgAverage{ ["$avg"] = "$level" }
$minMinimum{ ["$min"] = "$score" }
$maxMaximum{ ["$max"] = "$score" }
$firstFirst value{ ["$first"] = "$name" }
$lastLast value{ ["$last"] = "$name" }
$pushArray of values{ ["$push"] = "$name" }
$addToSetUnique array{ ["$addToSet"] = "$class" }

Complete Examples

Player Statistics Dashboard

function GetServerStats()
    local players = db:Collection("players")

    local stats = players:Aggregate({
        -- Filter active players
        {
            ["$match"] = {
                banned = { ["$ne"] = true }
            }
        },
        -- Calculate statistics
        {
            ["$group"] = {
                _id = nil,
                totalPlayers = { ["$sum"] = 1 },
                avgLevel = { ["$avg"] = "$level" },
                avgCredits = { ["$avg"] = "$credits" },
                totalCredits = { ["$sum"] = "$credits" },
                maxLevel = { ["$max"] = "$level" },
                activeToday = {
                    ["$sum"] = {
                        ["$cond"] = {
                            ["if"] = {
                                ["$gte"] = { "$last_login", os.time() - 86400 }
                            },
                            ["then"] = 1,
                            ["else"] = 0
                        }
                    }
                }
            }
        }
    })

    return stats[1]
end

local stats = GetServerStats()
print("Total Players:", stats.totalPlayers)
print("Average Level:", string.format("%.1f", stats.avgLevel))
print("Active Today:", stats.activeToday)

Class Distribution

function GetClassDistribution()
    local players = db:Collection("players")

    return players:Aggregate({
        { ["$match"] = { banned = false } },
        {
            ["$group"] = {
                _id = "$class",
                count = { ["$sum"] = 1 },
                avgLevel = { ["$avg"] = "$level" },
                players = { ["$push"] = "$username" }
            }
        },
        { ["$sort"] = { count = -1 } }
    })
end

local classes = GetClassDistribution()
for _, class in ipairs(classes) do
    print(string.format("%s: %d players (avg level %.1f)",
        class._id, class.count, class.avgLevel))
end

Top Players Leaderboard

function GetLeaderboard(limit)
    local players = db:Collection("players")

    return players:Aggregate({
        -- Filter out banned players
        {
            ["$match"] = {
                banned = { ["$ne"] = true }
            }
        },
        -- Sort by score
        {
            ["$sort"] = { score = -1 }
        },
        -- Limit results
        {
            ["$limit"] = limit or 10
        },
        -- Project only needed fields
        {
            ["$project"] = {
                username = 1,
                score = 1,
                level = 1,
                class = 1,
                _id = 0
            }
        }
    })
end

local top10 = GetLeaderboard(10)
print("=== Top 10 Players ===")
for i, player in ipairs(top10) do
    print(string.format("%d. %s - %d points (Level %d %s)",
        i, player.username, player.score, player.level, player.class))
end

Time-Based Analytics

function GetDailyActivity(days)
    local logs = db:Collection("login_logs")

    return logs:Aggregate({
        {
            ["$match"] = {
                timestamp = { ["$gte"] = os.time() - (days * 86400) }
            }
        },
        {
            ["$group"] = {
                _id = {
                    ["$floor"] = {
                        ["$divide"] = { "$timestamp", 86400 }
                    }
                },
                logins = { ["$sum"] = 1 },
                uniquePlayers = { ["$addToSet"] = "$steamid" }
            }
        },
        {
            ["$project"] = {
                day = "$_id",
                logins = 1,
                uniqueCount = { ["$size"] = "$uniquePlayers" }
            }
        },
        { ["$sort"] = { day = 1 } }
    })
end

Async Aggregation

collection:AggregateAsync({
    { ["$group"] = { _id = "$class", count = { ["$sum"] = 1 } } }
}, function(err, results)
    if err then
        print("Aggregation error:", err)
        return
    end

    for _, result in ipairs(results) do
        print(result._id, ":", result.count)
    end
end)

Performance Tips

  1. Use $match early: Filter documents before expensive operations
  2. Use indexes: Ensure $match and $sort use indexed fields
  3. Limit fields: Use $project to reduce data size
  4. Consider $limit: Don't process more data than needed
-- Good: Filter first
{
    { ["$match"] = { active = true } },
    { ["$group"] = { ... } }
}

-- Bad: Group all then filter
{
    { ["$group"] = { ... } },
    { ["$match"] = { count = { ["$gt"] = 10 } } }
}

Next Steps