SSE 방식으로 실시간 알림을 구현하는 이유는 지난 포스팅에서 확인하실 수 있습니다.

 

TIL 알람 기능 구현 SSE(Server-Sent-Events) 230110

오늘부터 MVP 2차를 개발하기 시작했습니다. 알람 기능 구현이 제가 맡은 부분입니다. 프론트엔드가 2명밖에 없어서 알람까지 할 수 있을지는 모르겠지만 일단은 만들고 생각하기로 했습니다. 프

pizzathedeveloper.tistory.com

 

 

우선 SSE를 연결하기위해서 컨트롤러부터 구현했습니다.

처음에는 Alarm이라는 도메인명을 사용하려다가 알람! 보다는 알림~ 이 좋을 거 같아서 Notification으로 변경했습니다.

 

NotificationController - SSE Subscribe 응답하기

@RestController
@RequiredArgsConstructor
public class NotificationController {

    private final NotificationService notificationService;

    @Tag(name = "SSE")
    @ApiResponses(value = {
            @ApiResponse(responseCode = "2000", description = "SSE 연결 성공"),
            @ApiResponse(responseCode = "5000", description = "SSE 연결 실패")
    })
    @Operation(summary = "SSE 연결")
    @GetMapping(value="/api/subscribe", produces = "text/event-stream")
    public SseEmitter subscribe(@AuthenticationPrincipal CustomUserDetails userDetails, @RequestHeader(value="Last-Event-ID", required = false, defaultValue = "") String lastEventId ){

        return notificationService.subscribe(userDetails.getMember().getId(), lastEventId);
    }
  • @GetMapping 어노테이션에 아래와 같이 입력해줘야 합니다.
produces = "text/event-stream"
  • SSE 통신을 위한 "text/event-stream"이 표준 MediaType입니다.
  • MemberId 값과 "Last-Event-ID"를 받아옵니다.
  • Last-Event-ID는 SSE 연결이 끊어졌을 경우, 클라이언트가 수신한 마지막 데이터의 id 값을 의미합니다.
  • 항상 있는 것이 아니기 때문에 required = false 로 설정했습니다.

 

NotificationService - SSE 연결

@Service
@RequiredArgsConstructor
public class NotificationService {
    private final EmitterRepository emitterRepository = new EmitterRepositoryImpl();
    private final NotificationRepository notificationRepository;

    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;

    public SseEmitter subscribe(Long memberId, String lastEventId) {
        String emitterId = memberId + "_" + System.currentTimeMillis();
        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));

        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

        sendToClient(emitter, emitterId, "EventStream Created. [memberId=" + memberId + "]");

        if (!lastEventId.isEmpty()) {
            Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
            events.entrySet().stream()
                    .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                    .forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
        }

        return emitter;
    }
  • DEFAULT_TIMEOUT을 1시간으로 설정해줬습니다. SSE 연결은 1시간동안 지속됩니다.
  • SseEmitter 클래스는 SpringFramework에서 버전 4.2 부터 제공합니다.
  • emitter는 발신기라는 뜻을 가지고 있습니다. 
  • String emitterId = memberId_System.currentImeMillis(); 로 한 이유는 Last-Event-ID와 관련이 있습니다.
    • Last-Event-ID는 클라이언트가 마지막으로 수신한 데이터의 Id값을 의미합니다. 그러나 Id 값만을 사용한다면 언제 데이터가 보내졌는지, 유실 되었는지 알 수가 없기 때문에 시간을 emitterId에 붙여두면 데이터가 유실된 시점을 알 수 있으므로 저장된 Key값 비교를 통해 유실된 데이터만 재전송 할 수 있습니다.

 

SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));

emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
  • SSE 연결을 위해서 SseEmitter 객체를 만들어 반환해야합니다.
    • 유효시간 DEFAULT_TIMEOUT을 넣어줍니다.
    • 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보냅니다.
    • emitterId도 함께 저장해줍니다.
  • 시간이 초과하거나 비동기요청이 정상동작이 안되면 저장한 SseEmitter를 삭제합니다.

 

sendToClient(emitter, emitterId, "EventStream Created. [memberId=" + memberId + "]");
  • Sse 연결이 이뤄진 후, 데이터가 하나도 전송되지 않았는데 SseEmitter의 유효시간이 끝나면 503 에러가 발생한다고 합니다. 따라서, 최초 연결 시 더미 데이터를 보내줍니다.

 

if (!lastEventId.isEmpty()) {
    Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
    events.entrySet().stream()
            .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
            .forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}
  • lastEventId값이 있는 경우, 저장된 데이터 캐시에서 유실된 데이터들을 다시 전송합니다.

 

Notification - 객체 생성

@Getter
@Entity
@NoArgsConstructor
public class Notification extends Timestamped {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Embedded
    private NotificationContent content;

    @Embedded
    private RelatedUrl url;

    @Column(nullable = false)
    private Boolean isRead;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private NotificationType notificationType;

    @OnDelete(action = OnDeleteAction.CASCADE)
    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "MEMBER_ID")
    private Member receiver;

    @Builder
    public Notification(Member receiver, NotificationType notificationType, String content, String url) {
        this.receiver = receiver;
        this.notificationType = notificationType;
        this.content = new NotificationContent(content);
        this.url = new RelatedUrl(url);
        this.isRead = false;
    }

    public String getContent() {
        return content.getContent();
    }

    public String getUrl() {
        return url.getUrl();
    }

    public void read(){
        isRead = true;
    }
}
  • Notification 객체를 생성해줍니다.
  • 필요한 값들 (receiver, notificationType, content, url, isRead) 를 넣어줍니다.
    • @Embedded 는 객체를 엔티티로 사용하는게 아니라 값 타입으로 사용하기 위해 붙여줍니다. 좀 더 객체지향적이고 각 상황에 따라 수정이 용이합니다.
    • isRead 는 조회 여부를 알기 위해 Boolean 타입으로 넣었습니다.
    • notificationType은 서비스도메인 별로 enum으로 구분해서 작성했습니다. 저희 프로젝트에서 보내야 하는 알림 종류는 5가지입니다. (콜라보 요청, 콜라보 승인, 게시글 댓글, 게시글 좋아요, 댓글 좋아요)
    • @OnDelete 에 대해서는 여기 블로그를 참조해주세요. receiver를 삭제하면 연관관계도 함께 삭제됩니다.  

 

NotificationContent

@Getter
@Embeddable
@NoArgsConstructor
public class NotificationContent {
    @Column(nullable = false)
    private String content;

    public NotificationContent(String content){
        this.content = content;
    }
}

 

RelatedUrl

@Getter
@Embeddable
@NoArgsConstructor
public class RelatedUrl {
    @Column(nullable = false)
    private String url;

    public RelatedUrl(String url){
        this.url = url;
    }
}

 

 

Entity를 만들었으니 Repository도 만들어줍니다.

 

NotificationRepository

public interface NotificationRepository extends JpaRepository<Notification, Long> {
    List<Notification> findAllByReceiver(Member member);
}
  • JpaRepository를 상속합니다.
  • findAllByReceiver는 알림 전체 목록을 조회할 때 사용한 메서드 입니다.

 

EmitterRepository

public interface EmitterRepository {
    SseEmitter save(String emitterId, SseEmitter sseEmitter);
    void saveEventCache(String emitterId, Object event);
    
    Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);
    Map<String,Object> findAllEventCacheStartWithByMemberId(String memberId);
    
    void deleteById(String emitterId);
}
  • findAllEmitterStartWithByMemberId 는 해당 member와 관련된 모든 emitter를 찾습니다.
  • findAllEventCacheStartWithByMemberId 는 해당 member와 관련된 모든 event를 찾습니다.

 

EmitterRepositoryImpl

위의 EmitterRepository를 아래와 같이 구현합니다.

public class EmitterRepositoryImpl implements EmitterRepository {
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId,sseEmitter);
        return sseEmitter;
    }

    @Override
    public void saveEventCache(String emitterId, Object event) {
        eventCache.put(emitterId,event);
    }

    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
        return eventCache.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public void deleteById(String emitterId) {
        emitters.remove(emitterId);

    }

    @Override
    public void deleteAllEmitterStartWithId(String memberId) {
        emitters.forEach(
                (key,emitter) -> {
                    if(key.startsWith(memberId)){
                        emitters.remove(key);
                    }
                }
        );
    }

    @Override
    public void deleteAllEventCacheStartWithId(String memberId) {
        eventCache.forEach(
                (key,emitter) -> {
                    if(key.startsWith(memberId)){
                        eventCache.remove(key);
                    }
                }
        );

    }
}
  • 동시성을 고려하여 ConcurrentHashmap을 사용합니다.
    • HashMap은 멀티스레드 환경에서 동시에 수정을 시도하는 경우 예상하지 못한 결과가 발생할 수 있습니다. 멀티스레드 환경하에서 HashMap을 안전하게 사용하기위해 java에서는 concurrent 패키지를 제공합니다. ConcurrentHashmap을 사용하면 thread-safe가 보장됩니다. ConcurrentHashmap 관련 설명은 여기 블로그를 참조하세요.
    • 그냥 사용한다면 ConcurrentModificationException이 발생할 수 있습니다. 

 

Dto와 MapStruct

추가적으로 저희 프로젝트 컨벤션이기 때문에 MapStruct도 작성해주었습니다. 

@Mapper
public interface SseMapStruct {
    SseMapStruct SSE_MAPPER = Mappers.getMapper(SseMapStruct.class);

    ResponseNotificationDto NotificationtoResponseNotificationDto(Notification notification);

}

ResponseDto도 작성해주었습니다.

@Schema(description = "알림 Dto")
@Getter
@Setter
public class ResponseNotificationDto {
    private Long id;
    private String content;
    private String url;
    private Boolean isRead;
    private String createdAt;

    @Builder
    public ResponseNotificationDto(Notification notification) {
        this.id = notification.getId();
        this.content = notification.getContent();
        this.url = notification.getUrl();
        this.isRead = notification.getIsRead();
        this.createdAt = LocalDateTimeConverter.timeToString(notification.getCreatedAt());
    }

}
  • 프로젝트에서 사용하는 LocalDateTimeConverter를 사용해서 알림 발송시간을 한글로 바꿔줍니다😎

 

 

이제 클라이언트에 알림을 보낼 준비가 끝났습니다.

 

 

NotificationService - 클라이언트에 데이터 전송하기

public void send(Member receiver, NotificationType notificationType, String content, String url) {
    Notification notification = notificationRepository.save(createNotification(receiver, notificationType, content, url));
    String memberId = String.valueOf(receiver.getId());

    Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByMemberId(memberId);
    sseEmitters.forEach(
            (key, emitter) -> {
                emitterRepository.saveEventCache(key, notification);
                sendToClient(emitter, key, SSE_MAPPER.NotificationtoResponseNotificationDto(notification));
            }
    );
}

private void sendToClient(SseEmitter emitter, String emitterId, Object data) {
    try {
        emitter.send(SseEmitter.event()
                .id(emitterId)
                .data(data));
    } catch (IOException exception) {
        emitterRepository.deleteById(emitterId);
        throw new InvalidRequestException(SSE, SERVICE, UNHANDLED_SERVER_ERROR);
    }
}

 

Notification을 전송하기 위한 메서드 입니다.

public void send(Member receiver, NotificationType notificationType, String content, String url) {
    Notification notification = notificationRepository.save(createNotification(receiver, notificationType, content, url));
    String memberId = String.valueOf(receiver.getId());

    Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByMemberId(memberId);
    sseEmitters.forEach(
            (key, emitter) -> {
                emitterRepository.saveEventCache(key, notification);
                sendToClient(emitter, key, SSE_MAPPER.NotificationtoResponseNotificationDto(notification));
            }
    );
}
  • Notification 객체를 만들고 해당 Member의 emitter를 다 불러옵니다.(여러 브라우저에서 접속할 수 있기 때문에 emitter가 여러개 일 수 있습니다.)
  • 해당 데이터를 EventCache에 저장합니다.
  • sendToClient 메서드를 통해 클라이언트에 전송합니다.
    • MapStruct를 사용해서 Dto로 변환한 값을 보내줬습니다.

 

private void sendToClient(SseEmitter emitter, String emitterId, Object data) {
    try {
        emitter.send(SseEmitter.event()
                .id(emitterId)
                .data(data));
    } catch (IOException exception) {
        emitterRepository.deleteById(emitterId);
        throw new InvalidRequestException(SSE, SERVICE, UNHANDLED_SERVER_ERROR);
    }
}
  • emitter, emitterId와 함께 알림 내용을 클라이언트에 보냅니다.
  • 전송이 안된 경우 IOException을 터트려줍니다. IOException은 스트림, 파일 및 디렉터리를 사용해 정보에 엑세스하는 동안 throw된 예외에 대한 기본 클래스입니다. 

 

 

알림 만들기 🛎

이제 위에서 만든 알림 메서드를 호출해서 콜라보요청이 승인 되면 콜라보 요청 작성자에게 알림을 보내는 기능을 추가해보겠습니다.

@Transactional
public void approveCollaboRequest(Long collaborequestid, Member member) {
    /.../

    //요청한 사람한테 승인 완료 알림 - 게시글 상세 조회로 이동
    Long postId = post.getId();
    Member collaboMember = memberRepository.findByNickname(collaboRequest.getNickname())
            .orElseThrow(() -> new NotFoundException(COLLABO_REQUEST, SERVICE, MEMBER_NOT_FOUND));
    String url = "/api/post/"+postId;
    String content = post.getTitle()+"에 대한 콜라보 요청이 승인되었습니다.";
    notificationService.send(collaboMember, NotificationType.COLLABO_APPROVED, content, url);
}
  • 콜라보 요청을 한 member가 receiver이고, 해당 Post의 상세조회 페이지의 Url을 넣었습니다. NotificationType으로는 미리 enum으로 저장해 놓은 COLLABO_APPROVED(콜라보 승인)을 넣어줬습니다.

 

알림 결과는 아래와 같이 Postman 으로 확인할 수 있습니다.

클라이언트 사이드는 구현은 아직 안해서 알림 전체 조회하는 api를 만들어서 조회했습니다. 

 

 

 

 

참조 문서

여러 블로그를 참조해서 작성했습니다. 주요 참고한 블로그는 아래와 같습니다.

 

 

[Spring + SSE] Server-Sent Events를 이용한 실시간 알림

코드리뷰 매칭 플랫폼 개발 중 알림 기능이 필요했다. 리뷰어 입장에서는 새로운 리뷰 요청이 생겼을 때 모든 리뷰가 끝나고 리뷰이의 피드백이 도착했을 때 리뷰이 입장에서는 리뷰 요청이 거

velog.io

 

알림 기능을 구현해보자 - SSE(Server-Sent-Events)!

시작하기에 앞서 이번에 개발을 진행하면서 알림에 대한 요구사항을 만족시켜야하는 상황이 발생했다. 여기서 말하는 알림이 무엇인지 자세하게 살펴보자. A라는 사람이 스터디를 생성했고 B라

gilssang97.tistory.com

 

Node긴 하지만 개념 정리가 잘 되있어서 아래 블로그도 공유합니다.

 

[NODE] 📚 Server Sent Events 💯 정리 (+사용법)

SSE - Server Sent Events 란? SSE는 서버의 데이터를 실시간으로, 지속적으로 Streaming 하는 기술 이다. SSE는 웹 표준으로써 IE를 제외한 모든 브라우저에서 지원되며, IE역시 polyfill을 통해 지원이 가능하

inpa.tistory.com

 

 

 

+ Recent posts