From 22385b107b61e818d69478d9f3de918c170afa06 Mon Sep 17 00:00:00 2001 From: Oleg Rakitskiy Date: Wed, 9 Dec 2015 19:57:52 +0200 Subject: [PATCH 1/3] Implement batch runner for postgresql. In general it's almost copy/paste of MySqlBatchRunner class. Slight modifications: - changed queries to postgresql syntax UPDATE SET FROM ( ) - slighly modified regular expression with self updates. ColumAlias is optional group now. Alias is not generated every time. - When in TransactionScope it means we have open transaction, really, it is. Postgre treats such transaction as distributed, that's why opening new transaction fails. --- .../Batch/PgSqlBatchRunner.cs | 452 ++++++++++++++++++ .../EntityFramework.Extended.net45.csproj | 2 + 2 files changed, 454 insertions(+) create mode 100644 Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs diff --git a/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs b/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs new file mode 100644 index 0000000..4f05e6e --- /dev/null +++ b/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs @@ -0,0 +1,452 @@ +using System; +using System.Data; +using System.Data.Common; +using System.Data.Entity.Core.EntityClient; +using System.Data.Entity.Core.Objects; +using System.Linq; +using System.Linq.Dynamic; +using System.Linq.Expressions; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using EntityFramework.Extensions; +using EntityFramework.Mapping; +using EntityFramework.Reflection; + +namespace EntityFramework.Batch +{ + /// + /// A batch execution runner for PostgreSQL Server. + /// + public class PgSqlBatchRunner : IBatchRunner + { + /// + /// Create and run a batch delete statement. + /// + /// The type of the entity. + /// The to get connection and metadata information from. + /// The for . + /// The query to create the where clause from. + /// + /// The number of rows deleted. + /// + public int Delete(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query) + where TEntity : class + { +#if NET45 + return InternalDelete(objectContext, entityMap, query, false).Result; +#else + return InternalDelete(objectContext, entityMap, query); +#endif + } + +#if NET45 + /// + /// Create and run a batch delete statement asynchronously. + /// + /// The type of the entity. + /// The to get connection and metadata information from. + /// The for . + /// The query to create the where clause from. + /// + /// The number of rows deleted. + /// + public Task DeleteAsync(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query) where TEntity : class + { + return InternalDelete(objectContext, entityMap, query, true); + } +#endif + +#if NET45 + private async Task InternalDelete(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query, bool async = false) + where TEntity : class +#else + private int InternalDelete(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query) + where TEntity : class +#endif + { + DbConnection deleteConnection = null; + DbTransaction deleteTransaction = null; + DbCommand deleteCommand = null; + bool ownConnection = false; + bool ownTransaction = false; + + try + { + // get store connection and transaction + var store = GetStore(objectContext); + deleteConnection = store.Item1; + deleteTransaction = store.Item2; + + if (deleteConnection.State != ConnectionState.Open) + { + deleteConnection.Open(); + ownConnection = true; + } + + // When in TransactionScope PostgreSQL does not allow to create nested transaction + if (deleteTransaction == null && System.Transactions.Transaction.Current == null) + { + deleteTransaction = deleteConnection.BeginTransaction(); + ownTransaction = true; + } + + + deleteCommand = deleteConnection.CreateCommand(); + deleteCommand.Transaction = deleteTransaction; + if (objectContext.CommandTimeout.HasValue) + deleteCommand.CommandTimeout = objectContext.CommandTimeout.Value; + + var innerSelect = GetSelectSql(query, entityMap, deleteCommand); + + var sqlBuilder = new StringBuilder(innerSelect.Length * 2); + + sqlBuilder.AppendFormat("DELETE FROM {0} AS j0", entityMap.TableName); + sqlBuilder.AppendLine(); + sqlBuilder.AppendLine("USING ("); + sqlBuilder.AppendLine(innerSelect); + sqlBuilder.AppendLine(") AS j1"); + sqlBuilder.Append("WHERE ("); + + bool wroteKey = false; + foreach (var keyMap in entityMap.KeyMaps) + { + if (wroteKey) + sqlBuilder.Append(" AND "); + + sqlBuilder.AppendFormat("j0.{0} = j1.{0}", keyMap.ColumnName); + wroteKey = true; + } + sqlBuilder.Append(")"); + + deleteCommand.CommandText = sqlBuilder.ToString().Replace("[", "").Replace("]", ""); + +#if NET45 + int result = async + ? await deleteCommand.ExecuteNonQueryAsync().ConfigureAwait(false) + : deleteCommand.ExecuteNonQuery(); +#else + int result = deleteCommand.ExecuteNonQuery(); +#endif + + // only commit if created transaction + if (ownTransaction) + deleteTransaction.Commit(); + + return result; + } + finally + { + if (deleteCommand != null) + deleteCommand.Dispose(); + + if (deleteTransaction != null && ownTransaction) + deleteTransaction.Dispose(); + + if (deleteConnection != null && ownConnection) + deleteConnection.Close(); + } + } + + /// + /// Create and run a batch update statement. + /// + /// The type of the entity. + /// The to get connection and metadata information from. + /// The for . + /// The query to create the where clause from. + /// The update expression. + /// + /// The number of rows updated. + /// + public int Update(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query, Expression> updateExpression) where TEntity : class + { +#if NET45 + return InternalUpdate(objectContext, entityMap, query, updateExpression, false).Result; +#else + return InternalUpdate(objectContext, entityMap, query, updateExpression); +#endif + } + +#if NET45 + /// + /// Create and run a batch update statement asynchronously. + /// + /// The type of the entity. + /// The to get connection and metadata information from. + /// The for . + /// The query to create the where clause from. + /// The update expression. + /// + /// The number of rows updated. + /// + public Task UpdateAsync(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query, Expression> updateExpression) where TEntity : class + { + return InternalUpdate(objectContext, entityMap, query, updateExpression, true); + } +#endif + +#if NET45 + private async Task InternalUpdate(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query, Expression> updateExpression, bool async = false) + where TEntity : class +#else + private int InternalUpdate(ObjectContext objectContext, EntityMap entityMap, ObjectQuery query, Expression> updateExpression, bool async = false) + where TEntity : class +#endif + { + DbConnection updateConnection = null; + DbTransaction updateTransaction = null; + DbCommand updateCommand = null; + bool ownConnection = false; + bool ownTransaction = false; + + try + { + // get store connection and transaction + var store = GetStore(objectContext); + updateConnection = store.Item1; + updateTransaction = store.Item2; + + if (updateConnection.State != ConnectionState.Open) + { + updateConnection.Open(); + ownConnection = true; + } + + // use existing transaction or create new + // When in TransactionScope PostgreSQL does not allow to create nested transaction + if (updateTransaction == null && System.Transactions.Transaction.Current != null) + { + updateTransaction = updateConnection.BeginTransaction(); + ownTransaction = true; + } + + updateCommand = updateConnection.CreateCommand(); + updateCommand.Transaction = updateTransaction; + if (objectContext.CommandTimeout.HasValue) + updateCommand.CommandTimeout = objectContext.CommandTimeout.Value; + + var innerSelect = GetSelectSql(query, entityMap, updateCommand); + var sqlBuilder = new StringBuilder(innerSelect.Length * 2); + + sqlBuilder.AppendFormat("UPDATE {0} AS j0", entityMap.TableName); + sqlBuilder.AppendLine(); + sqlBuilder.AppendLine("SET "); + + var memberInitExpression = updateExpression.Body as MemberInitExpression; + if (memberInitExpression == null) + throw new ArgumentException("The update expression must be of type MemberInitExpression.", "updateExpression"); + + int nameCount = 0; + bool wroteSet = false; + foreach (MemberBinding binding in memberInitExpression.Bindings) + { + if (wroteSet) + sqlBuilder.AppendLine(", "); + + string propertyName = binding.Member.Name; + string columnName = entityMap.PropertyMaps + .Where(p => p.PropertyName == propertyName) + .Select(p => p.ColumnName) + .FirstOrDefault(); + + + var memberAssignment = binding as MemberAssignment; + if (memberAssignment == null) + throw new ArgumentException("The update expression MemberBinding must only by type MemberAssignment.", "updateExpression"); + + Expression memberExpression = memberAssignment.Expression; + + ParameterExpression parameterExpression = null; + memberExpression.Visit((ParameterExpression p) => + { + if (p.Type == entityMap.EntityType) + parameterExpression = p; + + return p; + }); + + + if (parameterExpression == null) + { + object value; + + if (memberExpression.NodeType == ExpressionType.Constant) + { + var constantExpression = memberExpression as ConstantExpression; + if (constantExpression == null) + throw new ArgumentException( + "The MemberAssignment expression is not a ConstantExpression.", "updateExpression"); + + value = constantExpression.Value; + } + else + { + LambdaExpression lambda = Expression.Lambda(memberExpression, null); + value = lambda.Compile().DynamicInvoke(); + } + + if (value != null) + { + string parameterName = "p__update__" + nameCount++; + var parameter = updateCommand.CreateParameter(); + parameter.ParameterName = parameterName; + parameter.Value = value; + updateCommand.Parameters.Add(parameter); + + sqlBuilder.AppendFormat("{0} = :{1}", columnName, parameterName); + } + else + { + sqlBuilder.AppendFormat("{0} = NULL", columnName); + } + } + else + { + // create clean objectset to build query from + var objectSet = objectContext.CreateObjectSet(); + + Type[] typeArguments = new[] { entityMap.EntityType, memberExpression.Type }; + + ConstantExpression constantExpression = Expression.Constant(objectSet); + LambdaExpression lambdaExpression = Expression.Lambda(memberExpression, parameterExpression); + + MethodCallExpression selectExpression = Expression.Call( + typeof(Queryable), + "Select", + typeArguments, + constantExpression, + lambdaExpression); + + // create query from expression + var selectQuery = objectSet.CreateQuery(selectExpression, entityMap.EntityType); + string sql = selectQuery.ToTraceString(); + + // parse select part of sql to use as update + string regex = @"SELECT\s*\r\n(?.+)(\s+AS\s+(?\w+))?\r\nFROM\s+(?\w+\.\w+|\w+)(\s*AS\s*(?("")?\w+("")?))?"; + Match match = Regex.Match(sql, regex); + if (!match.Success) + throw new ArgumentException("The MemberAssignment expression could not be processed.", "updateExpression"); + + string value = match.Groups["ColumnValue"].Value; + string alias = match.Groups["TableAlias"].Value; + + value = value.Replace(alias + ".", ""); + + foreach (ObjectParameter objectParameter in selectQuery.Parameters) + { + string parameterName = "p__update__" + nameCount++; + + var parameter = updateCommand.CreateParameter(); + parameter.ParameterName = parameterName; + parameter.Value = objectParameter.Value; + updateCommand.Parameters.Add(parameter); + + value = value.Replace(objectParameter.Name, parameterName); + } + sqlBuilder.AppendFormat("{0} = {1}", columnName, value); + } + wroteSet = true; + } + + sqlBuilder.AppendLine(); + sqlBuilder.AppendLine("FROM ("); + sqlBuilder.AppendLine(innerSelect); + sqlBuilder.AppendLine(") as j1"); + sqlBuilder.Append("WHERE ("); + bool wroteKey = false; + foreach (var keyMap in entityMap.KeyMaps) + { + if (wroteKey) + sqlBuilder.Append(" AND "); + + sqlBuilder.AppendFormat("j0.{0} = j1.{0}", keyMap.ColumnName); + wroteKey = true; + } + sqlBuilder.AppendLine(")"); + + updateCommand.CommandText = sqlBuilder.ToString().Replace("[", "").Replace("]", ""); + +#if NET45 + int result = async + ? await updateCommand.ExecuteNonQueryAsync().ConfigureAwait(false) + : updateCommand.ExecuteNonQuery(); +#else + int result = updateCommand.ExecuteNonQuery(); +#endif + + // only commit if created transaction + if (ownTransaction) + updateTransaction.Commit(); + + return result; + } + finally + { + if (updateCommand != null) + updateCommand.Dispose(); + if (updateTransaction != null && ownTransaction) + updateTransaction.Dispose(); + if (updateConnection != null && ownConnection) + updateConnection.Close(); + } + } + + private static Tuple GetStore(ObjectContext objectContext) + { + DbConnection dbConnection = objectContext.Connection; + var entityConnection = dbConnection as EntityConnection; + + // by-pass entity connection + if (entityConnection == null) + return new Tuple(dbConnection, null); + + DbConnection connection = entityConnection.StoreConnection; + + // get internal transaction + dynamic connectionProxy = new DynamicProxy(entityConnection); + dynamic entityTransaction = connectionProxy.CurrentTransaction; + if (entityTransaction == null) + return new Tuple(connection, null); + + DbTransaction transaction = entityTransaction.StoreTransaction; + return new Tuple(connection, transaction); + } + + private static string GetSelectSql(ObjectQuery query, EntityMap entityMap, DbCommand command) + where TEntity : class + { + // changing query to only select keys + var selector = new StringBuilder(50); + selector.Append("new("); + foreach (var propertyMap in entityMap.KeyMaps) + { + if (selector.Length > 4) + selector.Append((", ")); + + selector.Append(propertyMap.PropertyName); + } + selector.Append(")"); + + var selectQuery = DynamicQueryable.Select(query, selector.ToString()); + var objectQuery = selectQuery as ObjectQuery; + + if (objectQuery == null) + throw new ArgumentException("The query must be of type ObjectQuery.", "query"); + + string innerJoinSql = objectQuery.ToTraceString(); + + // create parameters + foreach (var objectParameter in objectQuery.Parameters) + { + var parameter = command.CreateParameter(); + parameter.ParameterName = objectParameter.Name; + parameter.Value = objectParameter.Value; + + command.Parameters.Add(parameter); + } + + return innerJoinSql; + } + } +} \ No newline at end of file diff --git a/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj b/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj index ddf8af6..b2e0032 100644 --- a/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj +++ b/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj @@ -57,6 +57,7 @@ + @@ -79,6 +80,7 @@ + From 8601958c39e2d79133c9f7befed0ed4c9f81f6f0 Mon Sep 17 00:00:00 2001 From: Oleg Rakitskiy Date: Fri, 4 Mar 2016 13:34:31 +0200 Subject: [PATCH 2/3] Remove detecting outside transaction. - Previously we detected outside transactions using System.Transaction, but recently we overcame the issue using specific configuration of the PgSql devart provider. It's redundant code now and should be removed to make everything consistent with the rest of the code. --- Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs | 8 ++------ .../EntityFramework.Extended.net45.csproj | 1 - 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs b/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs index 4f05e6e..9d2e744 100644 --- a/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs +++ b/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs @@ -84,14 +84,12 @@ private int InternalDelete(ObjectContext objectContext, EntityMap entit ownConnection = true; } - // When in TransactionScope PostgreSQL does not allow to create nested transaction - if (deleteTransaction == null && System.Transactions.Transaction.Current == null) + if (deleteTransaction == null) { deleteTransaction = deleteConnection.BeginTransaction(); ownTransaction = true; } - deleteCommand = deleteConnection.CreateCommand(); deleteCommand.Transaction = deleteTransaction; if (objectContext.CommandTimeout.HasValue) @@ -213,9 +211,7 @@ private int InternalUpdate(ObjectContext objectContext, EntityMap entit ownConnection = true; } - // use existing transaction or create new - // When in TransactionScope PostgreSQL does not allow to create nested transaction - if (updateTransaction == null && System.Transactions.Transaction.Current != null) + if (updateTransaction == null) { updateTransaction = updateConnection.BeginTransaction(); ownTransaction = true; diff --git a/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj b/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj index b2e0032..1d9ad15 100644 --- a/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj +++ b/Source/EntityFramework.Extended/EntityFramework.Extended.net45.csproj @@ -57,7 +57,6 @@ - From f43974ee86d3344e20a35274d94963e10a323499 Mon Sep 17 00:00:00 2001 From: Oleg Rakitskiy Date: Fri, 4 Mar 2016 15:09:10 +0200 Subject: [PATCH 3/3] Put schema and table name in quotes - Table and column names that are not lowercase are caused the error. So it's necessary to put them in quotes. More info here http://www.postgresql.org/docs/current/interactive/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS --- Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs b/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs index 9d2e744..b44ffbf 100644 --- a/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs +++ b/Source/EntityFramework.Extended/Batch/PgSqlBatchRunner.cs @@ -117,7 +117,7 @@ private int InternalDelete(ObjectContext objectContext, EntityMap entit } sqlBuilder.Append(")"); - deleteCommand.CommandText = sqlBuilder.ToString().Replace("[", "").Replace("]", ""); + deleteCommand.CommandText = sqlBuilder.ToString().Replace("[", "\"").Replace("]", "\""); #if NET45 int result = async @@ -361,7 +361,7 @@ private int InternalUpdate(ObjectContext objectContext, EntityMap entit } sqlBuilder.AppendLine(")"); - updateCommand.CommandText = sqlBuilder.ToString().Replace("[", "").Replace("]", ""); + updateCommand.CommandText = sqlBuilder.ToString().Replace("[", "\"").Replace("]", "\""); #if NET45 int result = async