From 2654f63eaecbf2479f900e53491c8150445202c8 Mon Sep 17 00:00:00 2001 From: JAG-UK Date: Mon, 29 Jun 2026 09:20:07 +0100 Subject: [PATCH] datasetworker: skip pack jobs orphaned by deleted preparation A deleted preparation sets jobs.attachment_id to NULL (SET NULL cascade) and leaves the reaper to delete the rows later. findJob did not exclude those, so an orphaned pack job got claimed and pack.Pack dereferenced the nil Attachment, panicking the whole worker in a crash loop. Skip orphans when claiming, and guard pack.Pack defensively. Co-Authored-By: Claude Opus 4.8 --- pack/pack.go | 5 +++++ service/datasetworker/find.go | 4 +++- service/datasetworker/find_test.go | 27 +++++++++++++++++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/pack/pack.go b/pack/pack.go index 9e0637029..0fdc8dbfe 100644 --- a/pack/pack.go +++ b/pack/pack.go @@ -88,6 +88,11 @@ func Pack( job model.Job, ) (*model.Car, error) { db = db.WithContext(ctx) + // Attachment is nil when the job was orphaned by a deleted preparation; guard + // against the SET NULL cascade racing the reaper rather than panicking. + if job.Attachment == nil || job.Attachment.Preparation == nil { + return nil, errors.Errorf("job %d has no attachment, likely orphaned by a deleted preparation", job.ID) + } pieceSize := job.Attachment.Preparation.GetMinPieceSize() // storageWriter can be nil for inline preparation storageID, storageWriter, err := storagesystem.GetRandomOutputWriter(ctx, job.Attachment.Preparation.OutputStorages) diff --git a/service/datasetworker/find.go b/service/datasetworker/find.go index 92a79e92f..abb83b61e 100644 --- a/service/datasetworker/find.go +++ b/service/datasetworker/find.go @@ -38,7 +38,9 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo Strength: "UPDATE", Options: "SKIP LOCKED", }). - Where("type = ? AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). + // attachment_id IS NOT NULL skips jobs orphaned by a deleted + // preparation (SET NULL cascade); the reaper deletes those. + Where("type = ? AND attachment_id IS NOT NULL AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). First(&job).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/service/datasetworker/find_test.go b/service/datasetworker/find_test.go index 8add3e37a..527577975 100644 --- a/service/datasetworker/find_test.go +++ b/service/datasetworker/find_test.go @@ -67,3 +67,30 @@ func TestFindPackWork(t *testing.T) { require.Equal(t, thread.id.String(), *existing.WorkerID) }) } + +// A job orphaned by a deleted preparation (attachment_id NULL via SET NULL +// cascade) must be skipped, not claimed, so it never reaches pack.Pack. +func TestFindPackWorkSkipsOrphanedJob(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + thread := &Thread{ + dbNoContext: db, + config: Config{EnablePack: true}, + logger: logger.With("test", true), + id: uuid.New(), + } + + _, err := healthcheck.Register(ctx, thread.dbNoContext, thread.id, model.DatasetWorker, true) + require.NoError(t, err) + + err = db.Create(&model.Job{ + AttachmentID: nil, + State: model.Ready, + Type: model.Pack, + }).Error + require.NoError(t, err) + + found, err := thread.findJob(ctx, []model.JobType{model.Pack}) + require.NoError(t, err) + require.Nil(t, found) + }) +}