@@ -14,38 +14,43 @@ import (
1414)
1515
1616type Update struct {
17- IndexFile * IndexedFile
18- Progress float64
19- Done bool
17+ IndexFile * IndexedFile
18+ Retry bool
19+ RemoveTakenFlag bool
20+ Progress float64
21+ Bytes int64
22+ Done bool
2023}
2124
2225type Downloader struct {
23- RateLimit * int
24- state * InstallerState
25- ctx context.Context
26- cancel context.CancelFunc
27- client * grab.Client
28- reqch chan * grab.Request
29- respch chan * grab.Response
30- updatech chan * Update
31- workerWg sync.WaitGroup
32- responderWg sync.WaitGroup
33- updaterWg sync.WaitGroup
34- running bool
35- started bool
36- installPath string
26+ RateLimit * int
27+ state * InstallerState
28+ ctx context.Context
29+ cancel context.CancelFunc
30+ client * grab.Client
31+ reqch chan * grab.Request
32+ respch chan * grab.Response
33+ updatech chan * Update
34+ workerWg sync.WaitGroup
35+ responderWg sync.WaitGroup
36+ updaterWg sync.WaitGroup
37+ newRequestWg sync.WaitGroup
38+ running bool
39+ started bool
40+ installPath string
3741}
3842
3943func NewDownloader (state * InstallerState ) * Downloader {
4044 d := & Downloader {
41- RateLimit : nil ,
42- state : state ,
43- client : grab .NewClient (),
44- workerWg : sync.WaitGroup {},
45- responderWg : sync.WaitGroup {},
46- updaterWg : sync.WaitGroup {},
47- running : false ,
48- started : false ,
45+ RateLimit : nil ,
46+ state : state ,
47+ client : grab .NewClient (),
48+ workerWg : sync.WaitGroup {},
49+ responderWg : sync.WaitGroup {},
50+ updaterWg : sync.WaitGroup {},
51+ newRequestWg : sync.WaitGroup {},
52+ running : false ,
53+ started : false ,
4954 }
5055
5156 return d
@@ -97,21 +102,48 @@ func (d *Downloader) Resume() error {
97102 d .updatech <- & Update {
98103 IndexFile : f ,
99104 Progress : resp .Progress (),
105+ Bytes : resp .BytesComplete (),
100106 Done : false ,
101107 }
102108 case <- resp .Done :
103109 // Done, check for error
110+ f := resp .Request .Tag .(* IndexedFile )
104111 err := resp .Err ()
105112 if err != nil {
106113 fmt .Println (err .Error ())
107- // Bad download
114+ if err .Error () == "context canceled" {
115+ d .updatech <- & Update {
116+ IndexFile : f ,
117+ Retry : false ,
118+ RemoveTakenFlag : true ,
119+ Progress : 1 ,
120+ Bytes : 0 ,
121+ Done : true ,
122+ }
123+ return
124+ } else {
125+ // Bad download, retry if below 3 retries
126+ if f .RetryCount < 3 {
127+ f .RetryCount += 1
128+ d .updatech <- & Update {
129+ IndexFile : f ,
130+ Retry : true ,
131+ RemoveTakenFlag : false ,
132+ Progress : 1 ,
133+ Bytes : 0 ,
134+ Done : true ,
135+ }
136+ }
137+ }
108138 } else {
109139 // Successful download, notify UI updater
110- f := resp .Request .Tag .(* IndexedFile )
111140 d .updatech <- & Update {
112- IndexFile : f ,
113- Progress : 1 ,
114- Done : true ,
141+ IndexFile : f ,
142+ Retry : false ,
143+ RemoveTakenFlag : false ,
144+ Progress : 1 ,
145+ Bytes : resp .BytesComplete (),
146+ Done : true ,
115147 }
116148 }
117149 return
@@ -127,6 +159,7 @@ func (d *Downloader) Resume() error {
127159 d .updaterWg .Add (1 )
128160 go func () {
129161 defer d .updaterWg .Done ()
162+
130163 // Create UI files
131164 uifiles := make ([]* UiFile , 4 )
132165 for i := 0 ; i < 4 ; i ++ {
@@ -137,14 +170,117 @@ func (d *Downloader) Resume() error {
137170 }
138171 }
139172
173+ // Create speed handler
174+ speedch := make (chan int64 , 4 )
175+ defer close (speedch )
176+ // Set up download speed handler to store records and update average
177+ d .updaterWg .Add (1 )
178+ go func () {
179+ defer d .updaterWg .Done ()
180+ // Set UI speed value to 0 when downloader stops
181+ defer func () { _ = d .state .formatDownloadSpeed .Set ("0.0B/s" ) }()
182+ // Track 6 speed records to get average
183+ bytePerSecondRecords := make ([]float64 , 0 )
184+
185+ queue := func (bytesPerSecond float64 ) {
186+ if len (bytePerSecondRecords ) > 6 {
187+ // Remove first element
188+ bytePerSecondRecords = bytePerSecondRecords [1 :]
189+ }
190+ // Add new record
191+ bytePerSecondRecords = append (bytePerSecondRecords , bytesPerSecond )
192+ }
193+
194+ averageSpeed := func () float64 {
195+ if len (bytePerSecondRecords ) == 0 {
196+ return 0
197+ }
198+
199+ sum := float64 (0 )
200+ for _ , num := range bytePerSecondRecords {
201+ sum += num
202+ }
203+
204+ return sum / float64 (len (bytePerSecondRecords ))
205+ }
206+
207+ totalBytes := int64 (0 )
208+ lastByteRecord := int64 (0 )
209+ lastUpdate := time .Now ()
210+
211+ for bytes := range speedch {
212+ totalBytes += bytes
213+
214+ curTime := time .Now ()
215+ secondsDiff := curTime .Sub (lastUpdate ).Seconds ()
216+ if secondsDiff > 0.5 {
217+ byteDiff := totalBytes - lastByteRecord
218+ lastByteRecord = totalBytes
219+ lastUpdate = curTime
220+ // Push record
221+ queue (float64 (byteDiff ) / secondsDiff )
222+ // Update UI
223+ d .state .downloadSpeed = averageSpeed ()
224+ _ = d .state .formatDownloadSpeed .Set (FormatBytes (int64 (d .state .downloadSpeed )) + "/s" )
225+ }
226+ }
227+ }()
228+
229+ _ = d .state .fileTitle1 .Set ("None" )
230+ _ = d .state .fileProgress1 .Set (0 )
231+ _ = d .state .fileTitle2 .Set ("None" )
232+ _ = d .state .fileProgress2 .Set (0 )
233+ _ = d .state .fileTitle3 .Set ("None" )
234+ _ = d .state .fileProgress3 .Set (0 )
235+ _ = d .state .fileTitle4 .Set ("None" )
236+ _ = d .state .fileProgress4 .Set (0 )
237+
140238 for update := range d .updatech {
141- fmt .Println (fmt .Sprintf ("path: %s, progress: %f" , update .IndexFile .Filepath , update .Progress ))
239+ // Failed download because of context cancel, remove taken flag instead, ignore ui update
240+ if update .RemoveTakenFlag {
241+ err = d .state .Repo .ClearTaken (update .IndexFile )
242+ if err != nil {
243+ dialog .NewError (& DatabaseError {err }, d .state .window ).Show ()
244+ }
245+ continue
246+ }
247+
248+ // Immediately retry file if asked, ignore ui update
249+ if update .Retry {
250+ d .newRequestWg .Add (1 )
251+ go func () {
252+ defer d .newRequestWg .Done ()
253+ select {
254+ case <- d .ctx .Done ():
255+ {
256+ // Kill request channel since we are sole sender
257+ close (d .reqch )
258+ return
259+ }
260+ default :
261+ {
262+ req , err := d .NewRequest (update .IndexFile )
263+ if err != nil {
264+ dialog .NewError (err , d .state .window ).Show ()
265+ }
266+ d .reqch <- req
267+ }
268+ }
269+ }()
270+ continue
271+ }
272+
142273 // Update UI element
143274 updateIdx := - 1
144275 for idx , f := range uifiles {
145276 if f .Filepath == update .IndexFile .Filepath {
277+ // Send bytes update to speed handler
278+ bytesDiff := f .Bytes - update .Bytes
279+ speedch <- bytesDiff
280+ // Update ui file
146281 f .Progress = update .Progress
147282 f .Done = update .Done
283+ f .Bytes = update .Bytes
148284 updateIdx = idx
149285 break
150286 }
@@ -153,9 +289,14 @@ func (d *Downloader) Resume() error {
153289 // Didn't find existing entry, find an older one to replace
154290 for idx , f := range uifiles {
155291 if f .Done == true {
292+ // Send bytes update to speed handler
293+ bytesDiff := f .Bytes
294+ speedch <- bytesDiff
295+ // Update ui file
156296 f .Filepath = update .IndexFile .Filepath
157297 f .Progress = update .Progress
158298 f .Done = update .Done
299+ f .Bytes = update .Bytes
159300 updateIdx = idx
160301 break
161302 }
@@ -179,7 +320,6 @@ func (d *Downloader) Resume() error {
179320 _ = d .state .fileTitle4 .Set (update .IndexFile .Filepath )
180321 _ = d .state .fileProgress4 .Set (update .Progress )
181322 }
182-
183323 }
184324
185325 if update .Done {
@@ -189,13 +329,13 @@ func (d *Downloader) Resume() error {
189329 dialog .NewError (& DatabaseError {err }, d .state .window ).Show ()
190330 }
191331
192- // Add new request if still running
332+ d . newRequestWg . Add ( 1 )
193333 go func () {
334+ defer d .newRequestWg .Done ()
194335 select {
195336 case <- d .ctx .Done ():
196337 {
197- // Kill request channel since we are sole sender
198- close (d .reqch )
338+ // Context dead, assume channel closed already by parent
199339 return
200340 }
201341 default :
@@ -225,7 +365,6 @@ func (d *Downloader) Resume() error {
225365 dialog .NewError (err , d .state .window ).Show ()
226366 }
227367 progress := float64 (d .state .downloadedSize ) / float64 (d .state .totalSize )
228- fmt .Println (progress )
229368 err = d .state .progressBarTotal .Set (progress )
230369 if err != nil {
231370 dialog .NewError (err , d .state .window ).Show ()
@@ -262,8 +401,10 @@ func (d *Downloader) Stop() {
262401
263402 // Stop new and current requests
264403 d .cancel ()
404+ d .newRequestWg .Wait ()
405+ close (d .reqch )
265406
266- // Wait for all workers to exit
407+ // Let workers try to trigger exit first
267408 d .workerWg .Wait ()
268409 close (d .respch )
269410
0 commit comments