Skip to content

Commit

Permalink
nbd netlink refactoring part 2 (#1971)
Browse files Browse the repository at this point in the history
* nbd netlink refactoring part 2

- rename Timeout -> RequestTimeout
- rename DeadConnectionTimeout -> ConnectionTimeout
- move Send method from TNetlinkMessage to TNetlinkSocket
- clarify comments regarding ioctl device timeout
- get rid of ShouldStop atomic
- close client socket regardless of deleteDevice flag
- replace hard-coded 1 day ioctl device timeout with connection timeout
  which serves the same purpose
- make Start idempotent
  • Loading branch information
tpashkin authored Sep 9, 2024
1 parent 7822a12 commit d591ed9
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 89 deletions.
2 changes: 1 addition & 1 deletion cloud/blockstore/config/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ message TServerConfig
// NBD request timeout in seconds, only makes sense if using netlink
optional uint32 NbdRequestTimeout = 112;

// NBD connection timeout in seconds, only makes sense if using netlink
// NBD connection timeout in seconds
optional uint32 NbdConnectionTimeout = 113;

// Endpoint Proxy unix socket path. Triggers proxy device factory usage
Expand Down
5 changes: 4 additions & 1 deletion cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,13 @@ void TBootstrapBase::Init()
EndpointProxyClient);
}

// The only case we want kernel to retry requests is when the socket is dead
// due to nbd server restart. And since we can't configure ioctl device to
// use a new socket, request timeout effectively becomes connection timeout
if (!nbdDeviceFactory) {
nbdDeviceFactory = NBD::CreateDeviceFactory(
Logging,
TDuration::Days(1)); // timeout
Configs->ServerConfig->GetNbdConnectionTimeout()); // timeout
}

EndpointManager = CreateEndpointManager(
Expand Down
10 changes: 5 additions & 5 deletions cloud/blockstore/libs/endpoint_proxy/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,15 @@ struct TServer: IEndpointProxyServer
TDuration::Days(1), // connection timeout
true); // reconfigure device if exists
} else {
// For netlink devices we have to balance request timeout between
// time it takes to fail request for good and resend it if socket
// is dead due to proxy restart. We can't configure ioctl device
// to use a fresh socket, so there is no point configuring it
// The only case we want kernel to retry requests is when the socket
// is dead due to nbd server restart. And since we can't configure
// ioctl device to use a new socket, request timeout effectively
// becomes connection timeout
ep.NbdDevice = NBD::CreateDevice(
Logging,
*ep.ListenAddress,
request.GetNbdDevice(),
TDuration::Days(1)); // request timeout
TDuration::Days(1)); // timeout
}

auto status = ep.NbdDevice->Start().ExtractValue();
Expand Down
136 changes: 70 additions & 66 deletions cloud/blockstore/libs/nbd/netlink_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ class TNetlinkSocket
nl_socket_free(Socket);
}

operator nl_sock*() const
{
return Socket;
}

int GetFamily() const
{
return Family;
Expand All @@ -78,6 +73,7 @@ class TNetlinkSocket
void SetCallback(nl_cb_type type, F func)
{
auto arg = std::make_unique<TResponseHandler>(std::move(func));

if (int err = nl_socket_modify_cb(
Socket,
type,
Expand All @@ -98,6 +94,20 @@ class TNetlinkSocket

return (*func)(static_cast<genlmsghdr*>(nlmsg_data(nlmsg_hdr(msg))));
}

void Send(nl_msg* message)
{
if (int err = nl_send_auto(Socket, message); err < 0) {
throw TServiceError(E_FAIL)
<< "send error: " << nl_geterror(err);
}
if (int err = nl_wait_for_ack(Socket)) {
// this is either recv error, or an actual error message received
// from the kernel
throw TServiceError(E_FAIL)
<< "recv error: " << nl_geterror(err);
}
}
};

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -120,9 +130,7 @@ class TNestedAttribute

~TNestedAttribute()
{
if (Attribute) {
nla_nest_end(Message, Attribute);
}
nla_nest_end(Message, Attribute);
}
};

Expand Down Expand Up @@ -156,6 +164,11 @@ class TNetlinkMessage
nlmsg_free(Message);
}

operator nl_msg*() const
{
return Message;
}

template <typename T>
void Put(int attribute, T data)
{
Expand All @@ -169,20 +182,6 @@ class TNetlinkMessage
{
return TNestedAttribute(Message, attribute);
}

void Send(nl_sock* socket)
{
if (int err = nl_send_auto(socket, Message); err < 0) {
throw TServiceError(E_FAIL)
<< "send error: " << nl_geterror(err);
}
if (int err = nl_wait_for_ack(socket)) {
// this is either recv error, or an actual error message received
// from the kernel
throw TServiceError(E_FAIL)
<< "recv error: " << nl_geterror(err);
}
}
};

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -195,26 +194,25 @@ class TNetlinkDevice final
const ILoggingServicePtr Logging;
const TNetworkAddress ConnectAddress;
const TString DeviceName;
const TDuration Timeout;
const TDuration DeadConnectionTimeout;
const TDuration RequestTimeout;
const TDuration ConnectionTimeout;
const bool Reconfigure;

TLog Log;
IClientHandlerPtr Handler;
TSocket Socket;
ui32 DeviceIndex;
TAtomic ShouldStop = 0;

TPromise<NProto::TError> StartResult = NewPromise<NProto::TError>();
TPromise<NProto::TError> StopResult = NewPromise<NProto::TError>();
TPromise<NProto::TError> StartResult;
TPromise<NProto::TError> StopResult;

public:
TNetlinkDevice(
ILoggingServicePtr logging,
TNetworkAddress connectAddress,
TString deviceName,
TDuration timeout,
TDuration deadConnectionTimeout,
TDuration requestTimeout,
TDuration connectionTimeout,
bool reconfigure);

~TNetlinkDevice();
Expand Down Expand Up @@ -242,14 +240,14 @@ TNetlinkDevice::TNetlinkDevice(
ILoggingServicePtr logging,
TNetworkAddress connectAddress,
TString deviceName,
TDuration timeout,
TDuration deadConnectionTimeout,
TDuration requestTimeout,
TDuration connectionTimeout,
bool reconfigure)
: Logging(std::move(logging))
, ConnectAddress(std::move(connectAddress))
, DeviceName(std::move(deviceName))
, Timeout(timeout)
, DeadConnectionTimeout(deadConnectionTimeout)
, RequestTimeout(requestTimeout)
, ConnectionTimeout(connectionTimeout)
, Reconfigure(reconfigure)
{
Log = Logging->CreateLog("BLOCKSTORE_NBD");
Expand All @@ -262,6 +260,11 @@ TNetlinkDevice::~TNetlinkDevice()

TFuture<NProto::TError> TNetlinkDevice::Start()
{
if (StartResult.Initialized()) {
return StartResult.GetFuture();
}
StartResult = NewPromise<NProto::TError>();

try {
ParseIndex();
ConnectSocket();
Expand All @@ -274,25 +277,24 @@ TFuture<NProto::TError> TNetlinkDevice::Start()
<< "unable to configure " << DeviceName << ": " << e.what()));
}

// will be set asynchronously in Connect > HandleStatus > DoConnect
return StartResult.GetFuture();
}

TFuture<NProto::TError> TNetlinkDevice::Stop(bool deleteDevice)
{
if (AtomicSwap(&ShouldStop, 1) == 1) {
return StopResult.GetFuture();
}

if (!deleteDevice) {
StopResult.SetValue(MakeError(S_OK));
if (StopResult.Initialized()) {
return StopResult.GetFuture();
}
StopResult = NewPromise<NProto::TError>();

try {
Disconnect();
DisconnectSocket();
StopResult.SetValue(MakeError(S_OK));

if (deleteDevice) {
Disconnect();
} else {
StopResult.SetValue(MakeError(S_OK));
}

} catch (const TServiceError& e) {
StopResult.SetValue(MakeError(
Expand All @@ -319,7 +321,7 @@ TFuture<NProto::TError> TNetlinkDevice::Resize(ui64 deviceSizeInBytes)
message.Put(NBD_SOCK_FD, static_cast<ui32>(Socket));
}

message.Send(socket);
socket.Send(message);

} catch (const TServiceError& e) {
return MakeFuture(MakeError(
Expand All @@ -336,6 +338,7 @@ void TNetlinkDevice::ParseIndex()
// accept dev/nbd* devices with prefix other than /
TStringBuf l, r;
TStringBuf(DeviceName).RSplit(NBD_DEVICE_SUFFIX, l, r);

if (!TryFromString(r, DeviceIndex)) {
throw TServiceError(E_ARGUMENT) << "unable to parse device index";
}
Expand Down Expand Up @@ -379,7 +382,7 @@ void TNetlinkDevice::Connect()

TNetlinkMessage message(socket.GetFamily(), NBD_CMD_STATUS);
message.Put(NBD_ATTR_INDEX, DeviceIndex);
message.Send(socket);
socket.Send(message);
}

void TNetlinkDevice::Disconnect()
Expand All @@ -389,7 +392,8 @@ void TNetlinkDevice::Disconnect()
TNetlinkSocket socket;
TNetlinkMessage message(socket.GetFamily(), NBD_CMD_DISCONNECT);
message.Put(NBD_ATTR_INDEX, DeviceIndex);
message.Send(socket);
socket.Send(message);
StopResult.SetValue(MakeError(S_OK));
}

void TNetlinkDevice::DoConnect(bool connected)
Expand Down Expand Up @@ -417,14 +421,14 @@ void TNetlinkDevice::DoConnect(bool connected)
static_cast<ui64>(info.MinBlockSize));
message.Put(NBD_ATTR_SERVER_FLAGS, static_cast<ui64>(info.Flags));

if (Timeout) {
message.Put(NBD_ATTR_TIMEOUT, Timeout.Seconds());
if (RequestTimeout) {
message.Put(NBD_ATTR_TIMEOUT, RequestTimeout.Seconds());
}

if (DeadConnectionTimeout) {
if (ConnectionTimeout) {
message.Put(
NBD_ATTR_DEAD_CONN_TIMEOUT,
DeadConnectionTimeout.Seconds());
ConnectionTimeout.Seconds());
}

{
Expand All @@ -433,7 +437,7 @@ void TNetlinkDevice::DoConnect(bool connected)
message.Put(NBD_SOCK_FD, static_cast<ui32>(Socket));
}

message.Send(socket);
socket.Send(message);
StartResult.SetValue(MakeError(S_OK));

} catch (const TServiceError& e) {
Expand Down Expand Up @@ -529,19 +533,19 @@ class TNetlinkDeviceFactory final
{
private:
const ILoggingServicePtr Logging;
const TDuration Timeout;
const TDuration DeadConnectionTimeout;
const TDuration RequestTimeout;
const TDuration ConnectionTimeout;
const bool Reconfigure;

public:
TNetlinkDeviceFactory(
ILoggingServicePtr logging,
TDuration timeout,
TDuration deadConnectionTimeout,
TDuration requestTimeout,
TDuration connectionTimeout,
bool reconfigure)
: Logging(std::move(logging))
, Timeout(std::move(timeout))
, DeadConnectionTimeout(std::move(deadConnectionTimeout))
, RequestTimeout(requestTimeout)
, ConnectionTimeout(connectionTimeout)
, Reconfigure(reconfigure)
{}

Expand All @@ -558,8 +562,8 @@ class TNetlinkDeviceFactory final
Logging,
connectAddress,
std::move(deviceName),
Timeout,
DeadConnectionTimeout,
RequestTimeout,
ConnectionTimeout,
Reconfigure);
}
};
Expand All @@ -572,29 +576,29 @@ IDevicePtr CreateNetlinkDevice(
ILoggingServicePtr logging,
TNetworkAddress connectAddress,
TString deviceName,
TDuration timeout,
TDuration deadConnectionTimeout,
TDuration requestTimeout,
TDuration connectionTimeout,
bool reconfigure)
{
return std::make_shared<TNetlinkDevice>(
std::move(logging),
std::move(connectAddress),
std::move(deviceName),
timeout,
deadConnectionTimeout,
requestTimeout,
connectionTimeout,
reconfigure);
}

IDeviceFactoryPtr CreateNetlinkDeviceFactory(
ILoggingServicePtr logging,
TDuration timeout,
TDuration deadConnectionTimeout,
TDuration requestTimeout,
TDuration connectionTimeout,
bool reconfigure)
{
return std::make_shared<TNetlinkDeviceFactory>(
std::move(logging),
std::move(timeout),
std::move(deadConnectionTimeout),
requestTimeout,
connectionTimeout,
reconfigure);
}

Expand Down
8 changes: 4 additions & 4 deletions cloud/blockstore/libs/nbd/netlink_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ IDevicePtr CreateNetlinkDevice(
ILoggingServicePtr logging,
TNetworkAddress connectAddress,
TString deviceName,
TDuration timeout,
TDuration deadConnectionTimeout,
TDuration requestTimeout,
TDuration connectionTimeout,
bool reconfigure);

IDeviceFactoryPtr CreateNetlinkDeviceFactory(
ILoggingServicePtr logging,
TDuration timeout,
TDuration deadConnectionTimeout,
TDuration requestTimeout,
TDuration connectionTimeout,
bool reconfigure);

} // namespace NCloud::NBlockStore::NBD
Loading

0 comments on commit d591ed9

Please sign in to comment.