File Uploads, Search, Socket.io Redis Adapter, and Final Production Integration

File uploads, search, and the final integration bring together the most complex multi-layer interactions in the capstone. File uploads involve three layers simultaneously: the Angular drag-and-drop component with progress tracking, the Multer middleware on Express with cloud storage, and the MongoDB task document that stores attachment metadata. Full-text search uses Atlas Search for fuzzy matching and relevance ranking. The final integration wires all the pieces together — authentication, real-time updates, caching, rate limiting, and the Angular application — into a deployable, production-ready whole.

Integration Points Summary

Feature Frontend Backend Infrastructure
File uploads FileUploadComponent (drag-drop, progress) Multer + Cloudinary Cloudinary CDN
Search SearchBarComponent (debounced, live results) Atlas Search $search aggregation MongoDB Atlas Search index
Notifications NotificationBellComponent (unread count badge) Bull email queue + Socket.io emit Redis queue + SMTP
Real-time updates TaskStore socket subscriptions Change Stream → Socket.io broadcast MongoDB replica set
Caching X-Cache response header display Redis getOrSet + tag invalidation Redis
Rate limiting Retry-After toast on 429 Sliding window Lua in Redis Redis
Note: The Change Stream → Socket.io pipeline is the backbone of real-time updates. MongoDB Change Streams fire whenever a task document changes (from any source — API, background job, admin script). The change stream handler broadcasts the change to the relevant workspace’s Socket.io room. Angular clients subscribed to the workspace room receive the update and the TaskStore applies it to the signal. This decoupled architecture means real-time updates work regardless of which server instance handled the write — the change stream runs on one instance, but the Socket.io server uses Redis adapter for cross-instance broadcasting.
Tip: Use the Socket.io Redis adapter (@socket.io/redis-adapter) when running multiple Express instances (e.g. with PM2 cluster or Docker Swarm replicas). Without it, Socket.io rooms are per-instance — if a task is created by a request handled by Instance A, only Instance A’s Socket.io broadcasts the event, missing all clients connected to Instance B. The Redis adapter publishes events to a Redis channel that all instances subscribe to, ensuring cross-instance broadcasting.
Warning: Never deploy the capstone with default or weak secrets. Before the first production deployment, generate all secrets with openssl rand -hex 32: JWT_SECRET, REFRESH_SECRET, and any webhook signing secrets. The .env.example contains placeholder values that are publicly visible in the repository — any deployment using these placeholder values is trivially compromisable. Use a secrets manager (AWS Secrets Manager, HashiCorp Vault, or the hosting platform’s environment variable management) for production credentials.

File Upload and Search Integration

// ── apps/api/src/modules/tasks/task-attachment.controller.js ─────────────
const asyncHandler = require('express-async-handler');
const multer       = require('multer');
const cloudinary   = require('cloudinary').v2;
const crypto       = require('crypto');
const Task         = require('./task.model');
const { NotFoundError } = require('../../errors/app-errors');

cloudinary.config({
    cloud_name: process.env.CLOUDINARY_CLOUD_NAME,
    api_key:    process.env.CLOUDINARY_API_KEY,
    api_secret: process.env.CLOUDINARY_API_SECRET,
});

const ALLOWED_TYPES = [
    'image/jpeg','image/png','image/webp','image/gif',
    'application/pdf','text/plain','text/csv',
];

const upload = multer({
    storage:    multer.memoryStorage(),
    limits:     { fileSize: 10 * 1024 * 1024 },
    fileFilter: (req, file, cb) =>
        cb(null, ALLOWED_TYPES.includes(file.mimetype)),
});

exports.uploadMiddleware = upload.single('file');

exports.upload = asyncHandler(async (req, res) => {
    if (!req.file) return res.status(400).json({ message: 'No valid file provided' });

    const task = await Task.findOne({
        _id:       req.params.id,
        workspace: req.workspace._id,
        deletedAt: { $exists: false },
    });
    if (!task) throw new NotFoundError('Task', req.params.id);
    if (task.attachments.length >= 10) {
        return res.status(400).json({ message: 'Maximum 10 attachments per task' });
    }

    // Upload to Cloudinary
    const uploadResult = await new Promise((resolve, reject) => {
        const stream = cloudinary.uploader.upload_stream(
            {
                folder:       `taskmanager/${req.workspace._id}/${req.params.id}`,
                public_id:    crypto.randomUUID(),
                resource_type:'auto',
            },
            (err, result) => err ? reject(err) : resolve(result)
        );
        stream.end(req.file.buffer);
    });

    const attachment = {
        filename:   req.file.originalname,
        url:        uploadResult.secure_url,
        size:       req.file.size,
        mimeType:   req.file.mimetype,
        uploadedBy: req.user.sub,
    };

    task.attachments.push(attachment);
    await task.save();

    res.status(201).json({ success: true, data: attachment });
});

// ── apps/api/src/modules/search/search.service.js ────────────────────────
const mongoose = require('mongoose');
const Task     = require('../tasks/task.model');

exports.searchTasks = async (workspaceId, query, { page = 1, limit = 20 } = {}) => {
    if (!query?.trim()) return { results: [], total: 0 };

    const workspaceObjectId = new mongoose.Types.ObjectId(workspaceId);

    // Try Atlas Search first; fall back to $text if not available
    try {
        const [result] = await Task.aggregate([
            {
                $search: {
                    index: 'task_search',
                    compound: {
                        must:   [{ text: { query, path: ['title','description','tags'],
                                           fuzzy: { maxEdits: 1, prefixLength: 3 } } }],
                        filter: [{ equals: { path: 'workspace', value: workspaceObjectId } }],
                    },
                },
            },
            { $match: { deletedAt: { $exists: false } } },
            { $facet: {
                results: [
                    { $addFields: { _score: { $meta: 'searchScore' } } },
                    { $project: { title: 1, status: 1, priority: 1, _score: 1, tags: 1 } },
                    { $skip:  (page - 1) * limit },
                    { $limit: limit },
                ],
                total: [{ $count: 'count' }],
            }},
            { $project: {
                results: 1,
                total:   { $ifNull: [{ $arrayElemAt: ['$total.count', 0] }, 0] },
            }},
        ]);
        return { results: result.results, total: result.total };

    } catch {
        // Fallback to native $text search if Atlas Search index not configured
        const filter = {
            workspace: workspaceObjectId,
            deletedAt: { $exists: false },
            $text:     { $search: query },
        };
        const [results, total] = await Promise.all([
            Task.find(filter, { score: { $meta: 'textScore' } })
                .sort({ score: { $meta: 'textScore' } })
                .skip((page - 1) * limit).limit(limit).lean(),
            Task.countDocuments(filter),
        ]);
        return { results, total };
    }
};
// ── Socket.io Redis adapter setup ─────────────────────────────────────────
// apps/api/src/config/socket.js
const { createClient }      = require('redis');
const { createAdapter }     = require('@socket.io/redis-adapter');
const { Server }            = require('socket.io');

async function createSocketServer(httpServer) {
    const io = new Server(httpServer, {
        cors: { origin: process.env.CORS_ORIGINS?.split(','), credentials: true },
    });

    // Redis adapter for multi-instance support
    const pubClient = createClient({ url: process.env.REDIS_URL });
    const subClient = pubClient.duplicate();
    await Promise.all([pubClient.connect(), subClient.connect()]);
    io.adapter(createAdapter(pubClient, subClient));

    return io;
}

// ── Final production deployment checklist ────────────────────────────────
// .github/workflows/deploy.yml trigger on push to main:
//
// 1. CI: lint + typecheck + unit tests + integration tests
// 2. Docker: build API image (tag with git SHA) + build Angular nginx image
// 3. Push to GHCR
// 4. Deploy to staging: docker compose up with new images + smoke test
// 5. Manual approval gate (GitHub Environment protection rule)
// 6. Deploy to production: docker compose up + health check

// ── Angular environment configuration ────────────────────────────────────
// apps/client/src/environments/environment.prod.ts
export const environment = {
    production: true,
    apiUrl:     'https://api.taskmanager.io/api/v1',
    wsUrl:      'wss://api.taskmanager.io',
};

// apps/client/src/environments/environment.ts
export const environment = {
    production: false,
    apiUrl:     'http://localhost:3000/api/v1',
    wsUrl:      'ws://localhost:3000',
};

How It Works

Step 1 — Cloudinary Stream Upload Avoids Double Buffering

Multer stores the file in memory (memoryStorage()). Rather than writing to disk then uploading, cloudinary.uploader.upload_stream() creates a writable stream that pipes directly to the Cloudinary API. The file buffer from Multer is pushed to this stream with stream.end(req.file.buffer). The upload resolves with the Cloudinary response including the CDN URL, which is stored in the task’s attachments array. No temporary files are created.

Step 2 — Atlas Search with $text Fallback Handles Both Configurations

The search service tries Atlas Search first (the preferred, production path with fuzzy matching and relevance scoring) and falls back to MongoDB’s native $text search if the Atlas Search index is not configured. This fallback makes local development easier — Atlas Search requires a replica set and the Atlas Search index to be created — while ensuring production uses the better search experience. The fallback is wrapped in a try/catch so configuration errors are handled gracefully.

Step 3 — Socket.io Redis Adapter Enables Multi-Instance Broadcasting

When multiple server instances are running (PM2 cluster, Docker replicas), a Socket.io room is local to one instance. Without the Redis adapter, Instance A’s change stream broadcasts to Instance A’s connected clients only — clients on Instance B miss the update. The Redis adapter uses Redis pub/sub: when Instance A broadcasts to a room, it publishes to Redis; all instances subscribe and broadcast to their local connections in that room. Every client receives the update regardless of which instance they are connected to.

Step 4 — Environment Files Separate Dev and Production Configuration

Angular’s build system replaces environment.ts with environment.prod.ts when building with --configuration production. This means the same codebase imports environment.apiUrl everywhere — in development it resolves to http://localhost:3000, in production to https://api.taskmanager.io. No conditional logic, no if (isDev) branches, no hardcoded URLs in components. The Angular CLI handles the substitution at build time.

Step 5 — The Complete Stack Is Production-Ready

Every layer of the Task Manager applies the patterns covered across the course: bcrypt + JWT auth (Ch17), Mongoose schemas with indexes (Ch5–9), Express middleware stack (Ch3–7), Angular reactive forms and signals (Ch10–15), Socket.io real-time (Ch18), Redis caching and rate limiting (Ch5), MongoDB aggregation for dashboards (Ch13), Atlas Search for search (Ch13), file uploads to Cloudinary (Ch18), Bull email queue (Ch5), Docker containerisation (Ch20), GitHub Actions CI/CD (Ch21), and Prometheus monitoring (Ch22). The capstone is not a new application — it is the sum of everything in the course.

Quick Reference

Task Code / Config
Cloudinary stream upload cloudinary.uploader.upload_stream(opts, cb).end(buffer)
Atlas Search with fallback try $search aggregation, catch → $text query
Socket.io Redis adapter io.adapter(createAdapter(pubClient, subClient))
Angular environment environment.apiUrl → replaced at build time per config
Production build ng build --configuration production
Generate secrets openssl rand -hex 32
Multi-instance WebSocket @socket.io/redis-adapter with Redis pub/sub
Attachment limit Check task.attachments.length >= 10 before upload

🧠 Test Yourself

The Task Manager runs on two server instances. A task is created via Instance A. A client connected to Instance B does not receive the real-time update. What is missing and what does it fix?