Kafka事务的应用场景

参考答案

Kafka中的事务特性,主要用于以下两种场景:

  • 生产者发送多条消息可以封装在一个事务中,形成一个原子操作。多条消息要么都发送成功,要么都发送失败。
  • read-process-write模式:将消息消费和生产封装在一个事务中,形成一个原子操作。在一个流式处理的应用中,常常一个服务需要从上游接收消息,然后经过处理后送达到下游,这就对应着消息的消费和生成。

Kafka producer API 提供了以下接口用于事务操作:

/**
    * 初始化事务
    */
   public void initTransactions();

   /**
    * 开启事务
    */
   public void beginTransaction() throws ProducerFencedException ;

   /**
    * 在事务内提交已经消费的偏移量
    */
   public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, 
                                        String consumerGroupId) throws ProducerFencedException ;

   /**
    * 提交事务
    */
   public void commitTransaction() throws ProducerFencedException;

   /**
    * 丢弃事务
    */
   public void abortTransaction() throws ProducerFencedException ;

下面是使用Kafka事务特性的示例,这段代码Producer开启了一个事务,然后在这个事务中发送了两条消息。这两条消息要么都发送成功,要么都失败。

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id”, “my-transactional-id");

producer.initTransactions();
producer.beginTransaction();
producer.send("outputTopic", "message1");
producer.send("outputTopic", "message2");
producer.commitTransaction();

下面这段代码为read-process-write模式,在一个Kafka事务中,同时涉及到了生产消息和消费消息。

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id", "my-transactional-id");

KafkaConsumer consumer = createKafkaConsumer(
  "bootstrap.servers", "localhost:9092",
  "group.id", "my-group-id",
  "isolation.level", "read_committed");

consumer.subscribe(singleton("inputTopic"));

producer.initTransactions();

while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(producerRecord(“outputTopic”, record));
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  
  producer.commitTransaction();
}

注意:在理解消息的事务时,一直处于一个错误理解是,把操作db的业务逻辑跟操作消息当成是一个事务,如下所示:

void  kakfa_in_tranction(){
  // 1.kafa的操作:读取消息或生产消息
  kafkaOperation();
  // 2.db操作
  dbOperation();
}

这个是有问题的。操作DB数据库的数据源是DB,消息数据源是kfaka,这是完全不同两个数据。

一种数据源(如mysql,kafka)对应一个事务,所以它们是两个独立的事务。kafka事务指kafka一系列 生产、消费消息等操作组成一个原子操作,db事务是指操作数据库的一系列增删改操作组成一个原子操作。

以上,是 Kafka 面试题【Kafka事务的特性,以及应用场景】的参考答案。

输出,是最好的学习方法

欢迎在评论区留下你的问题、笔记或知识点补充~

—end—

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧