Optimizing Report Generation with Redis Task Queues and Caching
Written on
In our previous post, we explored the fundamentals of creating a basic Redis Message Queue and the process for running a worker to handle tasks. Today, we will delve into applying this effective design to a real-world application.
Consider a real-time reporting system: every time a user requests a report, they face long waits due to extensive data processing and calculations. Users are stuck waiting until the report is ready. By integrating the Redis Message Queue, we can effortlessly transition from a synchronous process to an asynchronous one. Users no longer need to remain idle after initiating a report request; they can access the results at lightning speed because the generated reports are stored in the message queue.
Key Concepts
- Redis Task Queue: Utilizes Redis to manage a queue of tasks, where each task corresponds to a report generation request.
- Caching Middleware: This middleware checks for cached results before processing new tasks. If a result exists in the cache, it is returned immediately.
- Proxy Benefit: Workers can execute requests from any specific IP address, providing flexibility in task execution.
- Asynchronous Processing: Tasks are managed in the background, allowing users to engage in other activities while waiting for their reports.
- Seamless Integration: The middleware efficiently adds tasks to the queue and retrieves results from the cache, making the shift from synchronous to asynchronous operations invisible to users.
Implementation Overview
- API Request Handling: Incoming API requests are captured with an endpoint and payload, which are stored in the Redis queue.
- Task Processing: Workers extract tasks from the queue, process them, and store the results back in the queue.
- Middleware: A dedicated middleware handles both caching and task queue management.
Step-by-Step Implementation
1. Install Redis and BullMQ
Begin by ensuring Redis is installed and properly configured. Next, install BullMQ, a Node.js library for queue management.
npm install bullmq ioredis express
2. Set Up the Middleware
Create middleware to manage caching and queuing operations.
const { Queue, Worker, QueueScheduler } = require('bullmq');
const Redis = require('ioredis');
const express = require('express');
var bodyParser = require('body-parser');
const connection = new Redis();
const taskQueue = new Queue('tasks', { connection });
const scheduler = new QueueScheduler('tasks', { connection });
const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
app.use(express.json());
// Results will be stored in memory for this demo, but can be improved with Redis.
const cache = {};
const cacheJobInQueue = {};
const asyncJobList = ['/api/async-print', 'other async jobs ...'];
app.use(async (req, res, next) => {
const endpoint = req.originalUrl;
const { isRun, ...payload } = req.body;
// Skip if not in AsyncJobList
if (!asyncJobList.includes(endpoint)) return next();
const cacheKey = ${endpoint}_${JSON.stringify(payload)};
if (cache[cacheKey]) {
// Result is ready
return res.json({ result: cache[cacheKey], source: 'cache' });
}
if (cacheJobInQueue[cacheKey]) return next(); // Skip if the request is already submitted
const job = await taskQueue.add('task', { endpoint, payload });
req.jobId = job.id;
cacheJobInQueue[cacheKey] = { jobId: req.jobId, status: 'queued' };
next();
});
new Worker('tasks', async job => {
const { endpoint, payload } = job.data;
// Simulate API processing
const result = await syncProcessApiCall(endpoint, payload);
cache[${endpoint}_${JSON.stringify(payload)}] = result;
return result;
}, { connection });
async function syncProcessApiCall(endpoint, payload) {
return axios({
method: 'post',
url: endpoint,
data: { ...payload, isRun: true },
});
}
app.post('/api/async-print', async (req, res) => {
const { isRun, ...payload } = req.body;
if (!isRun) return res.json({ jobId: req.jobId, status: 'queued' });
//...... original sync process ....
return { results: 'reports contents....' };
});
app.get('/api/result/:jobId', async (req, res) => {
const job = await taskQueue.getJob(req.params.jobId);
if (job && job.returnvalue) {
return res.json({ result: job.returnvalue, source: 'worker' });}
res.status(404).json({ error: 'Result not available yet' });
});
const port = 3000;
app.listen(port, () => console.log(Server running on port ${port}));
Benefits of This Approach
- Asynchronous Processing: Users are not hindered while their requests are processed, enhancing user experience.
- Efficient Resource Utilization: Repeated requests for the same information are served from the cache, minimizing server load.
- Scalability: More workers can be added to accommodate increased demand without disrupting user experience.
- Proxy Capability: Workers can execute requests from any IP, allowing better control over network traffic.
This example illustrates a fundamental microservice pattern using BullMQ for queuing, Redis for caching, and Express for a straightforward API to submit jobs and retrieve results. The producer (backend instance) sends tasks to the queue, while the consumer (worker) processes these tasks, caching results in Redis for rapid access. As Redis acts as the central communication hub, both the producer and consumer can be scaled as necessary to manage growing workloads.
In Plain English?
Thank you for being a part of the **In Plain English* community! Before you go:* - Be sure to clap and follow the writer! - Follow us: X | LinkedIn | YouTube | Discord | Newsletter - Check out our other platforms: CoFeed | Differ - More content at PlainEnglish.io