Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions pkg/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,32 @@ func (c *chunk) process(ctx context.Context) {
myChunk, err := io.ReadAll(io.LimitReader(c.reader, backupChunk))
if err != nil {
if err != io.EOF {
c.errChan <- fmt.Errorf("failed to read file: %s", err)
select {
case c.errChan <- fmt.Errorf("failed to read file: %s", err):
case <-ctx.Done():
}
return
}
}
if len(myChunk) == 0 {
// stop further reading, no data
c.contChan <- false
select {
case c.contChan <- false:
case <-ctx.Done():
}
return
} else if err == io.EOF {
// EOF, but we still need to process some data
c.contChan <- false
select {
case c.contChan <- false:
case <-ctx.Done():
}
} else {
// allow next go routine to process the input
c.contChan <- true
select {
case c.contChan <- true:
case <-ctx.Done():
}
}

chunkPath := fmt.Sprintf("%s-%05d", c.path, c.i)
Expand All @@ -190,17 +202,26 @@ func (c *chunk) process(ctx context.Context) {
rb := new(bytes.Buffer)
zf, err := zlib.NewWriterLevel(rb, compressionLevel)
if err != nil {
c.errChan <- fmt.Errorf("failed to set zlib %d compression level: %s", compressionLevel, err)
select {
case c.errChan <- fmt.Errorf("failed to set zlib %d compression level: %s", compressionLevel, err):
case <-ctx.Done():
}
return
}
_, err = zf.Write(myChunk)
if err != nil {
c.errChan <- fmt.Errorf("failed to write zlib compressed data: %s", err)
select {
case c.errChan <- fmt.Errorf("failed to write zlib compressed data: %s", err):
case <-ctx.Done():
}
return
}
err = zf.Close()
if err != nil {
c.errChan <- fmt.Errorf("failed to flush and close zlib compressed data: %s", err)
select {
case c.errChan <- fmt.Errorf("failed to flush and close zlib compressed data: %s", err):
case <-ctx.Done():
}
return
}
// free up the compressor
Expand Down Expand Up @@ -228,7 +249,10 @@ func (c *chunk) process(ctx context.Context) {
rb.Reset()

if err != nil {
c.errChan <- fmt.Errorf("failed to upload %s/%s data: %s", c.containerName, chunkPath, err)
select {
case c.errChan <- fmt.Errorf("failed to upload %s/%s data: %s", c.containerName, chunkPath, err):
case <-ctx.Done():
}
return
}

Expand Down Expand Up @@ -264,7 +288,7 @@ func uploadBackup(ctx context.Context, srcImgClient, srcObjClient, dstObjClient,
progressReader := progress.NewReader(imageData.readCloser)
go func() {
var s int64
for p := range progress.NewTicker(context.Background(), progressReader, imageData.size, 1*time.Second) {
for p := range progress.NewTicker(ctx, progressReader, imageData.size, 1*time.Second) {
s = p.N() - s
speed := s / (1024 * 1024)
s = p.N()
Expand Down
4 changes: 3 additions & 1 deletion pkg/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func (r *volumeMeta) MarshalJSON() ([]byte, error) {
if r.VolumeGlanceMetadata == nil {
r.VolumeGlanceMetadata = make(map[string]string)
}
return json.Marshal(r)
// Use type alias to avoid infinite recursion
type t volumeMeta
return json.Marshal((*t)(r))
}

type volumeBaseMeta struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,11 @@ func migrateImage(ctx context.Context, srcImageClient, dstImageClient, srcObject
if err != nil {
return nil, fmt.Errorf("error getting the source image reader: %s", err)
}
defer imageReader.Close()

progressReader := progress.NewReader(imageReader)
go func() {
for p := range progress.NewTicker(context.Background(), progressReader, srcImg.SizeBytes, 1*time.Second) {
for p := range progress.NewTicker(ctx, progressReader, srcImg.SizeBytes, 1*time.Second) {
log.Printf("Image size: %d/%d (%.2f%%), remaining: %s", p.N(), p.Size(), p.Percent(), p.Remaining().Round(time.Second))
}
}()
Expand All @@ -359,7 +360,6 @@ func migrateImage(ctx context.Context, srcImageClient, dstImageClient, srcObject
if err != nil {
return nil, fmt.Errorf("failed to upload an image: %s", err)
}
imageReader.Close()

dstImg, err = waitForImage(ctx, dstImageClient, dstObjectClient, dstImg.ID, srcImg.SizeBytes, waitForImageSec)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func migrateSecret(ctx context.Context, srcSecretClient, dstSecretClient *gopher
expiration := now.AddDate(0, 1, 0)
// if source expiration is not expired, use its expiration date
if srcSecret.Expiration.After(now) {
log.Printf("The expiration date for the source secret (%s) has already passed", srcSecret.Expiration)
log.Printf("The source secret has not expired yet (expiration: %s), using its expiration date", srcSecret.Expiration)
expiration = srcSecret.Expiration
}
// if custom expiration is set, enforce it
Expand Down
15 changes: 13 additions & 2 deletions pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,12 +1002,23 @@ func createServerRetry(ctx context.Context, dstServerClient *gophercloud.Service

dstServer, err = waitForServer(ctx, dstServerClient, dstServerID, waitForServerSec)
if err != nil {
faultMsg := ""
if dstServer != nil && dstServer.Fault.Message != "" {
faultMsg = dstServer.Fault.Message
}
if i == tries {
// no further tries, fail right away
return nil, fmt.Errorf("failed to wait for %q target server: %s: %s", dstServerID, err, dstServer.Fault.Message)
if faultMsg != "" {
return nil, fmt.Errorf("failed to wait for %q target server: %s: %s", dstServerID, err, faultMsg)
}
return nil, fmt.Errorf("failed to wait for %q target server: %s", dstServerID, err)
}
// nil an error and don't delete the port
log.Printf("Failed to wait for %q target server: %s: %s", dstServerID, err, dstServer.Fault.Message)
if faultMsg != "" {
log.Printf("Failed to wait for %q target server: %s: %s", dstServerID, err, faultMsg)
} else {
log.Printf("Failed to wait for %q target server: %s", dstServerID, err)
}
log.Printf("Deleting the failed %q server", dstServerID)
if err := servers.Delete(ctx, dstServerClient, dstServerID).ExtractErr(); err != nil {
log.Printf("Error deleting the failed %q server: %s", dstServerID, err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func newOpenStackClient(ctx context.Context, loc Location) (*gophercloud.Provide
return
}

if err := applicationcredentials.Delete(ctx, identityClient, userID, ac.ID).ExtractErr(); err != nil {
// Use background context for cleanup since parent ctx may be cancelled
cleanupCtx := context.Background()
if err := applicationcredentials.Delete(cleanupCtx, identityClient, userID, ac.ID).ExtractErr(); err != nil {
if !gophercloud.ResponseCodeIs(err, http.StatusNotFound) {
log.Printf("Failed to delete a %q temp application credential: %s", acName, err)
}
Expand Down Expand Up @@ -190,7 +192,7 @@ func reauthClient(ctx context.Context, client *gophercloud.ServiceClient, funcNa
// reauth the client before the long running action to avoid openstack internal auth issues
if client.ReauthFunc != nil {
if err := client.Reauthenticate(ctx, client.TokenID); err != nil {
log.Printf("Failed to re-authenticate the provider client in the %s func: %v", err, funcName)
log.Printf("Failed to re-authenticate the provider client in the %s func: %v", funcName, err)
}
}
}
Expand Down