이전 글에서 발행-구독 패턴의 채널에 대한 오해를 바로 잡고 이를 개선해봤습니다.
이 과정을 통해 특정 관심사인 채널을 구독하게되어 이전 보다 더 느슨한 결합을 달성할 수 있었는데요
이와 반대로 Broker
에서 발행자를 관리하지 않게 되어 아래와 같은 문제들이 예상되었습니다.
- 발행자가 더 이상 메시지를 발행하지 않아도 채널이 유지됨
- 구독자가 없어도 채널이 항상 유지됨
발행자가 더 이상 메시지를 발행하지 않는 상태에 대한 처리는 활용하는 구현에서 관리해야하는 문제이지만, 구독자가 없는 상태에서 채널이 유지되는 것은 메모리를 낭비 할 여지가 있어보이죠?
그래서 이번 글에서는 더 이상 구독자가 없는 채널에 대한 정리를 추가해보려고 합니다.
어떤 방법들이 있을까?
이러한 문제를 개선하기위해 2가지 방법 정도가 떠올랐습니다.
- 구독 해제할 때, 더 이상 구독자가 없다면 정리
- 일정 시간 간격으로 구독자가 없는 채널을 정리
장단점을 살펴볼까요?
구독 해제할 때, 더 이상 구독자가 없다면 정리
간단한 방법으로 구독자가 해당 채널을 구독 해제할 때, 현재 남아있는 구독자가 자신 뿐이라면 채널을 정리하는 방식을 고려할 수 있습니다.
활용하고 있는 Node.js는 싱글 스레드로 동작하는 특성으로 인해, 구독자 목록을 저장하는 Map
인스턴스에 여러 처리가 접근할 수 없는 구조입니다. 때문에 인스턴스 자체에 접근하는 동시성 문제를 고려하지 않아도 괜찮아 구현이 쉽습니다.
다만, 각각의 구독자의 구독 해제 처리에서 구독했던 채널의 구독자 목록을 계속해서 확인해야하기 때문에 코드가 조금 많아지고, 메서드가 많은 역할을 담당하게 되겠네요
그리고, 구독자 목록을 정리한 직후에 구독 요청이 들어온다면 구독자 목록을 다시 할당해줘야합니다. (어쩔수 없어 보이긴 하네요 😂)
일정 시간 간격으로 구독자가 없는 채널을 정리
조금 더 복잡하게 처리하려면, Timmer
API를 이용하여 일정 시간 간격으로 정리해주는 방법도 고려할 수 있습니다.
구독자 목록을 저장하는 Map
인스턴스를 별도로 관리하기 때문에 구독 해제 처리에서 채널을 관리하는 역할을 하지 않게되어 구독 해제 한가지 일만 하게됩니다.
하지만, 채널 정리 주기가 너무 짧다면 오버헤드로 이어질 수 있고, 너무 길다면 정리하는 이득을 보기 어렵기 때문에 사용자가 직접 주기를 조절할 수 있도록 해야합니다.
다른 좋은 방법들도 있겠지만, 당장은 떠오르지 않네요 🤣
저는 첫 번째와 두 번째 모두를 고려해보겠습니다. 구독자 목록 관리를 별도로 분리하여 독립적으로 개선 가능한 형태로 구성해보려고 합니다.
더 고민해야할 부분은?
단순히 일정 시간을 주기로 채널을 정리한다고 해도, 위에서 살짝 언급했던 내용 처럼 금방 정리된 채널에 다시 구독이 발생할 수 있습니다. 저는 이런 문제를 시간 지역성(Temporal Locality)을 고려하여 처리해보려고 합니다.
시간 지역성(Temporal Locality)?
“최근에 접근한 데이터나 코드는 가까운 미래에 다시 접근될 가능성이 높다.“는 프로그램의 특성
시간 지역성은 캐시, RDBMS의 버퍼풀, OS의 페이지 교체 알고리즘 등에 활용되고 있는데, 이를 조금 다르게 해석하면 메시지가 발행된 시간이 오래되었다면 앞으로도 활용하지 않는 채널이라는 가설을 세울 수 있지 않을까요?
그래서 저는 구독자가 없는 채널의 마지막으로 메시지가 발행된 시간을 기준으로 정리될 수 있도록 해보겠습니다.
- 제가 구현한 pub/sub은 일반적인 활용을 가정하고 만든 모듈이라 사용하는 환경의 특성마다 다르기 때문에 큰 이득이 없을 수도 있고, 오히려 오버헤드가 될 수 있을 여지도 있겠지만요 🤣
전체 아키텍처
앞서 말씀드렸던 것 처럼 일정 주기로 정해진 기간동안 사용하지 않은 채널을 정리하는 기능을 도입했습니다.
잘 구현하려고 하다보니 생각보다 많은 변경이 있었네요
일단 변경된 전체 아키텍처를 먼저 살펴보겠습니다.
classDiagram class MessageBroker~TMessage~ { -repository: ChannelRepository -scheduler: Scheduler -tasks: ScheduledTask[] +subscribe(topic: string, handler: MessageHandler) +publish(topic: string, message: TMessage) } class ChannelRepository~TMessage~ { << interface >> +findChannelByTopic(topic: string) +findAllChannels() +deleteChannelByTopic(topic: string) } class Cleanable { << interface >> +topic: string +lastUpdatedAt: Date +subscriberLength: number } class Subjection~TMessage~ { << interface >> +publish(message: TMessage) +subscribe(handler: MessageHandler) } class ScheduledTask { << interface >> +name: string +interval: number +execute() } class CleanableChannelRepository { << interface >> +findAllChannels() +deleteChannelByTopic(topic: string) } class Channel~TMessage~ { -subscriptions: Subscription[] +topic: string +lastUpdatedAt: Date +subscriberLength: number +publish(message: TMessage) +subscribe(handler: MessageHandler) -unsubscribe(subscriptionId: string) } class MemoryChannelRepository~TMessage~ { -channels: Map +findChannelByTopic(topic: string) +findAllChannels() +deleteChannelByTopic(topic: string) } class ChannelCleaner { +name: string +interval: number -staleThreshold: number +execute() -cleanupChannel(channel: Cleanable) -isExpiredChannel(channel: Cleanable) } class Scheduler { -timers: Map -runningTasks: Set -tasks: ScheduledTask[] +registerTask(task: ScheduledTask) +unregisterTask(taskName: string) +start(taskName: string) +startAll() +stop(taskName: string) +stopAll() -executeTask(task: ScheduledTask) } MessageBroker --> ChannelRepository : uses MessageBroker --> Scheduler : uses MessageBroker --> ChannelCleaner : uses MemoryChannelRepository ..|> ChannelRepository : implements Channel ..|> Cleanable : implements Channel ..|> Subjection : implements ChannelCleaner ..|> ScheduledTask : implements MemoryChannelRepository --> Channel : manages ChannelCleaner --> CleanableChannelRepository : uses
추가된 요소들이 많다보니 클래스 다이어그램이 너무 작게 보이네요 😂 큰 변경 사항들을 통해 어떠한 변화들이 있는지 자세히 살펴보겠습니다.
Channel과 Repository
중요한 변경사항으로 Channel
을 도메인으로 분리하였습니다.
classDiagram class MessageBroker~TMessage~ { -repository: ChannelRepository -scheduler: Scheduler -tasks: ScheduledTask[] +subscribe(topic: string, handler: MessageHandler) +publish(topic: string, message: TMessage) } class ChannelRepository~TMessage~ { << interface >> +findChannelByTopic(topic: string) +findAllChannels() +deleteChannelByTopic(topic: string) } class Subjection~TMessage~ { << interface >> +publish(message: TMessage) +subscribe(handler: MessageHandler) } class Channel~TMessage~ { -subscriptions: Subscription[] +topic: string +lastUpdatedAt: Date +subscriberLength: number +publish(message: TMessage) +subscribe(handler: MessageHandler) -unsubscribe(subscriptionId: string) } class MemoryChannelRepository~TMessage~ { -channels: Map +findChannelByTopic(topic: string) +findAllChannels() +deleteChannelByTopic(topic: string) } MessageBroker --> ChannelRepository : uses MemoryChannelRepository ..|> ChannelRepository : implements Channel ..|> Subjection : implements MemoryChannelRepository --> Channel : manages
이전의 MessageBroker
는 subscribe
와 publish
인터페이스를 구현하도록 설정한 Broker
인터페이스를 구현한 클래스였는데요
MessageBroker
역시 구독과 발행을 처리하지만, 실제 구독/발행의 주체 Channel
이라고 판단되었습니다.
그래서 구독/발행에 대한 역할을 Subjection
인터페이스로 분리하고 Channel
클래스가 해당 인터페이스를 구현하도록 변경하였어요
|
|
Channel
의 CRUD에 대해 역할 분리와, 이후 쉽게 변경 가능하고 테스트 가능한 구조를 만들기 위해 ChannelRepository
인터페이스 추가하였습니다.
그리고 MessageBroker
가 해당 인터페이스에 의존하도록 하여 인터페이스의 구현체를 주입받도록 했습니다.
- 기존 구현은
ChannelRepository
의 구현체인MemoryChannelRepository
로 옮겼습니다.
|
|
세부 구현들을 주입 받는 방식으로 변경되었기 때문에 MessageBroker
의 구현은 크게 바뀔것이 없을 것으로 예상되어 Broker
인터페이스는 제거하였습니다.
Scheduler와 ScheduleTask
다음은 특정 작업을 일정 시간 간격으로 실행하는 역할을 담당하는 클래스인 Scheduler
입니다.
classDiagram direction LR class MessageBroker~TMessage~ { -repository: ChannelRepository -scheduler: Scheduler -tasks: ScheduledTask[] +subscribe(topic: string, handler: MessageHandler) +publish(topic: string, message: TMessage) } class ScheduledTask { << interface >> +name: string +interval: number +execute() } class Scheduler { -timers: Map -runningTasks: Set -tasks: ScheduledTask[] +registerTask(task: ScheduledTask) +unregisterTask(taskName: string) +start(taskName: string) +startAll() +stop(taskName: string) +stopAll() -executeTask(task: ScheduledTask) } MessageBroker --> Scheduler : uses Scheduler --> ScheduledTask : manages
Scheduler
는 ScheduledTask
인터페이스의 구현체의 목록을 이용하여 설정된 주기로 실행하는 역할을 담당하고 있는데요
ScheduledTask
는 해당 작업에 대한 식별을 위한 name
과 실행할 주기인 interval
을 포함하도록 하고, 실제 실행될 동작인 excute()
를 구현하도록 하고 있습니다.
MessageBroker
의 초기화 과정에서 주입받은 ScheduledTask
들을 Scheduler
에 등록하고 실행시키게됩니다.
|
|
Scheduler
가 ScheduledTask
들을 주입받도록 구현했습니다만 MessageBroker
에서도 ScheduledTask
들을 주입받아 초기화하도록 구현했습니다.
테스트 코드를 작성하기 편하게 변경하다보니 해당 구조를 만들게 되었는데, 초기화 방법이 여러개로 분산되어서 오히려 복잡하게 느껴질 수 있다는 생각이 드네요
더 좋은 방법이 있는지 다시 고민해봐야겠습니다 😁
ChannelCleaner와 Cleanable
다음은 채널 정리를 담당하고 있는 ChannelCleaner
입니다.
classDiagram class Cleanable { << interface >> +topic: string +lastUpdatedAt: Date +subscriberLength: number } class ScheduledTask { << interface >> +name: string +interval: number +execute() } class CleanableChannelRepository { << interface >> +findAllChannels() Promise~Cleanable[]~ +deleteChannelByTopic(topic: string) } class Channel~TMessage~ { -subscriptions: Subscription[] +topic: string +lastUpdatedAt: Date +subscriberLength: number +publish(message: TMessage) +subscribe(handler: MessageHandler) -unsubscribe(subscriptionId: string) } class ChannelCleaner { -repository: CleanableChannelRepository +name: string +interval: number -staleThreshold: number +execute() -cleanupChannel(channel: Cleanable) -isExpiredChannel(channel: Cleanable) } ChannelCleaner ..|> ScheduledTask : implements Channel ..|> Cleanable : implements CleanableChannelRepository --> Cleanable : manages ChannelCleaner --> CleanableChannelRepository : uses
ChannelCleaner
는 앞서 언급했던 ScheduledTask
의 구현체로 Scheduler
가 해당 클래스를 통해 설정된 간격으로 채널 정리 작업을 실행할 수 있습니다.
|
|
ChannelCleanerOption
을 통해 실행 간격과 채널의 수명을 지정할 수 있도록 했습니다.
isExpiredChannel
메서드를 통해 삭제 되어야 할 조건들을 확인하도록 구현했는데, 조건이 변경될 경우 해당 메서드의 변경이 불가피해 보이네요 😅
미들웨어 같은 책임 연쇄 패턴을 적용해보면 좋지 않을까 생각이 들긴 하지만, 아직 이른 감이 있어 보여서 다음 기회에 적용해보면 좋겠습니다.
이 코드에서 조금 특이한 점은 ChannelRepository
가 아닌 CleanableChannelRepository
의존성을 주입받도록 구성했다는 점 인데요
CleanableChannelRepository
는 Channel
이 아닌 Cleanable
을 처리하도록 하는 인터페이스 입니다.
앞서 Subjection
인터페이스를 구현하도록 한 Channel
클래스는 Cleanable
도 구현하도록 하고 있는데, 이는 ChannelCleaner
가 해당 인스턴스를 삭제하기 위한 기준들을 포함하도록 하기 위함입니다.
ChannelCleaner
에서 관리하게 될 Channal
에 대해 현재 삭제 기준인 마지막 업데이트 시점을 의미하는 lastUpdatedAt
과 현재 구독자들의 수를 의미하는 subcriberLength
, 그리고 해당 채널을 고유하게 식별할 수 있는 topic
을 통해 정리될 수 있도록 구현하였습니다.
그리고 ChannelCleaner
가 Channel
을 직접 활용하는 것이 아니어서 테스트 코드가 조금 더 간결해지는 효과도 얻을 수 있었어요
|
|
위 코드는 Channel
을 활용할 때 테스트 코드입니다.
이 테스트 코드에서 제가 생각한 문제는 활용하지 않는 메서드와 요소까지 모킹해야한다는 점 이었어요
보시는 것 처럼 ChannelRepository
를 모킹하기 위해 사용하지 않는 findChannelByTopic
메서드를 모킹해야했고,
Channel
을 생성하는 핼퍼 함수 createMockChannel
또한 불필요한 요소들을 모두 구현하지 않기 위해 타입 단언을 활용해야 했습니다.
이러한 부분이 불필요한 부분에 의존되는 테스트를 만든다고 생각했어요
|
|
이러한 이유로 Channel
클래스가 Cleanable
인터페이스를 구현하도록 하는 동시에 CleanableChannelRepository
를 통해 필요한 메서드만 호출되도록 하여 타입 단언이나 불필요한 의존을 제거할 수 있도록 했습니다.
이러한 구조로 인해 관리해야할 인터페이스가 늘어나긴 하지만 Channel
에 덜 의존적인 코드가 더 중요하다고 생각하여 적용했습니다.
마무리
이번에는 기존 pub/sub 기능에 구독자가 없는 상태에서 오랜 시간 유지된 채널은 앞으로도 구독자가 없을 것 이라는 가정을 기반으로 메모리 관리 기능을 도입해봤습니다.
이를 위해 일정 주기로 작업을 실행하는 역할을 담당하는 Schduler
과 검증 조건을 통해 채널을 삭제하는 역할을 ChannelCleaner
로 구현하여 책임을 최대한 분리해보려 시도하였습니다.
이 과정에서 주요 관심사인 Channel
을 분리하여 처리 흐름이 더욱 명확히 보이도록 했고, Subjection
과 Cleanable
인터페이스를 구현하도록 하여 동작을 예측 가능하게 만들려 해봤네요
Scheduler
에서 관리될 작업들을 SchduledTask
인터페이스로 정의하여 일정 주기로 원하는 기능을 수행할 수 있도록 했고, ChannelCleaner
가 SchduledTask
를 구현하도록 하여 설정한 시간 간격으로 수행될 수 있도록 했습니다.
다 만들고 정리하다보니 기능에 비해 너무 비대한 코드가 된 것이 아닌가 싶기도 하고, 아쉬운 점도 조금씩 보이네요😅
아직 완성된 것은 아니니 계속해서 조금씩 개선해 나가도록 하겠습니다! 끝까지 읽어주셔서 읽어주셔서 감사합니다.😊 (다음 순서는 실패 메시지 재발송 관련 개선을 해보려고 해요)