반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- Scaffold
- Kotlin
- 코딩테스트
- 코드 트리
- 부하 테스트
- flutter
- Sharding
- 자료구조
- C언어
- 디프만16기
- AOP
- Oidc
- 운영체제
- Kafka
- pub.dev
- dip
- nGrinder
- OAuth
- 디프만
- kakao
- Redis
- depromeet
- java
- 연습문제
- Spring
- c
- exception
- 코딩 테스트
- 코딩
- 코드트리
Archives
- Today
- Total
Nick Dev
[Outstagram] kafka를 활용한 피드 push model 구현 과정 본문
반응형
피드 push model?!
push model이란?
- SNS의 피드를 구성하는 방법 중 하나
- pull model 도 있음
- A가 신규 게시물을 작성하면 A 팔로워들의 피드 목록에 신규 게시물 ID를 push하는 방식으로 피드를 구성
⭐ kafka를 도입하는 이유
신규 게시물 작성
과팔로워들의 피드 목록에 신규 게시물 ID를 push
로직은 서로 다른 관심사임- 동기적으로 처리할 시 팔로워들이 많다면 각 팔로워들의 피드 목록에 push하는 로직이 오래 걸려서 게시물 생성 로직이 너무 오래 걸림
- 그렇기에 두 로직을 동기적으로 처리하지 말고 kafka의 메시지 큐를 활용해 비동기적으로 처리
메시지 큐는 kafka말고도 RabbitMQ도 있는데 왜 Kafka 써?
- Kafka가 MQ보다 분산 처리, 시스템에 적합
kafka 설정
1. build.gradle에 kafka 추가
implementation 'org.springframework.kafka:spring-kafka'
2. application.yml 설정
kafka:
bootstrap:
servers: localhost:9092
3. docker-compose로 kafka와 zookeeper 설정
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
zookeeper란?
- 분산 애플리케이션을 위한 중앙 집중형 서비스로, 구성 정보를 관리하고 분산 환경에서 동기화 및 이름 규명을 제공
kafka - zookeeper 관계
- 클러스터 관리: Kafka는 클러스터 환경에서 실행되며, Zookeeper는 이 클러스터를 관리
4. KafkaProducer, Consumer 설정하기
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Long> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, Long> feedConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sns-feed");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConsumerFactory<String, Long> postDeleteConsumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "post");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> feedKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(feedConsumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> postDeleteKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postDeleteConsumerFactory());
return factory;
}
}
5. FeedUpdateProducer 로직
- Kafka의 메시지 큐를 통해 메시지 보내는 로직
Component
@RequiredArgsConstructor
@Slf4j
public class FeedUpdateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void send(String topic, Long userId, Long postId) {
log.info("sending postId = {} to topic = {}", postId, topic);
kafkaTemplate.send(topic, userId.toString(), postId);
}
}
send()
를 통해 메시지 생성- 해당
topic
을 처리하는KafkaConsumer
가 메시지 수신해서 로직 처리
6. FeedUpdateConsumer 로직
@Component
@RequiredArgsConstructor
@Slf4j
public class FeedUpdateConsumer {
private final RedisTemplate<String, Object> redisTemplate;
// consumer 설정
@KafkaListener(topics = "feed", groupId = "sns-feed", containerFactory = "feedKafkaListenerContainerFactory")
public void receive(ConsumerRecord<String, Long> consumerRecord) {
Long userId = Long.parseLong(consumerRecord.key());
Long postId = consumerRecord.value();
log.info("=========== received userID = {}, postID = {}", userId, postId);
// Redis에서 userId의 팔로워 ID 목록 가져오기
Set<Object> followerIds = redisTemplate.opsForSet().members("followers:" + userId);
if (followerIds == null) {
log.error("====================== userID {}는 팔로워가 없습니다.", userId);
return;
}
log.info("=========== followerIds = {}", followerIds);
// 내 피드 목록에도 내가 생성한 postId 넣기
redisTemplate.opsForList().leftPush("feed:" + userId, postId);
// 각 팔로워의 피드목록에 postId 넣기
followerIds.forEach(
id -> {
String feedKey = "feed:" + id;
redisTemplate.opsForList().leftPush(feedKey, postId);
});
log.info("=========== feed push success!");
}
}
feed
라는 topic을 처리하는 consumer 로직
로직 설명
- Redis에서
userId
의 팔로워 ID 리스트를 가져온다 - 내 피드 목록에 내가 생성한
postId
넣기 - 가져온 팔로워 ID 리스트를 순회하면서 각 팔로워의 피드 목록에
postId
넣기
7. 게시물 생성 시 메시지 발행하기
@Transactional
public void insertPost(CreatePostReq createPostReq, Long userId) {
PostDTO newPost = PostDTO.builder()
.contents(createPostReq.getContents())
.userId(userId)
.createDate(LocalDateTime.now())
.updateDate(LocalDateTime.now())
.build();
// 게시물 내용 저장 (insertPost 정상 실행되면, newPost의 id 속성에 id값이 들어 있다)
postMapper.insertPost(newPost);
Long newPostId = newPost.getId();
// 로컬 디렉토리에 이미지 저장 후, DB에 이미지 정보 저장
imageService.saveImages(createPostReq.getImgFiles(),
newPostId);
// kafka에 메시지 발행 : 팔로워들의 피드목록에 내가 작성한 게시물 ID 넣기
feedUpdateProducer.send("feed", userId, newPostId);
}
push model 로직 흐름
1. A가 게시물 작성
- A user가 게시물을 작성하면 DB에 게시물 저장하고
feedUpdateProducer
를 통해topic
이feed
인 메시지를 발행함
2. Consumer가 메시지 처리
FeedUpdateConsumer
가topic
이feed
인 메시지 처리- A user의 팔로워 ID 리스트 가져와서 각 피드 목록에 신규 postId를 push
- 내 피드 목록에도 push
피드 조회하기
피드 조회 로직
- Redis에 저장되어 있는 피드 목록을 조회 (postId 리스트가 저장되어 있음)
- 피드 목록을 순회하면서 각 게시물의 정보를 가져옴
- 피드 결과물을 캐싱하지 않고 각 게시물 정보를 캐싱함
피드가 아닌 각 게시물 정보를 캐싱하는 이유
- 피드의 경우, 내가 팔로잉 하는 유저가 게시물 생성 시 내 피드 목록에 해당 게시물 ID가 들어옴
- 그럼 기존 피드 결과물 캐싱해놓은건 의미 없어짐
- 그래서 각 게시물을 캐싱해놓으면 피드 결과물 전체를 캐싱해놓는건 보단 좀 느리지만, 충분히 빠름
- 또한, 다른 유저들의 피드 결과물 생성할 때도 다른 유저로 인해 캐싱해놓은 게시물 정보를 사용할 수 있음
반응형
'Outstagram' 카테고리의 다른 글
[Outstagram] 템플릿 메서드 패턴을 실제 프로젝트에 적용해보기 (0) | 2024.12.12 |
---|---|
[Outstagram] 왜 같은 클래스에서 @Cacheable 달린 메서드 호출하면 씹힐까... (0) | 2024.12.12 |
[Outstagram] 무한 스크롤 구현하려다 Snowflake ID 도입한 이야기 (0) | 2024.12.12 |
[Outstagram] 좋아요 동시성 문제 해결 (1) | 2024.12.11 |
[Outstagram] 이미지 처리 추상화 (0) | 2024.12.11 |