blob: 7537bb3b62461a15a54a2258953f32b4e130fed4 [file]
// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "platform/impl/socket_handle_waiter_posix.h"
#include <sys/socket.h>
#include <unistd.h> // For pipe() and close()
#include <array>
#include <cerrno> // For errno
#include <chrono>
#include <iostream>
#include <thread>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "platform/impl/socket_handle_posix.h"
#include "platform/impl/socket_handle_waiter.h"
#include "platform/impl/timeval_posix.h"
#include "platform/test/fake_clock.h"
using ::testing::_;
using ::testing::ByMove;
using ::testing::Gt;
using ::testing::IsEmpty;
using ::testing::Return;
namespace openscreen {
namespace {
class MockSubscriber : public SocketHandleWaiter::Subscriber {
public:
using SocketHandleRef = SocketHandleWaiter::SocketHandleRef;
MOCK_METHOD(void, ProcessReadyHandle, (SocketHandleRef, uint32_t));
MOCK_METHOD(bool, HasPendingWrite, (SocketHandleRef));
};
class TestingSocketHandleWaiter : public SocketHandleWaiter {
public:
// These protected fields need to be public for testing below.
using SocketHandleRef = SocketHandleWaiter::SocketHandleRef;
using HandleWithFlags = SocketHandleWaiter::HandleWithFlags;
TestingSocketHandleWaiter() : SocketHandleWaiter(&FakeClock::now) {}
MOCK_METHOD(ErrorOr<std::vector<HandleWithFlags>>,
AwaitSocketsReady,
(const std::vector<HandleWithFlags>&, const Clock::duration&),
(override));
FakeClock fake_clock{Clock::time_point{Clock::duration{1234567}}};
};
} // namespace
// Test fixture for tests that need an instance of SocketHandleWaiterPosix with
// an actual pipe.
class SocketHandleWaiterPosixInstanceTest : public ::testing::Test {
protected:
SocketHandleWaiterPosixInstanceTest()
: clock_(Clock::time_point{Clock::duration{1234567}}),
waiter_(&clock_.now) {}
void TearDown() override {
// Clean up any FDs created in tests if they weren't closed properly.
// This is more of a safeguard.
for (int fd : fds_to_close_) {
close(fd);
}
fds_to_close_.clear();
}
// Helper to create a pipe and register fds for cleanup.
void CreatePipe(int pipe_fds[2]) {
ASSERT_NE(-1, pipe(pipe_fds))
<< "Failed to create pipe: " << strerror(errno);
fds_to_close_.push_back(pipe_fds[0]);
fds_to_close_.push_back(pipe_fds[1]);
}
void ClosePipe(int pipe_fds[2]) {
auto remove_fd = [this](int fd_to_remove) {
fds_to_close_.erase(
std::remove(fds_to_close_.begin(), fds_to_close_.end(), fd_to_remove),
fds_to_close_.end());
};
close(pipe_fds[0]);
remove_fd(pipe_fds[0]);
close(pipe_fds[1]);
remove_fd(pipe_fds[1]);
}
FakeClock clock_;
SocketHandleWaiterPosix waiter_; // The actual class under test
MockSubscriber subscriber_;
std::vector<int> fds_to_close_;
};
TEST(SocketHandleWaiterBaseTest, BubblesUpAwaitSocketsReadyErrors) {
MockSubscriber subscriber;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0);
SocketHandle handle1(1);
SocketHandle handle2(2);
const SocketHandle& handle0_ref = handle0;
const SocketHandle& handle1_ref = handle1;
const SocketHandle& handle2_ref = handle2;
waiter.Subscribe(&subscriber, std::cref(handle0_ref),
SocketHandleWaiter::kReadWriteFlags);
waiter.Subscribe(&subscriber, std::cref(handle1_ref),
SocketHandleWaiter::kReadWriteFlags);
waiter.Subscribe(&subscriber, std::cref(handle2_ref),
SocketHandleWaiter::kReadWriteFlags);
Error::Code response = Error::Code::kAgain;
EXPECT_CALL(subscriber, ProcessReadyHandle(_, _)).Times(0);
EXPECT_CALL(waiter, AwaitSocketsReady(_, _))
.WillOnce(Return(ByMove(response)));
waiter.ProcessHandles(Clock::duration{0});
}
TEST(SocketHandleWaiterBaseTest, WatchedSocketsReturnedToCorrectSubscribers) {
MockSubscriber subscriber;
MockSubscriber subscriber2;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0);
SocketHandle handle1(1);
SocketHandle handle2(2);
SocketHandle handle3(3);
const SocketHandle& handle0_ref = handle0;
const SocketHandle& handle1_ref = handle1;
const SocketHandle& handle2_ref = handle2;
const SocketHandle& handle3_ref = handle3;
waiter.Subscribe(&subscriber, std::cref(handle0_ref),
SocketHandleWaiter::kReadWriteFlags);
waiter.Subscribe(&subscriber, std::cref(handle2_ref),
SocketHandleWaiter::kReadWriteFlags);
waiter.Subscribe(&subscriber2, std::cref(handle1_ref),
SocketHandleWaiter::kReadWriteFlags);
waiter.Subscribe(&subscriber2, std::cref(handle3_ref),
SocketHandleWaiter::kReadWriteFlags);
EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle0_ref),
SocketHandleWaiter::kReadable))
.Times(1);
EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle2_ref),
SocketHandleWaiter::kWritable))
.Times(1);
EXPECT_CALL(subscriber2, ProcessReadyHandle(std::cref(handle1_ref),
SocketHandleWaiter::kReadable))
.Times(1);
EXPECT_CALL(subscriber2,
ProcessReadyHandle(std::cref(handle3_ref),
SocketHandleWaiter::kReadWriteFlags))
.Times(1);
EXPECT_CALL(waiter, AwaitSocketsReady(_, _))
.WillOnce(
Return(ByMove(std::vector<TestingSocketHandleWaiter::HandleWithFlags>{
{std::cref(handle0_ref), SocketHandleWaiter::kReadable},
{std::cref(handle1_ref), SocketHandleWaiter::kReadable},
{std::cref(handle2_ref), SocketHandleWaiter::kWritable},
{std::cref(handle3_ref), SocketHandleWaiter::kReadWriteFlags}})));
waiter.ProcessHandles(Clock::duration{0});
}
TEST(SocketHandleWaiterBaseTest, HandlesNoSubscriptions) {
TestingSocketHandleWaiter waiter;
EXPECT_CALL(waiter, AwaitSocketsReady(_, _)).Times(0);
Error result = waiter.ProcessHandles(Clock::duration{0});
EXPECT_EQ(result.code(), Error::Code::kAgain);
}
TEST(SocketHandleWaiterBaseTest, UnsubscribeRemovesHandle) {
MockSubscriber subscriber;
TestingSocketHandleWaiter waiter;
SocketHandle handle(123);
waiter.Subscribe(&subscriber, handle, SocketHandleWaiter::Flags::kReadable);
waiter.Unsubscribe(&subscriber, handle);
EXPECT_CALL(waiter, AwaitSocketsReady(_, _)).Times(0);
Error result = waiter.ProcessHandles(Clock::duration{0});
EXPECT_EQ(result.code(), Error::Code::kAgain);
}
TEST(SocketHandleWaiterBaseTest,
UnsubscribeAllRemovesOnlySubscriptionsForProvidedSubscriber) {
MockSubscriber subscriber1;
MockSubscriber subscriber2;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0);
SocketHandle handle1(1);
SocketHandle handle2(2);
waiter.Subscribe(&subscriber1, handle0, SocketHandleWaiter::kReadable);
waiter.Subscribe(&subscriber1, handle1, SocketHandleWaiter::kReadable);
waiter.Subscribe(&subscriber2, handle2, SocketHandleWaiter::kReadable);
waiter.UnsubscribeAll(&subscriber1);
EXPECT_CALL(subscriber1, ProcessReadyHandle(_, _)).Times(0);
EXPECT_CALL(subscriber2, ProcessReadyHandle(std::cref(handle2),
SocketHandleWaiter::kReadable))
.Times(1);
EXPECT_CALL(
waiter,
AwaitSocketsReady(
testing::Truly(
[&](const std::vector<TestingSocketHandleWaiter::HandleWithFlags>&
handle_list) {
return handle_list.size() == 1 &&
handle_list[0].handle.get() == handle2 &&
handle_list[0].flags == SocketHandleWaiter::kReadable;
}),
_))
.WillOnce(
Return(ByMove(std::vector<TestingSocketHandleWaiter::HandleWithFlags>{
{handle2, SocketHandleWaiter::kReadable}})));
waiter.ProcessHandles(Clock::duration{0});
}
TEST(SocketHandleWaiterBaseTest,
AwaitSocketsReadyCalledWithCorrectSubscribedFlags) {
MockSubscriber subscriber;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0);
constexpr uint32_t subscribed_flags = SocketHandleWaiter::Flags::kReadable;
// AwaitSocketsReady might report more flags if the underlying mechanism
// does.
constexpr uint32_t reported_flags = SocketHandleWaiter::Flags::kReadable |
SocketHandleWaiter::Flags::kWritable;
waiter.Subscribe(&subscriber, handle0, subscribed_flags);
EXPECT_CALL(
waiter,
AwaitSocketsReady(
testing::Truly(
[&](const std::vector<TestingSocketHandleWaiter::HandleWithFlags>&
handle_list) {
return handle_list.size() == 1 &&
handle_list[0].handle.get() == handle0 &&
handle_list[0].flags == subscribed_flags;
}),
_))
.WillOnce(
Return(ByMove(std::vector<TestingSocketHandleWaiter::HandleWithFlags>{
{handle0, reported_flags}})));
// The subscriber receives the flags reported by AwaitSocketsReady.
EXPECT_CALL(subscriber,
ProcessReadyHandle(std::cref(handle0), reported_flags))
.Times(1);
waiter.ProcessHandles(Clock::duration{0});
}
TEST(SocketHandleWaiterBaseTest, SubsequentSubscribeForSameHandleIsIgnored) {
MockSubscriber subscriber1;
MockSubscriber subscriber2;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0);
waiter.Subscribe(&subscriber1, handle0, SocketHandleWaiter::Flags::kReadable);
// This second subscribe for the same handle should be ignored.
waiter.Subscribe(&subscriber2, handle0, SocketHandleWaiter::Flags::kWritable);
EXPECT_CALL(
waiter,
AwaitSocketsReady(
testing::Truly(
[&](const std::vector<TestingSocketHandleWaiter::HandleWithFlags>&
handle_list) {
return handle_list.size() == 1 &&
handle_list[0].handle.get() == handle0 &&
handle_list[0].flags ==
SocketHandleWaiter::Flags::kReadable;
}),
_))
.WillOnce(Return(Error::Code::kAgain));
EXPECT_CALL(subscriber1, ProcessReadyHandle(_, _)).Times(0);
EXPECT_CALL(subscriber2, ProcessReadyHandle(_, _)).Times(0);
waiter.ProcessHandles(Clock::duration{0});
}
TEST(SocketHandleWaiterBaseTest, OnHandleDeletionRemovesHandle) {
MockSubscriber subscriber;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0);
waiter.Subscribe(&subscriber, handle0, SocketHandleWaiter::Flags::kReadable);
waiter.OnHandleDeletion(&subscriber, handle0, true);
EXPECT_CALL(waiter, AwaitSocketsReady(IsEmpty(), _)).Times(0);
const Error result = waiter.ProcessHandles(Clock::duration{0});
EXPECT_EQ(result.code(), Error::Code::kAgain);
}
TEST(SocketHandleWaiterBaseTest,
ProcessHandlesIgnoresUnmappedHandlesFromAwaitSocketsReady) {
MockSubscriber subscriber;
TestingSocketHandleWaiter waiter;
SocketHandle handle0(0); // Subscribed
SocketHandle handle1(1); // Not subscribed, but returned by AwaitSocketsReady
waiter.Subscribe(&subscriber, handle0, SocketHandleWaiter::kReadable);
EXPECT_CALL(waiter, AwaitSocketsReady(_, _))
.WillOnce(
Return(ByMove(std::vector<TestingSocketHandleWaiter::HandleWithFlags>{
{handle0, SocketHandleWaiter::kReadable}, // Expected
{handle1,
SocketHandleWaiter::kReadable} // Unexpected by subscription
})));
EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle0),
SocketHandleWaiter::kReadable))
.Times(1);
EXPECT_CALL(subscriber, ProcessReadyHandle(std::cref(handle1), _))
.Times(0); // Should be ignored
waiter.ProcessHandles(Clock::duration{0});
}
TEST_F(SocketHandleWaiterPosixInstanceTest, WriteNotProcessedIfNoPendingWrite) {
int pipe_fds[2];
CreatePipe(pipe_fds);
SocketHandle write_end_handle(pipe_fds[1]);
// Subscribe for write events on the write-end of the pipe.
waiter_.Subscribe(&subscriber_, std::cref(write_end_handle),
SocketHandleWaiter::Flags::kWritable);
// Simulate that the subscriber has no data to write.
EXPECT_CALL(subscriber_, HasPendingWrite(std::cref(write_end_handle)))
.WillRepeatedly(Return(false));
// Expect ProcessReadyHandle NOT to be called with the kWritable flag.
// Since we only subscribed for kWritable, and HasPendingWrite is false,
// it should not be called at all for this handle's write event.
EXPECT_CALL(subscriber_, ProcessReadyHandle(std::cref(write_end_handle), _))
.Times(0);
const Error result = waiter_.ProcessHandles(std::chrono::milliseconds(10));
// If no FDs were ready (or ready but filtered out), ProcessHandles returns
// kAgain.
EXPECT_TRUE(result.ok() || result.code() == Error::Code::kAgain)
<< "ProcessHandles returned: " << result;
waiter_.Unsubscribe(&subscriber_, std::cref(write_end_handle));
ClosePipe(pipe_fds);
}
TEST_F(SocketHandleWaiterPosixInstanceTest,
ReadProcessedWriteIgnoredIfNoPendingWrite) {
int pipe_fds[2];
CreatePipe(pipe_fds);
SocketHandle read_end_handle(pipe_fds[0]);
// Subscribe for read and write events on the read-end of the pipe.
waiter_.Subscribe(&subscriber_, std::cref(read_end_handle),
SocketHandleWaiter::Flags::kReadable |
SocketHandleWaiter::Flags::kWritable);
// Simulate that the subscriber has no data to write.
EXPECT_CALL(subscriber_, HasPendingWrite(std::cref(read_end_handle)))
.WillRepeatedly(Return(false));
// Make the read-end readable.
constexpr const char kTestBuf[] = "test";
ASSERT_THAT(write(pipe_fds[1], kTestBuf, sizeof(kTestBuf) - 1),
Gt(ssize_t{0}));
// Expect ProcessReadyHandle to be called only with kReadable.
EXPECT_CALL(subscriber_,
ProcessReadyHandle(std::cref(read_end_handle),
SocketHandleWaiter::Flags::kReadable))
.Times(1);
const Error result = waiter_.ProcessHandles(std::chrono::milliseconds(10));
EXPECT_TRUE(result.ok()) << "ProcessHandles returned: " << result;
std::array<uint8_t, 16> drain;
read(pipe_fds[0], drain.data(), drain.size());
waiter_.Unsubscribe(&subscriber_, std::cref(read_end_handle));
ClosePipe(pipe_fds);
}
} // namespace openscreen