Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prerelease 2024-01-10 #14

Merged
merged 4 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Core/SmartIOT.Connector.Core/Scheduler/ITagScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public interface ITagScheduler
public IDeviceDriver DeviceDriver { get; }
public Device Device { get; }

void Start();
Task StartAsync();

void Stop();
Task StopAsync();

/// <summary>
/// Questo metodo consente di eseguire una action di inizializzazione,
Expand Down
148 changes: 84 additions & 64 deletions Core/SmartIOT.Connector.Core/Scheduler/TagScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ public class TagScheduler : ITagScheduler
private readonly SchedulerConfiguration _configuration;
private readonly CancellationTokenSource _terminatingToken = new CancellationTokenSource();
private readonly Thread _schedulerThread;
private readonly SemaphoreSlim _schedulerTerminated = new SemaphoreSlim(0, 1);
private readonly Thread _monitorThread;
private readonly SemaphoreSlim _monitorThreadTerminated = new SemaphoreSlim(0, 1);
private DateTime _terminatingInstant;
private DateTime _lastWriteOnDevice;
private readonly ConcurrentDictionary<Device, DeviceStatusEvent> _lastDeviceStatusEvents = new ConcurrentDictionary<Device, DeviceStatusEvent>();
Expand Down Expand Up @@ -96,116 +98,134 @@ private void OnEngineExceptionHandler(object? sender, ExceptionEventArgs e)

private void SchedulerThreadRun()
{
while (true)
try
{
if (!_terminatingToken.IsCancellationRequested && IsPaused)
while (true)
{
try
{
_terminatingToken.Token.WaitHandle.WaitOne(1000);
}
catch (OperationCanceledException)
if (!_terminatingToken.IsCancellationRequested && IsPaused)
{
// we must verify if stopping application is possible or if there is something more to write to tags
}

continue;
}

if (_terminatingToken.IsCancellationRequested)
{
var lastWriteOrTerminateInstant = _terminatingInstant < _lastWriteOnDevice ? _lastWriteOnDevice : _terminatingInstant;
var now = _timeService.Now;

if (_timeService.IsTimeoutElapsed(lastWriteOrTerminateInstant, now, _configuration.TerminateAfterNoWriteRequestsDelay)
&& _timeService.IsTimeoutElapsed(_terminatingInstant, now, _configuration.TerminateMinimumDelay))
break;
}

try
{
try
{
var schedule = _tagSchedulerEngine.ScheduleNextTag(_terminatingToken.IsCancellationRequested);
if (schedule != null)
try
{
if (schedule.Type == TagScheduleType.WRITE)
_lastWriteOnDevice = _timeService.Now;
_terminatingToken.Token.WaitHandle.WaitOne(1000);
}
else
catch (OperationCanceledException)
{
Thread.Sleep(1000);
// we must verify if stopping application is possible or if there is something more to write to tags
}

continue;
}
catch (TagSchedulerWaitException ex)

if (_terminatingToken.IsCancellationRequested)
{
TagSchedulerWaitExceptionEvent?.Invoke(this, new TagSchedulerWaitExceptionEventArgs(ex));
var lastWriteOrTerminateInstant = _terminatingInstant < _lastWriteOnDevice ? _lastWriteOnDevice : _terminatingInstant;
var now = _timeService.Now;

Thread.Sleep(ex.WaitTime);
if (_timeService.IsTimeoutElapsed(lastWriteOrTerminateInstant, now, _configuration.TerminateAfterNoWriteRequestsDelay)
&& _timeService.IsTimeoutElapsed(_terminatingInstant, now, _configuration.TerminateMinimumDelay))
break;
}
}
catch (Exception ex)
{

try
{
ExceptionHandler?.Invoke(this, new ExceptionEventArgs(ex));
try
{
var schedule = _tagSchedulerEngine.ScheduleNextTag(_terminatingToken.IsCancellationRequested);
if (schedule != null)
{
if (schedule.Type == TagScheduleType.WRITE)
_lastWriteOnDevice = _timeService.Now;
}
else
{
Thread.Sleep(1000);
}
}
catch (TagSchedulerWaitException ex)
{
TagSchedulerWaitExceptionEvent?.Invoke(this, new TagSchedulerWaitExceptionEventArgs(ex));

Thread.Sleep(ex.WaitTime);
}
}
catch
catch (Exception ex)
{
// ignore this
try
{
ExceptionHandler?.Invoke(this, new ExceptionEventArgs(ex));
}
catch
{
// ignore this
}
}
}
}
finally
{
_schedulerTerminated.Release();
}
}

private void MonitorThreadRun()
{
while (true)
try
{
try
{
_tagSchedulerEngine.RestartDriver();

if (!_terminatingToken.IsCancellationRequested)
_terminatingToken.Token.WaitHandle.WaitOne(500);
else
break;
}
catch (OperationCanceledException)
{
// stop request: exit
}
catch (Exception ex)
while (true)
{
try
{
ExceptionHandler?.Invoke(this, new ExceptionEventArgs(ex));
_tagSchedulerEngine.RestartDriver();

if (!_terminatingToken.IsCancellationRequested)
_terminatingToken.Token.WaitHandle.WaitOne(500);
else
break;
}
catch (OperationCanceledException)
{
// stop request: exit
}
catch
catch (Exception ex)
{
// ignore this
try
{
ExceptionHandler?.Invoke(this, new ExceptionEventArgs(ex));
}
catch
{
// ignore this
}
}
}
}
finally
{
_monitorThreadTerminated.Release();
}
}

public void Start()
public Task StartAsync()
{
SchedulerStarting?.Invoke(this, new SchedulerStartingEventArgs(this));

_schedulerThread.Start();
_monitorThread.Start();

return Task.CompletedTask;
}

public void Stop()
public async Task StopAsync()
{
SchedulerStopping?.Invoke(this, new SchedulerStoppingEventArgs(this));

_terminatingToken.Cancel();
_terminatingInstant = _timeService.Now;

_schedulerThread.Join();
_monitorThread.Join();
await _schedulerTerminated.WaitAsync();
await _monitorThreadTerminated.WaitAsync();

_terminatingToken.Dispose();

_tagSchedulerEngine.ExceptionHandler -= OnEngineExceptionHandler;
_tagSchedulerEngine.DeviceStatusEvent -= OnEngineDeviceStatusEvent;
Expand Down
38 changes: 26 additions & 12 deletions Core/SmartIOT.Connector.Core/SmartIotConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public async Task StartAsync()

foreach (var scheduler in _schedulers)
{
scheduler.Start();
await scheduler.StartAsync();
}

IsStarted = true;
Expand All @@ -95,17 +95,14 @@ public async Task StopAsync()
{
Stopping?.Invoke(this, new EventArgs());

foreach (var scheduler in _schedulers)
await Task.WhenAll(_connectors.Select(async x =>
{
scheduler.Stop();
}
await x.StopAsync();

foreach (var connector in _connectors)
{
await connector.StopAsync();
RemoveConnectorEvents(x);
}));

RemoveConnectorEvents(connector);
}
await Task.WhenAll(_schedulers.Select(x => x.StopAsync()));

Stopped?.Invoke(this, new EventArgs());
}
Expand Down Expand Up @@ -199,17 +196,34 @@ public void AddScheduler(ITagScheduler scheduler)

scheduler.SchedulerStarting += OnSchedulerStarting;
scheduler.SchedulerStopping += OnSchedulerStopping;
}

public async Task AddSchedulerAsync(ITagScheduler scheduler)
{
_schedulers.Add(scheduler);

scheduler.TagReadEvent += OnSchedulerTagReadEvent;
scheduler.TagWriteEvent += OnSchedulerTagWriteEvent;
scheduler.DeviceStatusEvent += OnSchedulerDeviceStatusEvent;
scheduler.ExceptionHandler += OnSchedulerException;

scheduler.EngineRestartingEvent += OnSchedulerRestartingEvent;
scheduler.EngineRestartedEvent += OnSchedulerRestartedEvent;
scheduler.TagSchedulerWaitExceptionEvent += OnSchedulerWaitExceptionEvent;

scheduler.SchedulerStarting += OnSchedulerStarting;
scheduler.SchedulerStopping += OnSchedulerStopping;

if (IsStarted)
scheduler.Start();
await scheduler.StartAsync();
}

public void RemoveScheduler(ITagScheduler scheduler)
public async Task RemoveSchedulerAsync(ITagScheduler scheduler)
{
_schedulers.Remove(scheduler);

if (IsStarted)
scheduler.Stop();
await scheduler.StopAsync();

scheduler.TagReadEvent -= OnSchedulerTagReadEvent;
scheduler.TagWriteEvent -= OnSchedulerTagWriteEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ public static IServiceCollection AddSmartIOTConnector(this IServiceCollection se
var builder = new SmartIotConnectorBuilder();

configure?.Invoke(builder);
ArgumentNullException.ThrowIfNull(builder.Configuration);

// add main stuffs
services.AddSingleton<SmartIotConnectorBuilder>(builder);
services.AddSingleton<SmartIotConnectorConfiguration>(builder.Configuration);
services.AddSingleton<SmartIotConnector>(builder.Build);

// expose more things on DI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\SmartIOT.Connector.Core\SmartIOT.Connector.Core.csproj" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\SmartIOT.Connector.Core\SmartIOT.Connector.Core.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Properties\" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public IActionResult GetDeviceConfiguration(string deviceId)
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[Route("configuration")]
public IActionResult AddDevice([FromBody] DeviceConfiguration deviceConfiguration)
public async Task<IActionResult> AddDevice([FromBody] DeviceConfiguration deviceConfiguration)
{
try
{
_deviceService.AddDevice(deviceConfiguration);
await _deviceService.AddDeviceAsync(deviceConfiguration);
return Ok();
}
catch (DeviceException ex)
Expand All @@ -85,11 +85,11 @@ public IActionResult AddDevice([FromBody] DeviceConfiguration deviceConfiguratio
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[Route("configuration/{deviceId}")]
public IActionResult RemoveDevice(string deviceId)
public async Task<IActionResult> RemoveDevice(string deviceId)
{
try
{
_deviceService.RemoveDevice(deviceId);
await _deviceService.RemoveDeviceAsync(deviceId);
return Ok();
}
catch (DeviceException ex)
Expand Down
8 changes: 4 additions & 4 deletions Core/SmartIOT.Connector.RestApi/Services/DeviceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,24 @@ public IList<Device> GetDevices()
.FirstOrDefault();
}

public void AddDevice(DeviceConfiguration deviceConfiguration)
public async Task AddDeviceAsync(DeviceConfiguration deviceConfiguration)
{
var driver = _deviceDriverFactory.CreateDriver(deviceConfiguration);
if (driver == null)
throw new DeviceException($"DeviceConfiguration not valid {deviceConfiguration.ConnectionString}");

_smartIotConnector.AddScheduler(_schedulerFactory.CreateScheduler(driver.Name, driver, _timeService, _smartIotConnector.SchedulerConfiguration));
await _smartIotConnector.AddSchedulerAsync(_schedulerFactory.CreateScheduler(driver.Name, driver, _timeService, _smartIotConnector.SchedulerConfiguration));
}

public void RemoveDevice(string deviceId)
public async Task RemoveDeviceAsync(string deviceId)
{
var scheduler = _smartIotConnector.Schedulers
.FirstOrDefault(x => x.Device.DeviceId.Equals(deviceId, StringComparison.InvariantCultureIgnoreCase));

if (scheduler == null)
throw new DeviceException($"Device {deviceId} does not exists");

_smartIotConnector.RemoveScheduler(scheduler);
await _smartIotConnector.RemoveSchedulerAsync(scheduler);
}

public TagData? GetTagData(string deviceId, string tagId)
Expand Down
4 changes: 2 additions & 2 deletions Core/SmartIOT.Connector.RestApi/Services/IDeviceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public interface IDeviceService

public Device? GetDevice(string deviceId);

public void AddDevice(DeviceConfiguration deviceConfiguration);
public Task AddDeviceAsync(DeviceConfiguration deviceConfiguration);

public void RemoveDevice(string deviceId);
public Task RemoveDeviceAsync(string deviceId);

public TagData? GetTagData(string deviceId, string tagId);

Expand Down
Loading
Loading