Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent uploads #64

Open
wants to merge 6 commits into
base: v2.x-beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ConnectionKit/CKUploader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ typedef NSUInteger CKUploadingOptions;
@private
NSURLRequest *_request;
unsigned long _permissions;
NSUInteger _maxConcurrentOperationCount;
CKUploadingOptions _options;

CK2FileManager *_fileManager;
CK2FileOperation *_currentOperation;
NSMutableArray *_queue;

CKTransferRecord *_rootRecord;
Expand All @@ -47,6 +47,7 @@ typedef NSUInteger CKUploadingOptions;
filePosixPermissions:(NSNumber *)customPermissions
options:(CKUploadingOptions)options;

@property (nonatomic) NSUInteger maxConcurrentOperationCount; // defaults to 1
@property (nonatomic, assign, readonly) CKUploadingOptions options;
@property (nonatomic, assign) id <CKUploaderDelegate> delegate;

Expand Down
164 changes: 130 additions & 34 deletions ConnectionKit/CKUploader.m
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@
#import "CKUploader.h"


@interface CKUploaderOperation : NSOperation
{
CK2FileOperation* (^_block)(void);
CK2FileOperation *_fileOp;
}

- initWithBlock:(CK2FileOperation* (^)(void))block __attribute((nonnull));

@end


#pragma mark -


@implementation CKUploader

#pragma mark Lifecycle
Expand All @@ -19,6 +33,7 @@ - (id)initWithRequest:(NSURLRequest *)request filePosixPermissions:(unsigned lon
{
_request = [request copy];
_permissions = customPermissions;
_maxConcurrentOperationCount = 1;
_options = options;

if (!(_options & CKUploadingDryRun))
Expand Down Expand Up @@ -62,6 +77,34 @@ - (void)dealloc
@synthesize delegate = _delegate;

@synthesize options = _options;

@synthesize maxConcurrentOperationCount = _maxConcurrentOperationCount;
- (NSUInteger)effectiveConcurrentOperationCount;
{
NSUInteger result = self.maxConcurrentOperationCount;

// Reign in count when:
// * Using CURLHandle sync backend
// * Deleting files before uploading; have no dependency system to handle that
if (result > 1)
{
if (self.options & CKUploadingDeleteExistingFileFirst)
{
result = 1;
}
else
{
NSString *scheme = _request.URL.scheme;
if ([scheme caseInsensitiveCompare:@"ftp"] == NSOrderedSame || [scheme caseInsensitiveCompare:@"sftp"] == NSOrderedSame)
{
result = 1;
}
}
}

return result;
}

@synthesize rootTransferRecord = _rootRecord;
@synthesize baseTransferRecord = _baseRecord;

Expand All @@ -88,18 +131,21 @@ - (void)removeFileAtPath:(NSString *)path;

- (void)removeFileAtPath:(NSString *)path reportError:(BOOL)reportError;
{
[self addOperationWithBlock:^{
__block CKUploaderOperation *op = [[CKUploaderOperation alloc] initWithBlock:^{

return [_fileManager removeItemAtURL:[self URLForPath:path] completionHandler:^(NSError *error) {

[self operationDidFinish:(reportError ? error : nil)];
[self operation:op didFinish:(reportError ? error : nil)];
}];
}];

[self addOperation:op];
[op release];
}

- (CKTransferRecord *)uploadData:(NSData *)data toPath:(NSString *)path;
{
return [self uploadToPath:path size:data.length usingBlock:^CK2FileOperation* (CKTransferRecord *record) {
__block CKTransferRecord *record = [self transferRecordWithPath:path size:data.length usingBlock:^{

NSDictionary *attributes = @{ NSFilePosixPermissions : @([self posixPermissionsForPath:path isDirectory:NO]) };

Expand All @@ -115,7 +161,7 @@ - (CKTransferRecord *)uploadData:(NSData *)data toPath:(NSString *)path;
[record transferDidFinish:record error:error];
});

[self operationDidFinish:error];
[self operation:[record propertyForKey:@"uploadOperation"] didFinish:error];
}];

NSAssert(op, @"Failed to create upload operation");
Expand All @@ -128,14 +174,17 @@ - (CKTransferRecord *)uploadData:(NSData *)data toPath:(NSString *)path;

return op;
}];

[self addOperation:[record propertyForKey:@"uploadOperation"]];
return record;
}

- (CKTransferRecord *)uploadFileAtURL:(NSURL *)localURL toPath:(NSString *)path;
{
NSNumber *size;
if (![localURL getResourceValue:&size forKey:NSURLFileSizeKey error:NULL]) size = nil;

return [self uploadToPath:path size:size.unsignedLongLongValue usingBlock:^CK2FileOperation* (CKTransferRecord *record) {
__block CKTransferRecord *record = [self transferRecordWithPath:path size:size.unsignedLongLongValue usingBlock:^{

NSDictionary *attributes = @{ NSFilePosixPermissions : @([self posixPermissionsForPath:path isDirectory:NO]) };

Expand All @@ -151,7 +200,7 @@ - (CKTransferRecord *)uploadFileAtURL:(NSURL *)localURL toPath:(NSString *)path;
[record transferDidFinish:record error:error];
});

[self operationDidFinish:error];
[self operation:[record propertyForKey:@"uploadOperation"] didFinish:error];
}];

NSAssert(op, @"Failed to create upload operation");
Expand All @@ -164,9 +213,12 @@ - (CKTransferRecord *)uploadFileAtURL:(NSURL *)localURL toPath:(NSString *)path;

return op;
}];

[self addOperation:[record propertyForKey:@"uploadOperation"]];
return record;
}

- (CKTransferRecord *)uploadToPath:(NSString *)path size:(unsigned long long)size usingBlock:(CK2FileOperation* (^)(CKTransferRecord *record))block;
- (CKTransferRecord *)transferRecordWithPath:(NSString *)path size:(unsigned long long)size usingBlock:(CK2FileOperation* (^)(void))block;
{
// Create transfer record
if (_options & CKUploadingDeleteExistingFileFirst)
Expand All @@ -179,9 +231,9 @@ - (CKTransferRecord *)uploadToPath:(NSString *)path size:(unsigned long long)siz


// Enqueue upload
[self addOperationWithBlock:^CK2FileOperation* {
return block(result);
}];
CKUploaderOperation *op = [[CKUploaderOperation alloc] initWithBlock:block];
[result setProperty:op forKey:@"uploadOperation"];
[op release];


// Notify delegate
Expand All @@ -206,58 +258,54 @@ - (NSURL *)URLForPath:(NSString *)path;

- (void)finishUploading;
{
[self addOperationWithBlock:^CK2FileOperation* {

NSAssert(!self.isCancelled, @"Shouldn't be able to finish once cancelled!");
[[self delegate] uploaderDidFinishUploading:self];
[self operationDidFinish:nil];
return nil;
}];
if (_isFinishing || _isCancelled) return;

_isFinishing = YES;
if (_queue.count == 0)
{
NSAssert(!self.isCancelled, @"Shouldn't be able to finish once cancelled!");
[[self delegate] uploaderDidFinishUploading:self];
}
}

#pragma mark Queue

- (void)cancel;
{
_isCancelled = YES;
[self.currentOperation cancel];
[_queue makeObjectsPerformSelector:_cmd];
[_queue release]; _queue = nil;
}

- (BOOL)isCancelled; { return _isCancelled; }

- (CK2FileOperation *)currentOperation; { return _currentOperation; }

- (void)addOperationWithBlock:(CK2FileOperation* (^)(void))block;
{
CKUploaderOperation *operation = [[CKUploaderOperation alloc] initWithBlock:block];
[self addOperation:operation];
[operation release];
}

- (void)addOperation:(CKUploaderOperation *)operation;
{
NSAssert([NSThread isMainThread], @"-addOperation: is only safe to call on the main thread");

// No more operations can go on once finishing up
if (_isFinishing) return;

NSBlockOperation *operation = [NSBlockOperation blockOperationWithBlock:^{

NSAssert(_currentOperation == nil, @"Seems like an op is starting before another has finished");
_currentOperation = [block() retain];
}];

[_queue addObject:operation];
if ([_queue count] == 1)

if ([_queue count] <= self.effectiveConcurrentOperationCount)
{
[operation start];
}
}

- (void)operationDidFinish:(NSError *)error;
- (void)operation:(CKUploaderOperation *)operation didFinish:(NSError *)error;
{
// This method gets called on all sorts of threads, so marshall back to main queue
dispatch_async(dispatch_get_main_queue(), ^{

[_currentOperation release]; _currentOperation = nil;

if (error)
{
if ([self.delegate respondsToSelector:@selector(uploader:shouldProceedAfterError:)])
Expand All @@ -275,10 +323,21 @@ - (void)operationDidFinish:(NSError *)error;
}
}

[_queue removeObjectAtIndex:0];
if ([_queue count])
NSUInteger index = [_queue indexOfObject:operation];
NSAssert(index != NSNotFound, @"Finished operation should be in the queue");
[_queue removeObjectAtIndex:index];

// Dequeue next op
// TODO: Check for ops which should have been started but haven't?
NSUInteger maxOps = self.effectiveConcurrentOperationCount;
if (_queue.count >= maxOps)
{
[[_queue objectAtIndex:(maxOps - 1)] start];
}
else if (_isFinishing && _queue.count == 0)
{
[[_queue objectAtIndex:0] start];
NSAssert(!self.isCancelled, @"Shouldn't be able to finish once cancelled!");
[[self delegate] uploaderDidFinishUploading:self];
}
});
}
Expand Down Expand Up @@ -361,3 +420,40 @@ - (void)fileManager:(CK2FileManager *)manager appendString:(NSString *)info toTr

@end


#pragma mark -


@implementation CKUploaderOperation

- initWithBlock:(CK2FileOperation* (^)(void))block;
{
NSParameterAssert(block);
if (self = [self init])
{
_block = [block copy];
}
return self;
}

- (void)main;
{
if (self.isCancelled) return;
_fileOp = [_block() retain];
}

- (void)cancel;
{
[super cancel];
[_fileOp cancel];
}

- (void)dealloc;
{
[_block release];
[_fileOp release];

[super dealloc];
}

@end