diff --git a/pack/pack.go b/pack/pack.go index 9e063702..0fdc8dbf 100644 --- a/pack/pack.go +++ b/pack/pack.go @@ -88,6 +88,11 @@ func Pack( job model.Job, ) (*model.Car, error) { db = db.WithContext(ctx) + // Attachment is nil when the job was orphaned by a deleted preparation; guard + // against the SET NULL cascade racing the reaper rather than panicking. + if job.Attachment == nil || job.Attachment.Preparation == nil { + return nil, errors.Errorf("job %d has no attachment, likely orphaned by a deleted preparation", job.ID) + } pieceSize := job.Attachment.Preparation.GetMinPieceSize() // storageWriter can be nil for inline preparation storageID, storageWriter, err := storagesystem.GetRandomOutputWriter(ctx, job.Attachment.Preparation.OutputStorages) diff --git a/service/datasetworker/find.go b/service/datasetworker/find.go index 92a79e92..abb83b61 100644 --- a/service/datasetworker/find.go +++ b/service/datasetworker/find.go @@ -38,7 +38,9 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo Strength: "UPDATE", Options: "SKIP LOCKED", }). - Where("type = ? AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). + // attachment_id IS NOT NULL skips jobs orphaned by a deleted + // preparation (SET NULL cascade); the reaper deletes those. + Where("type = ? AND attachment_id IS NOT NULL AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing). First(&job).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/service/datasetworker/find_test.go b/service/datasetworker/find_test.go index 8add3e37..52757797 100644 --- a/service/datasetworker/find_test.go +++ b/service/datasetworker/find_test.go @@ -67,3 +67,30 @@ func TestFindPackWork(t *testing.T) { require.Equal(t, thread.id.String(), *existing.WorkerID) }) } + +// A job orphaned by a deleted preparation (attachment_id NULL via SET NULL +// cascade) must be skipped, not claimed, so it never reaches pack.Pack. +func TestFindPackWorkSkipsOrphanedJob(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + thread := &Thread{ + dbNoContext: db, + config: Config{EnablePack: true}, + logger: logger.With("test", true), + id: uuid.New(), + } + + _, err := healthcheck.Register(ctx, thread.dbNoContext, thread.id, model.DatasetWorker, true) + require.NoError(t, err) + + err = db.Create(&model.Job{ + AttachmentID: nil, + State: model.Ready, + Type: model.Pack, + }).Error + require.NoError(t, err) + + found, err := thread.findJob(ctx, []model.JobType{model.Pack}) + require.NoError(t, err) + require.Nil(t, found) + }) +}