blob: 4c322a023428a913e90535bb7cba935acc7b3cdc [file] [log] [blame]
//
// Copyright 2018 Google LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#import "Service/Sources/EDOExecutor.h"
#import "Channel/Sources/EDOBlockingQueue.h"
#import "Service/Sources/EDOExecutorMessage.h"
#import "Service/Sources/EDOServiceError.h"
@interface EDOExecutor ()
// The message queues to process the requests that are attached to this executor.
@property(nonatomic, readonly)
NSMutableArray<EDOBlockingQueue<EDOExecutorMessage *> *> *messageQueueStack;
// The isolation queue for synchronization.
@property(nonatomic, readonly) dispatch_queue_t isolationQueue;
@end
@implementation EDOExecutor
+ (instancetype)executorWithQueue:(dispatch_queue_t)queue {
return [[self alloc] initWithQueue:queue];
}
- (instancetype)initWithQueue:(dispatch_queue_t)queue {
self = [super init];
if (self) {
NSString *queueName = [NSString stringWithFormat:@"com.google.edo.executor[%p]", self];
_isolationQueue = dispatch_queue_create(queueName.UTF8String, DISPATCH_QUEUE_SERIAL);
_executionQueue = queue;
_messageQueueStack = [[NSMutableArray alloc] init];
}
return self;
}
- (void)loopWithBlock:(void (^)(void))executeBlock {
// Create the waited queue so it can also process the requests while waiting for the response
// when the incoming request is dispatched to the same queue.
EDOBlockingQueue<EDOExecutorMessage *> *messageQueue = [[EDOBlockingQueue alloc] init];
// Set the message queue to process the request that will be received and dispatched to this
// queue while waiting for the response to come back.
dispatch_sync(self.isolationQueue, ^{
[self.messageQueueStack addObject:messageQueue];
});
// Schedule the handler in the background queue so it won't block the current thread. After the
// handler closes the messageQueue, before or after the while loop starts, it will trigger the
// while loop to exit.
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
executeBlock();
[messageQueue close];
});
while (true) {
// If the dispatch_async above is not invoked before this loop is hit, we block the current
// queue in this loop and wait for any incoming requests, say if another request was triggered
// by the execution of a message. The dispatch_async above will close (unset) the messageQueue.
// This prevents a race condition where any incoming requests (messages) might still be left to
// process after the queue is closed (unset) above.
EDOExecutorMessage *message = [messageQueue firstObjectWithTimeout:DISPATCH_TIME_FOREVER];
if (!message) {
break;
}
[message executeBlock];
}
dispatch_sync(self.isolationQueue, ^{
// If messageQueue has not been popped out from stack yet, pop it out here to avoid memleak.
NSMutableArray<EDOBlockingQueue<EDOExecutorMessage *> *> *stack = self.messageQueueStack;
if (stack.lastObject == messageQueue) {
[stack removeLastObject];
}
});
NSAssert(messageQueue.empty, @"The message queue contains stale requests.");
}
- (BOOL)handleBlock:(void (^)(void))executeBlock error:(NSError *_Nullable *_Nullable)errorOrNil {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
// TODO(haowoo): Replace with dispatch_assert_queue once the minimum support is iOS 10+.
NSAssert(dispatch_get_current_queue() != self.executionQueue,
@"Only enqueue a request from a non-tracked queue.");
#pragma clang diagnostic pop
EDOExecutorMessage *message = [[EDOExecutorMessage alloc] initWithBlock:executeBlock];
if (![self enqueueMessage:message]) {
dispatch_queue_t executionQueue = self.executionQueue;
if (executionQueue) {
dispatch_async(executionQueue, ^{
[message executeBlock];
});
} else {
if (errorOrNil) {
NSString *reason =
@"The message is not handled because the execution queue is already released.";
*errorOrNil = [NSError errorWithDomain:EDOServiceErrorDomain
code:EDOServiceErrorRequestNotHandled
userInfo:@{@"reason" : reason}];
}
return NO;
}
}
[message waitForCompletion];
return YES;
}
#pragma mark - Private
/**
* Check if a message can be enqueued to a message queue that will be executed on the
* @c executionQueue. The message is appended to the message queue if this passes.
*
* @param message The message to be enqueued.
*
* @return @c YES if message is enqueued to a message queue; @c NO if no message queue is
* available.
*/
- (BOOL)enqueueMessage:(EDOExecutorMessage *)message {
__block BOOL messageEnqueued = NO;
dispatch_sync(self.isolationQueue, ^{
NSMutableArray<EDOBlockingQueue<EDOExecutorMessage *> *> *stack = self.messageQueueStack;
while (stack.count > 0 && ![stack.lastObject appendObject:message]) {
[stack removeLastObject];
}
messageEnqueued = stack.count > 0;
});
return messageEnqueued;
}
@end