diff --git a/src/log4stash.Tests/Integration/ElasticSearchAppenderTests.cs b/src/log4stash.Tests/Integration/ElasticSearchAppenderTests.cs index e433ce1..5cdb3e7 100644 --- a/src/log4stash.Tests/Integration/ElasticSearchAppenderTests.cs +++ b/src/log4stash.Tests/Integration/ElasticSearchAppenderTests.cs @@ -7,6 +7,7 @@ using log4net; using log4net.Core; using log4stash.Filters; +using Nest; using Newtonsoft.Json.Linq; using NUnit.Framework; @@ -29,6 +30,41 @@ public void Can_create_an_event_from_log4net() Assert.AreEqual(1, searchResults.Total); } + [Test] + public void log_async_message() + { + bool originIndexAsync = false; + QueryConfiguration(appender => + { + originIndexAsync = appender.IndexAsync; + appender.IndexAsync = true; + }); + + _log.Info("dummy async"); + + int tries = 5; + ISearchResponse searchResults = null; + while (--tries >= 0) + { + Client.Refresh(TestIndex); + + searchResults = Client.Search(s => s.AllTypes().AllIndices()); + + if (searchResults.Total > 0) + { + break; + } + Thread.Sleep(100); + } + Assert.AreEqual(1, searchResults.Total); + + QueryConfiguration(appender => + { + appender.IndexAsync = originIndexAsync; + }); + + } + [Test] public void Log_exception_string_without_object() { diff --git a/src/log4stash/ElasticClient/ElasticClient.cs b/src/log4stash/ElasticClient/ElasticClient.cs index 3b867c1..0567084 100644 --- a/src/log4stash/ElasticClient/ElasticClient.cs +++ b/src/log4stash/ElasticClient/ElasticClient.cs @@ -59,6 +59,18 @@ public class WebElasticClient : AbstractWebElasticClient { private readonly string _credentials; + class RequestDetails + { + public RequestDetails(WebRequest webRequest, string content) + { + WebRequest = webRequest; + Content = content; + } + + public WebRequest WebRequest { get; private set; } + public string Content { get; private set; } + } + public WebElasticClient(string server, int port) : this(server, port, false, false, string.Empty, string.Empty) { @@ -86,23 +98,34 @@ public override void PutTemplateRaw(string templateName, string rawBody) webRequest.ContentType = "text/json"; webRequest.Method = "PUT"; SetBasicAuthHeader(webRequest); - SendRequest(webRequest, rawBody); - using (var httpResponse = (HttpWebResponse)webRequest.GetResponse()) + if (SafeSendRequest(new RequestDetails(webRequest, rawBody), webRequest.GetRequestStream)) { - CheckResponse(httpResponse); + SafeGetAndCheckResponse(webRequest.GetResponse); } } public override void IndexBulk(IEnumerable bulk) { - var webRequest = PrepareBulkAndSend(bulk); - SafeGetAndCheckResponse(webRequest.GetResponse); + var request = PrepareRequest(bulk); + if (SafeSendRequest(request, request.WebRequest.GetRequestStream)) + { + SafeGetAndCheckResponse(request.WebRequest.GetResponse); + } } public override IAsyncResult IndexBulkAsync(IEnumerable bulk) { - var webRequest = PrepareBulkAndSend(bulk); - return webRequest.BeginGetResponse(FinishGetResponse, webRequest); + var request = PrepareRequest(bulk); + return request.WebRequest.BeginGetRequestStream(FinishGetRequest, request); + } + + private void FinishGetRequest(IAsyncResult result) + { + var request = (RequestDetails)result.AsyncState; + if (SafeSendRequest(request, () => request.WebRequest.EndGetRequestStream(result))) + { + request.WebRequest.BeginGetResponse(FinishGetResponse, request.WebRequest); + } } private void FinishGetResponse(IAsyncResult result) @@ -111,7 +134,7 @@ private void FinishGetResponse(IAsyncResult result) SafeGetAndCheckResponse(() => webRequest.EndGetResponse(result)); } - private WebRequest PrepareBulkAndSend(IEnumerable bulk) + private RequestDetails PrepareRequest(IEnumerable bulk) { var requestString = PrepareBulk(bulk); @@ -120,8 +143,7 @@ private WebRequest PrepareBulkAndSend(IEnumerable bulk) webRequest.Method = "POST"; webRequest.Timeout = 10000; SetBasicAuthHeader(webRequest); - SendRequest(webRequest, requestString); - return webRequest; + return new RequestDetails(webRequest, requestString); } private static string PrepareBulk(IEnumerable bulk) @@ -142,12 +164,21 @@ private static string PrepareBulk(IEnumerable bulk) return sb.ToString(); } - private static void SendRequest(WebRequest webRequest, string requestString) + private bool SafeSendRequest(RequestDetails request, Func getRequestStream) { - using (var stream = new StreamWriter(webRequest.GetRequestStream())) + try { - stream.Write(requestString); + using (var stream = new StreamWriter(getRequestStream())) + { + stream.Write(request.Content); + } + return true; } + catch (Exception ex) + { + LogLog.Error(GetType(), "Invalid request to ElasticSearch", ex); + } + return false; } private void SafeGetAndCheckResponse(Func getResponse) @@ -161,7 +192,7 @@ private void SafeGetAndCheckResponse(Func getResponse) } catch (Exception ex) { - LogLog.Error(GetType(), "Invalid request to ElasticSearch", ex); + LogLog.Error(GetType(), "Got error while reading response from ElasticSearch", ex); } }