안녕하세요😀 최근 부스트캠프 멤버십 과정을 마치고, 이전해 수행했던 프로젝트를 리팩토링 하는 시간을 가지고 있는데요, 리팩토링을 수행하면서 발행-구독 패턴을 통해 프로젝트를 개선한 경험을 공유하려고 합니다.
프로젝트에서 어떠한 문제가 있었는지 알아보고, 구현한 결과물을 확인해보겠습니다.
문제의 시작
저희 팀은 여러 인원들이 동시에 참여하는 실시간 퀴즈 플랫폼 BooQuiz를 Nest를 이용해 구현했는데요.
서버에서 퀴즈의 진행을 전적으로 담당하는 설계로 사용자의 정답 제출, 채팅 등의 지속적인 상호작용이 필요했기 때문에 웹소켓을 활용했습니다.
Nest에는 웹소켓을 ws
을 활용하는 WsAdapter
, socket.io
를 활용하는 IoAdapter
두 가지 방식으로 제공하는데, 저희는 많은 사용자가 동시에 여러 퀴즈에 참여하는 상황에서 서버측의 지연은 모든 퀴즈 진행에 영향을 주는 구조여서 빠른 성능을 제공하는 ws
를 활용했습니다.
ws
는 최소한의 헤더만을 사용하고, 별도의 추상화 레이어가 없기 때문에 빠른 성능을 보여주지만, 클라이언트에게 메시지를 보내는 것 외의 기능은 제공하지 않아 모든 기능들을 직접 구현해야했어요🥲
특히 서버는 퀴즈 진행을 위해 퀴즈존이라는 각각의 상태 변화에 따라 해당 퀴즈존에 참여하고 있는 사용자에 메시지를 보내주는 Broadcast 기능이 필요합니다. socket.io
같은 경우 room
이라는 개념을 이용해서 제공하지만, ws
는 제공하지 않았기 때문에 역시 직접 구현해야했어요
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| @WebSocketGateway({ path: '/play' })
export class PlayGateway implements OnGatewayInit {
constructor(
@Inject('ClientInfoStorage')
private readonly clients: Map<String, WebSocketWithSession>,
) {}
private broadcast(clientIds: string[], event: string, data?: any) {
clientIds.forEach((clientId) => {
this.sendToClient(clientId, event, data);
});
}
private sendToClient(clientId: string, event: string, data?: any) {
const { socket } = this.getClientInfo(clientId);
socket.send(JSON.stringify({ event, data }));
}
}
|
WebSocketWithSession
은 단순히 websocket 인스턴스에 사용자 세션을 바인딩한 것 입니다.
Map
을 이용하여 클라이언트의 고유한 ID를 key로 Websocket 인스턴스를 참조할 수 있게 저장하고, 클라이언트의 ID들을 받아 순회하며 메시지를 보내는 구조입니다.
이러한 구현을 기반으로 socket.io
의 room
개념을 대체할 두 가지 방식을 고려했습니다.
참여중인 사용자들을 별도로 관리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| @WebSocketGateway({ path: '/play' })
export class PlayGateway implements OnGatewayInit {
constructor(
@Inject('ClientInfoStorage')
private readonly clients: Map<String, ClientInfo>,
@Inject('PlayInfoStorage')
private readonly players: Map<string, string[]>,
/* 혹은
private readonly players: Map<string, {id: string, socket: WebSocketWithSession}[]>,
private readonly players: Map<string, Map<string, WebSocketWithSession>>,
*/
) {}
/* 생략 */
@SubscribeMessage('join')
async join(
@ConnectedSocket() client: WebSocketWithSession,
@MessageBody() quizJoinDto: QuizJoinDto,
) {
const { id, quizZoneId } = client.session;
const player = await this.playService.joinQuizZone(quizZoneId, id);
/* 생략 */
this.clients.set(id, { quizZoneId, socket: client });
this.broadcast(players.get(quizZoneId), 'someone_join', player);
/* 생략 */
}
}
|
첫 번째로 참여중인 사용자들을 Gateway
에서 추가적으로 관리하는 방법입니다.
각 퀴즈존에 참여중인 사용자들의 id를 통해 각 연결을 참조하거나, 직접 참조해서 broadcast 처리를 수행하도록 하는 방법으로 아래와 같은 특징이 있습니다.
- 장점
- Gateway에서 외부 도움 없이 응답을 보낼 수 있음
- 이로인해 웹소켓 응답에만 집중할 수 있음
- 단점
- 퀴즈존에 이미 존재하는 상태를 중복해서 관리해야함
- 이로인해 참여, 퇴장 등이 발생했을 때 동기화가 필요함
- 용량이 추가적으로 필요함
요약하면 웹소켓 처리에만 집중할수 있지만, 참여자 정보는 다른 서비스에서 API 응답에 활용하고 있기 때문에 별도 관리가 필요하고, 동일한 상태를 별도로 관리하기 때문에 동기화 등의 처리에 신경써야합니다.
응답이 필요할 때마다 사용자들의 정보를 조회
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| @WebSocketGateway({ path: '/play' })
export class PlayGateway implements OnGatewayInit {
constructor(
@Inject('ClientInfoStorage')
private readonly clients: Map<String, ClientInfo>,
private readonly playService: PlayService,
) {}
/* 생략 */
@SubscribeMessage('join')
async join(
@ConnectedSocket() client: WebSocketWithSession,
@MessageBody() quizJoinDto: QuizJoinDto,
): Promise<SendEventMessage<ResponsePlayerDto[]>> {
const { id, quizZoneId } = client.session;
const { currentPlayer, playerIds } = await this.playService.joinQuizZone(quizZoneId, id);
/* 혹은
const currentPlayer = await this.playService.joinQuizZone(quizZoneId, sessionId);
const playerIds = await this.playService.getQuizZonePlayerIds(quizZoneId);
*/
/* 생략 */
this.clients.set(id, { quizZoneId, socket: client });
this.broadcast(playerIds, 'someone_join', currentPlayer);
/* 생략 */
}
}
|
두 번째로 필요할때마다 참여중인 사용자의 정보를 조회하여 넘겨주는 방법입니다.
예시에서 주입받은 PlayService
는 별도로 분리되어있는 QuizZoneService
를 통해 퀴즈존에 대한 처리를 수행하는데요
이를 통해서 필요한 처리마다 참여중인 사용자를 조회할 수 있습니다.
- 장점
- 별도의 참여자 정보들을 관리하지 않기 때문에 동기화가 필요없음
- 단점
PlayGateway
와 PlayService
의 종단 관심사 결합PlayService
와 QuizZoneService
의 횡단 관심사 결합 강화QuizZoneService
의 부하가 많아짐- 그래도 웹소켓 연결에 대한 정보들은 관리해야함
요약하자면, broadcast가 필요할 때마다 퀴즈존에서 관리하는 참여자 정보를 조회하기 때문에 관심사의 결합이 발생할 수 있지만, 상태가 이원화되지 않아 관리 지점이 줄어듭니다.
두 선택지 모두 단점이 많아 고민이 많았는데요. 결과적으로 저희는 두 번째 방식을 통해 처리하기로 했습니다.
퀴즈 진행에 따라 사용자의 요청 처리 때문에 PlayService
와 QuizZoneService
의 의존은 필연적으로 발생하는 구조였고, QuizZoneService
의 부하는 캐싱 등의 처리로 개선할 수 있다고 판단했습니다.
그리고 서비스를 분리를 고려했을 때 현재 아키텍처에서 이원화된 상태 동기화 문제가 더 해결하기 어려운 문제라고 생각했어요
발견한 문제
처음에는 괜찮았지만 시간이 지남에 따라 여러 문제들이 부각되기 시작했어요😂
제 판단하기에는 크게 세 가지 정도 문제가 있었습니다.
Gateway에서 모든 웹소켓 연결에 대해 알아야함
이전에 확인했던 것 처럼 Gateway에서 메시지를 보낼 때 클라이언트의 연결을 직접 활용하고 있기 때문에, broadcast를 위해 클라이언트의 연결 정보를 유지해야합니다.
이는 추가적인 저장 공간을 필요로 할 뿐만 아니라, 서비스에서 활용되는 사용자의 식별자와 웹소켓 인스턴스를 연결해야 하기 때문에, 서비스 상태와 결합이 생겼다고 볼 수 있다고 생각했어요
Service 레이어의 많은 책임
Service 에서 참여중인 사용자의 정보를 받아오는 구조이기 때문에 서비스에서 관련 처리들이 추가되어야하고, 이는 많은 책임과 더불어 코드가 비대해지는 문제가 있었습니다.
Gateway와 Service 레이어의 모호한 경계
위와 같은 이유로 응답을 위한 데이터 가공하는 처리가 Gateway로 조금씩 넘어오는 상황이 발생했습니다.
이로 인해 일부 비즈니스 로직들도 Gateway로 넘어오게 되면서, 사용자에게 응답을 담당해야하는 Gateway의 책임이 오염되고 있었어요
근본적인 원인은?
저는 이러한 문제가 발생한 근본적인 원인이 웹소켓 연결을 통한 응답 처리에서 퀴즈존에 참여중인 사용자의 정보를 알아야하는 것 자체 라고 판단했습니다.
이러한 이유로 Gateway에서 웹소켓 연결을 통해 처리 결과를 응답할 때 참여자의 정보를 아예 격리시켜야할 필요성을 느끼게 되었어요
사용자의 정보를 어떻게 격리할까?
서비스는 기본적으로 N개의 퀴즈존 각각에 M명의 사용자가 참여하는 구조입니다. 그리고 서버가 사용자들의 메시지를 받아 퀴즈존의 상태를 변경하고, 진행 상태에 따라 참여중인 사용자들에게 응답 메시지를 보내게됩니다.
정리하면, 다수의 퀴즈존에 대한 진행을 위해 사용자는 서버에게만 요청을 보내고 서버는 참여중인 사용자에게 broadcast 하는 구조이죠
이러한 상황에서 서버에서 관리하는 여러 퀴즈존과 참여중인 사용자가 서로 누구인지 몰라도 되게 만드는 대표적인 방법으로 발행-구독 패턴을 고려할 수 있었습니다.
발행-구독 패턴

발행-구독(Publish-Subscribe) 패턴(이하 pub/sub)은 발행자(Publisher)가 메시지를 직접 특정 구독자(Subscriber)에게 보내지 않고, 중간의 메시지 브로커(이벤트 버스)를 통해 특정 주제(Topic)나 채널에 메시지를 발행하면, 해당 주제를 구독하는 모든 구독자들이 자동으로 메시지를 받는 디자인 패턴입니다.
구독자는 주제만 알아도 보내는 모든 메시지를 받을 수 있기 때문에 컴포넌트 간 느슨한 결합(Loose Coupling)을 가능하게 하여 시스템의 확장성과 유연성이 향상되고, 이는 서비스의 안정적인 확장과 유지보수가 용이해지는 결과로 이어집니다.
결과적으로 근본적인 원인인 웹소켓 연결을 통한 응답 처리에서 퀴즈존에 참여중인 사용자의 정보를 알아야하는 문제를 개선할 수 있다고 판단하여 적용하게 되었습니다.
인터페이스와 타입 설계
1
2
3
4
5
6
7
8
9
| export type MessageHandler<TMessage> = (message: TMessage) => void | Promise<void>;
export type Unsubscribe = () => Promise<void>;
export interface Broker<TMessage> {
subscribe(publisherId: string, handler: MessageHandler<TMessage>): Promise<Unsubscribe>;
addPublisher(publisherId: string): Promise<void>;
removePublisher(publisherId: string): Promise<void>;
publish(publisherId: string, message: TMessage): Promise<void>;
}
|
처음 제가 생각했을때 필요하다고 생각되는 기능을 바탕으로 인터페이스와 타입을 설계 했습니다.
그리고 공통적으로 제네릭을 활용해 메시지 타입을 활용할 수 있도록 해봤습니다.
MessageHandler
타입: 구독자가 발생자가 보낼 메시지를 처리하는 함수를 정의Unsubscibe
타입: 구독 취소를 처리하는 함수 정의Broker
인터페이스: 메시지 브로커의 필수적인 기능을 정의addPublisher
: 새 퍼블리셔 추가removePublisher
: 퍼블리셔 제거publish
: 메시지 발행subscribe
: 구독자 등록 및 구독 해제 함수 반환
기본적으로 아래와 같은 흐름으로 처리가 진행됩니다.
- 발행자가 있다고 가정할 때
subscribe
를 통해 구독할 때 제네릭으로 선언한 형식의 메시지를 처리할 수 있는 핸들러 함수를 입력으로 받음 publish
메서드를 통해 메시지를 발행. 내부적으로 저장된 핸들러를 통해 클라이언트가 원했던 처리를 수행
최초 설계에서 조금 차이가 있는 부분은 subscribe
메소드가 Unsubsribe
함수를 반환한다는 점 인데요
사용하는 영역에서 Broker
를 직접 활용하여 구독 해제를 처리하는 것 보다, Clean-Up 함수를 별도로 제공하여 처리하는 것이 이후 자유도가 더 높은 방식이라 판단되어 선택하게 되었습니다.
구현
위 인터페이스를 기반으로 간단한 MessageBroker
를 구현하였습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| import { Broker } from './interfaces/broker.interface';
import { MessageHandler } from './types';
import { v4 as uuidv4 } from 'uuid';
export interface Subscription<TMessage> {
readonly id: string;
readonly handler: MessageHandler<TMessage>;
}
export class MessageBroker<TMessage> implements Broker<TMessage> {
constructor(
private readonly publishers: Map<string, Subscription<TMessage>[]> = new Map(),
) {}
public async addPublisher(id: string) {
if (this.publishers.has(id)) {
throw new Error(`Publisher with ID ${id} already exists`);
}
this.publishers.set(id, []);
}
public async removePublisher(id: string) {
if (!this.publishers.has(id)) {
throw new Error(`Publisher with ID ${id} does not exist`);
}
this.publishers.delete(id);
}
public async publish(id: string, message: TMessage) {
const subscriptions = this.publishers.get(id);
if (subscriptions === undefined) {
throw new Error(`Publisher with ID ${id} does not exist`);
}
await Promise.all(subscriptions.map((subscription) => subscription.handler(message)));
}
public async subscribe(publisherId: string, handler: MessageHandler<TMessage>) {
const subscriptions = this.publishers.get(publisherId);
if (subscriptions === undefined) {
throw new Error(`Publisher with ID ${publisherId} does not exist`);
}
const subscribtionId = uuidv4();
this.publishers.set(publisherId, [...subscriptions, {id: subscriptionId, handler}]);
return () => this.unsubscribe(publisherId, subscriptionId);
}
private async unsubscribe(publisherId: string, subscriptionId: string) {
const subscriptions = this.publishers.get(publisherId);
if (subscriptions === undefined) {
throw new Error(`Publisher with ID ${publisherId} does not exist`);
}
this.publishers.set(publisherId, subscriptions.filter((subscription) => subscription.id !== subscriptionId));
}
}
|
주요 요소들을 자세히 살펴보겠습니다.
Subscription 인터페이스
1
2
3
4
| export interface Subscription<TMessage> {
readonly id: string;
readonly handler: MessageHandler<TMessage>;
}
|
Subscription
는 구독을 의미하는 인터페이스입니다.
id
: 구독의 식별자handler
: 발행한 메시지를 처리하는 함수
구독 해제 등을 처리해야하기 때문에 구독자가 등록한 핸들러에 대해 식별할 수 있도록 id
를 포함하도록 했습니다.
subscribe 메서드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| export class MessageBroker<TMessage> implements Broker<TMessage> {
public async subscribe(publisherId: string, handler: MessageHandler<TMessage>) {
const subscriptions = this.publishers.get(publisherId);
if (subscriptions === undefined) {
throw new Error(`Publisher with ID ${publisherId} does not exist`);
}
const subscribtionId = uuidv4();
this.publishers.set(publisherId, [...subscriptions, {id: subscriptionId, handler}]);
return () => this.unsubscribe(publisherId, subscriptionId);
}
}
|
구독자가 발행 받은 메시지에 대해서 처리해야할 핸들러를 등록하는 과정입니다.
이 과정에서 Subscription
을 만들게 되는데, 구독 자체는 내부적으로 관리되어야 한다고 판단하여, 식별자는 UUID
를 생성하여 설정하도록 했습니다.
메서드가 반환하는 익명 함수는 구독의 ID를 통해 구독 해제를 수행하는 함수입니다.
publish 메서드
1
2
3
4
5
6
7
8
9
10
11
| export class MessageBroker<TMessage> implements Broker<TMessage> {
public async publish(id: string, message: TMessage) {
const subscriptions = this.publishers.get(id);
if (subscriptions === undefined) {
throw new Error(`Publisher with ID ${id} does not exist`);
}
await Promise.all(subscriptions.map((subscription) => subscription.handler(message)));
}
}
|
발행자가 특정 메시지를 발행합니다.
해당 발행자의 구독들을 통해 등록된 핸들러를 실행하게됩니다.
프로젝트 적용 예시
글 맨 앞에 문제의 원인에서 PlayGateway
예시를 간략히 보여드렸었는데요. 해당 예시가 Broker
를 적용하면서 어떻게 바뀌었는지 살펴보겠습니다.
현재 pub/sub 모듈을 구현하는게 목표여서 간략하고 빠르게 적용해봤어요
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| @WebSocketGateway({ path: '/play' })
export class PlayGateway implements OnGatewayInit {
constructor(
private readonly playService: PlayService,
@Inject('Broker')
private readonly broker: Broker<SendEventMessage<BroadcastPlayEvent, any>>,
) {}
@SubscribeMessage('join')
async join(
@ConnectedSocket() client: WebSocketWithSession,
@MessageBody() quizJoinDto: QuizJoinDto,
): Promise<SendEventMessage<PlayEvent, ResponsePlayerDto[]>> {
const {id, quizZondId} = client.session.id;
const {nickname} = await this.playService.getPlayer(quizZoneId, id);
await this.broker.publish(quizZoneId, {event: 'someone_join', sender: id, data: {id, nickname}});
await this.subscribePlay(quizZoneId, client);
/* 생략 */
}
}
|
퀴즈존에 참여하고 있는 사용자의 정보를 조회하고, 해당 정보를 바탕으로 기존 참여자들에게 새로운 참여자의 정보를 broadcast 해줍니다.
그리고 새로운 참여자가 해당 퀴즈존을 구독하게 됩니다. 많이 간단해진 모습이네요😊
PlayGateway
의 생성자를 살펴보면 발행하는 이벤트를 SendEventMessage<BroadcastPlayEvent, any>
로 정의했습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| export type PlayEvent = UnicastPlayEvent | BroadcastPlayEvent;
export type UnicastPlayEvent = 'join' | 'changeNickname' | 'submit' | 'leave';
export type BroadcastPlayEvent =
'someone_join' | 'someone_leave' | 'changeNickname' |
'start' | 'nextQuiz' |
'someone_submit' | 'quizTimeOut' |
'finish' | 'summary' | 'close';
export interface SendEventMessage<TEvent, TMessage> {
event: TEvent;
sender: string;
data: TMessage;
}
|
구독 처리시에 인자로 넘겨주는 메시지 핸들러는 위와 같은 형식의 데이터를 처리하는 함수면 됩니다.
이를 구현한 subscribePlay
메서드는 아래와 같이 구현되어있어요
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| private async subscribePlay(quizZoneId: string, client: WebSocketWithSession) {
const clientId = client.session.id;
try {
await this.broker.addPublisher(quizZoneId);
} catch (error) {}
const unsubscribe = await this.broker.subscribe(
quizZoneId,
async (message) => {
const { event, sender, data } = message;
switch (event) {
case "someone_join":
if (sender === clientId) break;
client.send(JSON.stringify(message));
break;
case "leave":
unsubscribe();
break;
/* 생략 */
}
}
);
}
|
실제로는 많은 이벤트들을 처리해야하기 때문에 훨씬 비대한 상태에요🥲 핸들러가 어떤 방식으로 구성되는지만 봐주시면 되겠습니다.
메시지에 선언된 이벤트에 맞게 웹소켓을 이용하여 응답을 보내게 됩니다.
현재 핸들러가 간단하고 지저분하게 구현되어 있긴 하지만, 관심사의 분리가 진행된 만큼 독립적으로 개선될 수 있을것이라 기대하고있습니다😁
마무리
pub/sub을 활용해서 서비스에서 발생할 결합을 느슨하게 만들어 봤는데요, 만든 모듈을 간단하게 적용해보니 Gateway에서 퀴즈존이나 사용자 정보를 전혀 모르는 상태로 응답을 보낼 수 있었습니다.
덕분에 이를 처리할 핸들러 로직이 독립적으로 분리되었고, 메시지 처리하는 핸들러에만 집중해서 개선할 수 있게 되었네요
이러한 관심사의 분리를 통해 의존되는 상태 때문에 시도하지 못했던 방법들을 적극적으로 고려할 수 있게 되었다고 생각합니다. 앞으로 시도해볼 것들이 많겠네요
pub/sub 모듈 자체도 아직 부족한 부분이 많아 이후 글들은 제가 구현한 모듈을 고도화하는 과정들을 담아보려고 합니다.
끝까지 읽어주셔서 감사합니다.😊