Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions service/datasetworker/datasetworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,20 @@ func (w *Thread) run(ctx context.Context) (retErr error) {
}

w.stateMonitor.AddJob(job.ID, workCancel)
switch job.Type {
case model.Scan:
err = w.scan(workCtx, *job.Attachment)
case model.Pack:
err = w.pack(workCtx, *job)
case model.DagGen:
err = w.ExportDag(workCtx, *job)
// belt-and-suspenders: findJob filters attachment_id IS NOT NULL, but if a job slips through
// without its Attachment preloaded (e.g. row was orphaned between claim and preload) we'd
// otherwise nil-deref *job.Attachment below. Mark it errored and move on.
if job.Attachment == nil {
err = errors.Errorf("job %d has no attachment (orphaned by prep deletion)", job.ID)
} else {
switch job.Type {
case model.Scan:
err = w.scan(workCtx, *job.Attachment)
case model.Pack:
err = w.pack(workCtx, *job)
case model.DagGen:
err = w.ExportDag(workCtx, *job)
}
}
w.stateMonitor.RemoveJob(job.ID)
if workCtx.Err() != nil && ctx.Err() == nil {
Expand Down
8 changes: 6 additions & 2 deletions service/datasetworker/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ func (w *Thread) findJob(ctx context.Context, typesOrdered []model.JobType) (*mo
for _, jobType := range typesOrdered {
err := database.DoRetry(ctx, func() error {
return db.Transaction(func(db *gorm.DB) error {
// First, lock and claim the job without preloading (preload can interfere with locking)
// First, lock and claim the job without preloading (preload can interfere with locking).
// attachment_id IS NOT NULL skips orphans: jobs.attachment_id is ON DELETE SET NULL, so
// deleting a prep leaves its jobs behind with state=ready and a null FK. Claiming one
// would nil-deref *job.Attachment in the run loop. The healthcheck sweep deletes these
// eventually but it races worker pickup -- filter here too.
err := db.Clauses(clause.Locking{
Strength: "UPDATE",
Options: "SKIP LOCKED",
}).
Where("type = ? AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing).
Where("type = ? AND attachment_id IS NOT NULL AND (state = ? OR (state = ? AND worker_id IS NULL))", jobType, model.Ready, model.Processing).
First(&job).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down
32 changes: 32 additions & 0 deletions service/datasetworker/find_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,38 @@ import (
"gorm.io/gorm"
)

// A job whose preparation was deleted has attachment_id NULL (SET NULL cascade)
// and stays Ready until the healthcheck reaper sweeps it. findJob must skip these
// rather than claim them and nil-deref *job.Attachment downstream.
func TestFindWorkSkipsOrphanedJob(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
thread := &Thread{
dbNoContext: db,
config: Config{EnableScan: true, EnablePack: true, EnableDag: true},
logger: logger.With("test", true),
id: uuid.New(),
}

_, err := healthcheck.Register(ctx, thread.dbNoContext, thread.id, model.DatasetWorker, true)
require.NoError(t, err)

for _, jt := range []model.JobType{model.Scan, model.Pack, model.DagGen} {
err = db.Create(&model.Job{
AttachmentID: nil,
State: model.Ready,
Type: jt,
}).Error
require.NoError(t, err)
}

for _, jt := range []model.JobType{model.Scan, model.Pack, model.DagGen} {
found, err := thread.findJob(ctx, []model.JobType{jt})
require.NoError(t, err)
require.Nil(t, found, "orphaned %s job must not be claimed", jt)
}
})
}

func TestFindPackWork(t *testing.T) {
testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
thread := &Thread{
Expand Down
Loading