From a61d4c406681b6ff986763d275b0059f0aa99c33 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Fri, 19 Jun 2026 22:24:12 +0200 Subject: [PATCH] datasetworker: skip orphaned jobs left by deleted preparations Co-Authored-By: JAG-UK --- service/datasetworker/datasetworker.go | 21 +++++++++++------ service/datasetworker/find.go | 8 +++++-- service/datasetworker/find_test.go | 32 ++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/service/datasetworker/datasetworker.go b/service/datasetworker/datasetworker.go index dffc68981..241c42344 100644 --- a/service/datasetworker/datasetworker.go +++ b/service/datasetworker/datasetworker.go @@ -299,13 +299,20 @@ func (w *Thread) run(ctx context.Context) (retErr error) { } w.stateMonitor.AddJob(job.ID, workCancel) - switch job.Type { - case model.Scan: - err = w.scan(workCtx, *job.Attachment) - case model.Pack: - err = w.pack(workCtx, *job) - case model.DagGen: - err = w.ExportDag(workCtx, *job) + // belt-and-suspenders: findJob filters attachment_id IS NOT NULL, but if a job slips through + // without its Attachment preloaded (e.g. row was orphaned between claim and preload) we'd + // otherwise nil-deref *job.Attachment below. Mark it errored and move on. + if job.Attachment == nil { + err = errors.Errorf("job %d has no attachment (orphaned by prep deletion)", job.ID) + } else { + switch job.Type { + case model.Scan: + err = w.scan(workCtx, *job.Attachment) + case model.Pack: + err = w.pack(workCtx, *job) + case model.DagGen: + err = w.ExportDag(workCtx, *job) + } } w.stateMonitor.RemoveJob(job.ID) if workCtx.Err() != nil && ctx.Err() == nil { diff --git a/service/datasetworker/find.go b/service/datasetworker/find.go index 92a79e92f..d9dce6bc9 100644 --- a/service/datasetworker/find.go +++ b/service/datasetworker/find.go @@ -33,12 +33,16 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo for _, jobType := range typesOrdered { err := database.DoRetry(ctx, func() error { return db.Transaction(func(db *gorm.DB) error { - // First, lock and claim the job without preloading (preload can interfere with locking) + // First, lock and claim the job without preloading (preload can interfere with locking). + // attachment_id IS NOT NULL skips orphans: jobs.attachment_id is ON DELETE SET NULL, so + // deleting a prep leaves its jobs behind with state=ready and a null FK. Claiming one + // would nil-deref *job.Attachment in the run loop. The healthcheck sweep deletes these + // eventually but it races worker pickup -- filter here too. err := db.Clauses(clause.Locking{ Strength: "UPDATE", Options: "SKIP LOCKED", }). - Where("type = ? AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). + Where("type = ? AND attachment_id IS NOT NULL AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). First(&job).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/service/datasetworker/find_test.go b/service/datasetworker/find_test.go index 8add3e37a..04b2b3e1c 100644 --- a/service/datasetworker/find_test.go +++ b/service/datasetworker/find_test.go @@ -13,6 +13,38 @@ import ( "gorm.io/gorm" ) +// A job whose preparation was deleted has attachment_id NULL (SET NULL cascade) +// and stays Ready until the healthcheck reaper sweeps it. findJob must skip these +// rather than claim them and nil-deref *job.Attachment downstream. +func TestFindWorkSkipsOrphanedJob(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + thread := &Thread{ + dbNoContext: db, + config: Config{EnableScan: true, EnablePack: true, EnableDag: true}, + logger: logger.With("test", true), + id: uuid.New(), + } + + _, err := healthcheck.Register(ctx, thread.dbNoContext, thread.id, model.DatasetWorker, true) + require.NoError(t, err) + + for _, jt := range []model.JobType{model.Scan, model.Pack, model.DagGen} { + err = db.Create(&model.Job{ + AttachmentID: nil, + State: model.Ready, + Type: jt, + }).Error + require.NoError(t, err) + } + + for _, jt := range []model.JobType{model.Scan, model.Pack, model.DagGen} { + found, err := thread.findJob(ctx, []model.JobType{jt}) + require.NoError(t, err) + require.Nil(t, found, "orphaned %s job must not be claimed", jt) + } + }) +} + func TestFindPackWork(t *testing.T) { testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { thread := &Thread{