Skip to content

Commit

Permalink
Allow configuring PostgreSQL using an Npgsql data source (#1496)
Browse files Browse the repository at this point in the history
* Allow configuring the Npgsql data source directly

Add an optional DataSource property in the PostgreSqlOptions.

Keep ConnectionString for backwards compatibility, removing it is
a breaking change that would require a major version bump.

* Add DataSource property in PostgreSQL documentation

* Make CreateConnection helper internal

* Mark PostgreSqlOptions.ConnectionString as obsolete

---------

Co-authored-by: Jon Ekdahl <[email protected]>
  • Loading branch information
jonekdahl and Jon Ekdahl authored Mar 7, 2024
1 parent dac69cf commit f0646a3
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 20 deletions.
9 changes: 5 additions & 4 deletions docs/content/user-guide/en/storage/postgresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public void ConfigureServices(IServiceCollection services)

#### PostgreSqlOptions

NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---|---|:---
Schema | Database schema | string | cap
ConnectionString | Database connection string | string |
NAME | DESCRIPTION | TYPE | DEFAULT
:---|:---------------------------|----------------------|:---
Schema | Database schema | string | cap
ConnectionString | Database connection string | string |
DataSource | [Data source](https://www.npgsql.org/doc/basic-usage.html#data-source) | [NpgsqlDataSource](https://www.npgsql.org/doc/api/Npgsql.NpgsqlDataSource.html) |

## Publish with transaction

Expand Down
15 changes: 15 additions & 0 deletions src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Npgsql;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP;
Expand All @@ -15,7 +16,21 @@ public class PostgreSqlOptions : EFOptions
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
[Obsolete("Use .DataSource = NpgsqlDataSource.Create(<connectionString>) for same behavior.")]
public string ConnectionString { get; set; } = default!;

/// <summary>
/// Gets or sets the Npgsql data source that will be used to store database entities.
/// </summary>
public NpgsqlDataSource? DataSource { get; set; }

/// <summary>
/// Creates an Npgsql connection from the configured data source.
/// </summary>
internal NpgsqlConnection CreateConnection()
{
return DataSource != null ? DataSource.CreateConnection() : new NpgsqlConnection(ConnectionString);
}
}

internal class ConfigurePostgreSqlOptions : IConfigureOptions<PostgreSqlOptions>
Expand Down
20 changes: 10 additions & 10 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public async Task<bool> AcquireLockAsync(string key, TimeSpan ttl, string instan
{
var sql =
$"UPDATE {_lockName} SET \"Instance\"=@Instance,\"LastLockTime\"=@LastLockTime WHERE \"Key\"=@Key AND \"LastLockTime\" < @TTL;";
var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
object[] sqlParams =
{
Expand All @@ -67,7 +67,7 @@ public async Task ReleaseLockAsync(string key, string instance, CancellationToke
{
var sql =
$"UPDATE {_lockName} SET \"Instance\"='',\"LastLockTime\"=@LastLockTime WHERE \"Key\"=@Key AND \"Instance\"=@Instance;";
var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
object[] sqlParams =
{
Expand All @@ -82,7 +82,7 @@ public async Task RenewLockAsync(string key, TimeSpan ttl, string instance, Canc
{
var sql =
$"UPDATE {_lockName} SET \"LastLockTime\"=\"LastLockTime\"+interval '{ttl.TotalSeconds}' second WHERE \"Key\"=@Key AND \"Instance\"=@Instance;";
var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
object[] sqlParams =
{
Expand All @@ -96,7 +96,7 @@ public async Task ChangePublishStateToDelayedAsync(string[] ids)
{
var sql =
$"UPDATE {_pubName} SET \"StatusName\"='{StatusName.Delayed}' WHERE \"Id\" IN ({string.Join(',', ids)});";
var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false);
}
Expand Down Expand Up @@ -140,7 +140,7 @@ public async Task<MediumMessage> StoreMessageAsync(string name, Message content,

if (transaction == null)
{
var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}
Expand Down Expand Up @@ -205,7 +205,7 @@ public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string g
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default)
{
var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteNonQueryAsync(
$"DELETE FROM {table} WHERE \"Id\" IN (SELECT \"Id\" FROM {table} WHERE \"ExpiresAt\" < @timeout AND (\"StatusName\"='{StatusName.Succeeded}' OR \"StatusName\"='{StatusName.Failed}') LIMIT @batchCount);",
Expand Down Expand Up @@ -238,7 +238,7 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
new NpgsqlParameter("@OneMinutesAgo", QueuedMessageFetchTime())
};

await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await using var connection = _options.Value.CreateConnection();
await connection.OpenAsync(token);
await using var transaction = await connection.BeginTransactionAsync(token);
var messageList = await connection.ExecuteReaderAsync(sql, async reader =>
Expand Down Expand Up @@ -296,7 +296,7 @@ private async Task ChangeMessageStateAsync(string tableName, MediumMessage messa
}
else
{
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString);
await using var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}
Expand All @@ -308,7 +308,7 @@ private async Task StoreReceivedMessage(object[] sqlParams)
$"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";";

var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}
Expand All @@ -327,7 +327,7 @@ private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(strin
new NpgsqlParameter("@Added", fourMinAgo)
};

var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
var result = await connection.ExecuteReaderAsync(sql, async reader =>
{
Expand Down
10 changes: 5 additions & 5 deletions src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ SELECT COUNT(""Id"") FROM {_recName} WHERE ""StatusName"" = N'Failed'
SELECT COUNT(""Id"") FROM {_pubName} WHERE ""StatusName"" = N'Delayed'
) AS ""PublishedDelayed"";";

var connection = new NpgsqlConnection(_options.ConnectionString);
var connection = _options.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
var statistics = await connection.ExecuteReaderAsync(sql, async reader =>
{
Expand Down Expand Up @@ -98,7 +98,7 @@ public async Task<PagedQueryResult<MessageDto>> GetMessagesAsync(MessageQueryDto
var sqlQuery =
$"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";

var connection = new NpgsqlConnection(_options.ConnectionString);
var connection = _options.CreateConnection();
await using var _ = connection.ConfigureAwait(false);

var count = await connection.ExecuteScalarAsync<int>($"select count(1) from {tableName} where 1=1 {where}",
Expand Down Expand Up @@ -182,7 +182,7 @@ private async ValueTask<int> GetNumberOfMessage(string tableName, string statusN
var sqlQuery =
$"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)";

var connection = new NpgsqlConnection(_options.ConnectionString);
var connection = _options.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
return await connection.ExecuteScalarAsync<int>(sqlQuery, new NpgsqlParameter("@state", statusName))
.ConfigureAwait(false);
Expand Down Expand Up @@ -227,7 +227,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH')
};

Dictionary<string, int> valuesMap;
var connection = new NpgsqlConnection(_options.ConnectionString);
var connection = _options.CreateConnection();
await using (connection.ConfigureAwait(false))
{
valuesMap = await connection.ExecuteReaderAsync(sqlQuery, async reader =>
Expand Down Expand Up @@ -264,7 +264,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH')
var sql =
$@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED";

var connection = new NpgsqlConnection(_options.ConnectionString);
var connection = _options.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
var mediumMessage = await connection.ExecuteReaderAsync(sql, async reader =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task InitializeAsync(CancellationToken cancellationToken)
if (cancellationToken.IsCancellationRequested) return;

var sql = CreateDbTablesScript(_options.Value.Schema);
var connection = new NpgsqlConnection(_options.Value.ConnectionString);
var connection = _options.Value.CreateConnection();
await using var _ = connection.ConfigureAwait(false);
object[] sqlParams =
{
Expand Down

0 comments on commit f0646a3

Please sign in to comment.