privatelongsendProducerData(long now) { Clustercluster= metadata.fetch(); //result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间 RecordAccumulator.ReadyCheckResultresult=this.accumulator.ready(cluster, now); if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); }
Iterator<Node> iter = result.readyNodes.iterator(); longnotReadyTimeout= Long.MAX_VALUE; while (iter.hasNext()) { Nodenode= iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
// create produce requests Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } }
/** * Check for data, waiting up to the given timeout. * * @param timeoutMs Length of time to wait, in milliseconds, which must be non-negative * @return The number of keys ready */ privateintselect(long timeoutMs)throws IOException { if (timeoutMs < 0L) thrownewIllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L) returnthis.nioSelector.selectNow(); else returnthis.nioSelector.select(timeoutMs); }
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }