본문 바로가기

spring

spring rabbitmq 무작정 따라하기

목표

  1. RabbitMQ server 구성
  2. consumer(receiver) 구현
  3. producer(sender) 구현
  4. 메시지 발행 및 수신 테스트

풀어서 설명하자면, rabbitmq 라는 메시지 브로커 서버가 존재하면, consumer는 어떤 메시지를 받을 것인지 구현하고, producer는 특정 메시지를 발행하는 것이다.

RabbitMQ server 구성

macOS 환경에서 간단하게 homebrew를 통해 설치하였다. `brew info rabbitmq` 명령어를 통해 확인하면 Dependencies > required erlang 이라고 나오는데, 윈도우에서 설치할 경우 erlang을 버전에 맞게 설치해주어야 한다. homebrew를 통해 설치하였으므로 erlang도 자동으로 설치된다.

rabbitmq 공식 docs에서 권장하는 바대로 enable_feature_flag=all 설정을 해준다. admin web은 15672 포트가 기본으로 설정되어 있는데, 해당 포트로 접속해보면 admin 사이트에 접근할 수 있다. default 생성되는 admin은 guest/guest이다. feature flag 설정을 해주지 않으면 admin 사이트 접속이 불가하다.

client - server configuration

consumer든 producer든 rabbitmq 서버와 connection을 맺어야 하므로 서버 설정을 해줘야 한다. 예제는 별도의 프로세스가 아니므로 하나의 spring boot process에서 application.yml에 rabbitmq와의 연결 설정을 넣어준다.

//application.yml
spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest

 

consumer(receiver) 구현

@Component
@Getter
@Slf4j
public class Receiver {

    public void receiveMessage(String message) {
        log.info("Received <" + message + ">");
    }
}
@SpringBootApplication
public class RabbitmqApplication {

	static final String topicExchangeName = "spring-boot-exchange";
	static final String queueName = "spring-boot";

	public static void main(String[] args) {
		SpringApplication.run(RabbitmqApplication.class, args);
	}

	@Bean
	Queue queue() {
		return new Queue(queueName, false);
	}

	@Bean
	TopicExchange exchange() {
		return new TopicExchange(topicExchangeName);
	}

	@Bean
	Binding binding(Queue queue, TopicExchange exchange) {
		return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
	}

	@Bean
	MessageListenerAdapter listenerAdapter(Receiver receiver) {
		return new MessageListenerAdapter(receiver, "receiveMessage");
	}

	@Bean
	SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		container.setQueueNames(queueName);
		container.setMessageListener(listenerAdapter);
		return container;
	}

}

사실 queue, exchange, binding은 receiver 측에서 구현할 필요가 없다. 어떤 queueName을 가진 큐를 생성하고, 어떤 topicExchange를 만들고, 이를 어떤 topic pattern을 가지고 binding 할지 결정하는 것은 rabbitmq 서버가 가진 책임이기 때문이다. spring boot tutorial에선 편의상 설정도 모두 같이 넣은 것으로 보인다.

receiver 구현 시 중요한 것은 listener를 등록하는 것이다. MessageListenerAdapter로 메시지를 수신할 어댑터를 생성한다. "receiveMessage"라는 메서드를 메시지 수신 시 호출하게 된다. adapter의 의미처럼 해당 메시지 리스너는 뺐다 꼈다 할 수 있다. receiver 입장에서는 특정 topic으로 발행된 메시지를 수신할 때 callback으로 receiver 시스템에서 receiveMessage라는 메서드가 호출되게 한 것 뿐이다.

receiver는 여러 메시지에 대해서 수신할 수 있어야 한다. 예제에서는 'foo.bar'로 시작하는 모든 토픽을 수신 받는 consumer에게 모두 메시지를 전송하는데, 추후 producer가 'foo.bar.baz'라는 토픽으로 메시지를 발행할 것이므로 receiver 시스템에서 메시지를 수신받게 된다.

producer(sender) 구현

이미 설정이 모두 끝났으므로 producer의 구현은 매우 간단하다. 예제에서는 CommandLineRunner를 구현해서 처리했지만, spring web을 보통 사용하므로 restController로 구현해보았다. receiver 코드도 runner를 통해 실행되므로 프로세스가 종료되기 전에 message를 수신할 수 있도록 CountDownLatch를 추가로 구현해두었지만, restController로 구현하면 메인 프로세스는 종료되지 않으므로 countDownLatch 코드도 삭제하였다.

@RestController
@RequiredArgsConstructor
@Slf4j
public class Controller {

    private final RabbitTemplate rabbitTemplate;

    @GetMapping("/rabbit-message")
    public void sendMessage() {
        log.info("Sending message...");
        rabbitTemplate.convertAndSend(topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
    }
}

간단하다. RabbitTemplate을 이용해 특정 exchange(위에서 spring-boot-exchange로 이미 등록해두었다)에 특정 topic(foo.bar.baz)으로 메시지(Hello from RabbitMQ!)를 발행한다.

메시지 발행 및 수신 테스트

15672 port로 admin 사이트에 접속해보면 spring-boot-exchange 라는 exchange가 생성되어 있는 것이 보인다. /rabbit-message로 요청을 보내면 다음과 같이 로그가 정상적으로 출력되는 것을 확인할 수 있다.

admin 사이트에서 확인하면 메시지 발행이 정상적으로 이루어진 것을 overview에서 확인할 수 있는데, 해당 메시지가 정상적으로 잘 처리되고 있는지는 메시지 수신이 너무 빨리 끝나서 제대로 확인할 수 없다.

실제로는 jmeter를 이용해 트래픽을 주고 테스트해보고, receiver callback(receiveMessage) 메서드에서 thread sleep time을 길게 주고 부하를 줘보기도 했다. 이 때 rabbitmq 서버에서 메시지 처리가 매우 지연되는 현상을 발견했다. queue에 최대로 쌓을 수 있는 메시지 갯수에도 한계가 있었다. 특정 queue에 쌓인 메시지들을 수신하는 consumer의 성능이 안 좋을 경우 이슈가 생길 수 있을 듯하다.

(추가) RabbitMQ 메시지 처리 방식

RabbitMQ 공식사이트의 tutorials를 확인하면 producing - consuming 처리를 어떻게 하는지에 따라 나눠서 설명해두고 있는데, spring boot 측에서 예제로 넣어둔 것은 topic 별 messaging에 해당한다. 추가적인 메시징 처리 방식은 tutorials에서 확인할 수 있다.

마치며

messaging 처리에 대한 필요성에 대해서는 계속 인지하고 있었으나 공부하거나 실습해보는 노력은 하지 않아왔었다. 실무에서 사용할 일이 없기 때문이었다. 이번에 글을 쓰면서 제목을 '무작정 따라하기'로 잡은 이유도 너무 큰 hurdle을 내 앞에 만들어내어 시작조차 하지 못하는 것에 대한 반성 때문이었다. 정확히 이해하지 못하더라도 내가 일단 한 번 따라해보면서 시작하는 데 의의가 있는 상태였기 때문이다.(그런데 역시나 rabbitmq를 검색하는 과정에서 amqp, kafka 등에 대한 이론적인 배경을 계속 읽고 찾아버리고 말았다..)

나의 학습법의 문제는 실습 보다는 이론에 치우쳐져 있다는 데 있다. 실제로 코드를 짜보면서 부딪혀보면 동작 원리나 개념들은 자연스럽게 따라오는 경우가 많다. 물론 이건 학습 방법론의 차이나 개개인의 편차가 있는, 그리고 trade-off의 문제이지만 말이다.

다시 말하면 나의 학습법의 문제는 이론 학습의 hurdle 때문에 시작조차 하지 않는 데 있다. '시작해보기'에서 중요한 것은 내가 무엇을 시작하는 데 대한 '동기부여가 되어 있는 상태인가?'에 있는 듯하다. 이미 동기부여가 되어 있는 상태면 이론 학습을 먼저 시작하더라도 실습까지 무난하게 가는 듯하다. 그런데 동기부여가 되어 있지 않으면 금방 질려버리곤 한다. '무작정 따라하기'가 필요했던 순간이었다. 동기부여되어 있지 않아도 가볍고 즐겁게 할 수 있으니까.

중요한 것은 '나의 문제로 만드는 것'이다. 동기부여되지 않은 것은 위에서 말한 것처럼 실무에서 사용할 일이 없기 때문이다. 필요성은 계속 인지하고 있고 개발 공부 같이 하는 동료들이 메시징에 대한 이야기를 계속 해도, 그건 마음 속 깊은 곳에서 정의한 '남의 문제'였던 것이다. 이번에 rabbitmq를 간단히 구현해보면서 스스로 아쉬웠던 점은 언제쯤 이 기술이 나의 문제가 될 것인가?였다.