Skip to content

Commit

Permalink
Merge pull request #4 from emc-mongoose/concurrency-drop-close
Browse files Browse the repository at this point in the history
Concurrency drop close
  • Loading branch information
dlarge authored Oct 2, 2024
2 parents 60debd3 + 20d0d83 commit ca8b618
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ enum Transport {

AttributeKey<Operation> ATTR_KEY_OPERATION = AttributeKey.valueOf("op");

AttributeKey<Boolean> ATTR_KEY_RELEASED = AttributeKey.valueOf("released");

void complete(final Channel channel, final O op);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -317,6 +319,7 @@ protected boolean submit(final O op) throws IllegalStateException {
ThreadContext.put(KEY_STEP_ID, stepId);
ThreadContext.put(KEY_CLASS_NAME, CLS_NAME);

Channel conn = null;
if (!isStarted()) {
throw new IllegalStateException();
}
Expand All @@ -331,9 +334,10 @@ protected boolean submit(final O op) throws IllegalStateException {
op.startResponse();
complete(null, op);
} else {
final var conn = connPool.lease();
if (conn == null) {
return false;
conn = connPool.lease();
conn.attr(ATTR_KEY_RELEASED).set(Boolean.FALSE);
if (!conn.isActive()) {
throw new ConnectException("Connection is not active");
}
conn.attr(ATTR_KEY_OPERATION).set(op);
op.nodeAddr(conn.attr(ATTR_KEY_NODE).get());
Expand All @@ -343,12 +347,14 @@ protected boolean submit(final O op) throws IllegalStateException {
} catch (final ConnectException e) {
LogUtil.exception(Level.WARN, e, "Failed to lease the connection for the load operation");
op.status(Operation.Status.FAIL_IO);
complete(null, op);
complete(conn, op);
return false;
} catch (final Throwable thrown) {
throwUncheckedIfInterrupted(thrown);
LogUtil.exception(Level.WARN, thrown, "Failed to submit the load operation");
op.status(Operation.Status.FAIL_UNKNOWN);
complete(null, op);
complete(conn, op);
return false;
}
return true;
} else {
Expand Down Expand Up @@ -380,7 +386,7 @@ protected int submit(final List<O> ops, final int from, final int to)
ThreadContext.put(KEY_STEP_ID, stepId);
ThreadContext.put(KEY_CLASS_NAME, CLS_NAME);

Channel conn;
Channel conn = null;
O nextOp = null;
var n = 0;
try {
Expand All @@ -396,8 +402,9 @@ protected int submit(final List<O> ops, final int from, final int to)
complete(null, nextOp);
} else {
conn = connPool.lease();
if (conn == null) {
return n;
conn.attr(ATTR_KEY_RELEASED).set(Boolean.FALSE);
if (!conn.isActive()) {
throw new ConnectException("Connection is not active");
}
conn.attr(ATTR_KEY_OPERATION).set(nextOp);
nextOp.nodeAddr(conn.attr(ATTR_KEY_NODE).get());
Expand All @@ -409,15 +416,15 @@ protected int submit(final List<O> ops, final int from, final int to)
} catch (final ConnectException e) {
LogUtil.exception(Level.WARN, e, "Failed to lease the connection for the load operation");
nextOp.status(Operation.Status.FAIL_IO);
complete(null, nextOp);
complete(conn, nextOp);
if (permits - n > 1) {
concurrencyThrottle.release(permits - n - 1);
}
} catch (final Throwable thrown) {
throwUncheckedIfInterrupted(thrown);
LogUtil.exception(Level.WARN, thrown, "Failed to submit the load operations");
nextOp.status(Operation.Status.FAIL_UNKNOWN);
complete(null, nextOp);
complete(conn, nextOp);
if (permits - n > 1) {
concurrencyThrottle.release(permits - n - 1);
}
Expand Down Expand Up @@ -586,8 +593,11 @@ public void complete(final Channel channel, final O op) {
} catch (final IllegalStateException e) {
LogUtil.exception(Level.DEBUG, e, "{}: invalid load operation state", op.toString());
}
concurrencyThrottle.release();
if (channel != null) {
if (op.status() != Operation.Status.SUCC) {
channel.close();
}
if (!channel.attr(ATTR_KEY_RELEASED).getAndSet(Boolean.TRUE)) {
concurrencyThrottle.release();
connPool.release(channel);
}
handleCompleted(op);
Expand Down

0 comments on commit ca8b618

Please sign in to comment.