Featured image of post 2. 발행-구독 패턴에서 발행자는 어떤 의미일까?

2. 발행-구독 패턴에서 발행자는 어떤 의미일까?

발행-구독 패턴 개선 일지

저번 글에 간단한 pub/sub 모듈을 구현했던 내용들을 공유했습니다.

만든 모듈을 개선하려고 여러 자료들을 참고해보니 제가 구현한 pub/sub 모듈에서 뭔가 이질감이 느껴졌어요

오늘은 그 이질감을 느꼈던 부분에 대해 말씀드리고 개선해본 내용들을 공유드리려고 합니다.

현재 구조

이전에 구현했던 Broker 인터페이스의 구조를 먼저 살펴보겠습니다.

classDiagram
    class Broker {
        << interface >>
        subscribe(publisherId: string, handler: MessageHandler)
        publish(publisherId: string, message: TMessage)
        addPublisher(publisherId)
        removePublisher(publisherId: string)
    }
  • TMessage: 사용자가 정의하는 처리할 메시지
  • MessageHandler: TMessage로 정의된 메시지를 처리하는 핸들러

처음 의도했던 코드는 Broker의 구현체의 addPublisher를 통해 발행자를 등록하고, 이후 구독자들이 해당 발행자를 구독하는 형태였습니다.

이질감을 느꼈던 부분

이전 글에서 언급했던 내용처럼 제가 pub/sub 패턴을 적용한 이유는 특정 그룹에 대한 웹소켓 요청/응답 브로드캐스팅 처리할 때, 해당 웹소켓 연결에 대한 참조를 알고있어야 한다는 문제로 인해 발생한 결합을 느슨하게 만들기 위함이었는데요

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class MessageBroker<TMessage> implements Broker<TMessage> {
    public async subscribe(publisherId: string, handler: MessageHandler<TMessage>) {
        const subscriptions = this.publishers.get(publisherId);

        if (subscriptions === undefined) {
            throw new Error(`Publisher with ID ${publisherId} does not exist`);
        }

        const subscriptionId = randomUUID();
        this.publishers.set(publisherId, [...subscriptions, {id: subscriptionId, handler}]);

        return () => this.unsubscribe(publisherId, subscriptionId);
    }
}

현재 구현된 subscribe 메서드를 살펴보면, 구독하려고 하는 발행자가 없는 경우 예외를 던지도록 되어있습니다.

이를 해석해보면 구독자가 발행자의 존재 유무에 영향을 받음을 의미하게 된다고 생각하게 되었어요

사실 이러한 구현은 발행-구독 패턴이라고는 말할 수는 있지만, 메시지 브로커(이벤트 버스)가 옵저버 패턴의 주제(Subject)를 모아 관리만 해주는 형태가 되었기 때문에 완벽하게 결합을 느슨하게 만들었다고는 볼 수 없다고 판단하였습니다.

옵저버 패턴

간략하게 옵저버 패턴의 구조를 살펴보면 아래와 같습니다.

classDiagram
    direction LR
    class Subject {
        << interface >>
        attach(observer: Observer)
        detach(observer: Observer)
        notify()
    }
    
    class Observer {
        << interface >>
        +update()
    }
    
    class ConcreteSubject {
        -observers: Observer[]
        +attach(observer: Observer)
        +detach(observer: Observer)
        +notify()
    }
    
    class ConcreteObserver {
        +update()
    }
    
    Subject <|-- ConcreteSubject
    Observer <|-- ConcreteObserver
    Subject "1" --o "*" Observer : observers
    ConcreteObserver --> ConcreteSubject

옵저버 패턴은 주제(Subject)와 관찰자(Observer)로 구성되어, 특정 SubjectObserver 관찰하는 것 처럼 만드는데요

Subject 인터페이스를 구현한 ConcreteSubject 클래스에서 Observer의 구현체들을 가지고 있고, 특정 상태 변경시 notify 메서드를 통해 Observerupdate 메서드를 호출하여, 원하는 기능들을 처리하는 형태로 구현됩니다.

기본적인 구조를 간단히 구현한 예시를 살펴보면

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Subject<TMessage> {
  constructor(private readonly observers: TMessage[] = []) {}

  attach(observer: Observer<TMessage>) {
    this.observers.push(observer);
  }

  notify(data: TMessage) {
    this.observers.forEach(observer => observer.update(data));
  }
}

class Observer<TMessage> {
  constructor(private readonly id: string) {}

  update(data: TMessage) {
    console.log(`ID: ${this.id} : ${data}`);
  }
}

const subject = new Subject<string>();

const observerA = new Observer('A');
const observerB = new Observer('B');

subject.attach(observerA);
subject.attach(observerB);

subject.notify('hi');

// ID: A : hi
// ID: B : hi

위와 같은 형태로 구현될 수 있고, 함수를 반환할 수 있는 JS/TS의 특성을 활용하면 아래와 같이 표현될 수 있습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Observer = (data: TMessage) => void; 

class Subject {
  constructor(private readonly observers: Observer[] = []) {}
  
  attach(observer: Observer) {
    this.observers.push(observer);
  }

  notify(data: any) {
    this.observers.forEach(observer => observer.update(data));
  }
}

function update(id: string, data: string) {
  console.log(`ID: ${this.id} : ${data}`);
}

const subject = new Subject<string>();

subject.attach(data => update('A', data));
subject.attach(data => update('B', data));

subject.notify('hi');

// ID: A : hi
// ID: B : hi

옵저버 패턴을 적용하면 아래와 같은 장점을 취할 수 있습니다.

  • 관찰자를 언제든 추가할 수 있음
    • Observer 인터페이스를 구현하는 객체 목록에만 의존하므로, 언제든 새로운 옵저버를 추가할 수 있음
    • 실행 중에 하나의 옵저버를 다른 옵저버로 바꿔도 다른 옵저버에게 데이터를 보내는데 영향이 없음
  • 새로운 형식의 옵저버를 추가해도 주제를 변경할 필요가 없음
    • Observer 인터페이스만 구현하면 됨
  • 주제와 관찰자 모두 독립적으로 재사용할 수 있음
  • 주제나 옵저버의 내부 구현이 달라져도 서로에게 영향을 미치지 않음
    • Observer 인터페이스를 구현한다는 조건만 만족한다면!

옵저버 패턴도 주제가 관찰자라는 특정 인터페이스(또는, 특정 입력을 받아 처리하는 함수)를 구현한다는 사실만 알고 있을 뿐이므로, 충분히 느슨한 결합을 만든다고 볼 수 있지만, 여전히 특정 주제의 구현체를 알아야 한다는 사실(Subject 구현체가 존재해야함, 어떻게든 넘겨줘야함)은 남아있게 됩니다.

Topic, Channel

이렇게 뭔가 찝찝한(?) 결과를 구현하게 된 이유는 구독-발행 패턴에서 발행자옵저버 패턴에서의 특정 주제(Subject)가 와 같다고 생각했기 때문이었습니다.

특히 제가 오해한 부분은 바로 Topic 혹은 Channel이라는 개념이었어요

발행/구독 패턴

위 그림처럼 메시지 브로커각 채널에 발행된 메시지를 구독자에게 전달하는 역할을 수행하게됩니다.

저는 이러한 구조를 메시지 브로커발행자들과 각각의 구독자들을 관리하는 형태로 오해했던 것 이었어요

물론 아주 틀렸다고 할 수는 없지만… 조금 더 정확히 표현하면 메시지 브로커옵저버 패턴의 Subject 구현체가 아닌 특정 관심사 자체를 관리하는 것이고, 특정 관심사에 대해 발행된 메시지를 메시지 브로커가 구독자에게 넘겨주는 그룹화된 흐름을 자체Channel 혹은 Topic이라 부른다고 볼 수 있습니다.

비유를 해보자면 TV를 볼 때 특정 번호를 통해 특정 방송국에서 방송이 송출되는 주파수 대역에 맞추고 해당 방송국에서 보내는 영상을 시청하는 것을 채널을 맞춘다라고 표현하는 것 처럼요

공중파 방송으로 제한했을 때, 방송국은 특정 대상에 대해서만 방송을 송출하는 것이 아닌 모든 대상에 대해서 신호를 송출하고, 연결만 되어있다면 채널을 맞춰 시청할 수 있는 것과 같은 맥락이라고 볼 수 있습니다.

  • 관심사에 조금 더 집중해서 표현한 단어가 Topic, 메시지가 전달되는 과정에 집중해서 표현한 단어가 Channel이 아닐까 생각해봅니다.

개선

이러한 내용들을 반영하여 아래와 같은 인터페이스 구조로 변경하게되었습니다.

classDiagram
    class Broker {
        << interface >>
        subscribe(channel: string, handler: MessageHandler)
        publish(channel: string, message: TMessage)
    }

변경된 부분은 아래와 같습니다.

  • 각 함수의 인자 publisherIdchannel로 변경
  • addSubscriber, removeSubscriber 메서드 제거

결과적으로 아래와 같은 결과물이 나왔습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class MessageBroker<TMessage> implements Broker<TMessage> {
  constructor(
    private readonly channels: Map<string, Subscription<TMessage>[]> = new Map(),
  ) {}

  public async subscribe(channel: string, handler: MessageHandler<TMessage>) {
    const subscriptions = this.getSubscriptions(channel)
    const subscriptionId = randomUUID();

    this.channels.set(channel, [...subscriptions, {id: subscriptionId, handler}]);

    return () => this.unsubscribe(channel, subscriptionId);
  }

  public async publish(channel: string, message: TMessage) {
    const subscriptions = this.getSubscriptions(channel);
    await Promise.all(subscriptions.map((subscriber) => subscriber.handler(message)));
  }

  private getSubscriptions(channel: string) {
    if (!this.channels.has(channel)) {
      this.channels.set(channel, []);
    }

    return this.channels.get(channel)!;
  }

  private async unsubscribe(channel: string, subscriptionId: string) {
    const subscriptions = this.getSubscriptions(channel);
    this.channels.set(channel, subscriptions.filter((subscription) => subscription.id !== subscriptionId));
  }
}

addPublisher, removePublisher가 제거되며 getSubscriptions 메서드가 추가되었습니다.

보시는 것과 같이 채널 컬렉션에 해당 채널이 없을 경우 빈 채널을 초기화 후 반환하도록 했습니다.

더 고민해봐야할 내용

구독자가 특정 발행자를 발행자가 아닌 관심사를 구독하게 되었는데요, 이로 인해 주의해야하거나 생각해봐야 할 내용들이 더 생기게 되었습니다.

  1. 발행자가 없더라도 구독 상태가 유지될 수 있음
  2. channel을 잘못 입력한 경우 다른 관심사를 구독하게 됨(오류로 처리되지 않음)
  3. channel에 구독자가 없더라도 빈 배열이 유지됨

크게 세 가지 정도가 생각나는군요

1, 2번의 경우는 모듈을 사용할 때 주의해야할 점으로 보이고, 3번 같은 경우는 별도의 메모리 관리가 필요해 보입니다.

마무리

이전 글에 이어서 발행-구독 패턴에 대한 잘못된 오해를 바로 잡아, 제가 직접 구현한 pub/sub 패키지를 개선해보는 시간을 가졌습니다.

다음 주제는 구독자가 없는 채널에 대한 관리를 어떻게 해야할 지 고민해보는 과정이 될 것 같네요

끝까지 읽어주셔서 감사합니다😁