KKanging

Spring Data Redis에서 Redis Pub/Sub 사용하기 본문

백엔드/DB

Spring Data Redis에서 Redis Pub/Sub 사용하기

천방지축 개발자 2025. 6. 18. 23:02

Redis Pub/Sub 개요

Redis Pub/Sub 이란

Redis 는 싱글 쓰레드기반 In-memory DB이다.

 

Redis를 많이 사용하는 이유는 다양한 자료구조와 고가용성, 고성능을 짚곤 한다.

 

Redis의 자료구조인 Redis Pub/Sub 에 대해 알아보려고 한다.

Message Queue ?

Redis Pub/Sub 은 Message Queue의 한 종류라고 볼 수 있다.

 

MQ 플랫폼에 항상 등장하는 개념이 있는데 Publisher 와 Subscriber 이다.

 

Publisher 는 MQ에게 data를 보내는 역할을 하며 Subscriber는 MQ로 부터 자기 관심사에 해당하는 데이터를 수신 받게 된다.

Kafka 같은 다른 MQ 플랫폼과의 차별점

사실 Kafka 같은 MQ 플랫폼은 다른 플랫폼과 성격이 달라서 비교하기 애매하지만 글쓴이가 Kafka에 익숙하므로 Kafka와 Redis Pub/Sub 과의 차별점을 설명하겠다.

 

Kafka는 Producer/Consumer 구조로 되어 있으며 Producer 가 보내는 로그(메시지와 같은 개념이라고 생각)를 영속화한다(retention 기간동안)

반면 Redis는 In-memory 기반이라 휘발성이며 한번 Subscribers에게 보낸 데이터는 삭제된다.

 

Kafka는 높은 처리량을 자랑한다 Kafka의 파티션이라는 개념은 컨슈머들의 집합인 컨슈머 그룹 단위로 높은 병렬성을 제공할 수 있기 때문에 유연한 확장과 높은 병렬 처리로 고성능 처리량이 강점이다.

하지만 Redis는 그런 병렬처리를 위해서는 클러스터를 구축해서 연결 단위 노드를 분산해야만 한다.

 

Kafka는 통신간 신뢰성을 보장할 수 있다. At-Least Once, At-Most Once, Exactly Once 수준의 신뢰성을 보장할 수 있다는게 큰 특징이다.

하지만 Redis Pub/Sub은 At-Most Once (신뢰성을 보장하지 않음)만 가능하다는 점이다.

그럼 Redis Pub/Sub의 강점은 없나?

Redis Pub/Sub을 왜쓰나 싶을 수 있다.

 

Redis Pub/Sub은 Kafka 보다 가볍고 In-memory 기반이기 때문에 Kafka 가 아무리 자체적인 페이징 기법을 써서 성능을 향상한다고 해도 Redis가 더 낮은 Latency를 보여준다. (물론 대규모 시스템 혹은 어떤 환경이냐 에 따라 Redis 의 성능은 낮아질 수 있다)

 

또한 Redis Pub/Sub은 Unicasting과 동적 Subscribe 그리고 패턴 매칭에 강점이 있다고 생각한다.

Kafka의 토픽은 보통 브로커를 설정할 때 미리 설계하여 만들어 놓곤 한다.

하지만 Redis Pub/Sub의 Subscribe 명령은 Kafka에 비해 훨씬 동적이다. 다른 Redis 명령이 그렇듯 subscribe로 새로운 Channel 을 만들면 해당 Channel 이 생기는 동적인 구조를 가진다.

Redis Pub/Sub 원리

Redis Pub/Sub 용어

다음은 Redis Pub/Sub의 주요 용어이다.

  • Publisher : 데이터를 생성하는 주체
  • Subscriber : 데이터를 수신하는 주체
  • Channel : 데이터의 단위 ( RDB의 테이블이라고 생각해도 좋을 듯 하다)
  • Message : 데이터

Redis Pub/Sub 동작 흐름

우선 Publish 동작을 수행해보자

PUBLISH 명령 뒤에 [channel] [message] 를 입력한다.

test-channel 에 hi라는 메시지를 보냈는데 응답으로 0을 받았다.

 

Redis Pub/Sub 관련 명령을 수행하고 받는 응답은 정수값인데 해당 데이터와 연결된 세션의 갯수를 의미한다.

 

따라서 위 0은 test-channel 을 수신하는 Subscriber 가 없다는 것을 의미한다.

 

다음은 Subscriber 동작이다

성공적으로 수신이 되면 응답으로 위 설명처럼 해당 채널에 연관된 세션의 수를 응답받는다.

Redis Pub/Sub 동작 원리

Redis Pub/Sub은 Kafka 같은 플랫폼과 다르게 메시지를 수신자에게 Push하는 형태이다.

Redis Pub/Sub은 내부적으로 channel 이름과 클라이언트와 연결되는 TCP 세션을 저장한다.

메시지가 들어온다면 channel 을 O(n) 반복하여 찾고 해당하는 클라이언트들에게 OS 커널로 Push 하는 형태이다.

channel을 찾는 행위는 무조건 n 번을 반복하여 해당하는 클라이언트를 찾기 때문에 channel 의 수가 많아진다면 Redis Pub/Sub의 성능이 저하될 수 있다.

 

[TMI] 클러스터링 레디스에서 Pub/Sub의 동작

key를 기준으로 샤딩되는 클러스터링 레디스에서 Pub/Sub은 Channel 기준으로 샤딩될까?

정답은 아니다. Channel은 key로 판단하지 않기 때문에 Broadcasting 하여 모든 노드에 동일한 메시지 데이터가 존재한다.

7.0+ 버전의 레디스에서는 Sharded Pub/Sub 기능이 추가 됐으며 해당 기능으로 샤딩을 구축할 수 있다.

Spring Data Redis 에서 Redis Pub/Sub 사용하기

Redis Pub/Sub을 사용한다는 건 메시지 유실 같은 보장을 신경쓰지 않겠다는 것과 같다.

그래서 나는 Spring Data Redis에서 Redis Pub/Sub 을 사용법을 익히기 위해 비동기 처리, 커넥션 유지 안정성 위주로 학습을 하였고 구현했다.

따라서 다음 내용도 해당 내용을 담고 있다.

Publisher구현

사실 Producer 는 쉽게 구현할 수 있다.

RedisTemplate 를 사용하면 되는데

redisTemplate.convertAndSend(channel, storeOrderEvent)

위처럼 구현하면 해당 topic 에 데이터를 보낼 수 있다.

물론 저수준으로 RedisConnection 이나 RedisOperation을 사용할 수 있는거 같지만 본인은 아직 그정도의 저수준 API를 사용해야할 이유를 발견하지 못했다

Subscriber구현

Consumer 구현도 RedisTemplate 을 활용할 수 있는거 같지만 RedisTemplate보단 SpringDataRedis에서 제공하는 RedisMessageListenerContainer에 인터페이스인 MessageListener 를 커스텀한 구현체를 등록하면 된다.

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisStoreOrderListener implements MessageListener {
	private final SseEventHandler sseEventHandler;
	private final GenericJackson2JsonRedisSerializer serializer;
	private static final String EVENT_NAME = "StoreOrderEvent";

	@Override
	public void onMessage(Message message, byte[] pattern) {
		String key = new String(message.getChannel());
		StoreOrderEvent storeOrderEvent;
		storeOrderEvent = serializer.deserialize(message.getBody(), StoreOrderEvent.class);
		sseEventHandler.handleEventWithSSE(SseChannelProvider.OWNER_STORE, EVENT_NAME, key, storeOrderEvent);
	}
}

MessageListener 의 구현체는 message를 수신 받았을 때 수행하는 로직을 작성하면 된다


@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumerHandler {
	private final RedisMessageListenerContainer container;
	private final Map<String, MessageListener> listenerMap = new ConcurrentHashMap<>();
	private final RedisStoreOrderListener redisStoreOrderListener;

	public void subscribe(String channel) {
		if (listenerMap.containsKey(channel)) {
			log.info("이미 구독중인 channel {}", channel);
			return;
		}
		listenerMap.put(channel, redisStoreOrderListener);
		container.addMessageListener(redisStoreOrderListener, ChannelTopic.of(channel));
	}

	public void unsubscribe(String channel) {
		if (!listenerMap.containsKey(channel)) {
			log.info("구독중이지 않은 channel {}", channel);
			return;
		}
		container.removeMessageListener(listenerMap.get(channel), ChannelTopic.of(channel));
		listenerMap.remove(channel);
	}
}

그리고 위처럼 동적으로 subscribe 하고 unsubscibe 하는 로직을 담당하는 컴포넌트를 구현했다.

Spring Data Redis TMI

위에서 언급했듯이 Spring Data Redis에서 Redis Pub/Sub 사용법을 익힐 때 중점적으로 봤던것은 다음과 같다

  • 고성능 구현 방법
  • 커넥션의 안정성

성능을 높이는 사용법

고성능 관점에서는 주요하게 본 것은 논블록킹IO 이다. Redis Pub/Sub은 At-Most Once이므로 사실 동기적으로 처리해야하는 필요성이 낮은 도메인에서 사용할 확률이 크다 (본인 또한 그렇다)

그래서 RedisTemplate 또한 그냥 쓰지 않고 ReactiveRedisTemplate를 사용하여 구현했다.

 

그리고 Subscriber 쪽인 RedisMessageListnerContainer 도 블록킹IO 방식일지 궁금해서 디버깅도 해보고 문서도 찾아보면서 알아보았다.

다행이도 Lecttuce 기반은 Netty 이벤트-루프 방식으로 OS커널에 데이터가 푸쉬되면 해당 데이터를 읽고 이벤트를 발생시키는 구조였다.

 

Netty IO Thread → PubSubEndpoint.notifyMessage() (이벤트 트리거)

이후 RedisMessageListenerContainer 에서 등록된 channel 과 MessageListener 를 찾고 DispatchMessageListner 에게 책임을 전가해 DispatchMessageListner 는 찾은 MessageListner Collection Wrapper 를 순회하면서 비동기 실행을 한다.

다음은 핵심 코드이다

커넥션의 안정성은 어떨까

DB와의 커넥션은 언제 끊어질지 모른다. 여기서 걱정인게 channel ,커넥션 단위로 Redis 는 클라이언트를 식별하며, 커넥션이 끊기면 삭제한다.

 

이상황에서 난 그런 의문이 들었다. RedisMessageListenerContainer 에 동적으로 등록한 MessageListner들(Subscriber)이 커넥션이 끝나도 Spring 서버 메모리에는 존재하는데 Redis 서버와 Spring 서버 사이에 이러면 정합성이 깨지지 않나 라는 의문을 품었다.

 

다행이도 RedisMessageListenerContainer 는 back-off라는 기능을 제공한다.

만약 커넥션이 끊긴다면 RedisMessageListenerContainer 안에 Subscriber라는 내부 클래스 객체에 Exception이 발생하고 RedisMessageListenerContainer.handleSubscriptionException() 메서드를

통해 연결 재시도(횟수 제한있음)을 실행한다.


private CompletableFuture<Void> lazyListen(BackOffExecution backOffExecution) {

    // 1) 아직 구독할 채널·패턴이 하나도 없다면 → 바로 끝
    if (!this.hasTopics()) {
        this.logDebug(() -> "Postpone listening for Redis messages until actual listeners are added");
        return CompletableFuture.completedFuture(null);
    }

    // 2) 최소 한 개의 채널/패턴이 있는 경우
    CompletableFuture<Void> containerListenFuture;

    // 3) doSubscribe(...) 가 “구독 절차를 시작했을 때” true 를 반환
    //    false 면 CAS 경합(state 변경 실패) 혹은 재시도 필요 → 다시 루프
    for (containerListenFuture = this.listenFuture;
         !this.doSubscribe(backOffExecution);
         containerListenFuture = this.listenFuture) { }

    // 4) 최종적으로, 구독 완료/실패가 담길 listenFuture 를 반환
    return containerListenFuture;
}

위 코드는 백오프가 실행되면 RedisMessageListenerContainer 에서 실행하는 메서드이다.

다시 subscribe를 맺는 것을 볼 수 있다.