Skip to content

Commit

Permalink
Merge pull request #21993 from abpframework/auto-merge/rel-9-1/3426
Browse files Browse the repository at this point in the history
Merge branch dev with rel-9.1
  • Loading branch information
maliming authored Jan 24, 2025
2 parents 8de1476 + dafcfb3 commit 6591713
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ private async Task ProcessEventAsync(ServiceBusReceivedMessage message)

public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -111,8 +113,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, outgoingEvent.GetCorrelationId(), outgoingEvent.Id);
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand Down Expand Up @@ -141,7 +141,12 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
throw new AbpException(
"The message is too large to fit in the batch. Set AbpEventBusBoxesOptions.OutboxWaitingEventMaxCount to reduce the number");
}
}

await publisher.SendMessagesAsync(messageBatch);

foreach (var outgoingEvent in outgoingEventArray)
{
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -152,8 +157,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
});
}
}

await publisher.SendMessagesAsync(messageBatch);
}

public async override Task ProcessFromInboxAsync(IncomingEventInfo incomingEvent, InboxConfig inboxConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFac

public async override Task PublishFromOutboxAsync(OutgoingEventInfo outgoingEvent, OutboxConfig outboxConfig)
{
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -162,8 +164,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand All @@ -172,6 +172,8 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI

foreach (var outgoingEvent in outgoingEventArray)
{
await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -181,8 +183,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishToDaprAsync(outgoingEvent.EventName, Serializer.Deserialize(outgoingEvent.EventData, GetEventType(outgoingEvent.EventName)), outgoingEvent.Id, outgoingEvent.GetCorrelationId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,6 @@ public async override Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}

var headers = new Headers
{
{ "messageId", System.Text.Encoding.UTF8.GetBytes(outgoingEvent.Id.ToString("N")) }
Expand All @@ -222,6 +212,16 @@ await PublishAsync(
outgoingEvent.EventData,
headers
);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Outbox,
EventName = outgoingEvent.EventName,
EventData = outgoingEvent.EventData
});
}
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand All @@ -242,6 +242,15 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
headers.Add(EventBusConsts.CorrelationIdHeaderName, System.Text.Encoding.UTF8.GetBytes(outgoingEvent.GetCorrelationId()!));
}

producer.Produce(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = outgoingEvent.EventName,
Value = outgoingEvent.EventData,
Headers = headers
});

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -251,15 +260,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

producer.Produce(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = outgoingEvent.EventName,
Value = outgoingEvent.EventData,
Headers = headers
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public async override Task PublishFromOutboxAsync(
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -216,8 +218,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishAsync(outgoingEvent.EventName, outgoingEvent.EventData, eventId: outgoingEvent.Id, correlationId: outgoingEvent.GetCorrelationId());
}

public async override Task PublishManyFromOutboxAsync(
Expand All @@ -231,6 +231,13 @@ public async override Task PublishManyFromOutboxAsync(

foreach (var outgoingEvent in outgoingEventArray)
{
await PublishAsync(
channel,
outgoingEvent.EventName,
outgoingEvent.EventData,
eventId: outgoingEvent.Id,
correlationId: outgoingEvent.GetCorrelationId());

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -240,13 +247,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishAsync(
channel,
outgoingEvent.EventName,
outgoingEvent.EventData,
eventId: outgoingEvent.Id,
correlationId: outgoingEvent.GetCorrelationId());
}

channel.WaitForConfirmsOrDie();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ public async override Task PublishFromOutboxAsync(
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName)!;
var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);

var headers = new Dictionary<string, string>();
if (outgoingEvent.GetCorrelationId() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()!);
}

await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent() {
Expand All @@ -261,14 +269,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent() {
EventData = outgoingEvent.EventData
});
}

var headers = new Dictionary<string, string>();
if (outgoingEvent.GetCorrelationId() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()!);
}

await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers);
}

public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
Expand All @@ -279,6 +279,8 @@ public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventI
{
foreach (var outgoingEvent in outgoingEventArray)
{
await PublishFromOutboxAsync(outgoingEvent, outboxConfig);

using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
{
await TriggerDistributedEventSentAsync(new DistributedEventSent()
Expand All @@ -288,8 +290,6 @@ await TriggerDistributedEventSentAsync(new DistributedEventSent()
EventData = outgoingEvent.EventData
});
}

await PublishFromOutboxAsync(outgoingEvent, outboxConfig);
}

await scope.CompleteAsync();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -85,14 +85,14 @@ public async Task PublishAsync(
}
}

await PublishToEventBusAsync(eventType, eventData);

await TriggerDistributedEventSentAsync(new DistributedEventSent()
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
});

await PublishToEventBusAsync(eventType, eventData);
}

public abstract Task PublishFromOutboxAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,89 +57,116 @@ public virtual IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> ha
return Subscribe(typeof(TEvent), handler);
}

public virtual IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
public IDisposable Subscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
{
return _localEventBus.Subscribe(action);
}

public virtual IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
public IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
{
return _localEventBus.Subscribe(handler);
}

public virtual IDisposable Subscribe<TEvent, THandler>() where TEvent : class where THandler : IEventHandler, new()
public IDisposable Subscribe<TEvent, THandler>() where TEvent : class where THandler : IEventHandler, new()
{
return _localEventBus.Subscribe<TEvent, THandler>();
}

public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
public IDisposable Subscribe(Type eventType, IEventHandler handler)
{
return _localEventBus.Subscribe(eventType, handler);
}

public virtual IDisposable Subscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
public IDisposable Subscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
return _localEventBus.Subscribe<TEvent>(factory);
}

public virtual IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
public IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
return _localEventBus.Subscribe(eventType, factory);
}

public virtual void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
public void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class
{
_localEventBus.Unsubscribe(action);
}

public virtual void Unsubscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
public void Unsubscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
{
_localEventBus.Unsubscribe(handler);
}

public virtual void Unsubscribe(Type eventType, IEventHandler handler)
public void Unsubscribe(Type eventType, IEventHandler handler)
{
_localEventBus.Unsubscribe(eventType, handler);
}

public virtual void Unsubscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
public void Unsubscribe<TEvent>(IEventHandlerFactory factory) where TEvent : class
{
_localEventBus.Unsubscribe<TEvent>(factory);
}

public virtual void Unsubscribe(Type eventType, IEventHandlerFactory factory)
public void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
_localEventBus.Unsubscribe(eventType, factory);
}

public virtual void UnsubscribeAll<TEvent>() where TEvent : class
public void UnsubscribeAll<TEvent>() where TEvent : class
{
_localEventBus.UnsubscribeAll<TEvent>();
}

public virtual void UnsubscribeAll(Type eventType)
public void UnsubscribeAll(Type eventType)
{
_localEventBus.UnsubscribeAll(eventType);
}

public virtual Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
}

public virtual Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
}

public virtual Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
public async Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true) where TEvent : class
{
return _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(typeof(TEvent), eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventData, onUnitOfWorkComplete);
}

public virtual Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
public async Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true, bool useOutbox = true)
{
return _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
await PublishDistributedEventSentReceivedAsync(eventType, eventData, onUnitOfWorkComplete);
await _localEventBus.PublishAsync(eventType, eventData, onUnitOfWorkComplete);
}
}

private async Task PublishDistributedEventSentReceivedAsync(Type eventType, object eventData, bool onUnitOfWorkComplete)
{
if (eventType != typeof(DistributedEventSent))
{
await _localEventBus.PublishAsync(new DistributedEventSent
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
}, onUnitOfWorkComplete);
}

if (eventType != typeof(DistributedEventReceived))
{
await _localEventBus.PublishAsync(new DistributedEventReceived
{
Source = DistributedEventSource.Direct,
EventName = EventNameAttribute.GetNameOrDefault(eventType),
EventData = eventData
}, onUnitOfWorkComplete);
}
}
}
Loading

0 comments on commit 6591713

Please sign in to comment.