ETC

[RabbitMQ] Exchange Type

dev_minpark 2024. 8. 1. 12:06

RabbitMQ 에서 Exchange 는 메시지를 큐로 라우팅하는 핵심 컴포넌트

메시지가 Exchange 로 보내지면 미리 정의된 규칙에 따라 메시지를 적절한 큐에 전달한다.

  • Direct
    • 메시지를 라우팅 키에 따라 정확하게 일치하는 큐로 전달
    • 특정한 수신자가 있는 메시지 전달에 유용
    • ex : 로그 시스템에서 error 레벨의 로그 메시지는 error.queue 에, info 레벨의 로그 메시지는 info.queue에 전달
# Binding example
- Queue: error.queue, Binding Key: error
- Queue: info.queue, Binding Key: info
  • Topic
    • 메시지를 라우팅 키에 기반한 패턴 매칭을 통해 큐로 전달
    • '*' 단일 단어, '#' 0개 이상의 단어 패턴
    • 다양한 조건에 따라 메시지를 구분하고 전달해야 할 때 유용
    • ex : 뉴스 피드나 복잡한 라우팅 요구사항이 있는 경우 사용
# Binding example
- Queue: usa.news.queue, Binding Key: usa.news.*
- Queue: europe.sports.queue, Binding Key: europe.sports.*
  • Fanout
    • 라우팅 키를 무시하고, 바인딩된 모든 큐에 메시지를 브로드캐스트
    • 모든 수신자가 메시지를 받아야하는 경우에 유용
    • ex : 시스템 상태 변경 이벤트, 관련 서비스가 구독할 때
# Binding example
- Queue: service1.queue
- Queue: service2.queue
- Queue: service3.queue
  • Headers
    • 라우팅 키 대신 메시지 헤더의 속성에 기반하여 큐로 메시지를 전달
    • 복잡한 라우팅 로직이 필요할 경우 유용, 메시지 속성에 따라 더욱 세밀하게 제어가 필요할 때 주로 사용
    • ex : 이메일 시스템에서 중요도나 형식에따라 이메일을 다른 큐로 전달할 때 사용
# Binding example
- Queue: important.queue, Headers: { "importance": "high" }
- Queue: text.queue, Headers: { "format": "text" }
- Queue: html.queue, Headers: { "format": "html" }

 

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

// RabbitMQ Config (Queue, Exchange, Binding Bean 생성)
@Configuration
public class RabbitMQConfig {

    // Direct Exchange example
    @Bean
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("directExchange").durable(true).build();
    }

    @Bean
    public Queue directQueue() {
        return new Queue("direct.queue", true);
    }

    @Bean
    public Binding directBinding(Queue directQueue, Exchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("direct.key").noargs();
    }

    // Topic Exchange example
    @Bean
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange("topicExchange").durable(true).build();
    }

    @Bean
    public Queue topicQueue() {
        return new Queue("topic.queue", true);
    }

    @Bean
    public Binding topicBinding(Queue topicQueue, Exchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.*").noargs();
    }

    // Fanout Exchange example
    @Bean
    public Exchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("fanoutExchange").durable(true).build();
    }

    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1", true);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2", true);
    }

    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, Exchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange).with("").noargs();
    }

    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, Exchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange).with("").noargs();
    }

    // Headers Exchange example (optional)
    @Bean
    public Exchange headersExchange() {
        return ExchangeBuilder.headersExchange("headersExchange").durable(true).build();
    }

    @Bean
    public Queue headersQueue() {
        return new Queue("headers.queue", true);
    }

    @Bean
    public Binding headersBinding(Queue headersQueue, Exchange headersExchange) {
        return BindingBuilder.bind(headersQueue).to(headersExchange).where("format").matches("json").and("importance").matches("high").noargs();
    }
}

 

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

// 메시지 발행 서비스
@Service
public class MessagePublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // Direct Exchange message publish
    public void sendToDirectExchange(String message) {
        rabbitTemplate.convertAndSend("directExchange", "direct.key", message);
    }

    // Topic Exchange message publish
    public void sendToTopicExchange(String message, String routingKey) {
        rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
    }

    // Fanout Exchange message publish
    public void sendToFanoutExchange(String message) {
        rabbitTemplate.convertAndSend("fanoutExchange", "", message);
    }

    // Headers Exchange message publish
    public void sendToHeadersExchange(String message) {
        rabbitTemplate.convertAndSend("headersExchange", "", message, msg -> {
            msg.getMessageProperties().setHeader("format", "json");
            msg.getMessageProperties().setHeader("importance", "high");
            return msg;
        });
    }
}

 

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

// 메시지 수신 서비스
@Service
public class MessageListener {

    @RabbitListener(queues = "direct.queue")
    public void handleDirectMessage(String message) {
        System.out.println("Received message from direct.queue: " + message);
    }

    @RabbitListener(queues = "topic.queue")
    public void handleTopicMessage(String message) {
        System.out.println("Received message from topic.queue: " + message);
    }

    @RabbitListener(queues = {"fanout.queue1", "fanout.queue2"})
    public void handleFanoutMessage(String message) {
        System.out.println("Received message from fanout queue: " + message);
    }

    @RabbitListener(queues = "headers.queue")
    public void handleHeadersMessage(String message) {
        System.out.println("Received message from headers.queue: " + message);
    }
}

'ETC' 카테고리의 다른 글

[Helm] Chart 기본 명령어  (0) 2024.12.31
[Hey] HTTP 로드 테스트 도구  (0) 2024.12.13
VertexAI API 사용하기 위해 GoogleCloud 설정  (1) 2024.10.11
[Intellij] 환경 변수 로컬 세팅  (0) 2024.07.17