参考答案
Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator。
Transaction Coordinator 主要负责分配pid,记录事务状态等操作。
下面是Kafka开启一个事务,到提交一个事务的运行流程图:
主要步骤如下:
1. 查找Tranaction Corordinator
- Producer 向任意一个 brokers 发送 FindCoordinatorRequest请求,来获取Transaction Coordinator的地址。
2. 初始化事务 initTransaction
- Producer 发送 InitpidRequest 给 Transaction Coordinator,获取pid。Transaction Coordinator在Transaciton Log中记录这<TransactionId,pid>的映射关系。
- 另外,它还会做两件事:恢复(Commit或Abort)之前的Producer未完成的事务;对PID对应的epoch进行递增,这样可以保证同一个app的不同实例对应的PID是一样,而epoch是不同的。
3. 开始事务beginTransaction
- 执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。这个操作并没有通知Transaction Coordinator,因为Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。
4. read-process-write流程
- 一旦Producer开始发送消息,Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。
- 在注册<Transaction, Topic, Partition>到Transaction Log后,生产者发送数据,虽然没有还没有执行commit或者abort,但是此时消息已经保存到Broker上了。即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。
5. 事务提交或终结 commitTransaction/abortTransaction
在Producer执行commitTransaction/abortTransaction时,Transaction Coordinator会执行一个两阶段提交:
- 第一阶段,将Transaction Log内的该事务状态设置为
PREPARE_COMMIT
或PREPARE_ABORT
- 第二阶段,将
Transaction Marker
写入该事务涉及到的所有消息(即将消息标记为committed
或aborted
)。这一步骤Transaction Coordinator会发送给当前事务涉及到的每个<Topic, Partition>的Leader,Broker收到该请求后,会将对应的Transaction Marker
控制信息写入日志。
一旦Transaction Marker
写入完成,Transaction Coordinator会将最终的COMPLETE_COMMIT
或COMPLETE_ABORT
状态写入Transaction Log中以标明该事务结束。
以上,是 Kafka 面试题【Kafka事务的原理】的参考答案。
输出,是最好的学习方法。
欢迎在评论区留下你的问题、笔记或知识点补充~
—end—