Kafka 新版Producer Java版代码阅读
更新日期:
- 1. Kafka 新版Producer Java版代码阅读
- 1.1. 如果我来做,怎么做?
- 1.2. 带着这几个问题,我们来看人家怎么做的
Kafka 新版Producer Java版代码阅读
Kafka在0.8.2.1出了新版Producer,支持ack(仅Java版,因为通过JavaClient实现的)。因此对代码进行了简单阅读,并记录如下:
接口如下:
public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
封装一个Record之后,每次调用同时传入一个callback。该函数在Kafka返回结果时被调用。
根据官方example的调用方式:
1 | public void onCompletion(RecordMetadata metadata, Exception exception) { |
如果我来做,怎么做?
我觉得如果我来设计,至少需要考虑如下几个问题:
发送的时候callback是否跟着发到Kafka Server?
Kafka支持了batch send,ack的时候是一个一个ack还是batch ack?同样,如果是batch ack,call back怎么调用?
每次callback都会单独使用一个线程调用么?还是共享一个线程?
如果Callback不发送到KafkaServer,在客户端是怎样存储的?进程fail掉的时候是否会丢ack?
带着这几个问题,我们来看人家怎么做的
基本逻辑
先从Kafka Producer的send方法看起,
send的全部代码就是这样,简单来说做了这样几件事
- 判断partition
- 序列化消息
- 判断消息大小是否符合格式
重点是第四步:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
accumulator 是每个Producer单独持有唯一一个的,每次调用appen之后返会一个包含有(FutureRecordMetadata)的执行result.
追进去看一下这个append方法,注释是这样说的
Add a record to the accumulator, return the append result。The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created.
简单来说这个方法就是把一个message序列化之后加入到accumulator的发送队列里,等会再详细介绍Acculator。
第五步,调用sender的awake方法
if (result.batchIsFull || result.newBatchCreated) { this.sender.wakeup(); }
看到这里感觉啥都没干啊,所以我们需要进一步看一下Acculator以及Sender究竟在做什么。
Accumulator
在Producer里通过Accumulator的append方法把消息加入异步发送队列,我们先看看Accumulator的实现。
private final BufferPool free;
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;
Accumulator里有三个结构必须要说一下,BufferPool用于管理batch发送的缓存,等会细说,batches显然是一个以partition为Key的map,value是一个double-ended-queue,每个queue里的元素是一个RecordBatch,显然是用来做发送缓冲的。最后还有一个Incoplete,用于记录未完成的所有batch。
Accumulator的append方法代码比较长,简要说一下做了这样几个事情
根据partition从batches里找到deque,然后peeklast().tryappend(),也就是调用了RecordBatch的tryappend方法
Deque<RecordBatch> dq = dequeFor(tp); synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); //you.meng futrue==true means no more room for the new coming message if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } }
这个tryappend方法比较简单,就是看看recordbatch里面地方够不够,不够就返回null,够就加上,Recordbatch里用一个List<Thunk> 来存储每个msg的callback。但整个BatchRecord封装成一个future返回。代码如下:
public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
this.records.append(0L, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
由此可知Kafka对Batch消息的确认是一次批量确认,但callback应该是一次批量确认之后一个一个发送的。
第一步如果成功吧msg加入batch显然后面啥都不用做了,如果返回是null,则需要从新来一个RecordBatch。然后先申请空间
ByteBuffer buffer = free.allocate(size);
注意,这个申请空间是有可能block的(当然也要看用户设置),所以在申请空间之后,可能已经过了很久,物是人非了,所以代码很小心的从新调用了一遍batches.get(partition).peeklast.tryappend。
//哈哈 自从用了scala 妈妈再也不担心我读不懂长代码了。
如果这个时候tryappend发现有地方了,这时候释放空间,加进去拉倒。
free.deallocate(buffer);
当然也可能依然坑爹的tryappend返回null,即表示notEnoughRoom for new msg,那么进入第三步
- 最后只有两种情况没有讨论了,要不就是partition下面 d-e-queue是空的,要不就是现有的空间都不够了。所以第二部申请的空间(buffer)必须用了啊,然后我们新来做一个RecordMessage吧。
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
dq.addLast(batch);
incomplete.add(batch);
我们用新申请的空间buffer生成了新的MemoryRecord,然后做出来batch,加入d-e-queue,加入未完成队列。
看到这里已经相对清晰了,我们捋一捋几个悬而未解的问题。
- free.allocate(size) 还有free.deallocate(buffer)是咋做的?
- MemoryRecords怎么使用的buffer?
- 那个坑爹的Sender怎么awake的?
- callback什么时候被调用的?
根据这几个问题,我们逐一分析一下:
Buffered pool
MemoryRecords && Compressor && RecordBatch
MemoryRecords
在accumulator中我们看到,MemoryRecord 对 bytebuffer进行了封装,而后RecordBatch对MemoeryRecord进行封装
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
先看Memory Recored,Memory Record继承自Record接口,其定义4byte的size,8byte的offest所以每个Record size<2^32 位 文件小于2^64 位。
/**
* A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
* for the in-memory representation.
*/
public interface Records extends Iterable<LogEntry> {
int SIZE_LENGTH = 4;
int OFFSET_LENGTH = 8;
int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
MemoryRecord中还持有Compressor以及buffer,主要被调用append方法将buffer中的数据写入compressor
/**
* Append the given record and offset to the buffer
*/
public void append(long offset, Record record) {
if (!writable)
throw new IllegalStateException("Memory records is not writable");
int size = record.size();
compressor.putLong(offset);
compressor.putInt(size);
compressor.put(record.buffer());
compressor.recordWritten(size + Records.LOG_OVERHEAD);
record.buffer().rewind();
}
RecordBatch
先跳过Compressor,我们看一下MemoryRecord继续被向上封装成了RecordBatch。因为MemoryRecord只有对IO的操作,并没有对Kafka逻辑的支持,因此RecordBatch在其基础之上封装了一些计数参数之外还增加了几个变量:
public final MemoryRecords records;
public final TopicPartition topicPartition;
public final ProduceRequestResult produceFuture;
private final List<Thunk> thunks;
即对一次producer的batch提交过程进行了封装,包括发送的topicPartition,提交batch返回的produceFutrue以及存储这个batch里所有msg对应callback的thunks。
RecordBatch的tryAppend方法已经在2.2节介绍,除此之外,RecordBatch还有一个Done方法,看名字也知道用于对batch的返回结果进行确认:如果没有exception就直接调用thunks list里所有的callback,异常就按异常格式调用。
值得一提的就是recordbatch里是有个thunks的,里面用于存放所有的callback信息。
1 | private final List<Thunk> thunks; |
1 | FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); |
因此可以看到,callback信息是全部存在客户端的recordbatch里的,等到返回时再从thunks里取出来。
再看一下被确认的done方法,当被确认时,recordbatch里所有信息被一一确认。所以也可以看出来是批量确认的。
1 | public void done(long baseOffset, RuntimeException exception) { |
Compressor
//TBD
callback && Future && ProduceRequestResult && FutureRecordMetadata
Futrue FutureTask Callable 的概念就不再复述了,请自行查阅。
ProduceRequestResult (short for PRResult)
ProduceRequestResult是象征意义上的返回结果,但事实上该Result是在Client端生成的,其构造函数只有一种空构造函数,参数只有这几个。
private final CountDownLatch latch = new CountDownLatch(1);
private volatile TopicPartition topicPartition;
private volatile long baseOffset = -1L;
private volatile RuntimeException error;
public ProduceRequestResult() {
}
看到
latch = new CountDownLatch(1);
就很容易明白这个PRResult的作用了,因为支持message.get()的阻塞等待,因此需要对产生的结果进行阻塞控制,只有当Server端回复结果之后才能让message.get()方法进入结束阻塞。而这个过程的实现,就是使用PRResult的latch实现的。
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
this.topicPartition = topicPartition;
this.baseOffset = baseOffset;
this.error = error;
this.latch.countDown();
}
这是PRResult的Done方法,可以看到,该Result的latch在初始化就自动生成,直到Done方法被调用才能解除阻塞,其他任何wait在latch上的方法都将被阻塞。
我们看看有哪些方法调用了latch.await吧: 追到根 发现使用了该方法的包括:
FutureRecordMetadata.get
KakfaProducer.flush
第一个是官方定义的清清楚楚的接口,第二个是flush,简单易懂,不再介绍。
FutureRecordMetadata &&RecordMetadata
如果说PRResult只是具有了一个阻塞功能的结果存储器,那么FutureRecordMetaData就是在其基础上有封装了执行过程。
public final class FutureRecordMetadata implements Future<RecordMetadata> {
private final ProduceRequestResult result;
private final long relativeOffset;
FRMetaData持有result的同时,继承了Future,所以调用FRMetaData的get方法时,通过实现其封装的PRResult.await()进行阻塞。直到PRResult被Done,latch.countDown()被调用为止。
对多线程以及阻塞感兴趣更多的可以参考FutrueTask。
@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
this.result.await();
return valueOrError();
}
RecordMetadata value() {
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
}
======================
前部分小结
简要总结一下
- Accumulator:消息的总控端,负责对发送,接收进行实际控制,但并非线程类
- 消息的封装,Batch封装(RecordBatch/MemoryRecords/Comporessor)
- 消息对内存的使用(Buffer Pool)
- 调用结果返回(FutureRecordMetadata/ProduceRequestResult/Callback)
基本涉及了发送过程的全部静态实现,唯独缺少了多线程控制。当然,在这上面的实现中,也多次涉及到了sender.run(time) 以及sender.wakeup()等方法。
所以在最后,我们来看看Sender的实现。
Sender
Sender 主要持有如下对象
public class Sender implements Runnable {
/* the state of each nodes connection */
private final KafkaClient client;
/* the record accumulator that batches records */
private final RecordAccumulator accumulator;
/* the metadata for the client */
private final Metadata metadata;
/* the maximum request size to attempt to send to the server */
private final int maxRequestSize;
/* the number of acknowledgements to request from the server */
private final short acks;
/* the max time in ms for the server to wait for acknowlegements */
private final int requestTimeout;
/* the number of times to retry a failed request before giving up */
private final int retries;
/* the clock instance used for getting the time */
private final Time time;
/* true while the sender thread is still running */
private volatile boolean running;
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
private volatile boolean forceClose;
/* metrics */
private final SenderMetrics sensors;
/* param clientId of the client */
private String clientId;
看半天鸡毛用没有,对吧。值得一提的是,sender持有的accumulator和KafkaProducer持有的accumulator是同一个。而且Sender是继承自线程的,其唯一的一次初始化是在new KafkaProducer的时候,且被KafkaProducer持有,被KafkaProducer的iothread装了起来。
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
首先找一下sender怎么被使用的吧,ioThread只有在KProducer.close调用了一下。而sender在KProducer的send,flush等方法里多次被调用wakeup()方法.
下面我们仔细看一下他的run.() , run.(time)以及wakeup()方法吧。
//是不是俺的写法带点scala风格拉?
run方法内部调用了run(time)方法,二者不分家。先看run
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
带停止标志位的循环执行,后面就剩下关闭相关操作了。
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
关闭过程其实很值得看看,没啥可说的。看看run(time)和wakeup()吧。
/**
* Wake up the selector associated with this send thread
*/
public void wakeup() {
this.client.wakeup();
}
然后就没了,client是NetworkClient,封装了网络的NIOSelector的wakeup,我觉得这个问题还是开一篇单讲了。读者就就按照注释的意思理解吧。
KafkaProducer
经历了这一切,我们从新回到KafkaProducer 来。似乎只剩下了一个close方法。
没啥好说的,就到这里吧。