文章目錄
  1. 1. Kafka 新版Producer Java版代码阅读
    1. 1.1. 如果我来做,怎么做?
    2. 1.2. 带着这几个问题,我们来看人家怎么做的
      1. 1.2.1. 基本逻辑
      2. 1.2.2. Accumulator
      3. 1.2.3. Buffered pool
      4. 1.2.4. MemoryRecords && Compressor && RecordBatch
      5. 1.2.5. MemoryRecords
        1. 1.2.5.1. RecordBatch
        2. 1.2.5.2. Compressor
      6. 1.2.6. callback && Future && ProduceRequestResult && FutureRecordMetadata
      7. 1.2.7. ProduceRequestResult (short for PRResult)
        1. 1.2.7.1. FutureRecordMetadata &&RecordMetadata
      8. 1.2.8. 前部分小结
      9. 1.2.9. Sender
      10. 1.2.10. KafkaProducer

Kafka 新版Producer Java版代码阅读

Kafka在0.8.2.1出了新版Producer,支持ack(仅Java版,因为通过JavaClient实现的)。因此对代码进行了简单阅读,并记录如下:

接口如下:

public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) 

封装一个Record之后,每次调用同时传入一个callback。该函数在Kafka返回结果时被调用。

根据官方example的调用方式:

1
2
3
4
5
6
7
8
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
//means success
} else {
//means fail
exception.printStackTrace();
}
}

如果我来做,怎么做?

我觉得如果我来设计,至少需要考虑如下几个问题:

  • 发送的时候callback是否跟着发到Kafka Server?

  • Kafka支持了batch send,ack的时候是一个一个ack还是batch ack?同样,如果是batch ack,call back怎么调用?

  • 每次callback都会单独使用一个线程调用么?还是共享一个线程?

  • 如果Callback不发送到KafkaServer,在客户端是怎样存储的?进程fail掉的时候是否会丢ack?

带着这几个问题,我们来看人家怎么做的

基本逻辑

先从Kafka Producer的send方法看起,

send的全部代码就是这样,简单来说做了这样几件事

  1. 判断partition
  2. 序列化消息
  3. 判断消息大小是否符合格式
  4. 重点是第四步:

    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
    

accumulator 是每个Producer单独持有唯一一个的,每次调用appen之后返会一个包含有(FutureRecordMetadata)的执行result.

追进去看一下这个append方法,注释是这样说的

Add a record to the accumulator, return the append resultThe append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created.

简单来说这个方法就是把一个message序列化之后加入到accumulator的发送队列里,等会再详细介绍Acculator。

  1. 第五步,调用sender的awake方法

    if (result.batchIsFull || result.newBatchCreated) {
        this.sender.wakeup();
    }
    

看到这里感觉啥都没干啊,所以我们需要进一步看一下Acculator以及Sender究竟在做什么。

Accumulator

在Producer里通过Accumulator的append方法把消息加入异步发送队列,我们先看看Accumulator的实现。

private final BufferPool free;
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;

Accumulator里有三个结构必须要说一下,BufferPool用于管理batch发送的缓存,等会细说,batches显然是一个以partition为Key的map,value是一个double-ended-queue,每个queue里的元素是一个RecordBatch,显然是用来做发送缓冲的。最后还有一个Incoplete,用于记录未完成的所有batch。

Accumulator的append方法代码比较长,简要说一下做了这样几个事情

  1. 根据partition从batches里找到deque,然后peeklast().tryappend(),也就是调用了RecordBatch的tryappend方法

    Deque<RecordBatch> dq = dequeFor(tp);
    synchronized (dq) {
        RecordBatch last = dq.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(key, value, callback);
            //you.meng   futrue==true means no more room for the new coming message
            if (future != null)
                return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
        }
    }
    
这个tryappend方法比较简单,就是看看recordbatch里面地方够不够,不够就返回null,够就加上,Recordbatch里用一个List<Thunk>  来存储每个msg的callback。但整个BatchRecord封装成一个future返回。代码如下:

    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
            this.records.append(0L, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

由此可知Kafka对Batch消息的确认是一次批量确认,但callback应该是一次批量确认之后一个一个发送的。

  1. 第一步如果成功吧msg加入batch显然后面啥都不用做了,如果返回是null,则需要从新来一个RecordBatch。然后先申请空间

    ByteBuffer buffer = free.allocate(size);
    

    注意,这个申请空间是有可能block的(当然也要看用户设置),所以在申请空间之后,可能已经过了很久,物是人非了,所以代码很小心的从新调用了一遍batches.get(partition).peeklast.tryappend。

    //哈哈 自从用了scala 妈妈再也不担心我读不懂长代码了。

    如果这个时候tryappend发现有地方了,这时候释放空间,加进去拉倒。

    free.deallocate(buffer);
    
当然也可能依然坑爹的tryappend返回null,即表示notEnoughRoom for new msg,那么进入第三步
  1. 最后只有两种情况没有讨论了,要不就是partition下面 d-e-queue是空的,要不就是现有的空间都不够了。所以第二部申请的空间(buffer)必须用了啊,然后我们新来做一个RecordMessage吧。
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
dq.addLast(batch);
incomplete.add(batch);

我们用新申请的空间buffer生成了新的MemoryRecord,然后做出来batch,加入d-e-queue,加入未完成队列。

看到这里已经相对清晰了,我们捋一捋几个悬而未解的问题。

  • free.allocate(size) 还有free.deallocate(buffer)是咋做的?
  • MemoryRecords怎么使用的buffer?
  • 那个坑爹的Sender怎么awake的?
  • callback什么时候被调用的?

根据这几个问题,我们逐一分析一下:

Buffered pool

MemoryRecords && Compressor && RecordBatch

MemoryRecords

在accumulator中我们看到,MemoryRecord 对 bytebuffer进行了封装,而后RecordBatch对MemoeryRecord进行封装

MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());

先看Memory Recored,Memory Record继承自Record接口,其定义4byte的size,8byte的offest所以每个Record size<2^32 位 文件小于2^64 位。

/**
* A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
* for the in-memory representation.
*/
public interface Records extends Iterable<LogEntry> {

int SIZE_LENGTH = 4;
int OFFSET_LENGTH = 8;
int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;

MemoryRecord中还持有Compressor以及buffer,主要被调用append方法将buffer中的数据写入compressor

/**
 * Append the given record and offset to the buffer
 */
public void append(long offset, Record record) {
    if (!writable)
        throw new IllegalStateException("Memory records is not writable");

    int size = record.size();
    compressor.putLong(offset);
    compressor.putInt(size);
    compressor.put(record.buffer());
    compressor.recordWritten(size + Records.LOG_OVERHEAD);
    record.buffer().rewind();
}

RecordBatch

先跳过Compressor,我们看一下MemoryRecord继续被向上封装成了RecordBatch。因为MemoryRecord只有对IO的操作,并没有对Kafka逻辑的支持,因此RecordBatch在其基础之上封装了一些计数参数之外还增加了几个变量:

public final MemoryRecords records;
public final TopicPartition topicPartition;
public final ProduceRequestResult produceFuture;
private final List<Thunk> thunks;

即对一次producer的batch提交过程进行了封装,包括发送的topicPartition,提交batch返回的produceFutrue以及存储这个batch里所有msg对应callback的thunks。

RecordBatch的tryAppend方法已经在2.2节介绍,除此之外,RecordBatch还有一个Done方法,看名字也知道用于对batch的返回结果进行确认:如果没有exception就直接调用thunks list里所有的callback,异常就按异常格式调用。

值得一提的就是recordbatch里是有个thunks的,里面用于存放所有的callback信息。

1
private final List<Thunk> thunks;
1
2
3
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
thunks.add(new Thunk(callback, future));

因此可以看到,callback信息是全部存在客户端的recordbatch里的,等到返回时再从thunks里取出来。

再看一下被确认的done方法,当被确认时,recordbatch里所有信息被一一确认。所以也可以看出来是批量确认的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, baseOffset, exception);
// execute callbacks
for (int i = 0; i < this.thunks.size(); i++) {
try {
Thunk thunk = this.thunks.get(i);
if (exception == null)
thunk.callback.onCompletion(thunk.future.get(), null);
else
thunk.callback.onCompletion(null, exception);
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
}
}
}

Compressor

//TBD

callback && Future && ProduceRequestResult && FutureRecordMetadata

Futrue FutureTask Callable 的概念就不再复述了,请自行查阅。

ProduceRequestResult (short for PRResult)

ProduceRequestResult是象征意义上的返回结果,但事实上该Result是在Client端生成的,其构造函数只有一种空构造函数,参数只有这几个。

private final CountDownLatch latch = new CountDownLatch(1);
private volatile TopicPartition topicPartition;
private volatile long baseOffset = -1L;
private volatile RuntimeException error;

public ProduceRequestResult() {
}

看到

latch = new CountDownLatch(1);

就很容易明白这个PRResult的作用了,因为支持message.get()的阻塞等待,因此需要对产生的结果进行阻塞控制,只有当Server端回复结果之后才能让message.get()方法进入结束阻塞。而这个过程的实现,就是使用PRResult的latch实现的。

public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
    this.topicPartition = topicPartition;
    this.baseOffset = baseOffset;
    this.error = error;
    this.latch.countDown();
}

这是PRResult的Done方法,可以看到,该Result的latch在初始化就自动生成,直到Done方法被调用才能解除阻塞,其他任何wait在latch上的方法都将被阻塞。

我们看看有哪些方法调用了latch.await吧: 追到根 发现使用了该方法的包括:

FutureRecordMetadata.get 
KakfaProducer.flush

第一个是官方定义的清清楚楚的接口,第二个是flush,简单易懂,不再介绍。

FutureRecordMetadata &&RecordMetadata

如果说PRResult只是具有了一个阻塞功能的结果存储器,那么FutureRecordMetaData就是在其基础上有封装了执行过程。

    public final class FutureRecordMetadata implements Future<RecordMetadata> {
    private final ProduceRequestResult result;
private final long relativeOffset;

FRMetaData持有result的同时,继承了Future,所以调用FRMetaData的get方法时,通过实现其封装的PRResult.await()进行阻塞。直到PRResult被Done,latch.countDown()被调用为止。

对多线程以及阻塞感兴趣更多的可以参考FutrueTask。

@Override
public RecordMetadata get() throws InterruptedException, ExecutionException {
    this.result.await();
    return valueOrError();
}

RecordMetadata value() {
    return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
}

======================

前部分小结

简要总结一下

  1. Accumulator:消息的总控端,负责对发送,接收进行实际控制,但并非线程类
  2. 消息的封装,Batch封装(RecordBatch/MemoryRecords/Comporessor)
  3. 消息对内存的使用(Buffer Pool)
  4. 调用结果返回(FutureRecordMetadata/ProduceRequestResult/Callback)

基本涉及了发送过程的全部静态实现,唯独缺少了多线程控制。当然,在这上面的实现中,也多次涉及到了sender.run(time) 以及sender.wakeup()等方法。

所以在最后,我们来看看Sender的实现。

Sender

Sender 主要持有如下对象

public class Sender implements Runnable {


    /* the state of each nodes connection */
private final KafkaClient client;

/* the record accumulator that batches records */
private final RecordAccumulator accumulator;

/* the metadata for the client */
private final Metadata metadata;

/* the maximum request size to attempt to send to the server */
private final int maxRequestSize;

/* the number of acknowledgements to request from the server */
private final short acks;

/* the max time in ms for the server to wait for acknowlegements */
private final int requestTimeout;

/* the number of times to retry a failed request before giving up */
private final int retries;

/* the clock instance used for getting the time */
private final Time time;

/* true while the sender thread is still running */
private volatile boolean running;

/* true when the caller wants to ignore all unsent/inflight messages and force close.  */
private volatile boolean forceClose;

/* metrics */
private final SenderMetrics sensors;

/* param clientId of the client */
private String clientId;

看半天鸡毛用没有,对吧。值得一提的是,sender持有的accumulator和KafkaProducer持有的accumulator是同一个。而且Sender是继承自线程的,其唯一的一次初始化是在new KafkaProducer的时候,且被KafkaProducer持有,被KafkaProducer的iothread装了起来。

this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

首先找一下sender怎么被使用的吧,ioThread只有在KProducer.close调用了一下。而sender在KProducer的send,flush等方法里多次被调用wakeup()方法.

下面我们仔细看一下他的run.() , run.(time)以及wakeup()方法吧。

//是不是俺的写法带点scala风格拉?

run方法内部调用了run(time)方法,二者不分家。先看run

// main loop, runs until close is called
while (running) {
        try {
            run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

带停止标志位的循环执行,后面就剩下关闭相关操作了。

// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
    try {
        run(time.milliseconds());
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}
if (forceClose) {
    // We need to fail all the incomplete batches and wake up the threads waiting on
    // the futures.
    this.accumulator.abortIncompleteBatches();
}
try {
    this.client.close();
} catch (Exception e) {
    log.error("Failed to close network client", e);
}

关闭过程其实很值得看看,没啥可说的。看看run(time)和wakeup()吧。

    /**
 * Wake up the selector associated with this send thread
 */
public void wakeup() {
    this.client.wakeup();
}

然后就没了,client是NetworkClient,封装了网络的NIOSelector的wakeup,我觉得这个问题还是开一篇单讲了。读者就就按照注释的意思理解吧。

KafkaProducer

经历了这一切,我们从新回到KafkaProducer 来。似乎只剩下了一个close方法。
没啥好说的,就到这里吧。

文章目錄
  1. 1. Kafka 新版Producer Java版代码阅读
    1. 1.1. 如果我来做,怎么做?
    2. 1.2. 带着这几个问题,我们来看人家怎么做的
      1. 1.2.1. 基本逻辑
      2. 1.2.2. Accumulator
      3. 1.2.3. Buffered pool
      4. 1.2.4. MemoryRecords && Compressor && RecordBatch
      5. 1.2.5. MemoryRecords
        1. 1.2.5.1. RecordBatch
        2. 1.2.5.2. Compressor
      6. 1.2.6. callback && Future && ProduceRequestResult && FutureRecordMetadata
      7. 1.2.7. ProduceRequestResult (short for PRResult)
        1. 1.2.7.1. FutureRecordMetadata &&RecordMetadata
      8. 1.2.8. 前部分小结
      9. 1.2.9. Sender
      10. 1.2.10. KafkaProducer