public FileRecords slice(int position, int size)throws IOException { //省略校验代码 intend=this.start + position + size; // handle integer overflow or if end is beyond the end of the file if (end < 0 || end >= start + sizeInBytes()) end = start + sizeInBytes(); returnnewFileRecords(file, channel, this.start + position, end, true); }
private Record nextFetchedRecord() { while (true) { if (records == null || !records.hasNext()) { //略 } else { Recordrecord= records.next(); // skip any records out of range if (record.offset() >= nextFetchOffset) { // we only do validation when the message should not be skipped. maybeEnsureValid(record);
// control records are not returned to the user if (!currentBatch.isControlBatch()) { return record; } else { // Increment the next fetch offset when we skip a control batch. nextFetchOffset = record.offset() + 1; } } } } }
The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); | V final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); | V List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); | V longnextOffset= partitionRecords.nextFetchOffset; log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset); //这里就是所谓的offset+1,也就是开头问题的答案! subscriptions.position(partitionRecords.partition, nextOffset);
privatelongsendProducerData(long now) { Clustercluster= metadata.fetch(); //result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间 RecordAccumulator.ReadyCheckResultresult=this.accumulator.ready(cluster, now); if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); }
Iterator<Node> iter = result.readyNodes.iterator(); longnotReadyTimeout= Long.MAX_VALUE; while (iter.hasNext()) { Nodenode= iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
// create produce requests Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } }
/** * Check for data, waiting up to the given timeout. * * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative * @return The number of keys ready */ privateintselect(long timeoutMs)throws IOException { if (timeoutMs < 0L) thrownewIllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L) returnthis.nioSelector.selectNow(); else returnthis.nioSelector.select(timeoutMs); }
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }