KAFKA-20585 : Share Fetch latency metric incorrectly updated during ShareAcknowledgeResponse#22296
KAFKA-20585 : Share Fetch latency metric incorrectly updated during ShareAcknowledgeResponse#22296muralibasani wants to merge 1 commit into
Conversation
nileshkumar3
left a comment
There was a problem hiding this comment.
Thanks for PR, +1 on the fix.
It looks like there is no dedicated ack latency sensor in ShareFetchMetricsManager today. If ack latency is useful to track, this may be worth a follow-up JIRA rather than reusing fetch metrics.
Also, consider adding a test asserting that fetch-rate/fetch-total do not change after a successful ShareAcknowledge response, to guard against regression.
@nileshkumar3 thanks for the review. IMO this was copy pasted by mistake and we ended up with this duplicate in ack method. We might have other metrics in these round trip situations. If you still feel it's worth adding, I will. |
Reg the follow-up jira, I think it makes sense. |
| final ShareFetchResponse response = (ShareFetchResponse) resp.responseBody(); | ||
| final ShareSessionHandler handler = sessionHandler(fetchTarget.id()); | ||
|
|
||
| metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs()); |
There was a problem hiding this comment.
should we move it to finally ?
There was a problem hiding this comment.
moving it to finally will record the metric even if above lines 817 or 818 fail (while getting response and handler), and we don't want that right.
There was a problem hiding this comment.
Given the JIRA intent of capturing top-level errors too, IMO finally is a cleaner fit. Curious what @chia7712 / @apoorvmittal10 think.
IMO, adding a regression test for this issue worths, other metrics may be a follow up. |
| final FetchResponse response = (FetchResponse) resp.responseBody(); | ||
| final FetchSessionHandler handler = sessionHandler(fetchTarget.id()); | ||
|
|
||
| metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs()); |
There was a problem hiding this comment.
moving it to finally will record the metric even if above lines 155 or 156 fail (while getting response and handler), and we don't want that right., same as above
| metrics = new Metrics(metricConfig, time); | ||
| shareFetchMetricsRegistry = new ShareFetchMetricsRegistry(metricConfig.tags().keySet(), "consumer-share" + groupId); | ||
| metricsManager = new ShareFetchMetricsManager(metrics, shareFetchMetricsRegistry); | ||
| metricsManager = spy(new ShareFetchMetricsManager(metrics, shareFetchMetricsRegistry)); |
There was a problem hiding this comment.
Using spy here because the existing tests need the real metric registry to work
| final FetchResponse response = (FetchResponse) resp.responseBody(); | ||
| final FetchSessionHandler handler = sessionHandler(fetchTarget.id()); | ||
|
|
||
| metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs()); |
There was a problem hiding this comment.
this line should fix the gap for the Async and Classic consumer I expect.
We would just need to add tests to validate it (same as the one added here for the share consumer, but using an error like FETCH_SESSION_TOPIC_ID_ERROR maybe), added to FetcherTest for the classic, and in FetchRequestManagerTest for the async.
Ref : https://issues.apache.org/jira/browse/KAFKA-20585
fetch latency metric is recorded twice. In handleShareFetchSuccess and handleShareAcknowledgeSuccess.