SSE 방식으로 실시간 알림을 구현하는 이유는 지난 포스팅에서 확인하실 수 있습니다.
우선 SSE를 연결하기위해서 컨트롤러부터 구현했습니다.
처음에는 Alarm이라는 도메인명을 사용하려다가 알람! 보다는 알림~ 이 좋을 거 같아서 Notification으로 변경했습니다.
NotificationController - SSE Subscribe 응답하기
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
@Tag(name = "SSE")
@ApiResponses(value = {
@ApiResponse(responseCode = "2000", description = "SSE 연결 성공"),
@ApiResponse(responseCode = "5000", description = "SSE 연결 실패")
})
@Operation(summary = "SSE 연결")
@GetMapping(value="/api/subscribe", produces = "text/event-stream")
public SseEmitter subscribe(@AuthenticationPrincipal CustomUserDetails userDetails, @RequestHeader(value="Last-Event-ID", required = false, defaultValue = "") String lastEventId ){
return notificationService.subscribe(userDetails.getMember().getId(), lastEventId);
}
- @GetMapping 어노테이션에 아래와 같이 입력해줘야 합니다.
produces = "text/event-stream"
- SSE 통신을 위한 "text/event-stream"이 표준 MediaType입니다.
- MemberId 값과 "Last-Event-ID"를 받아옵니다.
- Last-Event-ID는 SSE 연결이 끊어졌을 경우, 클라이언트가 수신한 마지막 데이터의 id 값을 의미합니다.
- 항상 있는 것이 아니기 때문에 required = false 로 설정했습니다.
NotificationService - SSE 연결
@Service
@RequiredArgsConstructor
public class NotificationService {
private final EmitterRepository emitterRepository = new EmitterRepositoryImpl();
private final NotificationRepository notificationRepository;
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
public SseEmitter subscribe(Long memberId, String lastEventId) {
String emitterId = memberId + "_" + System.currentTimeMillis();
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
sendToClient(emitter, emitterId, "EventStream Created. [memberId=" + memberId + "]");
if (!lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}
return emitter;
}
- DEFAULT_TIMEOUT을 1시간으로 설정해줬습니다. SSE 연결은 1시간동안 지속됩니다.
- SseEmitter 클래스는 SpringFramework에서 버전 4.2 부터 제공합니다.
- emitter는 발신기라는 뜻을 가지고 있습니다.
- String emitterId = memberId_System.currentImeMillis(); 로 한 이유는 Last-Event-ID와 관련이 있습니다.
- Last-Event-ID는 클라이언트가 마지막으로 수신한 데이터의 Id값을 의미합니다. 그러나 Id 값만을 사용한다면 언제 데이터가 보내졌는지, 유실 되었는지 알 수가 없기 때문에 시간을 emitterId에 붙여두면 데이터가 유실된 시점을 알 수 있으므로 저장된 Key값 비교를 통해 유실된 데이터만 재전송 할 수 있습니다.
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
- SSE 연결을 위해서 SseEmitter 객체를 만들어 반환해야합니다.
- 유효시간 DEFAULT_TIMEOUT을 넣어줍니다.
- 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보냅니다.
- emitterId도 함께 저장해줍니다.
- 시간이 초과하거나 비동기요청이 정상동작이 안되면 저장한 SseEmitter를 삭제합니다.
sendToClient(emitter, emitterId, "EventStream Created. [memberId=" + memberId + "]");
- Sse 연결이 이뤄진 후, 데이터가 하나도 전송되지 않았는데 SseEmitter의 유효시간이 끝나면 503 에러가 발생한다고 합니다. 따라서, 최초 연결 시 더미 데이터를 보내줍니다.
if (!lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}
- lastEventId값이 있는 경우, 저장된 데이터 캐시에서 유실된 데이터들을 다시 전송합니다.
Notification - 객체 생성
@Getter
@Entity
@NoArgsConstructor
public class Notification extends Timestamped {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Embedded
private NotificationContent content;
@Embedded
private RelatedUrl url;
@Column(nullable = false)
private Boolean isRead;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private NotificationType notificationType;
@OnDelete(action = OnDeleteAction.CASCADE)
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "MEMBER_ID")
private Member receiver;
@Builder
public Notification(Member receiver, NotificationType notificationType, String content, String url) {
this.receiver = receiver;
this.notificationType = notificationType;
this.content = new NotificationContent(content);
this.url = new RelatedUrl(url);
this.isRead = false;
}
public String getContent() {
return content.getContent();
}
public String getUrl() {
return url.getUrl();
}
public void read(){
isRead = true;
}
}
- Notification 객체를 생성해줍니다.
- 필요한 값들 (receiver, notificationType, content, url, isRead) 를 넣어줍니다.
- @Embedded 는 객체를 엔티티로 사용하는게 아니라 값 타입으로 사용하기 위해 붙여줍니다. 좀 더 객체지향적이고 각 상황에 따라 수정이 용이합니다.
- isRead 는 조회 여부를 알기 위해 Boolean 타입으로 넣었습니다.
- notificationType은 서비스도메인 별로 enum으로 구분해서 작성했습니다. 저희 프로젝트에서 보내야 하는 알림 종류는 5가지입니다. (콜라보 요청, 콜라보 승인, 게시글 댓글, 게시글 좋아요, 댓글 좋아요)
- @OnDelete 에 대해서는 여기 블로그를 참조해주세요. receiver를 삭제하면 연관관계도 함께 삭제됩니다.
NotificationContent
@Getter
@Embeddable
@NoArgsConstructor
public class NotificationContent {
@Column(nullable = false)
private String content;
public NotificationContent(String content){
this.content = content;
}
}
RelatedUrl
@Getter
@Embeddable
@NoArgsConstructor
public class RelatedUrl {
@Column(nullable = false)
private String url;
public RelatedUrl(String url){
this.url = url;
}
}
Entity를 만들었으니 Repository도 만들어줍니다.
NotificationRepository
public interface NotificationRepository extends JpaRepository<Notification, Long> {
List<Notification> findAllByReceiver(Member member);
}
- JpaRepository를 상속합니다.
- findAllByReceiver는 알림 전체 목록을 조회할 때 사용한 메서드 입니다.
EmitterRepository
public interface EmitterRepository {
SseEmitter save(String emitterId, SseEmitter sseEmitter);
void saveEventCache(String emitterId, Object event);
Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);
Map<String,Object> findAllEventCacheStartWithByMemberId(String memberId);
void deleteById(String emitterId);
}
- findAllEmitterStartWithByMemberId 는 해당 member와 관련된 모든 emitter를 찾습니다.
- findAllEventCacheStartWithByMemberId 는 해당 member와 관련된 모든 event를 찾습니다.
EmitterRepositoryImpl
위의 EmitterRepository를 아래와 같이 구현합니다.
public class EmitterRepositoryImpl implements EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId,sseEmitter);
return sseEmitter;
}
@Override
public void saveEventCache(String emitterId, Object event) {
eventCache.put(emitterId,event);
}
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteById(String emitterId) {
emitters.remove(emitterId);
}
@Override
public void deleteAllEmitterStartWithId(String memberId) {
emitters.forEach(
(key,emitter) -> {
if(key.startsWith(memberId)){
emitters.remove(key);
}
}
);
}
@Override
public void deleteAllEventCacheStartWithId(String memberId) {
eventCache.forEach(
(key,emitter) -> {
if(key.startsWith(memberId)){
eventCache.remove(key);
}
}
);
}
}
- 동시성을 고려하여 ConcurrentHashmap을 사용합니다.
- HashMap은 멀티스레드 환경에서 동시에 수정을 시도하는 경우 예상하지 못한 결과가 발생할 수 있습니다. 멀티스레드 환경하에서 HashMap을 안전하게 사용하기위해 java에서는 concurrent 패키지를 제공합니다. ConcurrentHashmap을 사용하면 thread-safe가 보장됩니다. ConcurrentHashmap 관련 설명은 여기 블로그를 참조하세요.
- 그냥 사용한다면 ConcurrentModificationException이 발생할 수 있습니다.
Dto와 MapStruct
추가적으로 저희 프로젝트 컨벤션이기 때문에 MapStruct도 작성해주었습니다.
@Mapper
public interface SseMapStruct {
SseMapStruct SSE_MAPPER = Mappers.getMapper(SseMapStruct.class);
ResponseNotificationDto NotificationtoResponseNotificationDto(Notification notification);
}
ResponseDto도 작성해주었습니다.
@Schema(description = "알림 Dto")
@Getter
@Setter
public class ResponseNotificationDto {
private Long id;
private String content;
private String url;
private Boolean isRead;
private String createdAt;
@Builder
public ResponseNotificationDto(Notification notification) {
this.id = notification.getId();
this.content = notification.getContent();
this.url = notification.getUrl();
this.isRead = notification.getIsRead();
this.createdAt = LocalDateTimeConverter.timeToString(notification.getCreatedAt());
}
}
- 프로젝트에서 사용하는 LocalDateTimeConverter를 사용해서 알림 발송시간을 한글로 바꿔줍니다😎
이제 클라이언트에 알림을 보낼 준비가 끝났습니다.
NotificationService - 클라이언트에 데이터 전송하기
public void send(Member receiver, NotificationType notificationType, String content, String url) {
Notification notification = notificationRepository.save(createNotification(receiver, notificationType, content, url));
String memberId = String.valueOf(receiver.getId());
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByMemberId(memberId);
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendToClient(emitter, key, SSE_MAPPER.NotificationtoResponseNotificationDto(notification));
}
);
}
private void sendToClient(SseEmitter emitter, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(emitterId)
.data(data));
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
throw new InvalidRequestException(SSE, SERVICE, UNHANDLED_SERVER_ERROR);
}
}
Notification을 전송하기 위한 메서드 입니다.
public void send(Member receiver, NotificationType notificationType, String content, String url) {
Notification notification = notificationRepository.save(createNotification(receiver, notificationType, content, url));
String memberId = String.valueOf(receiver.getId());
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByMemberId(memberId);
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendToClient(emitter, key, SSE_MAPPER.NotificationtoResponseNotificationDto(notification));
}
);
}
- Notification 객체를 만들고 해당 Member의 emitter를 다 불러옵니다.(여러 브라우저에서 접속할 수 있기 때문에 emitter가 여러개 일 수 있습니다.)
- 해당 데이터를 EventCache에 저장합니다.
- sendToClient 메서드를 통해 클라이언트에 전송합니다.
- MapStruct를 사용해서 Dto로 변환한 값을 보내줬습니다.
private void sendToClient(SseEmitter emitter, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(emitterId)
.data(data));
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
throw new InvalidRequestException(SSE, SERVICE, UNHANDLED_SERVER_ERROR);
}
}
- emitter, emitterId와 함께 알림 내용을 클라이언트에 보냅니다.
- 전송이 안된 경우 IOException을 터트려줍니다. IOException은 스트림, 파일 및 디렉터리를 사용해 정보에 엑세스하는 동안 throw된 예외에 대한 기본 클래스입니다.
알림 만들기 🛎
이제 위에서 만든 알림 메서드를 호출해서 콜라보요청이 승인 되면 콜라보 요청 작성자에게 알림을 보내는 기능을 추가해보겠습니다.
@Transactional
public void approveCollaboRequest(Long collaborequestid, Member member) {
/.../
//요청한 사람한테 승인 완료 알림 - 게시글 상세 조회로 이동
Long postId = post.getId();
Member collaboMember = memberRepository.findByNickname(collaboRequest.getNickname())
.orElseThrow(() -> new NotFoundException(COLLABO_REQUEST, SERVICE, MEMBER_NOT_FOUND));
String url = "/api/post/"+postId;
String content = post.getTitle()+"에 대한 콜라보 요청이 승인되었습니다.";
notificationService.send(collaboMember, NotificationType.COLLABO_APPROVED, content, url);
}
- 콜라보 요청을 한 member가 receiver이고, 해당 Post의 상세조회 페이지의 Url을 넣었습니다. NotificationType으로는 미리 enum으로 저장해 놓은 COLLABO_APPROVED(콜라보 승인)을 넣어줬습니다.
알림 결과는 아래와 같이 Postman 으로 확인할 수 있습니다.
클라이언트 사이드는 구현은 아직 안해서 알림 전체 조회하는 api를 만들어서 조회했습니다.
참조 문서
여러 블로그를 참조해서 작성했습니다. 주요 참고한 블로그는 아래와 같습니다.
Node긴 하지만 개념 정리가 잘 되있어서 아래 블로그도 공유합니다.
'TIL' 카테고리의 다른 글
TIL 스프링 좋아요 복합키로 구현 230113 (0) | 2023.01.16 |
---|---|
TIL 복합키 composite key 230112 (0) | 2023.01.12 |
TIL 알람 기능 구현 SSE(Server-Sent-Events) 230110 (0) | 2023.01.10 |
TIL JPA List 수정하기 230109 (0) | 2023.01.09 |
TIL CRUD 마스터의 길 230108 (0) | 2023.01.09 |