Menangani CPU Intensive Task di Node.js dengan RabbitMQ

31 May 2024
·
7 min read
Menangani CPU Intensive Task di Node.js dengan RabbitMQ

Latar Belakang

Javascript secara umum bisa dibilang cukup bagus dalam menangani operasi yang bersifat I/O (interaksi ke disk/network) misalnya HTTP server, mengakses database, membaca atau menulis file.

Tetapi, Javascript relatif kurang bagus untuk menangani operasi CPU yang intensif, hal ini karena ketika kita menjalankan operasi CPU yang intensive, operasi tersebut akan membuat event loop sibuk dan tidak bisa memproses task yang lain, selain itu by default Javascript/Node.js akan menggunakan 1 core saja untuk 1 instance/process.

Sebagai contoh, saat kita menjalankan sebuah http server, request baru tidak bisa dilayani selama operasi CPU intensive tersebut berlangsung.

Perhatikan contoh kode berikut.

const express = require("express");
const app = express();
const port = 5000;
 
function generateReport() {
  const endTime = new Date().getTime() + 30 * 1000; // 30 seconds
 
  //cpu intensive task in 30 seconds
  while (new Date().getTime() < endTime) {
    console.log("processing...");
  }
}
 
app.get("/home", (req, res) => {
  res.send("Hello World");
});
 
app.get("/report", (req, res) => {
  generateReport();
  res.send("done");
});
``;
app.listen(port, () => {
  console.log(`Example app listening on port ${port}`);
});

Dari kode diatas, kita bisa melihat ada 2 api endpoint yang tersedia yaitu /home dan /report. jika kita membuat request ke /report dimana api tersebut akan menjalan proses CPU intensive selama 30 detik, hal itu akan membuat endpoint http lain semisal /home tidak bisa melayani request sementara sampai task tadi selesai.

Lalu bagaimana caranya untuk mengatasi hal tersebut ? secara umum kita mempunya 2 opsi berikut.

  1. Menggunakan worker thread, dimana Node.js akan mengeksekusi sebuah task di thread yang berbeda.
  2. Membuat service queueu/antrian, di mana suatu message atau data akan dikirim melalui message broker (misalnya RabbitMQ) yang kemudian akan diproses oleh suatu service secara bergantian sehingga tidak mengganggu service lain.

Di artikel kali ini kita akan menggunakan solusi yang kedua.

Gambaran Arsitektur

Untuk lebih jelasnya berikut gambaran arsitektur solusi yang akan kita pakai.

Worker Pool

Dari gambaran arsitektur di atas bisa dijelaskan beberapa point berikut.

  • Sebuah task akan dikirim ke sebuah worker dengan mekanisme FIFO (first in first out).
  • By default sebuah task hanya dikerjakan sekali oleh suatu consumer/worker.
  • Kita bisa mempunyai lebih dari 1 instance worker, baik diserver yang sama ataupun berbeda.
  • Kita juga bisa membuat worker dengan bahasa lain selama bisa terkoneksi dengan RabbitMQ.

Pra syarat

Sebelum melanjutkan tutorial ini, pastikan kita telah memenuhi syarat berikut

  • Pengetahuan dasar Node.js dan Express
  • Node.js dan NPM telah terinstall dikomputermu.

Instalasi RabbitMQ

Selain Node.js, kita juga memerlukan RabbitMQ disistem kita. Jika kamu mempunyai docker yang sudah terinstall, kita bisa menggunakan perintah berikut untuk menjalankan program RabbitMQ di port 5672 dan 15672 untuk web management.

docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Jika tidak menggunakan docker kamu bisa menggunakan beberapa pilihan instalasi di dokumentasi resmi RabbitMQ melalui link Berikut.

Membuat Project Node.js

Pertama buat folder baru misalnya "nodejs-rabbitmq", kemudian didalam folder tersebut jalankan perintah berikut

npm init -y

selanjutnya kita install express sebagai HTTP server dan amqplib yang berfungsi sebagai driver untuk membuat koneksi dengan RabbitMQ.

npm install express amqplib --save

setelah itu kita buat file server.js untuk membuat http server dan rabbitmq.js untuk membuat koneksi dengan RabbitMQ.

server.js

const express = require("express");
const app = express();
const port = 5000;
 
app.get("/", (req, res) => {
  res.send("Hello World");
});
 
app.listen(port, () => {
  console.log(`Example app listening on port ${port}`);
});

rabbitmq.js

const amqp = require("amqplib");
 
let connection = null;
 
async function connect() {
  if (connection === null) {
    connection = await amqp.connect({
      protocol: "amqp",
      hostname: "localhost",
      port: 5672,
      username: "guest",
      password: "guest",
    });
    console.log("connected to rabbitmq")
  }
 
  return connection;
}
 
module.exports = connect;

untuk memastikan http server berjalan baik, kita bisa coba pastikan dengan command berikut.

node server.js

Kemudian kunjungi URL berikut http://localhost:5000, jika sukses kita bisa melanjutkan ke langkah berikutnya.

Publish sebuah message ke Queue/Antrian

Untuk mempublish sebuah message ke queue, pertama kita coba tambahkan kode berikut di endpoint "/generate-report" di server.js yang berfungsi sebagai pemicu untuk mempublish message ke queue RabbitMQ.

const express = require("express");
const app = express();
const port = 5000;
//import rabbitmq
const rabbitmq = require("./rabbitmq");
 
app.get("/home", (req, res) => {
  res.send("Hello World");
});
 
app.get("/generate-report", async (req, res) => {
  try {
    // ambil koneksi RabbitMQ
    const connection = await rabbitmq();
    // Buat channel
    const channel = await connection.createChannel();
    const queueName = "generate-report";
    // Create queue dengan nama 'generate-report' dan opsi durable aktif
    await channel.assertQueue(queueName, { durable: true });
 
    // Kirim sebuah message ke queue 'generate-report' dengan opsi persisten aktif
    const message = JSON.stringify({ month: "01" });
    channel.sendToQueue(queueName, Buffer.from(message), {
      persistent: true,
    });
 
    return res.send({ message: "Success" });
  } catch (error) {
    console.error(error);
    return res.send({ message: error.message });
  }
});
 
app.listen(port, () => {
  console.log(`Example app listening on port ${port}`);
});

Untuk membuat sebuah message tidak hilang ketika RabbitMQ terhenti kita perlu menambahkan opsi durable pada queue dan opsi persistent pada method sendToQueue ketika mengirim message.

Membuat Consumer/Worker untuk mengkonsumsi sebuah messaga dari Queue

Setelah membuat kode yang berfungsi untuk mengirim message ke Queue, selanjutnya kita perlu membuat worker yang akan standby menerima message yang akan kemudian akan diproses.

Buat file bernama report-worker.js dengan kode sebagai berikut.

const rabbitmq = require("./rabbitmq");
 
(async () => {
  try {
   
    const connection = await rabbitmq();
    const channel = await connection.createChannel();
    const queueName = "generate-report";
  
    await channel.assertQueue(queueName, { durable: true });
    
    channel.consume(
      queueName,
      (message) => {
        if (message) {
          const data = JSON.parse(msg.content.toString());
          console.log("Received");
          console.log(data);
          //tandai sebuah message sudah diterima dengan baik
          channel.ack(msg)
        }
      },
      { noAck: false }
    );
 
    console.log("Waiting for messages. To exit press CTRL+C");
  } catch (err) {
    console.warn(err);
  }
})();
 

Dari kode diatas kita bisa melihat ada opsi noAck,opsi tersebut berfungsi untuk menentukan apakah message yang dikonsumsi perlu dikonfirmasi secara manual atau tidak.

  • jika noAck = false, message yang diterima dari queueu harus dikonfirmasi secara eksplisit oleh consumer menggunakan metode channel.ack(msg). Jika tidak dikonfirmasi, message tersebut akan tetap berada di queue dan akan dikirim ulang ke consumer lain jika koneksi consumer saat ini terputus atau mengalami kesalahan. Hal ini agar memastikan message tidak hilang dan bisa di proses dengan benar.

  • Jika noAck = true, message yang diterima dari queue dianggap sudah dikonfirmasi secara otomatis begitu diterima oleh consumer, consumer tidak perlu memanggil channel.ack(msg). Ini dapat meningkatkan performa dalam situasi di mana konfirmasi manual tidak diperlukan, tetapi memiliki risiko bahwa message bisa hilang jika consumer gagal memprosesnya setelah diterima.

Untuk menjalankan worker tersebut kita bisa menggunakan command berikut

node report-worker.js

Setelah command tersebut dijalankan, worker tersebut akan standby menunggu message yang masuk. Selain itu kita juga bisa menjalankan instance dari worker tersebut lebih dari sekali baik di server yang sama ataupun server yang berbeda.

Sebagai contoh, jika kita mempunyai 2 server yang masing-masing mempunya 4 core, kita bisa saja menjalankan 4 worker atau lebih di masing-masing server tersebut, Hal ini sangat berguna jika aplikasi yang kita buat berpotensi mempunyai banyak background proses.

Tidak hanya worker untuk satu fitur saja, kita juga bisa membuat worker untuk fitur lain, selama nama queue yang dibuat berbeda dengan yang sudah ada.

Kesimpulan

Pada artikel ini, kita telah membahas bagaimana menangani operasi CPU intensif pada aplikasi Node.js menggunakan worker queue yang diimplementasikan dengan RabbitMQ.

  1. Mengapa Node.js kurang cocok untuk operasi CPU intensif.
  2. Instalasi RabbitMQ.
  3. Cara mengirimkan message ke queue RabbitMQ dari endpoint HTTP.
  4. Cara membuat worker/consumer yang akan memproses message dari queue RabbitMQ.
  5. Fungsi acknowledgment (ack) di RabbitMQ.

Dengan menggunakan worker queue, kita dapat memproses tugas-tugas CPU intensif di background tanpa mengganggu kinerja HTTP server. Hal ini membuat aplikasi kita tetap responsif dan scalable.

Implementasi ini juga memungkinkan kita untuk memanfaatkan arsitektur distributed, di mana kita bisa menjalankan banyak worker di server yang terpisah dengan mudah.

Semoga bermanfaat.