Skip to content

Commit

Permalink
Send the requests async too
Browse files Browse the repository at this point in the history
  • Loading branch information
urielha committed Jul 10, 2016
1 parent 3cb2a18 commit 4224d18
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 14 deletions.
36 changes: 36 additions & 0 deletions src/log4stash.Tests/Integration/ElasticSearchAppenderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using log4net;
using log4net.Core;
using log4stash.Filters;
using Nest;
using Newtonsoft.Json.Linq;
using NUnit.Framework;

Expand All @@ -29,6 +30,41 @@ public void Can_create_an_event_from_log4net()
Assert.AreEqual(1, searchResults.Total);
}

[Test]
public void log_async_message()
{
bool originIndexAsync = false;
QueryConfiguration(appender =>
{
originIndexAsync = appender.IndexAsync;
appender.IndexAsync = true;
});

_log.Info("dummy async");

int tries = 5;
ISearchResponse<JObject> searchResults = null;
while (--tries >= 0)
{
Client.Refresh(TestIndex);

searchResults = Client.Search<JObject>(s => s.AllTypes().AllIndices());

if (searchResults.Total > 0)
{
break;
}
Thread.Sleep(100);
}
Assert.AreEqual(1, searchResults.Total);

QueryConfiguration(appender =>
{
appender.IndexAsync = originIndexAsync;
});

}

[Test]
public void Log_exception_string_without_object()
{
Expand Down
59 changes: 45 additions & 14 deletions src/log4stash/ElasticClient/ElasticClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public class WebElasticClient : AbstractWebElasticClient
{
private readonly string _credentials;

class RequestDetails
{
public RequestDetails(WebRequest webRequest, string content)
{
WebRequest = webRequest;
Content = content;
}

public WebRequest WebRequest { get; private set; }
public string Content { get; private set; }
}

public WebElasticClient(string server, int port)
: this(server, port, false, false, string.Empty, string.Empty)
{
Expand Down Expand Up @@ -86,23 +98,34 @@ public override void PutTemplateRaw(string templateName, string rawBody)
webRequest.ContentType = "text/json";
webRequest.Method = "PUT";
SetBasicAuthHeader(webRequest);
SendRequest(webRequest, rawBody);
using (var httpResponse = (HttpWebResponse)webRequest.GetResponse())
if (SafeSendRequest(new RequestDetails(webRequest, rawBody), webRequest.GetRequestStream))
{
CheckResponse(httpResponse);
SafeGetAndCheckResponse(webRequest.GetResponse);
}
}

public override void IndexBulk(IEnumerable<InnerBulkOperation> bulk)
{
var webRequest = PrepareBulkAndSend(bulk);
SafeGetAndCheckResponse(webRequest.GetResponse);
var request = PrepareRequest(bulk);
if (SafeSendRequest(request, request.WebRequest.GetRequestStream))
{
SafeGetAndCheckResponse(request.WebRequest.GetResponse);
}
}

public override IAsyncResult IndexBulkAsync(IEnumerable<InnerBulkOperation> bulk)
{
var webRequest = PrepareBulkAndSend(bulk);
return webRequest.BeginGetResponse(FinishGetResponse, webRequest);
var request = PrepareRequest(bulk);
return request.WebRequest.BeginGetRequestStream(FinishGetRequest, request);
}

private void FinishGetRequest(IAsyncResult result)
{
var request = (RequestDetails)result.AsyncState;
if (SafeSendRequest(request, () => request.WebRequest.EndGetRequestStream(result)))
{
request.WebRequest.BeginGetResponse(FinishGetResponse, request.WebRequest);
}
}

private void FinishGetResponse(IAsyncResult result)
Expand All @@ -111,7 +134,7 @@ private void FinishGetResponse(IAsyncResult result)
SafeGetAndCheckResponse(() => webRequest.EndGetResponse(result));
}

private WebRequest PrepareBulkAndSend(IEnumerable<InnerBulkOperation> bulk)
private RequestDetails PrepareRequest(IEnumerable<InnerBulkOperation> bulk)
{
var requestString = PrepareBulk(bulk);

Expand All @@ -120,8 +143,7 @@ private WebRequest PrepareBulkAndSend(IEnumerable<InnerBulkOperation> bulk)
webRequest.Method = "POST";
webRequest.Timeout = 10000;
SetBasicAuthHeader(webRequest);
SendRequest(webRequest, requestString);
return webRequest;
return new RequestDetails(webRequest, requestString);
}

private static string PrepareBulk(IEnumerable<InnerBulkOperation> bulk)
Expand All @@ -142,12 +164,21 @@ private static string PrepareBulk(IEnumerable<InnerBulkOperation> bulk)
return sb.ToString();
}

private static void SendRequest(WebRequest webRequest, string requestString)
private bool SafeSendRequest(RequestDetails request, Func<Stream> getRequestStream)
{
using (var stream = new StreamWriter(webRequest.GetRequestStream()))
try
{
stream.Write(requestString);
using (var stream = new StreamWriter(getRequestStream()))
{
stream.Write(request.Content);
}
return true;
}
catch (Exception ex)
{
LogLog.Error(GetType(), "Invalid request to ElasticSearch", ex);
}
return false;
}

private void SafeGetAndCheckResponse(Func<WebResponse> getResponse)
Expand All @@ -161,7 +192,7 @@ private void SafeGetAndCheckResponse(Func<WebResponse> getResponse)
}
catch (Exception ex)
{
LogLog.Error(GetType(), "Invalid request to ElasticSearch", ex);
LogLog.Error(GetType(), "Got error while reading response from ElasticSearch", ex);
}
}

Expand Down

0 comments on commit 4224d18

Please sign in to comment.