diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b2b703ec..75414d2c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: runner: - - os: macos-14 # macos sonoma on m1; 3 vcpu, 7 GB memory + - os: macos-26 # macos tahoe on m1; 3 vcpu, 7 GB memory base-nixpkgs-channel: nixpkgs-25.05-darwin - os: ubuntu-22.04 # linux x86_64; 4 vcpu, 16 GB memory base-nixpkgs-channel: nixos-25.05 @@ -24,33 +24,19 @@ jobs: - ghc-9-12 runs-on: ${{ matrix.runner.os }} steps: - # see write-up @ https://github.com/marketplace/actions/maximize-build-disk-space + # GitHub-hosted ubuntu runners don't have enough free disk for the nix + # store; this removes preinstalled software to make room. - name: Reclaim space if: matrix.runner.os == 'ubuntu-22.04' - uses: easimon/maximize-build-space@v10 + uses: jlumbroso/free-disk-space@v1.3.1 with: - # you can try to increase this if the ubuntu runner runs out of space. - # prob can't push it too far above 30GB though. the github actions runners - # have a small disk size, and if we want more we'll have to look into - # larger runners or self-hosted runners. caveat - larger runners aren't - # included in our enterprise plan and _will_ cost extra! - # https://docs.github.com/en/actions/using-github-hosted-runners/using-larger-runners/about-larger-runners#machine-sizes-for-larger-runners - # - # alternately, we can think about moving the nix store off of root and onto - # the larger `/dev/sdb1` partition. - # - # all of the above - including moving the nix store, should you choose - # to do that - is ultimately very hacky and unfortunately dependent - # on lower-level implementation that we have very little control over. - # nothing is stopping github from e.g. changing disk layout, software - # contents, os packages, etc. this is especially true if we use newer - # `ubuntu-xx.yy` runners. - root-reserve-mb: 30720 - remove-dotnet: 'true' - remove-android: 'true' - remove-haskell: 'true' # we use nix to provide this anyway - remove-codeql: 'true' - remove-docker-images: 'true' + tool-cache: false + android: true + dotnet: true + haskell: true + large-packages: false + docker-images: true + swap-storage: true - name: Install Nix uses: cachix/install-nix-action@v30 diff --git a/nri-kafka/docs/known-issues.md b/nri-kafka/docs/known-issues.md new file mode 100644 index 00000000..32045663 --- /dev/null +++ b/nri-kafka/docs/known-issues.md @@ -0,0 +1,72 @@ +# Known issues + +## `sendSync` hangs forever on delivery failure + +### Where + +`src/Kafka.hs`, `mkHandler` → `Internal.sendSync` (around line 199–208) and +`sendHelperAsync` (around line 248–275). + +### Symptom + +A caller of `Kafka.sendSync` blocks indefinitely if the message ultimately +fails to deliver (e.g. exceeded `delivery.timeout.ms`, retries exhausted, +non-retriable broker error, partition leader unavailable). The caller never +gets an error and never returns. + +### Why + +`sendSync` blocks on a `TMVar` that is signalled by an `onDeliveryCallback`. +The callback installed in `sendHelperAsync` only signals on the success branch: + +```haskell +\deliveryReport -> do + log <- Platform.silentHandler + Task.perform log <| + case deliveryReport of + Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback + _ -> Task.succeed () -- <-- failure path is a silent no-op +``` + +So when librdkafka emits a `DeliveryFailure` (or any other non-success report), +the TMVar is never written and `TMVar.readTMVar terminator` in `sendSync` +parks forever. + +### Why this matters + +`acks=all` and `enable.idempotence=true` are both hardcoded, so the success +path is robust — but failure paths exist (timeout, retries exhausted, broker +returning a non-retriable error). In production these are rare but not +impossible, and when they hit, the calling request handler hangs until killed +or its surrounding timeout fires (if any). It also mutes errors from +observability — the caller doesn't get to log/report the failure. + +### What a fix looks like + +The callback should signal the TMVar in *both* branches and the `Internal.Msg` +path should carry the error to the caller. Roughly: + +1. Make the terminator carry a `Result Internal.Error ()` instead of a unit + `Terminate`, or use a second TMVar for the error. +2. In the failure branch, package the `DeliveryReport` (which contains the + `KafkaError`) and write it to the terminator. +3. `sendSync` reads the result and returns `Task.fail` on the error case. + +Whatever shape the fix takes, it must preserve the existing success-path +contract (TMVar signalled exactly once after broker confirmation). + +### What we're NOT doing yet + +This was discovered while diagnosing the unrelated 100ms-poll-interval latency +problem (see `pollEvents` rewrite in `src/Kafka.hs`). It is **not fixed** in +that change because the latency fix is purely a polling-loop change and we +wanted to land it without expanding scope. The hang-forever issue should be +its own follow-up so the test surface stays small and the change is +auditable on its own. + +### Links + +- librdkafka delivery-report semantics: +- hw-kafka-client `DeliveryReport` constructors: see + `Kafka.Producer.Types` in hw-kafka-client (hidden in 4.x but the type is + re-exported). diff --git a/nri-kafka/nri-kafka.cabal b/nri-kafka/nri-kafka.cabal index 97899901..45a62622 100644 --- a/nri-kafka/nri-kafka.cabal +++ b/nri-kafka/nri-kafka.cabal @@ -1,6 +1,6 @@ cabal-version: 1.12 --- This file has been generated from package.yaml by hpack version 0.37.0. +-- This file has been generated from package.yaml by hpack version 0.38.3. -- -- see: https://github.com/sol/hpack @@ -31,6 +31,10 @@ flag pause-resume-bug manual: False default: False +flag sync-write-benchmark + manual: False + default: False + library exposed-modules: Kafka @@ -183,6 +187,53 @@ executable pause-resume-bug-producer else buildable: False +executable sync-write-benchmark + main-is: Main.hs + other-modules: + Paths_nri_kafka + hs-source-dirs: + scripts/sync-write-benchmark + default-extensions: + DataKinds + DeriveGeneric + ExtendedDefaultRules + FlexibleContexts + FlexibleInstances + GeneralizedNewtypeDeriving + MultiParamTypeClasses + NamedFieldPuns + NoImplicitPrelude + NumericUnderscores + OverloadedStrings + PartialTypeSignatures + ScopedTypeVariables + Strict + TypeOperators + ghc-options: -Wall -Wcompat -Widentities -Wincomplete-record-updates -Wpartial-fields -Wredundant-constraints -Wincomplete-uni-patterns -fno-warn-type-defaults -fplugin=NriPrelude.Plugin -threaded -rtsopts "-with-rtsopts=-N -T" -O2 -main-is Main + build-depends: + aeson >=2.0 && <2.3 + , async >=2.2.2 && <2.3 + , base >=4.18 && <4.22 + , bytestring >=0.10.8.2 && <0.13 + , conduit >=1.3.0 && <1.4 + , containers >=0.6.0.1 && <0.8 + , hw-kafka-client >=4.0.3 && <5.0 + , nri-env-parser >=0.1.0.0 && <0.5 + , nri-kafka + , nri-observability >=0.1.1.1 && <0.5 + , nri-prelude >=0.1.0.0 && <0.7 + , safe-exceptions >=0.1.7.0 && <1.3 + , stm >=2.4 && <2.6 + , text >=1.2.3.1 && <2.2 + , time >=1.8.0.2 && <2 + , unix >=2.7.2.2 && <2.9 + , uuid >=1.3.0 && <1.4 + default-language: Haskell2010 + if flag(sync-write-benchmark) + buildable: True + else + buildable: False + test-suite tests type: exitcode-stdio-1.0 main-is: Main.hs diff --git a/nri-kafka/package.yaml b/nri-kafka/package.yaml index 639a9ac0..f6cf290f 100644 --- a/nri-kafka/package.yaml +++ b/nri-kafka/package.yaml @@ -73,6 +73,21 @@ executables: buildable: true else: buildable: false + sync-write-benchmark: + source-dirs: scripts/sync-write-benchmark + main: Main + dependencies: + - nri-kafka + ghc-options: + - -threaded + - -rtsopts "-with-rtsopts=-N -T" + - -O2 + when: + - condition: flag(sync-write-benchmark) + then: + buildable: true + else: + buildable: false default-extensions: - DataKinds - DeriveGeneric @@ -103,3 +118,6 @@ flags: pause-resume-bug: default: false manual: false + sync-write-benchmark: + default: false + manual: false diff --git a/nri-kafka/scripts/sync-write-benchmark/Main.hs b/nri-kafka/scripts/sync-write-benchmark/Main.hs new file mode 100644 index 00000000..cfccd6ca --- /dev/null +++ b/nri-kafka/scripts/sync-write-benchmark/Main.hs @@ -0,0 +1,132 @@ +{-# LANGUAGE DisambiguateRecordFields #-} + +-- | Benchmark for `Kafka.sendSync` per-message latency. +-- +-- Measures wall time of N sequential synchronous produces against the broker +-- configured by 'KAFKA_BROKER_ADDRESSES'. Run with the same env on master +-- (baseline) and on the fix branch (after) to compare the polling-induced +-- latency floor. +-- +-- > cabal run sync-write-benchmark -fsync-write-benchmark +-- +-- Tunables: +-- BENCHMARK_TOPIC (default: nri-kafka-sync-benchmark) +-- BENCHMARK_MESSAGE_COUNT (default: 1000) +-- BENCHMARK_WARMUP (default: 50) -- excluded from stats +module Main where + +import qualified Conduit +import Data.Aeson (FromJSON, ToJSON) +import qualified Data.List +import qualified Data.Word +import qualified Environment +import GHC.Clock (getMonotonicTimeNSec) +import qualified Kafka +import qualified System.Environment +import Text.Printf (printf) +import Prelude (IO, String, error, fromIntegral, pure, putStrLn) +import qualified Prelude + +newtype Sample = Sample {idx :: Int} + deriving (Generic) + +instance FromJSON Sample + +instance ToJSON Sample + +main :: IO () +main = do + topic <- envText "BENCHMARK_TOPIC" "nri-kafka-sync-benchmark" + count <- envInt "BENCHMARK_MESSAGE_COUNT" 1000 + warmup <- envInt "BENCHMARK_WARMUP" 50 + + settings <- Environment.decode Kafka.decoder + logHandler <- Platform.silentHandler + + putStrLn + ( printf + "Benchmarking sync writes: warmup=%d count=%d topic=%s" + warmup + count + (Text.toList topic) + ) + + Conduit.withAcquire (Kafka.handler settings) <| \handler -> do + putStrLn "Warming up..." + Prelude.mapM_ (sendOne logHandler handler topic) [1 .. warmup] + + putStrLn (printf "Measuring %d sends..." count) + samples <- + Prelude.traverse + (timeOne logHandler handler topic) + [warmup + 1 .. warmup + count] + + putStrLn (formatStats samples) + +envText :: String -> Text -> IO Text +envText name def = do + v <- System.Environment.lookupEnv name + pure + ( case v of + Prelude.Just s -> Text.fromList s + Prelude.Nothing -> def + ) + +envInt :: String -> Int -> IO Int +envInt name def = do + v <- System.Environment.lookupEnv name + pure + ( case v of + Prelude.Just s -> case Prelude.reads s of + [(i, "")] -> i + _ -> def + Prelude.Nothing -> def + ) + +mkMsg :: Text -> Int -> Kafka.Msg +mkMsg topic i = + Kafka.emptyMsg topic + |> Kafka.addPayload (Sample {idx = i}) + -- Pin all messages to one partition so partition variability doesn't + -- pollute the latency distribution. + |> Kafka.addKey "benchmark" + +sendOne :: Platform.LogHandler -> Kafka.Handler -> Text -> Int -> IO () +sendOne logHandler handler topic i = do + result <- Task.attempt logHandler (Kafka.sendSync handler (mkMsg topic i)) + case result of + Ok _ -> pure () + Err err -> error (Text.toList err) + +timeOne :: Platform.LogHandler -> Kafka.Handler -> Text -> Int -> IO Data.Word.Word64 +timeOne logHandler handler topic i = do + let msg = mkMsg topic i + t0 <- getMonotonicTimeNSec + result <- Task.attempt logHandler (Kafka.sendSync handler msg) + t1 <- getMonotonicTimeNSec + case result of + Ok _ -> pure (t1 - t0) + Err err -> error (Text.toList err) + +formatStats :: [Data.Word.Word64] -> String +formatStats samples = + let sorted = Data.List.sort samples + n = Prelude.length sorted + total = Prelude.sum sorted + avg = nsToMs (total `Prelude.div` fromIntegral n) + pct :: Prelude.Double -> Prelude.Double + pct p = + let i = Prelude.min (n - 1) (Prelude.floor (Prelude.fromIntegral n Prelude.* p / 100)) + in nsToMs (sorted Prelude.!! i) + in printf + "count=%d min=%.1fms avg=%.1fms p50=%.1fms p95=%.1fms p99=%.1fms max=%.1fms" + n + (nsToMs (Prelude.head sorted)) + avg + (pct 50) + (pct 95) + (pct 99) + (nsToMs (Prelude.last sorted)) + +nsToMs :: Data.Word.Word64 -> Prelude.Double +nsToMs ns = fromIntegral ns / 1_000_000 diff --git a/nri-kafka/src/Kafka.hs b/nri-kafka/src/Kafka.hs index f9ffc639..83d309bd 100644 --- a/nri-kafka/src/Kafka.hs +++ b/nri-kafka/src/Kafka.hs @@ -30,7 +30,6 @@ module Kafka where import qualified Conduit -import qualified Control.Concurrent import qualified Control.Concurrent.Async as Async import qualified Control.Concurrent.STM as STM import qualified Control.Concurrent.STM.TMVar as TMVar @@ -176,13 +175,35 @@ startPollEventLoop producer = do |> Async.async Prelude.pure terminator --- | We use a little trick here to poll events, by sending an empty message batch. --- This will call the internal pollEvent function in hw-kafka-client. +-- | Drains librdkafka's event queue (delivery reports, errors, stats) so +-- per-message delivery callbacks fire promptly. 'Producer.flushProducer' is +-- internally a loop over @rd_kafka_poll(rk, 100)@ that exits when the +-- outbound queue is empty; each underlying poll blocks the OS thread up to +-- 100ms but returns immediately when an event arrives, so latency-sensitive +-- 'sendSync' callers wake on event rather than on a fixed clock tick. The +-- previous implementation used a 100ms threadDelay between non-blocking +-- drains, which added up to a 100ms (~50ms mean) wait per sync send. +-- +-- We can't import 'Kafka.Internal.Shared.pollEvents' directly to pass our own +-- timeout because that module is hidden in hw-kafka-client 4.x. +-- 'flushProducer' is the closest exposed alternative; despite its name and +-- the @closeProducer = flushProducer@ alias, it does not transition the +-- producer to a closed state — it just flushes. It is safe to call +-- repeatedly on a live producer. +-- +-- Shutdown: this loop is cancelled by 'Async.race_' when the terminator +-- TMVar is signalled. GHC can't deliver the async exception while the +-- thread is parked in 'safe' FFI, but 'flushProducer' is not a single FFI +-- call — it's a Haskell loop calling @rd_kafka_poll(rk, 100)@. Each poll +-- returns within 100ms (when idle) or immediately (on event), and control +-- briefly returns to Haskell between iterations, where any pending async +-- exception is delivered and kills the thread. So shutdown is bounded by +-- ~100ms even in the worst case (idle producer, no events flowing). The +-- 'Async.race_' cleanup itself does not wait for the loser to finish dying, +-- so we don't deadlock the release chain. pollEvents :: Producer.KafkaProducer -> Prelude.IO () pollEvents producer = do - Producer.produceMessageBatch producer [] - |> map (\_ -> ()) - Control.Concurrent.threadDelay 100_000 {- 100ms -} + Producer.flushProducer producer pollEvents producer mkHandler :: Settings.Settings -> Producer.KafkaProducer -> Prelude.IO Internal.Handler