[댕댕어디가] MSA 아키텍처 전환기(5) - 서비스간 비동기 통신 (kafka 연결)

2025. 1. 11. 00:52·댕댕어디가 프로젝트/MSA

이번 시간에는, 지역별 방문 횟수 증가 로직에 필요한 통신을 연결해 보겠습니다.

 

지역별 방문 횟수 증가 로직 

: 유저가 A라는 지역의 A1 시설을 방문하면, 이를 인증하기 위해 실시간 리뷰를 작성하여 방문 횟수를 증가시킵니다. 이때, A 지역에서 방문 횟수가 가장 많은 유저가 땅의 주인이됩니다.

 

service별 역할

  • 유저 실시간 리뷰 작성 요청 -> project-service 에서 실시간 리뷰 저장 -> region-service에서 방문 횟수 증가 및 새로운 땅의 주인 판별

 

통신이 필요한 부분

projdect-service에서 실시간 리뷰를 저장한 후, 방문 횟수를 증가시키기 위해 region-service에 요청을 전달해야합니다.

 

1. Kafka 클러스터 구성

kafka 클러스터 구성 과정에 대해서는 간략히 아래에 첨부해두겠습니다.

 

 [ 더보기 + 확인 ] Kafka 클러스터 구성 🔽

더보기

kafka 클러스터 구성

 

로컬에 띄우기 위해 도커를 이용하여 클러스터를 구성하였습니다.

 

1. docker-compose.yml 파일 구성

* kraft를 이용하였습니다. 

* 3개의 카프카 브로커와 카프카UI 서비스를 구성하였습니다.

 

2. docker-compose.yml 파일 실행, 설치

docker-compose up -d

 

 

3. 설치 완료 확인 및 kafkaWebUiService 확인

 

 

 

2. Producer 와 Consumer

카프카에는 메세지를 생성해 브로커에 전달하는 producer와, 브로커에 있는 메세지를 읽는 consumer가 존재합니다.

이번 통신에서는, 방문 횟수 증가 요청에 대한 정보를 메세지로 전달할 예정입니다. 그렇기에 project-service가 producer가 되어 메세지를 브로커에 전달하면 region-service가 consumer로써 메세지를 읽어 로직을 처리합니다.

 

3. Project- Service : Producer 구성

 

1) gralde 의존성을 추가합니다.

implementation 'org.springframework.kafka:spring-kafka'

 

2) ProducerConfig를 정의합니다.

@Configuration
public class KafkaProducerConfig {
	@Bean
	public ProducerFactory<String, Object> producerFactory() {
		Map<String, Object> config = new HashMap<>();
		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:10000");
		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

		return new DefaultKafkaProducerFactory<>(config);
	}

	@Bean
	public KafkaTemplate<String, Object> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}

 

현재 카프카 브로커는 10000 ~ 10002 포트를 사용 중입니다.

 

3) kafka 메세지 전송 코드를 작성합니다.

private final KafkaTemplate<String, Object> kafkaTemplate;

private void addCountVisitRegion(User user, Place place, Review review) {
    if(review.getReviewtype().equals("REVIEW_TYP_02")){
        VisitRegionInfo visitRegionInfo = new VisitRegionInfo(place.getCity(), place.getCityDetail(), user.getUserId());
        kafkaTemplate.send("addVisitRegion", visitRegionInfo);
    }
}

 

kafka는 토픽으로 메세지를 분류하는데,  "addVisitRegion"라는 토픽으로 방문 횟수 증가 로직에 필요한 정보를 전달합니다.

 

4. Region- Service : Consumer 구성

1) gralde 의존성을 추가합니다.

implementation 'org.springframework.kafka:spring-kafka'

 

2)  ConsumerConfig를 정의합니다.

@Configuration
public class KafkaConsumerConfig {

	@Bean
	public ConsumerFactory<String, Object> consumerFactory() {
		Map<String, Object> config = new HashMap<>();
		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:10000");
		config.put(ConsumerConfig.GROUP_ID_CONFIG, "region_visit");
		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);

		return new DefaultKafkaConsumerFactory<>(config);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());

		return factory;
	}
}

 

  • 해당 consumer는 "region_visit"이라는 consumer 그룹에 속해있습니다.
  • consumer는 그룹을 단위로 메세지를 읽을 수 있으며, 하나 이상의 토픽을 구독할 수 있습니다.
  • consumer 그룹 내의 컨슈머들은 토픽의 파티션을 서로 나눠서 읽습니다. 따라서 같은 컨슈머 그룹에 속한 컨슈머들은 각 파티션을 독점적으로 처리하여, 한 메시지가 그룹 내에서 중복 처리되지 않도록 보장합니다.
  • 서로 다른 consumer 그룹은 동일한 토픽에서 메시지를 읽어도 상관없습니다. 각 그룹은 독립적으로 오프셋을 관리하므로, 같은 메시지를 서로 다른 그룹에서 개별적으로 처리할 수 있습니다.

3) listener 생성

"addVisitiRegion" 토픽 메세지에 대해 region_visit 컨슈머 그룹이 메세지를 읽는 listener를 생성합니다.

메세지를 읽으면 방문 횟수 로직을 증가하는 로직을 실행합니다.

	@KafkaListener(topics = "addVisitRegion", groupId = "region_visit")
	public void addRegionVisitListener(ConsumerRecord data) throws JsonProcessingException {
		VisitRegionInfo visitRegionInfo = objectMapper.readValue((String)data.value(), VisitRegionInfo.class);
		regionService.addCountVisitRegionForDB(visitRegionInfo.city(), visitRegionInfo.cityDetail(), visitRegionInfo.userId());
	}

 

 

5. 통신 연결 확인

아래는 연결이 성공적으로 되었을 때 진행되어야 하는 흐름입니다.

  • postman으로 실시간 리뷰 작성 API 요청
  • project-service에서 실시간 리뷰 저장, 방문 횟수 증가 로직 요청(카프카)
  • region-service에서 요청 메세지를 읽음 , 방문 횟수 증가 로직 실행
  • DB 반영

 

테스트

 

카프카 브로커에 메세지 성공적으로 전달

 

카프카 컨슈머도 성공적으로 메세지를 읽은 것을 확인할 수 있습니다.

consumer lag가 0임으로, 읽지 않은 메세지가 없음을 알 수 있습니다.

 

DB에도 유저의 방문 횟수가 증가한 것을 확인할 수 있습니다.

 

연결 성공

'댕댕어디가 프로젝트 > MSA' 카테고리의 다른 글

[댕댕어디가] MSA 아키텍처 전환기(7) - 인증서버 연결  (0) 2025.01.14
[댕댕어디가] MSA 아키텍처 전환기(6) - 서비스간 동기 통신(Spring Cloud Open Feign)  (0) 2025.01.12
[댕댕어디가] MSA 아키텍처 전환기(4) - 서비스간 비동기 통신하기  (0) 2025.01.08
[댕댕어디가] MSA 아키텍처 전환기(3) - API Gateway 구현  (0) 2025.01.07
[댕댕어디가] MSA 아키텍처 전환기(2) - Service Discovery 패턴/ 서비스 분리  (0) 2025.01.07
'댕댕어디가 프로젝트/MSA' 카테고리의 다른 글
  • [댕댕어디가] MSA 아키텍처 전환기(7) - 인증서버 연결
  • [댕댕어디가] MSA 아키텍처 전환기(6) - 서비스간 동기 통신(Spring Cloud Open Feign)
  • [댕댕어디가] MSA 아키텍처 전환기(4) - 서비스간 비동기 통신하기
  • [댕댕어디가] MSA 아키텍처 전환기(3) - API Gateway 구현
gani+
gani+
꾸준히 기록할 수 있는 사람이 되자 !
  • gani+
    Gani_Dev :)
    gani+
  • 전체
    오늘
    어제
    • 분류 전체보기 (43)
      • 당장 프로젝트 (2)
        • 트러블슈팅 (0)
      • 댕댕어디가 프로젝트 (11)
        • 트러블슈팅 (3)
        • MSA (8)
      • 개발일지 (2)
      • BOOK (12)
        • SQL 레벨업 (10)
      • 프로젝트 (0)
      • ELK (5)
      • 알고리즘 (9)
      • CS (2)
        • 디자인패턴 (2)
  • 블로그 메뉴

    • 홈
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    14기
    SWMaestro14
    이것이 코딩 테스트다
    백준4963
    완전탐색
    순차탐색
    4673
    이것이코딩테스트다
    섬의개수
    다이나믹프로그래밍
    해쉬
    이진탐색
    9095
    dfs
    후기
    정렬
    SW마에스트로
    4963
    플로이드워셔
    최단경로
    DP
    소마
    백준
    다익스트라
    알고리즘
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
gani+
[댕댕어디가] MSA 아키텍처 전환기(5) - 서비스간 비동기 통신 (kafka 연결)
상단으로

티스토리툴바