Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 11 additions & 25 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,27 @@ jobs:
strategy:
matrix:
runner:
- os: macos-14 # macos sonoma on m1; 3 vcpu, 7 GB memory
- os: macos-26 # macos tahoe on m1; 3 vcpu, 7 GB memory
base-nixpkgs-channel: nixpkgs-25.05-darwin
- os: ubuntu-22.04 # linux x86_64; 4 vcpu, 16 GB memory
base-nixpkgs-channel: nixos-25.05
ghc:
- ghc-9-12
runs-on: ${{ matrix.runner.os }}
steps:
# see write-up @ https://github.com/marketplace/actions/maximize-build-disk-space
# GitHub-hosted ubuntu runners don't have enough free disk for the nix
# store; this removes preinstalled software to make room.
- name: Reclaim space
if: matrix.runner.os == 'ubuntu-22.04'
uses: easimon/maximize-build-space@v10
uses: jlumbroso/free-disk-space@v1.3.1
with:
# you can try to increase this if the ubuntu runner runs out of space.
# prob can't push it too far above 30GB though. the github actions runners
# have a small disk size, and if we want more we'll have to look into
# larger runners or self-hosted runners. caveat - larger runners aren't
# included in our enterprise plan and _will_ cost extra!
# https://docs.github.com/en/actions/using-github-hosted-runners/using-larger-runners/about-larger-runners#machine-sizes-for-larger-runners
#
# alternately, we can think about moving the nix store off of root and onto
# the larger `/dev/sdb1` partition.
#
# all of the above - including moving the nix store, should you choose
# to do that - is ultimately very hacky and unfortunately dependent
# on lower-level implementation that we have very little control over.
# nothing is stopping github from e.g. changing disk layout, software
# contents, os packages, etc. this is especially true if we use newer
# `ubuntu-xx.yy` runners.
root-reserve-mb: 30720
remove-dotnet: 'true'
remove-android: 'true'
remove-haskell: 'true' # we use nix to provide this anyway
remove-codeql: 'true'
remove-docker-images: 'true'
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: true

- name: Install Nix
uses: cachix/install-nix-action@v30
Expand Down
72 changes: 72 additions & 0 deletions nri-kafka/docs/known-issues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Known issues

## `sendSync` hangs forever on delivery failure

### Where

`src/Kafka.hs`, `mkHandler` → `Internal.sendSync` (around line 199–208) and
`sendHelperAsync` (around line 248–275).

### Symptom

A caller of `Kafka.sendSync` blocks indefinitely if the message ultimately
fails to deliver (e.g. exceeded `delivery.timeout.ms`, retries exhausted,
non-retriable broker error, partition leader unavailable). The caller never
gets an error and never returns.

### Why

`sendSync` blocks on a `TMVar` that is signalled by an `onDeliveryCallback`.
The callback installed in `sendHelperAsync` only signals on the success branch:

```haskell
\deliveryReport -> do
log <- Platform.silentHandler
Task.perform log <|
case deliveryReport of
Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback
_ -> Task.succeed () -- <-- failure path is a silent no-op
```

So when librdkafka emits a `DeliveryFailure` (or any other non-success report),
the TMVar is never written and `TMVar.readTMVar terminator` in `sendSync`
parks forever.

### Why this matters

`acks=all` and `enable.idempotence=true` are both hardcoded, so the success
path is robust — but failure paths exist (timeout, retries exhausted, broker
returning a non-retriable error). In production these are rare but not
impossible, and when they hit, the calling request handler hangs until killed
or its surrounding timeout fires (if any). It also mutes errors from
observability — the caller doesn't get to log/report the failure.

### What a fix looks like

The callback should signal the TMVar in *both* branches and the `Internal.Msg`
path should carry the error to the caller. Roughly:

1. Make the terminator carry a `Result Internal.Error ()` instead of a unit
`Terminate`, or use a second TMVar for the error.
2. In the failure branch, package the `DeliveryReport` (which contains the
`KafkaError`) and write it to the terminator.
3. `sendSync` reads the result and returns `Task.fail` on the error case.

Whatever shape the fix takes, it must preserve the existing success-path
contract (TMVar signalled exactly once after broker confirmation).

### What we're NOT doing yet

This was discovered while diagnosing the unrelated 100ms-poll-interval latency
problem (see `pollEvents` rewrite in `src/Kafka.hs`). It is **not fixed** in
that change because the latency fix is purely a polling-loop change and we
wanted to land it without expanding scope. The hang-forever issue should be
its own follow-up so the test surface stays small and the change is
auditable on its own.

### Links

- librdkafka delivery-report semantics: <https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#producer-api>
- hw-kafka-client `DeliveryReport` constructors: see
`Kafka.Producer.Types` in hw-kafka-client (hidden in 4.x but the type is
re-exported).
53 changes: 52 additions & 1 deletion nri-kafka/nri-kafka.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.37.0.
-- This file has been generated from package.yaml by hpack version 0.38.3.
--
-- see: https://github.com/sol/hpack

Expand Down Expand Up @@ -31,6 +31,10 @@ flag pause-resume-bug
manual: False
default: False

flag sync-write-benchmark
manual: False
default: False

library
exposed-modules:
Kafka
Expand Down Expand Up @@ -183,6 +187,53 @@ executable pause-resume-bug-producer
else
buildable: False

executable sync-write-benchmark
main-is: Main.hs
other-modules:
Paths_nri_kafka
hs-source-dirs:
scripts/sync-write-benchmark
default-extensions:
DataKinds
DeriveGeneric
ExtendedDefaultRules
FlexibleContexts
FlexibleInstances
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
NoImplicitPrelude
NumericUnderscores
OverloadedStrings
PartialTypeSignatures
ScopedTypeVariables
Strict
TypeOperators
ghc-options: -Wall -Wcompat -Widentities -Wincomplete-record-updates -Wpartial-fields -Wredundant-constraints -Wincomplete-uni-patterns -fno-warn-type-defaults -fplugin=NriPrelude.Plugin -threaded -rtsopts "-with-rtsopts=-N -T" -O2 -main-is Main
build-depends:
aeson >=2.0 && <2.3
, async >=2.2.2 && <2.3
, base >=4.18 && <4.22
, bytestring >=0.10.8.2 && <0.13
, conduit >=1.3.0 && <1.4
, containers >=0.6.0.1 && <0.8
, hw-kafka-client >=4.0.3 && <5.0
, nri-env-parser >=0.1.0.0 && <0.5
, nri-kafka
, nri-observability >=0.1.1.1 && <0.5
, nri-prelude >=0.1.0.0 && <0.7
, safe-exceptions >=0.1.7.0 && <1.3
, stm >=2.4 && <2.6
, text >=1.2.3.1 && <2.2
, time >=1.8.0.2 && <2
, unix >=2.7.2.2 && <2.9
, uuid >=1.3.0 && <1.4
default-language: Haskell2010
if flag(sync-write-benchmark)
buildable: True
else
buildable: False

test-suite tests
type: exitcode-stdio-1.0
main-is: Main.hs
Expand Down
18 changes: 18 additions & 0 deletions nri-kafka/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ executables:
buildable: true
else:
buildable: false
sync-write-benchmark:
source-dirs: scripts/sync-write-benchmark
main: Main
dependencies:
- nri-kafka
ghc-options:
- -threaded
- -rtsopts "-with-rtsopts=-N -T"
- -O2
when:
- condition: flag(sync-write-benchmark)
then:
buildable: true
else:
buildable: false
default-extensions:
- DataKinds
- DeriveGeneric
Expand Down Expand Up @@ -103,3 +118,6 @@ flags:
pause-resume-bug:
default: false
manual: false
sync-write-benchmark:
default: false
manual: false
132 changes: 132 additions & 0 deletions nri-kafka/scripts/sync-write-benchmark/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
{-# LANGUAGE DisambiguateRecordFields #-}

-- | Benchmark for `Kafka.sendSync` per-message latency.
--
-- Measures wall time of N sequential synchronous produces against the broker
-- configured by 'KAFKA_BROKER_ADDRESSES'. Run with the same env on master
-- (baseline) and on the fix branch (after) to compare the polling-induced
-- latency floor.
--
-- > cabal run sync-write-benchmark -fsync-write-benchmark
--
-- Tunables:
-- BENCHMARK_TOPIC (default: nri-kafka-sync-benchmark)
-- BENCHMARK_MESSAGE_COUNT (default: 1000)
-- BENCHMARK_WARMUP (default: 50) -- excluded from stats
module Main where

import qualified Conduit
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.List
import qualified Data.Word
import qualified Environment
import GHC.Clock (getMonotonicTimeNSec)
import qualified Kafka
import qualified System.Environment
import Text.Printf (printf)
import Prelude (IO, String, error, fromIntegral, pure, putStrLn)
import qualified Prelude

newtype Sample = Sample {idx :: Int}
deriving (Generic)

instance FromJSON Sample

instance ToJSON Sample

main :: IO ()
main = do
topic <- envText "BENCHMARK_TOPIC" "nri-kafka-sync-benchmark"
count <- envInt "BENCHMARK_MESSAGE_COUNT" 1000
warmup <- envInt "BENCHMARK_WARMUP" 50

settings <- Environment.decode Kafka.decoder
logHandler <- Platform.silentHandler

putStrLn
( printf
"Benchmarking sync writes: warmup=%d count=%d topic=%s"
warmup
count
(Text.toList topic)
)

Conduit.withAcquire (Kafka.handler settings) <| \handler -> do
putStrLn "Warming up..."
Prelude.mapM_ (sendOne logHandler handler topic) [1 .. warmup]

putStrLn (printf "Measuring %d sends..." count)
samples <-
Prelude.traverse
(timeOne logHandler handler topic)
[warmup + 1 .. warmup + count]

putStrLn (formatStats samples)

envText :: String -> Text -> IO Text
envText name def = do
v <- System.Environment.lookupEnv name
pure
( case v of
Prelude.Just s -> Text.fromList s
Prelude.Nothing -> def
)

envInt :: String -> Int -> IO Int
envInt name def = do
v <- System.Environment.lookupEnv name
pure
( case v of
Prelude.Just s -> case Prelude.reads s of
[(i, "")] -> i
_ -> def
Prelude.Nothing -> def
)

mkMsg :: Text -> Int -> Kafka.Msg
mkMsg topic i =
Kafka.emptyMsg topic
|> Kafka.addPayload (Sample {idx = i})
-- Pin all messages to one partition so partition variability doesn't
-- pollute the latency distribution.
|> Kafka.addKey "benchmark"

sendOne :: Platform.LogHandler -> Kafka.Handler -> Text -> Int -> IO ()
sendOne logHandler handler topic i = do
result <- Task.attempt logHandler (Kafka.sendSync handler (mkMsg topic i))
case result of
Ok _ -> pure ()
Err err -> error (Text.toList err)

timeOne :: Platform.LogHandler -> Kafka.Handler -> Text -> Int -> IO Data.Word.Word64
timeOne logHandler handler topic i = do
let msg = mkMsg topic i
t0 <- getMonotonicTimeNSec
result <- Task.attempt logHandler (Kafka.sendSync handler msg)
t1 <- getMonotonicTimeNSec
case result of
Ok _ -> pure (t1 - t0)
Err err -> error (Text.toList err)

formatStats :: [Data.Word.Word64] -> String
formatStats samples =
let sorted = Data.List.sort samples
n = Prelude.length sorted
total = Prelude.sum sorted
avg = nsToMs (total `Prelude.div` fromIntegral n)
Comment on lines +111 to +116
pct :: Prelude.Double -> Prelude.Double
pct p =
let i = Prelude.min (n - 1) (Prelude.floor (Prelude.fromIntegral n Prelude.* p / 100))
in nsToMs (sorted Prelude.!! i)
in printf
"count=%d min=%.1fms avg=%.1fms p50=%.1fms p95=%.1fms p99=%.1fms max=%.1fms"
n
(nsToMs (Prelude.head sorted))
avg
(pct 50)
(pct 95)
(pct 99)
(nsToMs (Prelude.last sorted))

nsToMs :: Data.Word.Word64 -> Prelude.Double
nsToMs ns = fromIntegral ns / 1_000_000
Loading
Loading