diff --git a/pkg/backup.go b/pkg/backup.go index 5ef37e3..3f2df6b 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -161,20 +161,32 @@ func (c *chunk) process(ctx context.Context) { myChunk, err := io.ReadAll(io.LimitReader(c.reader, backupChunk)) if err != nil { if err != io.EOF { - c.errChan <- fmt.Errorf("failed to read file: %s", err) + select { + case c.errChan <- fmt.Errorf("failed to read file: %s", err): + case <-ctx.Done(): + } return } } if len(myChunk) == 0 { // stop further reading, no data - c.contChan <- false + select { + case c.contChan <- false: + case <-ctx.Done(): + } return } else if err == io.EOF { // EOF, but we still need to process some data - c.contChan <- false + select { + case c.contChan <- false: + case <-ctx.Done(): + } } else { // allow next go routine to process the input - c.contChan <- true + select { + case c.contChan <- true: + case <-ctx.Done(): + } } chunkPath := fmt.Sprintf("%s-%05d", c.path, c.i) @@ -190,17 +202,26 @@ func (c *chunk) process(ctx context.Context) { rb := new(bytes.Buffer) zf, err := zlib.NewWriterLevel(rb, compressionLevel) if err != nil { - c.errChan <- fmt.Errorf("failed to set zlib %d compression level: %s", compressionLevel, err) + select { + case c.errChan <- fmt.Errorf("failed to set zlib %d compression level: %s", compressionLevel, err): + case <-ctx.Done(): + } return } _, err = zf.Write(myChunk) if err != nil { - c.errChan <- fmt.Errorf("failed to write zlib compressed data: %s", err) + select { + case c.errChan <- fmt.Errorf("failed to write zlib compressed data: %s", err): + case <-ctx.Done(): + } return } err = zf.Close() if err != nil { - c.errChan <- fmt.Errorf("failed to flush and close zlib compressed data: %s", err) + select { + case c.errChan <- fmt.Errorf("failed to flush and close zlib compressed data: %s", err): + case <-ctx.Done(): + } return } // free up the compressor @@ -228,7 +249,10 @@ func (c *chunk) process(ctx context.Context) { rb.Reset() if err != nil { - c.errChan <- fmt.Errorf("failed to upload %s/%s data: %s", c.containerName, chunkPath, err) + select { + case c.errChan <- fmt.Errorf("failed to upload %s/%s data: %s", c.containerName, chunkPath, err): + case <-ctx.Done(): + } return } @@ -264,7 +288,7 @@ func uploadBackup(ctx context.Context, srcImgClient, srcObjClient, dstObjClient, progressReader := progress.NewReader(imageData.readCloser) go func() { var s int64 - for p := range progress.NewTicker(context.Background(), progressReader, imageData.size, 1*time.Second) { + for p := range progress.NewTicker(ctx, progressReader, imageData.size, 1*time.Second) { s = p.N() - s speed := s / (1024 * 1024) s = p.N() diff --git a/pkg/backup_types.go b/pkg/backup_types.go index 2b4de2a..ace1e4f 100644 --- a/pkg/backup_types.go +++ b/pkg/backup_types.go @@ -138,7 +138,9 @@ func (r *volumeMeta) MarshalJSON() ([]byte, error) { if r.VolumeGlanceMetadata == nil { r.VolumeGlanceMetadata = make(map[string]string) } - return json.Marshal(r) + // Use type alias to avoid infinite recursion + type t volumeMeta + return json.Marshal((*t)(r)) } type volumeBaseMeta struct { diff --git a/pkg/image.go b/pkg/image.go index b4347fb..c2b235b 100644 --- a/pkg/image.go +++ b/pkg/image.go @@ -346,10 +346,11 @@ func migrateImage(ctx context.Context, srcImageClient, dstImageClient, srcObject if err != nil { return nil, fmt.Errorf("error getting the source image reader: %s", err) } + defer imageReader.Close() progressReader := progress.NewReader(imageReader) go func() { - for p := range progress.NewTicker(context.Background(), progressReader, srcImg.SizeBytes, 1*time.Second) { + for p := range progress.NewTicker(ctx, progressReader, srcImg.SizeBytes, 1*time.Second) { log.Printf("Image size: %d/%d (%.2f%%), remaining: %s", p.N(), p.Size(), p.Percent(), p.Remaining().Round(time.Second)) } }() @@ -359,7 +360,6 @@ func migrateImage(ctx context.Context, srcImageClient, dstImageClient, srcObject if err != nil { return nil, fmt.Errorf("failed to upload an image: %s", err) } - imageReader.Close() dstImg, err = waitForImage(ctx, dstImageClient, dstObjectClient, dstImg.ID, srcImg.SizeBytes, waitForImageSec) if err != nil { diff --git a/pkg/secrets.go b/pkg/secrets.go index 589461b..fda52be 100644 --- a/pkg/secrets.go +++ b/pkg/secrets.go @@ -121,7 +121,7 @@ func migrateSecret(ctx context.Context, srcSecretClient, dstSecretClient *gopher expiration := now.AddDate(0, 1, 0) // if source expiration is not expired, use its expiration date if srcSecret.Expiration.After(now) { - log.Printf("The expiration date for the source secret (%s) has already passed", srcSecret.Expiration) + log.Printf("The source secret has not expired yet (expiration: %s), using its expiration date", srcSecret.Expiration) expiration = srcSecret.Expiration } // if custom expiration is set, enforce it diff --git a/pkg/server.go b/pkg/server.go index 0b3c98f..9434d89 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -1002,12 +1002,23 @@ func createServerRetry(ctx context.Context, dstServerClient *gophercloud.Service dstServer, err = waitForServer(ctx, dstServerClient, dstServerID, waitForServerSec) if err != nil { + faultMsg := "" + if dstServer != nil && dstServer.Fault.Message != "" { + faultMsg = dstServer.Fault.Message + } if i == tries { // no further tries, fail right away - return nil, fmt.Errorf("failed to wait for %q target server: %s: %s", dstServerID, err, dstServer.Fault.Message) + if faultMsg != "" { + return nil, fmt.Errorf("failed to wait for %q target server: %s: %s", dstServerID, err, faultMsg) + } + return nil, fmt.Errorf("failed to wait for %q target server: %s", dstServerID, err) } // nil an error and don't delete the port - log.Printf("Failed to wait for %q target server: %s: %s", dstServerID, err, dstServer.Fault.Message) + if faultMsg != "" { + log.Printf("Failed to wait for %q target server: %s: %s", dstServerID, err, faultMsg) + } else { + log.Printf("Failed to wait for %q target server: %s", dstServerID, err) + } log.Printf("Deleting the failed %q server", dstServerID) if err := servers.Delete(ctx, dstServerClient, dstServerID).ExtractErr(); err != nil { log.Printf("Error deleting the failed %q server: %s", dstServerID, err) diff --git a/pkg/utils.go b/pkg/utils.go index aa82bb2..b194996 100644 --- a/pkg/utils.go +++ b/pkg/utils.go @@ -153,7 +153,9 @@ func newOpenStackClient(ctx context.Context, loc Location) (*gophercloud.Provide return } - if err := applicationcredentials.Delete(ctx, identityClient, userID, ac.ID).ExtractErr(); err != nil { + // Use background context for cleanup since parent ctx may be cancelled + cleanupCtx := context.Background() + if err := applicationcredentials.Delete(cleanupCtx, identityClient, userID, ac.ID).ExtractErr(); err != nil { if !gophercloud.ResponseCodeIs(err, http.StatusNotFound) { log.Printf("Failed to delete a %q temp application credential: %s", acName, err) } @@ -190,7 +192,7 @@ func reauthClient(ctx context.Context, client *gophercloud.ServiceClient, funcNa // reauth the client before the long running action to avoid openstack internal auth issues if client.ReauthFunc != nil { if err := client.Reauthenticate(ctx, client.TokenID); err != nil { - log.Printf("Failed to re-authenticate the provider client in the %s func: %v", err, funcName) + log.Printf("Failed to re-authenticate the provider client in the %s func: %v", funcName, err) } } }