Skip to content

Commit

Permalink
Added support for conditional index
Browse files Browse the repository at this point in the history
  • Loading branch information
giometrix committed May 7, 2020
1 parent 3ff6f93 commit 7d50f0b
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;
Expand All @@ -11,6 +12,7 @@ namespace TableStorage.Abstractions.POCO.SecondaryIndexes
public static class PocoStoreIndexer
{
private static readonly Dictionary<string, dynamic> _indexes = new Dictionary<string, dynamic>();
private static readonly Dictionary<string, dynamic> _conditionalIndexFunctions = new Dictionary<string, dynamic>();

private static object _indexLock = new object();

Expand All @@ -26,7 +28,7 @@ public static class PocoStoreIndexer
/// <param name="indexName"></param>
/// <param name="indexStore"></param>
public static void AddIndex<T, TPartitionKey, TRowKey, TIndexPartitionKey, TIndexRowKey>(this IPocoTableStore<T, TPartitionKey, TRowKey> tableStore, string indexName,
IPocoTableStore<T, TIndexPartitionKey, TIndexRowKey> indexStore)
IPocoTableStore<T, TIndexPartitionKey, TIndexRowKey> indexStore, Func<T,bool> indexCondition = null)
{
lock (_indexLock)
{
Expand All @@ -35,10 +37,73 @@ public static void AddIndex<T, TPartitionKey, TRowKey, TIndexPartitionKey, TInde
throw new ArgumentException($"{indexName} has already been added");
}
_indexes[indexName] = indexStore;
tableStore.OnRecordInsertedOrUpdated += indexStore.InsertOrReplace;
tableStore.OnRecordInsertedOrUpdatedAsync += indexStore.InsertOrReplaceAsync;
tableStore.OnRecordsInserted += indexStore.Insert;
tableStore.OnRecordsInsertedAsync += indexStore.InsertAsync;

if (indexCondition == null)
{
tableStore.OnRecordInsertedOrUpdated += indexStore.InsertOrReplace;
tableStore.OnRecordInsertedOrUpdatedAsync += indexStore.InsertOrReplaceAsync;
tableStore.OnRecordsInserted += indexStore.Insert;
tableStore.OnRecordsInsertedAsync += indexStore.InsertAsync;
}
else
{
_conditionalIndexFunctions[indexName] = indexCondition;

tableStore.OnRecordInsertedOrUpdated += record =>
{
if (indexCondition(record))
{
indexStore.InsertOrReplace(record);
}
else
{
try
{
indexStore.Delete(record);
}
catch (StorageException e) when (e.Message == "Not Found")
{
// if the index row wasn't there there is nothing to do
}
}
};

tableStore.OnRecordInsertedOrUpdatedAsync += async record =>
{
if (indexCondition(record))
{
await indexStore.InsertOrReplaceAsync(record).ConfigureAwait(false);
}
else
{
try
{
await indexStore.DeleteAsync(record).ConfigureAwait(false);
}
catch (StorageException e) when (e.Message == "Not Found")
{
// if the index row wasn't there there is nothing to do
}
}
};

tableStore.OnRecordsInserted += records =>
{
var pass = records.Where(indexCondition);
indexStore.Insert(pass);
};

tableStore.OnRecordsInsertedAsync += async records =>
{
var pass = records.Where(indexCondition);
await indexStore.InsertAsync(pass).ConfigureAwait(false);
};
}

tableStore.OnRecordDeleted += obj =>
{
try
Expand All @@ -54,7 +119,7 @@ public static void AddIndex<T, TPartitionKey, TRowKey, TIndexPartitionKey, TInde
{
try
{
await indexStore.DeleteAsync(obj);
await indexStore.DeleteAsync(obj).ConfigureAwait(false);
}
catch (StorageException e) when (e.Message == "Not Found")
{
Expand All @@ -67,6 +132,7 @@ public static void AddIndex<T, TPartitionKey, TRowKey, TIndexPartitionKey, TInde
lock (_indexLock)
{
_indexes.Remove(indexName);
_conditionalIndexFunctions.Remove(indexName);
}
indexStore.DeleteTable();
};
Expand All @@ -75,8 +141,9 @@ public static void AddIndex<T, TPartitionKey, TRowKey, TIndexPartitionKey, TInde
lock (_indexLock)
{
_indexes.Remove(indexName);
_conditionalIndexFunctions.Remove(indexName);
}
await indexStore.DeleteTableAsync();
await indexStore.DeleteTableAsync().ConfigureAwait(false);
};
}

Expand All @@ -98,6 +165,7 @@ public static void RemoveIndex<T, TPartitionKey, TRowKey>(this IPocoTableStore<T
if (_indexes.ContainsKey(indexName))
{
_indexes.Remove(indexName);
_conditionalIndexFunctions.Remove(indexName);
}
}
}
Expand Down Expand Up @@ -476,6 +544,12 @@ public static async Task ReindexAsync<T, TPartitionKey, TRowKey>(this IPocoTable

string pageToken = null;
int count = 0;
dynamic indexCondition = null;
lock (_indexLock)
{
_conditionalIndexFunctions.TryGetValue(indexName, out indexCondition);
}

using (var semaphore = new SemaphoreSlim(maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
try
Expand All @@ -486,22 +560,27 @@ public static async Task ReindexAsync<T, TPartitionKey, TRowKey>(this IPocoTable
var result = await tableStore.GetAllRecordsPagedAsync(1000, pageToken);
pageToken = result.ContinuationToken;
var insertOrReplaceAsync = indexStore.GetType().GetMethod("InsertOrReplaceAsync");

if (result.Items.Count > 0)
{
foreach (var record in result.Items)
{
await semaphore.WaitAsync(TimeSpan.FromSeconds(20)).ConfigureAwait(false);
//Task task = indexStore.InsertOrReplaceAsync(record); //this line worked in the unit tests but not in a console app. Not sure why.
var task = (Task) insertOrReplaceAsync.Invoke(indexStore, new object[] { record });
task.ContinueWith(r =>
if (indexCondition == null || indexCondition(record))
{
if (r.IsFaulted)
await semaphore.WaitAsync(TimeSpan.FromSeconds(20)).ConfigureAwait(false);
var task = (Task) insertOrReplaceAsync.Invoke(indexStore, new object[] {record});
task.ContinueWith(r =>
{
failedIndexCallback?.Invoke(record, r.Exception);
}
Interlocked.Increment(ref count);
semaphore.Release(1);
});
if (r.IsFaulted)
{
failedIndexCallback?.Invoke(record, r.Exception);
}
Interlocked.Increment(ref count);
semaphore.Release(1);
});
}

}
}
Expand Down
19 changes: 18 additions & 1 deletion src/TableStorage.Abstractions.POCO.SecondaryIndexes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class Employee
public int Id { get; set; }
public string Name { get; set; }
public Department Department { get; set; }
public bool IsActive {get; set;} = true;
}

public class Department
Expand All @@ -37,7 +38,7 @@ TableStore = new PocoTableStore<Employee, int, int>("IXTestEmployee", "UseDevelo
IndexStore = new PocoTableStore<Employee, int, string>("IXTestEmployeeNameIndex", "UseDevelopmentStorage=true", e => e.CompanyId, e => e.Name);
```

Next we tie them together by using `AddIndex()`. Indexes must be given a name so that you can specify which index to use when querying. Hete we name our index "Name."
Next we tie them together by using `AddIndex()`. Indexes must be given a name so that you can specify which index to use when querying. Here we name our index "Name."

```charp
TableStore.AddIndex("Name", IndexStore);
Expand All @@ -54,7 +55,23 @@ var employee = new Employee
};
TableStore.Insert(employee);
```
### Conditional Indexes
Introduced in 1.1, you can now easily utilize conditional indexes. Conditional indexes allow you to add data to table storage only when a certain condition is true. Effectively this lets you easily place data into "buckets" that you can efficiently query later.

For example, suppose we want to quickly query only active employees.
We can add a new index as described below:
```charp
TableStore.AddIndex("ActiveEmployee", new PocoTableStore<Employee,
int, int>("IXActiveEmployees", "UseDevelopmentStorage=true",
e => e.CompanyId, e => e.Id), e => e.IsActive);
```
Getting all active employees is now as easy as
```charp
var activeEmployees = TableStore.GetByIndexPartitionKey("ActiveEmployee", 99);
```
This query would yield all active employees for company `99`, without penalty of an expensive partition scan at the server.

Note that conditional indexes are kept up to date, such that if a record were to no longer meet the condition (or later meet the condition), they will be removed or added to the index accordingly.
### Fetching Data
To fetch a single data point from the index, we use the `GetRecordByIndex` (or `GetRecordByIndexAsync`) extension method on the entity `PocoTableStore` (note that we are doing this on the main data store, not on the index, as a convenience):
```charp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
<PackageIcon>xtensible-x.png</PackageIcon>
<RepositoryUrl>https://github.com/giometrix/TableStorage.Abstractions.POCO</RepositoryUrl>
<PackageTags>table-storage azure-table-storage poco table-entities tableentity index secondary-index</PackageTags>
<AssemblyVersion>1.0.1.0</AssemblyVersion>
<PackageReleaseNotes>Minor optimizations</PackageReleaseNotes>
<AssemblyVersion>1.1.0.0</AssemblyVersion>
<PackageReleaseNotes>Added conditional indexes</PackageReleaseNotes>
<FileVersion>1.1.0.0</FileVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ public class Employee
public int Id { get; set; }
public string Name { get; set; }
public Department Department { get; set; }
public bool IsActive { get; set; }
}
}
Loading

0 comments on commit 7d50f0b

Please sign in to comment.