The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); | V final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); | V List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); | V longnextOffset= partitionRecords.nextFetchOffset; log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset); //这里就是所谓的offset+1,也就是开头问题的答案! subscriptions.position(partitionRecords.partition, nextOffset);
privatelongsendProducerData(long now) { Clustercluster= metadata.fetch(); //result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间 RecordAccumulator.ReadyCheckResultresult=this.accumulator.ready(cluster, now); if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); }
Iterator<Node> iter = result.readyNodes.iterator(); longnotReadyTimeout= Long.MAX_VALUE; while (iter.hasNext()) { Nodenode= iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
// create produce requests Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } }
/** * Check for data, waiting up to the given timeout. * * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative * @return The number of keys ready */ privateintselect(long timeoutMs)throws IOException { if (timeoutMs < 0L) thrownewIllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L) returnthis.nioSelector.selectNow(); else returnthis.nioSelector.select(timeoutMs); }
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }
Instant first = Instant.now(); // wait some time while something happens Instant second = Instant.now(); Duration duration = Duration.between(first, second);
public Set<String> listConsumerGroups(String topic) throws InterruptedException, ExecutionException, TimeoutException { final Set<String> filteredGroups = new HashSet<>();