Skip to content

Commit

Permalink
Implement DI
Browse files Browse the repository at this point in the history
  • Loading branch information
vieirandre committed Jul 20, 2020
1 parent ace6f96 commit b630c54
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 45 deletions.
2 changes: 2 additions & 0 deletions Mycenae.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="3.1.6" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.6" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.6" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.6" />
<PackageReference Include="NLog" Version="4.7.2" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.4" />
</ItemGroup>

<ItemGroup>
Expand Down
37 changes: 29 additions & 8 deletions Program.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,61 @@
using Mycenae.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Mycenae.Models;
using Mycenae.Tasks;
using NLog;
using NLog.Extensions.Logging;
using System;
using Logger = NLog.Logger;

namespace Mycenae
{
public class Program
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
private static IServiceProvider _serviceProvider;

public static void Main()
{
try
{
Setup();

var migrationTask = GetMigrationTaskInstance();
migrationTask.Execute();
}
catch (Exception ex)
{
Logger.Error(ex);
_logger.Error(ex);
}
finally
{
Logger.Info("Ending application...");
_logger.Info("Ending application...");
}
}

private static void Setup()
{
_serviceProvider = new ServiceCollection()
.AddTransient<Extraction>()
.AddTransient<Insertion>()
.AddTransient<EndToEnd>()
.AddLogging(builder =>
{
builder.ClearProviders();
builder.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Debug);
builder.AddNLog("NLog.config");
})
.BuildServiceProvider();
}

private static MigrationTask GetMigrationTaskInstance()
{
return Settings.Values.TaskToExecute switch
{
TaskToExecute.Extraction => new Extraction(),
TaskToExecute.Insertion => new Insertion(),
TaskToExecute.EndToEnd => new EndToEnd(),
_ => throw new ArgumentException($"Config TaskToPerform is not properly specified.")
TaskToExecute.Extraction => _serviceProvider.GetRequiredService<Extraction>(),
TaskToExecute.Insertion => _serviceProvider.GetRequiredService<Insertion>(),
TaskToExecute.EndToEnd => _serviceProvider.GetRequiredService<EndToEnd>(),
_ => throw new ArgumentException($"Config {nameof(Settings.Values.TaskToExecute)} is not properly specified.")
};
}
}
Expand Down
21 changes: 13 additions & 8 deletions Tasks/EndToEnd.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
using Cassandra;
using Microsoft.Extensions.Logging;
using Mycenae.Aspects;
using Mycenae.Models;
using NLog;
using System;
using System.Collections.Generic;
using Logger = NLog.Logger;

namespace Mycenae.Tasks
{
internal class EndToEnd : MigrationTask
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static ILogger<EndToEnd> _logger;

public EndToEnd(ILogger<EndToEnd> logger) : base(logger)
{
_logger = logger;
}

[ExecutionTimeMeasured]
internal override void Execute()
{
Logger.Info("Starting end-to-end migration...");
_logger.LogInformation("Starting end-to-end migration...");

try
{
Expand All @@ -31,11 +36,11 @@ internal override void Execute()
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.Flatten().InnerExceptions)
Logger.Error(ex);
_logger.LogError(ex.ToString());
}
catch (Exception ex)
{
Logger.Error(ex);
_logger.LogError(ex.ToString());
}
finally
{
Expand All @@ -51,7 +56,7 @@ private static bool IsThereCompliance()

if (sourceColumns.Count != targetColumns.Count)
{
Logger.Error("Tables from source and target have divergent number of columns.");
_logger.LogError("Tables from source and target have divergent number of columns.");
return false;
}

Expand All @@ -71,11 +76,11 @@ private static bool IsThereCompliance()

if (matches.Count == sourceColumns.Count)
{
Logger.Info("Tables are compliant with each other.");
_logger.LogInformation("Tables are compliant with each other.");
return true;
}

Logger.Error($"Tables are not compliant with each other: {sourceColumns.Count - matches.Count} mismatch(es) among {sourceColumns.Count} columns.");
_logger.LogError($"Tables are not compliant with each other: {sourceColumns.Count - matches.Count} mismatch(es) among {sourceColumns.Count} columns.");

return false;
}
Expand Down
18 changes: 11 additions & 7 deletions Tasks/Extraction.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
using Cassandra;
using Microsoft.Extensions.Logging;
using Mycenae.Aspects;
using Mycenae.Models;
using NLog;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Logger = NLog.Logger;

namespace Mycenae.Tasks
{
internal class Extraction : MigrationTask
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static ILogger<Extraction> _logger;

public Extraction(ILogger<Extraction> logger) : base(logger)
{
_logger = logger;
}

[ExecutionTimeMeasured]
internal override void Execute()
{
Logger.Info("Starting extraction phase...");
_logger.LogInformation("Starting extraction phase...");

try
{
Expand All @@ -29,11 +33,11 @@ internal override void Execute()
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.Flatten().InnerExceptions)
Logger.Error(ex);
_logger.LogError(ex.ToString());
}
catch (Exception ex)
{
Logger.Info(ex);
_logger.LogError(ex.ToString());
}
finally
{
Expand All @@ -43,7 +47,7 @@ internal override void Execute()

private static void ProcessRows(RowSet rows)
{
Logger.Info("Processing rows...");
_logger.LogInformation("Processing rows...");

_ = Directory.CreateDirectory(Path.GetDirectoryName(Settings.Values.Files.Extraction.Path));
using var fileWriter = new StreamWriter(Settings.Values.Files.Extraction.Path);
Expand Down
20 changes: 12 additions & 8 deletions Tasks/Insertion.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
using Cassandra;
using CsvHelper;
using Microsoft.Extensions.Logging;
using Mycenae.Aspects;
using Mycenae.Converters;
using Mycenae.Models;
using NLog;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using Logger = NLog.Logger;

namespace Mycenae.Tasks
{
internal class Insertion : MigrationTask
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static ILogger<Insertion> _logger;

public Insertion(ILogger<Insertion> logger) : base(logger)
{
_logger = logger;
}

[ExecutionTimeMeasured]
internal override void Execute()
{
Logger.Info("Starting insertion phase...");
_logger.LogInformation("Starting insertion phase...");

try
{
Expand All @@ -35,11 +39,11 @@ internal override void Execute()
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.Flatten().InnerExceptions)
Logger.Error(ex);
_logger.LogError(ex.ToString());
}
catch (Exception ex)
{
Logger.Error(ex);
_logger.LogError(ex.ToString());
}
finally
{
Expand All @@ -49,7 +53,7 @@ internal override void Execute()

private static IEnumerable<dynamic> ReadRecordsFromFile()
{
Logger.Info("Reading data from file...");
_logger.LogInformation("Reading data from file...");

var reader = new StreamReader(Settings.Values.Files.Insertion.Path);
var csvReader = new CsvReader(reader, CultureInfo.InvariantCulture);
Expand All @@ -61,7 +65,7 @@ private static IEnumerable<dynamic> ReadRecordsFromFile()

private static void ProcessRecords(IEnumerable<dynamic> records)
{
Logger.Info("Processing records...");
_logger.LogInformation("Processing records...");

IList<CColumn> columns = GetColumnsInfo(Settings.Values.Connections.Target.Keyspace, Settings.Values.Connections.Target.Table);
PreparedStatement pStatement = PrepareStatementForInsertion(columns);
Expand Down
30 changes: 16 additions & 14 deletions Tasks/MigrationTask.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
using Cassandra;
using Microsoft.Extensions.Logging;
using Mycenae.Aspects;
using Mycenae.Models;
using Mycenae.Policies;
using NLog;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Logger = NLog.Logger;

namespace Mycenae.Tasks
{
internal abstract class MigrationTask
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private static ILogger<MigrationTask> _logger;

#region Clusters & sessions
private static ICluster _sourceCluster, _targetCluster;
private static ISession _sourceSession, _targetSession;
#endregion

public MigrationTask(ILogger<MigrationTask> logger)
{
_logger = logger;
}

internal abstract void Execute();

protected static void BuildSourceClusterAndSession()
{
if (_sourceSession != null)
return;

Logger.Info("Building source cluster and connecting session...");
_logger.LogInformation("Building source cluster and connecting session...");

_sourceCluster =
Cluster.Builder()
_sourceCluster = Cluster.Builder()
.WithPort(Settings.Values.Connections.Source.Port)
.AddContactPoints(Settings.Values.Connections.Source.Endpoints)
.Build();
Expand All @@ -43,10 +46,9 @@ protected static void BuildTargetClusterAndSession()
if (_targetSession != null)
return;

Logger.Info("Building target cluster and connecting session...");
_logger.LogInformation("Building target cluster and connecting session...");

_targetCluster =
Cluster.Builder()
_targetCluster = Cluster.Builder()
.WithPort(Settings.Values.Connections.Target.Port)
.WithRetryPolicy(new RetryPolicy())
.WithPoolingOptions(PoolingOptions.Create())
Expand All @@ -62,7 +64,7 @@ protected static void DisposeSourceSessionAndCluster()
if (_sourceSession == null || _sourceSession.IsDisposed)
return;

Logger.Info("Disposing source's cluster and session...");
_logger.LogInformation("Disposing source's cluster and session...");

_sourceSession.Dispose();
_sourceCluster.Dispose();
Expand All @@ -73,15 +75,15 @@ protected static void DisposeTargetSessionAndCluster()
if (_targetSession == null || _targetSession.IsDisposed)
return;

Logger.Info("Disposing target's cluster and session...");
_logger.LogInformation("Disposing target's cluster and session...");

_targetSession.Dispose();
_targetCluster.Dispose();
}

protected static RowSet RetrieveRowsFromTable()
{
Logger.Info("Retrieving rows from table...");
_logger.LogInformation("Retrieving rows from table...");

string cql = $"SELECT * FROM {Settings.Values.Connections.Source.Keyspace}.{Settings.Values.Connections.Source.Table}";
var statement = new SimpleStatement(cql);
Expand All @@ -104,7 +106,7 @@ protected static PreparedStatement PrepareStatementForInsertion(IList<CColumn> c

protected static IList<CColumn> GetColumnsInfo(string keyspace, string table)
{
Logger.Info($"Getting columns info: [table] {table} [keyspace] {keyspace}");
_logger.LogInformation($"Getting columns info: [table] {table} [keyspace] {keyspace}");

string cql = $"SELECT * FROM {keyspace}.{table} LIMIT 1";
var statement = new SimpleStatement(cql);
Expand All @@ -116,7 +118,7 @@ protected static IList<CColumn> GetColumnsInfo(string keyspace, string table)
[ExecutionTimeMeasured]
protected static async Task ExecuteInsertAsync(IList<BoundStatement> insertStatements)
{
Logger.Info($"Inserting {insertStatements.Count} records into table...");
_logger.LogInformation($"Inserting {insertStatements.Count} records into table...");

var tasks = new ConcurrentQueue<Task>();

Expand Down

0 comments on commit b630c54

Please sign in to comment.