Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support hostname #75

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ obj
*.testsettings
*.vsmdi
TestResult.xml
/.vs/ENode/v15/Server/sqlite3
/.vs/ENode/DesignTimeBuild
125 changes: 74 additions & 51 deletions src/ENode.EQueue/Command/CommandResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,64 @@
using ECommon.Scheduling;
using ECommon.Serializing;
using ENode.Commanding;
using ENode.EQueue.Utils;

namespace ENode.EQueue
{
public class CommandResultProcessor : IRequestHandler
{
private byte[] ByteArray = new byte[0];
private SocketRemotingServer _remotingServer;
private ConcurrentDictionary<string, CommandTaskCompletionSource> _commandTaskDict;
private BlockingCollection<CommandResult> _commandExecutedMessageLocalQueue;
private BlockingCollection<DomainEventHandledMessage> _domainEventHandledMessageLocalQueue;
private Worker _commandExecutedMessageWorker;
private ConcurrentDictionary<string, CommandTaskCompletionSource> _commandTaskDict;
private BlockingCollection<DomainEventHandledMessage> _domainEventHandledMessageLocalQueue;
private Worker _domainEventHandledMessageWorker;
private IJsonSerializer _jsonSerializer;
private ILogger _logger;
private SocketRemotingServer _remotingServer;
private bool _started;

private byte[] ByteArray = new byte[0];
public IPEndPoint BindingAddress { get; private set; }
public string BindingHostname { get; private set; }

public string BindingServerAddress
{
get
{
if (!string.IsNullOrEmpty(BindingHostname))
{
return BindingHostname;
}
return BindingAddress.ToString();
}
}

RemotingResponse IRequestHandler.HandleRequest(IRequestHandlerContext context, RemotingRequest remotingRequest)
{
if (remotingRequest.Code == (int)CommandReturnType.CommandExecuted)
{
var json = Encoding.UTF8.GetString(remotingRequest.Body);
var result = _jsonSerializer.Deserialize<CommandResult>(json);
_commandExecutedMessageLocalQueue.Add(result);
}
else if (remotingRequest.Code == (int)CommandReturnType.EventHandled)
{
var json = Encoding.UTF8.GetString(remotingRequest.Body);
var message = _jsonSerializer.Deserialize<DomainEventHandledMessage>(json);
_domainEventHandledMessageLocalQueue.Add(message);
}
else
{
_logger.ErrorFormat("Invalid remoting request code: {0}", remotingRequest.Code);
}
return null;
}

public CommandResultProcessor Initialize(string hostname, int port)
{
var bindingAddress = SocketUtils.GetIPEndPointFromHostName(hostname, port);
BindingHostname = $"{hostname}:{port}";
return Initialize(bindingAddress);
}

public CommandResultProcessor Initialize(IPEndPoint bindingAddress)
{
Expand All @@ -43,20 +84,29 @@ public CommandResultProcessor Initialize(IPEndPoint bindingAddress)
return this;
}

public void ProcessFailedSendingCommand(ICommand command)
{
if (_commandTaskDict.TryRemove(command.Id, out CommandTaskCompletionSource commandTaskCompletionSource))
{
var commandResult = new CommandResult(CommandStatus.Failed, command.Id, command.AggregateRootId, "Failed to send the command.", typeof(string).FullName);
commandTaskCompletionSource.TaskCompletionSource.TrySetResult(new AsyncTaskResult<CommandResult>(AsyncTaskStatus.Success, commandResult));
}
}

public void RegisterProcessingCommand(ICommand command, CommandReturnType commandReturnType, TaskCompletionSource<AsyncTaskResult<CommandResult>> taskCompletionSource)
{
if (!_commandTaskDict.TryAdd(command.Id, new CommandTaskCompletionSource { CommandReturnType = commandReturnType, TaskCompletionSource = taskCompletionSource }))
{
throw new Exception(string.Format("Duplicate processing command registration, type:{0}, id:{1}", command.GetType().Name, command.Id));
}
}
public void ProcessFailedSendingCommand(ICommand command)

public CommandResultProcessor Shutdown()
{
if (_commandTaskDict.TryRemove(command.Id, out CommandTaskCompletionSource commandTaskCompletionSource))
{
var commandResult = new CommandResult(CommandStatus.Failed, command.Id, command.AggregateRootId, "Failed to send the command.", typeof(string).FullName);
commandTaskCompletionSource.TaskCompletionSource.TrySetResult(new AsyncTaskResult<CommandResult>(AsyncTaskStatus.Success, commandResult));
}
_remotingServer.Shutdown();
_commandExecutedMessageWorker.Stop();
_domainEventHandledMessageWorker.Stop();
return this;
}

public CommandResultProcessor Start()
Expand All @@ -76,33 +126,20 @@ public CommandResultProcessor Start()

return this;
}
public CommandResultProcessor Shutdown()
{
_remotingServer.Shutdown();
_commandExecutedMessageWorker.Stop();
_domainEventHandledMessageWorker.Stop();
return this;
}

RemotingResponse IRequestHandler.HandleRequest(IRequestHandlerContext context, RemotingRequest remotingRequest)
private void ProcessDomainEventHandledMessage(DomainEventHandledMessage message)
{
if (remotingRequest.Code == (int)CommandReturnType.CommandExecuted)
{
var json = Encoding.UTF8.GetString(remotingRequest.Body);
var result = _jsonSerializer.Deserialize<CommandResult>(json);
_commandExecutedMessageLocalQueue.Add(result);
}
else if (remotingRequest.Code == (int)CommandReturnType.EventHandled)
{
var json = Encoding.UTF8.GetString(remotingRequest.Body);
var message = _jsonSerializer.Deserialize<DomainEventHandledMessage>(json);
_domainEventHandledMessageLocalQueue.Add(message);
}
else
if (_commandTaskDict.TryRemove(message.CommandId, out CommandTaskCompletionSource commandTaskCompletionSource))
{
_logger.ErrorFormat("Invalid remoting request code: {0}", remotingRequest.Code);
var commandResult = new CommandResult(CommandStatus.Success, message.CommandId, message.AggregateRootId, message.CommandResult, message.CommandResult != null ? typeof(string).FullName : null);
if (commandTaskCompletionSource.TaskCompletionSource.TrySetResult(new AsyncTaskResult<CommandResult>(AsyncTaskStatus.Success, commandResult)))
{
if (_logger.IsDebugEnabled)
{
_logger.DebugFormat("Command result return, {0}", commandResult);
}
}
}
return null;
}

private void ProcessExecutedCommandMessage(CommandResult commandResult)
Expand Down Expand Up @@ -136,25 +173,11 @@ private void ProcessExecutedCommandMessage(CommandResult commandResult)
}
}
}
private void ProcessDomainEventHandledMessage(DomainEventHandledMessage message)
{
if (_commandTaskDict.TryRemove(message.CommandId, out CommandTaskCompletionSource commandTaskCompletionSource))
{
var commandResult = new CommandResult(CommandStatus.Success, message.CommandId, message.AggregateRootId, message.CommandResult, message.CommandResult != null ? typeof(string).FullName : null);
if (commandTaskCompletionSource.TaskCompletionSource.TrySetResult(new AsyncTaskResult<CommandResult>(AsyncTaskStatus.Success, commandResult)))
{
if (_logger.IsDebugEnabled)
{
_logger.DebugFormat("Command result return, {0}", commandResult);
}
}
}
}

class CommandTaskCompletionSource
private class CommandTaskCompletionSource
{
public TaskCompletionSource<AsyncTaskResult<CommandResult>> TaskCompletionSource { get; set; }
public CommandReturnType CommandReturnType { get; set; }
public TaskCompletionSource<AsyncTaskResult<CommandResult>> TaskCompletionSource { get; set; }
}
}
}
}
133 changes: 70 additions & 63 deletions src/ENode.EQueue/Command/CommandService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,18 @@ namespace ENode.EQueue
{
public class CommandService : ICommandService
{
private ILogger _logger;
private IJsonSerializer _jsonSerializer;
private ITopicProvider<ICommand> _commandTopicProvider;
private ITypeNameProvider _typeNameProvider;
private ICommandRoutingKeyProvider _commandRouteKeyProvider;
private SendQueueMessageService _sendMessageService;
private CommandResultProcessor _commandResultProcessor;
private Producer _producer;
private ICommandRoutingKeyProvider _commandRouteKeyProvider;
private ITopicProvider<ICommand> _commandTopicProvider;
private IOHelper _ioHelper;

private IJsonSerializer _jsonSerializer;
private ILogger _logger;
private Producer _producer;
private SendQueueMessageService _sendMessageService;
private ITypeNameProvider _typeNameProvider;
public string CommandExecutedMessageTopic { get; private set; }
public string DomainEventHandledMessageTopic { get; private set; }

public CommandService InitializeENode()
{
_jsonSerializer = ObjectContainer.Resolve<IJsonSerializer>();
_commandTopicProvider = ObjectContainer.Resolve<ITopicProvider<ICommand>>();
_typeNameProvider = ObjectContainer.Resolve<ITypeNameProvider>();
_commandRouteKeyProvider = ObjectContainer.Resolve<ICommandRoutingKeyProvider>();
_sendMessageService = new SendQueueMessageService();
_logger = ObjectContainer.Resolve<ILoggerFactory>().Create(GetType().FullName);
_ioHelper = ObjectContainer.Resolve<IOHelper>();
return this;
}
public CommandService InitializeEQueue(CommandResultProcessor commandResultProcessor = null, ProducerSetting setting = null)
{
InitializeENode();
_commandResultProcessor = commandResultProcessor;
_producer = new Producer(setting);
return this;
}

public CommandService Start()
{
if (_commandResultProcessor != null)
{
_commandResultProcessor.Start();
}
_producer.Start();
return this;
}
public CommandService Shutdown()
{
_producer.Shutdown();
if (_commandResultProcessor != null)
{
_commandResultProcessor.Shutdown();
}
return this;
}
public void Send(ICommand command)
{
_sendMessageService.SendMessage(_producer, BuildCommandMessage(command, false), _commandRouteKeyProvider.GetRoutingKey(command), command.Id, null);
}
public Task<AsyncTaskResult> SendAsync(ICommand command)
{
try
{
return _sendMessageService.SendMessageAsync(_producer, BuildCommandMessage(command, false), _commandRouteKeyProvider.GetRoutingKey(command), command.Id, null);
}
catch (Exception ex)
{
return Task.FromResult(new AsyncTaskResult(AsyncTaskStatus.Failed, ex.Message));
}
}
public CommandResult Execute(ICommand command, int timeoutMillis)
{
var result = ExecuteAsync(command).WaitResult(timeoutMillis);
Expand All @@ -90,6 +37,7 @@ public CommandResult Execute(ICommand command, int timeoutMillis)
}
return result.Data;
}

public CommandResult Execute(ICommand command, CommandReturnType commandReturnType, int timeoutMillis)
{
var result = ExecuteAsync(command, commandReturnType).WaitResult(timeoutMillis);
Expand All @@ -99,10 +47,12 @@ public CommandResult Execute(ICommand command, CommandReturnType commandReturnTy
}
return result.Data;
}

public Task<AsyncTaskResult<CommandResult>> ExecuteAsync(ICommand command)
{
return ExecuteAsync(command, CommandReturnType.CommandExecuted);
}

public async Task<AsyncTaskResult<CommandResult>> ExecuteAsync(ICommand command, CommandReturnType commandReturnType)
{
try
Expand All @@ -125,22 +75,79 @@ public async Task<AsyncTaskResult<CommandResult>> ExecuteAsync(ICommand command,
}
}

public CommandService InitializeENode()
{
_jsonSerializer = ObjectContainer.Resolve<IJsonSerializer>();
_commandTopicProvider = ObjectContainer.Resolve<ITopicProvider<ICommand>>();
_typeNameProvider = ObjectContainer.Resolve<ITypeNameProvider>();
_commandRouteKeyProvider = ObjectContainer.Resolve<ICommandRoutingKeyProvider>();
_sendMessageService = new SendQueueMessageService();
_logger = ObjectContainer.Resolve<ILoggerFactory>().Create(GetType().FullName);
_ioHelper = ObjectContainer.Resolve<IOHelper>();
return this;
}

public CommandService InitializeEQueue(CommandResultProcessor commandResultProcessor = null, ProducerSetting setting = null)
{
InitializeENode();
_commandResultProcessor = commandResultProcessor;
_producer = new Producer(setting);
return this;
}

public void Send(ICommand command)
{
_sendMessageService.SendMessage(_producer, BuildCommandMessage(command, false), _commandRouteKeyProvider.GetRoutingKey(command), command.Id, null);
}

public Task<AsyncTaskResult> SendAsync(ICommand command)
{
try
{
return _sendMessageService.SendMessageAsync(_producer, BuildCommandMessage(command, false), _commandRouteKeyProvider.GetRoutingKey(command), command.Id, null);
}
catch (Exception ex)
{
return Task.FromResult(new AsyncTaskResult(AsyncTaskStatus.Failed, ex.Message));
}
}

public CommandService Shutdown()
{
_producer.Shutdown();
if (_commandResultProcessor != null)
{
_commandResultProcessor.Shutdown();
}
return this;
}

public CommandService Start()
{
if (_commandResultProcessor != null)
{
_commandResultProcessor.Start();
}
_producer.Start();
return this;
}

private EQueueMessage BuildCommandMessage(ICommand command, bool needReply = false)
{
Ensure.NotNull(command.AggregateRootId, "aggregateRootId");
var commandData = _jsonSerializer.Serialize(command);
var topic = _commandTopicProvider.GetTopic(command);
var replyAddress = needReply && _commandResultProcessor != null ? _commandResultProcessor.BindingAddress.ToString() : null;
var replyAddress = needReply && _commandResultProcessor != null ? _commandResultProcessor.BindingServerAddress : null;
var messageData = _jsonSerializer.Serialize(new CommandMessage
{
CommandData = commandData,
ReplyAddress = replyAddress
});
return new EQueueMessage(
topic,
topic,
(int)EQueueMessageTypeCode.CommandMessage,
Encoding.UTF8.GetBytes(messageData),
_typeNameProvider.GetTypeName(command.GetType()));
}
}
}
}
Loading