1111using ManagedCode . Communication ;
1212using ManagedCode . Storage . Core . Helpers ;
1313using ManagedCode . Storage . Core . Models ;
14+ using ManagedCode . Storage . Server ;
15+ using Microsoft . Extensions . Configuration ;
1416
1517namespace ManagedCode . Storage . Client ;
1618
1719public class StorageClient : IStorageClient
1820{
1921 private readonly HttpClient _httpClient ;
22+ private readonly IConfiguration _configuration ;
2023
21- public StorageClient ( HttpClient httpClient )
24+ public StorageClient ( HttpClient httpClient , IConfiguration configuration )
2225 {
2326 _httpClient = httpClient ;
27+ _configuration = configuration ;
2428 }
2529
2630 public async Task < Result < BlobMetadata > > UploadFile ( Stream stream , string apiUrl , string contentName , CancellationToken cancellationToken = default )
@@ -125,130 +129,53 @@ public async Task<Result<LocalFile>> DownloadFile(string fileName, string apiUrl
125129 }
126130 }
127131
128- public async Task < Result > UploadLargeFileUsingStream ( Stream file ,
129- string сreateApiUrl ,
132+ public async Task < Result < uint > > UploadLargeFile ( Stream file ,
130133 string uploadApiUrl ,
131134 string completeApiUrl ,
132135 Action < double > ? onProgressChanged ,
133- CancellationToken cancellationToken )
136+ CancellationToken cancellationToken = default )
134137 {
135- var bufferSize = 4096000 ; //TODO: chunk size get from config
138+ int bufferSize = Int32 . Parse ( _configuration . GetSection ( "ChunkSize" ) . Value ) ;
136139 var buffer = new byte [ bufferSize ] ;
137- int bytesRead ;
138140 int chunkIndex = 1 ;
139- var fileCRC = Crc32Helper . Calculate ( file ) ;
141+ uint fileCRC = 123214 ;
140142 var partOfProgress = file . Length / bufferSize ;
141-
142- var createdFileResponse = await _httpClient . PostAsync ( сreateApiUrl , JsonContent . Create ( file . Length ) , cancellationToken ) ;
143- var createdFile = await createdFileResponse . Content . ReadFromJsonAsync < Result < BlobMetadata > > ( cancellationToken : cancellationToken ) ;
143+ var fileName = "file" + Guid . NewGuid ( ) ;
144144
145145 var semaphore = new SemaphoreSlim ( 0 , 4 ) ;
146146 var tasks = new List < Task > ( ) ;
147+ int bytesRead ;
147148 while ( ( bytesRead = await file . ReadAsync ( buffer , 0 , buffer . Length , cancellationToken ) ) > 0 )
148149 {
149- var task = Task . Run ( ( ) =>
150+ var task = Task . Run ( async ( ) =>
150151 {
151- try
152+ using ( var memoryStream = new MemoryStream ( buffer , 0 , bytesRead ) )
152153 {
153- semaphore . WaitAsync ( cancellationToken ) ;
154- using ( var memoryStream = new MemoryStream ( buffer , 0 , bytesRead ) )
154+ var content = new StreamContent ( memoryStream ) ;
155+ using ( var formData = new MultipartFormDataContent ( ) )
155156 {
156- var content = new StreamContent ( memoryStream ) ;
157-
158- using ( var chunk = new MultipartFormDataContent ( ) )
159- {
160- chunk . Add ( content , "chunk" , createdFile . Value . FullName ) ;
161- chunk . Add ( new StringContent ( createdFile . Value . FullName ) , "Payload.BlobName" ) ;
162- chunk . Add ( new StringContent ( chunkIndex . ToString ( ) ) , "Payload.ChunkIndex" ) ;
163- chunk . Add ( new StringContent ( bufferSize . ToString ( ) ) , "Payload.ChunkSize" ) ;
164- chunk . Add ( new StringContent ( fileCRC . ToString ( ) ) , "Payload.FullCRC" ) ;
165-
166- _httpClient . PostAsync ( uploadApiUrl , chunk , cancellationToken ) ;
167- }
168-
169- onProgressChanged ? . Invoke ( partOfProgress * chunkIndex ) ;
157+ formData . Add ( content , "File" , fileName ) ;
158+ formData . Add ( new StringContent ( chunkIndex . ToString ( ) ) , "Payload.ChunkIndex" ) ;
159+ formData . Add ( new StringContent ( bufferSize . ToString ( ) ) , "Payload.ChunkSize" ) ;
160+ formData . Add ( new StringContent ( fileCRC . ToString ( ) ) , "Payload.FullCRC" ) ;
161+ await _httpClient . PostAsync ( uploadApiUrl , formData , cancellationToken ) ;
170162 }
171163 }
172- finally
173- {
174- semaphore . Release ( ) ;
175- }
164+
165+ semaphore . Release ( ) ;
176166 } , cancellationToken ) ;
177-
167+
168+ await semaphore . WaitAsync ( cancellationToken ) ;
178169 tasks . Add ( task ) ;
179-
170+ onProgressChanged ? . Invoke ( partOfProgress * chunkIndex ) ;
180171 chunkIndex ++ ;
181172 }
182173
183174 await Task . WhenAll ( tasks . ToArray ( ) ) ;
184175
185176 var mergeResult = await _httpClient . PostAsync ( completeApiUrl , JsonContent . Create (
186- new { fileCrc = fileCRC , blobName = createdFile . Value . FullName } ) , cancellationToken ) ;
187-
188- return await mergeResult . Content . ReadFromJsonAsync < Result > ( cancellationToken : cancellationToken ) ;
189- }
190-
191- public async Task < Result > UploadLargeFileUsingMerge ( Stream file ,
192- string uploadApiUrl ,
193- string mergeApiUrl ,
194- Action < double > ? onProgressChanged ,
195- CancellationToken cancellationToken )
196- {
197- var bufferSize = 4096000 ; //TODO: chunk size get from config
198- var buffer = new byte [ bufferSize ] ;
199- int bytesRead ;
200- int chunkIndex = 1 ;
201- var fileCRC = Crc32Helper . Calculate ( file ) ;
202- var partOfProgress = file . Length / bufferSize ;
203-
204- var semaphore = new SemaphoreSlim ( 0 , 4 ) ;
205- var tasks = new List < Task < HttpResponseMessage > > ( ) ;
206- while ( ( bytesRead = await file . ReadAsync ( buffer , 0 , buffer . Length , cancellationToken ) ) > 0 )
207- {
208- var task = Task . Run ( ( ) =>
209- {
210- try
211- {
212- semaphore . WaitAsync ( cancellationToken ) ;
213- using ( var memoryStream = new MemoryStream ( buffer , 0 , bytesRead ) )
214- {
215- var content = new StreamContent ( memoryStream ) ;
216-
217- using ( var chunk = new MultipartFormDataContent ( ) )
218- {
219- chunk . Add ( content , "chunk" ) ;
220- chunk . Add ( new StringContent ( chunkIndex . ToString ( ) ) , "Payload.ChunkIndex" ) ;
221- chunk . Add ( new StringContent ( bufferSize . ToString ( ) ) , "Payload.ChunkSize" ) ;
222- chunk . Add ( new StringContent ( fileCRC . ToString ( ) ) , "Payload.FullCRC" ) ;
223-
224- var result = _httpClient . PostAsync ( uploadApiUrl , chunk , cancellationToken ) ;
225- onProgressChanged ? . Invoke ( partOfProgress * chunkIndex ) ;
226-
227- return result ;
228- }
229- }
230- }
231- finally
232- {
233- semaphore . Release ( ) ;
234- }
235- } , cancellationToken ) ;
236-
237- tasks . Add ( task ) ;
238- chunkIndex ++ ;
239- }
240-
241- var tasksResult = await Task . WhenAll ( tasks . ToArray ( ) ) ;
242- var blobNames = tasksResult
243- . Select ( async x =>
244- {
245- var content = await x . Content . ReadFromJsonAsync < Result < string > > ( cancellationToken : cancellationToken ) ;
246- return content . Value ;
247- } ) ;
248-
249- var mergeResult = await _httpClient . PostAsync ( mergeApiUrl , JsonContent . Create (
250- new { fileCrc = fileCRC , blobNames = blobNames } ) , cancellationToken ) ;
177+ fileName ) , cancellationToken ) ;
251178
252- return await mergeResult . Content . ReadFromJsonAsync < Result > ( cancellationToken : cancellationToken ) ;
179+ return await mergeResult . Content . ReadFromJsonAsync < Result < uint > > ( cancellationToken : cancellationToken ) ;
253180 }
254181}
0 commit comments