BullMQ
BullMQ는 Redis를 기반으로 하고, kafka나 RabbitMQ보다 훨씬 가볍고 편리한 Message queue입니다. 현재 기준 Github Star 11.8K 정도 되고, 거의 매일같이 커밋이 올라오는 뜨거운 라이브러리입니다.
제가 BullMQ를 도입하게 된 이유는 잔량을 파악해서 순차적으로 작업을 실행해야 하는 일이 있었는데, 테이블 구조가 DB 락(Select... for update)을 걸 수가 없는 상황이었습니다. 그래서, 대기열을 따라서 순차적인 처리를 위해 도입했습니다.
BullMQ를 도입하기 전에 kafka 도입을 검토했었는데, 앱 서버만해도 2개가 추가되고 (Producer, Consumer)랑 또 추가로 Zookeeper, kafka 등 띄울게 너무 많고 무거웠어요. 그래서, 한 가지 작업만을 수행하기 위해 Kafka를 도입하기에는 비적합하다고 생각해서 BullMQ를 도입했습니다 !
github: https://github.com/OptimalBits/bull
npm: https://www.npmjs.com/package/bull
설치
npm install bull --save
bull 라이브러리를 설치합니다.
npm install @types/bull --save-dev
Typescript를 사용하고 있따면, 타입스크립트용 라이브러리도 설치합니다 !
구현
1. Producer
import Bull from "bull";
const createBookingQueue = new Bull("booking");
먼저 Bull 라이브러리를 import합니다..! 그리고, Bull 객체를 생성하는데, 이 때 이름은 각 작업마다 다르게 적어주면 됩니다. 그러면, Redis에 넣거나 꺼낼 떄 key값으로 bull:booking:id 이런 식으로 저장되기 때문에, 여러 개의 작업을 분리할 수 있습니다..!
create = async (createBookingDto: CreateBookingDto): Promise<void> => {
try {
await createBookingQueue.add(createBookingDto);
} catch(error) {}
};
그리고, 작업마다 하나씩 Bull MQ에 메시지로 전달해줍니다 !
보통 MQ를 사용하면 요청은 성공했지만, 대기열이 진행되어야 하기 때문에 바로 응답을 줄 수 없다는 뜻에서 202 응답을 보냅니다.
만약 그게 아니라, 대기열을 기다렸다가 어떻게든 응답을 보내고 싶다고 한다면, 아래와 같이 Consumer가 결과를 던져주는 값을 받아서 response 할 수 있습니다.
const job = await createBookingQueue.add(createBookingDto);
const result = job.finished();
2. Consumer
import Bull from "bull";
const createBookingQueue = new Bull("booking");
// new Bull("booking", {defaultJobOptions: {removeOnComplete: true, removeOnFail: true} });
Producer와 동일한 이름의 Bull queue를 생성합니다. 주석 부분을 보시면, defaultJobOptions에 removeOnComplete와 removeOnFail이 있는데 각각 성공된 Job(작업)과 실패된 Job(작업)을 지울 지 여부를 지정할 수 있습니다.
createBookingQueue.process(async function (job, done) {
const booking: Booking = await Booking.create(job.data);
done();
});
큐객체.process를 사용하면, redis에 bull:booking:id이 쌓일 때마다 꺼내서 지정한 동작을 수행할 수 있습니다.
done(new Error('not found')); 같이 done에 파라미터를 하나 넘기면 Producer에게 에러를 알려줄 수도 있습니다.
done(null, booking); 파라미터를 두개 넘기면 Producer에게 결과를 전송해줄 수 있습니다. 만약 producer가 앞서 말씀드렸던 job을 구해와서 finished() 메서드를 통해 결과를 받을 수 있습니다.
createBookingQueue.on('completed', function (job, result) {
// Job이 성공했을 때의 처리 로직
})
bull.on('completed', .... ) {} 을 사용하면 Job이 성공했을 때마다 특정한 알고리즘을 실행시킬 수도 있습니다 !
3. redis 설정
만약 redis에 비밀번호가 걸려있거나, 포트가 기본 포트와 다르다면 Bull에게 알려줘야합니다. 그럴 땐 Bull을 생성할 떄 포트와 ip, password를 지정해야 합니다.
import Bull from "bull";
// const createBookingQueue = new Bull("booking");
const createBookingQueue = new Bull("booking", {
redis: {
port: 6379,
host: '127.0.0.1',
password: 'password'
}
});
물론, dotenv를 사용하고 있다면, port: process.env.REDISPORT 처럼 지정하시면 됩니다.
참고
Queue가 하나밖에 안돌아가면 대기열에 병목현상이 일어나지 않나요?
네. 큐를 여러개 만드시면 됩니다. 서비스별로 큐를 분리하거나, 해시를 해서 3개의 큐, 5개의 큐 등으로 파티셔닝할 수도 있습니다. Bull Queue는 가볍고 자유롭기 떄문에 원하는 대로 가공해서 쓰실 수 있습니다 !
Consumer 서버를 따로 띄워야 하나요?
서버가 1대라면, Producer와 Consumer를 따로 띄우지 않아도 전혀 문제 되지 않습니다. 하지만 만약 다수의 앱(Node) 서버를 사용하고 있고, 이를 로드 밸런싱 해서 사용하고 있다면 Consumer는 따로 띄워야 합니다. 동시성 제어를 위해서 Queue를 도입했는데, Queue 프로세스가 각각 다른 앱서버에서 각각 돌아간다면 순서가 엉켜버릴 수 있기 때문입니다.
감사합니다.
'Server > Node.js' 카테고리의 다른 글
NodeJS - 전역 예외 핸들러 구현 (Custom global exception handler) (0) | 2021.12.29 |
---|---|
NodeJS - API Response를 snake case로 변환하기(DTO, Entity 필드는 camel case) (0) | 2021.12.27 |
NodeJS - 모델매퍼(ModelMapper) 사용 [Class transformer] (0) | 2021.12.24 |
NodeJS - JSON null필드를 Response에서 생략하는 방법! (0) | 2021.12.21 |
Sequelize - 자식 테이블까지 한 번에 Insert 치는 방법 (Join Insert) + 상황에 따라 다른 쿼리를 날리는 방법 (0) | 2021.12.19 |