Nick Dev

[Outstagram] kafka를 활용한 피드 push model 구현 과정 본문

Outstagram

[Outstagram] kafka를 활용한 피드 push model 구현 과정

Nick99 2024. 12. 12. 01:00
반응형

피드 push model?!

push model이란?

  • SNS의 피드를 구성하는 방법 중 하나
  • pull model 도 있음
  • A가 신규 게시물을 작성하면 A 팔로워들의 피드 목록에 신규 게시물 ID를 push하는 방식으로 피드를 구성

⭐ kafka를 도입하는 이유

  1. 신규 게시물 작성팔로워들의 피드 목록에 신규 게시물 ID를 push 로직은 서로 다른 관심사
  2. 동기적으로 처리할 시 팔로워들이 많다면 각 팔로워들의 피드 목록에 push하는 로직이 오래 걸려서 게시물 생성 로직이 너무 오래 걸림
  • 그렇기에 두 로직을 동기적으로 처리하지 말고 kafka의 메시지 큐를 활용해 비동기적으로 처리

메시지 큐는 kafka말고도 RabbitMQ도 있는데 왜 Kafka 써?

  • Kafka가 MQ보다 분산 처리, 시스템에 적합

kafka 설정

1. build.gradle에 kafka 추가

    implementation 'org.springframework.kafka:spring-kafka'

2. application.yml 설정

kafka:
  bootstrap:
    servers: localhost:9092

3. docker-compose로 kafka와 zookeeper 설정

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

zookeeper란?

  • 분산 애플리케이션을 위한 중앙 집중형 서비스로, 구성 정보를 관리하고 분산 환경에서 동기화 및 이름 규명을 제공

kafka - zookeeper 관계

  • 클러스터 관리: Kafka는 클러스터 환경에서 실행되며, Zookeeper는 이 클러스터를 관리

4. KafkaProducer, Consumer 설정하기

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, Long> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Long> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


}

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, Long> feedConsumerFactory() {
        Map<String, Object> configProps = new HashMap<>();

        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sns-feed");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConsumerFactory<String, Long> postDeleteConsumerFactory() {
        Map<String, Object> configProps = new HashMap<>();

        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "post");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Long> feedKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(feedConsumerFactory());

        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Long> postDeleteKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postDeleteConsumerFactory());

        return factory;
    }


}

5. FeedUpdateProducer 로직

  • Kafka의 메시지 큐를 통해 메시지 보내는 로직
Component
@RequiredArgsConstructor
@Slf4j
public class FeedUpdateProducer {
    private final KafkaTemplate<String, Long> kafkaTemplate;

    public void send(String topic, Long userId, Long postId) {
        log.info("sending postId = {} to topic = {}", postId, topic);
        kafkaTemplate.send(topic, userId.toString(), postId);
    }
}
  • send()를 통해 메시지 생성
  • 해당 topic을 처리하는 KafkaConsumer가 메시지 수신해서 로직 처리

6. FeedUpdateConsumer 로직

@Component
@RequiredArgsConstructor
@Slf4j
public class FeedUpdateConsumer {

    private final RedisTemplate<String, Object> redisTemplate;

    // consumer 설정
    @KafkaListener(topics = "feed", groupId = "sns-feed", containerFactory = "feedKafkaListenerContainerFactory")
    public void receive(ConsumerRecord<String, Long> consumerRecord) {
        Long userId = Long.parseLong(consumerRecord.key());
        Long postId = consumerRecord.value();
        log.info("=========== received userID = {}, postID = {}", userId, postId);

        // Redis에서 userId의 팔로워 ID 목록 가져오기
        Set<Object> followerIds = redisTemplate.opsForSet().members("followers:" + userId);
        if (followerIds == null) {
            log.error("====================== userID {}는 팔로워가 없습니다.", userId);
            return;
        }

        log.info("=========== followerIds = {}", followerIds);

        // 내 피드 목록에도 내가 생성한 postId 넣기
        redisTemplate.opsForList().leftPush("feed:" + userId, postId);

        // 각 팔로워의 피드목록에 postId 넣기
        followerIds.forEach(
                id -> {
                    String feedKey = "feed:" + id;
                    redisTemplate.opsForList().leftPush(feedKey, postId);
                });
        log.info("=========== feed push success!");

    }


}
  • feed라는 topic을 처리하는 consumer 로직

로직 설명

  1. Redis에서 userId의 팔로워 ID 리스트를 가져온다
  2. 내 피드 목록에 내가 생성한 postId 넣기
  3. 가져온 팔로워 ID 리스트를 순회하면서 각 팔로워의 피드 목록에 postId 넣기

7. 게시물 생성 시 메시지 발행하기

    @Transactional
    public void insertPost(CreatePostReq createPostReq, Long userId) {
        PostDTO newPost = PostDTO.builder()
            .contents(createPostReq.getContents())
            .userId(userId)
            .createDate(LocalDateTime.now())
            .updateDate(LocalDateTime.now())
            .build();

        // 게시물 내용 저장 (insertPost 정상 실행되면, newPost의 id 속성에 id값이 들어 있다)
        postMapper.insertPost(newPost);

        Long newPostId = newPost.getId();

        // 로컬 디렉토리에 이미지 저장 후, DB에 이미지 정보 저장
        imageService.saveImages(createPostReq.getImgFiles(),
            newPostId);

        // kafka에 메시지 발행 : 팔로워들의 피드목록에 내가 작성한 게시물 ID 넣기
        feedUpdateProducer.send("feed", userId, newPostId);
    }

push model 로직 흐름

1. A가 게시물 작성

  • A user가 게시물을 작성하면 DB에 게시물 저장하고
  • feedUpdateProducer를 통해 topicfeed메시지를 발행

2. Consumer가 메시지 처리

  • FeedUpdateConsumertopicfeed인 메시지 처리
  • A user의 팔로워 ID 리스트 가져와서 각 피드 목록에 신규 postId를 push
  • 내 피드 목록에도 push

피드 조회하기

피드 조회 로직

  1. Redis에 저장되어 있는 피드 목록을 조회 (postId 리스트가 저장되어 있음)
  2. 피드 목록을 순회하면서 각 게시물의 정보를 가져옴
  • 피드 결과물을 캐싱하지 않고 각 게시물 정보를 캐싱함

피드가 아닌 각 게시물 정보를 캐싱하는 이유

  • 피드의 경우, 내가 팔로잉 하는 유저가 게시물 생성 시 내 피드 목록에 해당 게시물 ID가 들어옴
  • 그럼 기존 피드 결과물 캐싱해놓은건 의미 없어짐
  • 그래서 각 게시물을 캐싱해놓으면 피드 결과물 전체를 캐싱해놓는건 보단 좀 느리지만, 충분히 빠름
  • 또한, 다른 유저들의 피드 결과물 생성할 때도 다른 유저로 인해 캐싱해놓은 게시물 정보를 사용할 수 있음
반응형