diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex
index 444d79fc..c2faae38 100644
--- a/lib/kafka_ex/server.ex
+++ b/lib/kafka_ex/server.ex
@@ -486,12 +486,13 @@ defmodule KafkaEx.Server do
             |> client_request(updated_state)
             |> module.create_request
 
-            response = broker
-            |> NetworkClient.send_sync_request(
+            response = NetworkClient.send_sync_request(
+              broker,
               wire_request,
-              config_sync_timeout()
-            )
-            |> module.parse_response
+              config_sync_timeout())
+            response = if response != nil,
+                          do: module.parse_response(response),
+                          else: nil
 
             state_out = State.increment_correlation_id(updated_state)
 
diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex
index d10bbbe4..73479920 100644
--- a/lib/kafka_ex/server_0_p_8_p_2.ex
+++ b/lib/kafka_ex/server_0_p_8_p_2.ex
@@ -17,9 +17,7 @@ defmodule KafkaEx.Server0P8P2 do
   alias KafkaEx.Protocol.ConsumerMetadata
   alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse
   alias KafkaEx.Protocol.Fetch
-  alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest
   alias KafkaEx.Protocol.Metadata.Broker
-  alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse
   alias KafkaEx.Protocol.OffsetFetch
   alias KafkaEx.Protocol.OffsetCommit
   alias KafkaEx.Server.State
@@ -102,9 +100,13 @@ defmodule KafkaEx.Server0P8P2 do
         Logger.log(:error, "Coordinator for topic #{offset_fetch.topic} is not available")
         {:topic_not_found, state}
       _ ->
-        response = broker
-          |> NetworkClient.send_sync_request(offset_fetch_request, config_sync_timeout())
-          |> OffsetFetch.parse_response
+        response = NetworkClient.send_sync_request(
+          broker,
+          offset_fetch_request,
+          config_sync_timeout())
+        response = if response != nil,
+                      do: OffsetFetch.parse_response(response),
+                      else: nil
         {response, %{state | correlation_id: state.correlation_id + 1}}
     end
 
@@ -155,40 +157,22 @@ defmodule KafkaEx.Server0P8P2 do
     end
   end
 
-  defp fetch(fetch_request, state) do
-    true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state)
-    fetch_data = Fetch.create_request(%FetchRequest{
-      fetch_request |
-      client_id: @client_id,
-      correlation_id: state.correlation_id,
-    })
-    {broker, state} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, fetch_request.topic, fetch_request.partition) do
-      nil    ->
-        updated_state = update_metadata(state)
-        {MetadataResponse.broker_for_topic(updated_state.metadata, updated_state.brokers, fetch_request.topic, fetch_request.partition), updated_state}
-      broker -> {broker, state}
-    end
-
-    case broker do
-      nil ->
-        Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available")
-        {:topic_not_found, state}
-      _ ->
-        response = broker
-          |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout())
-          |> Fetch.parse_response
-        state = %{state | correlation_id: state.correlation_id + 1}
+  defp fetch(request, state) do
+    true = consumer_group_if_auto_commit?(request.auto_commit, state)
+    case network_request(request, Fetch, state) do
+      {{:error, error}, state_out} -> {error, state_out}
+      {response, state_out} ->
         last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset)
-        if last_offset != nil && fetch_request.auto_commit do
+        if last_offset != nil && request.auto_commit do
           offset_commit_request = %OffsetCommit.Request{
-            topic: fetch_request.topic,
+            topic: request.topic,
             offset: last_offset,
-            partition: fetch_request.partition,
-            consumer_group: state.consumer_group}
-          {_, state} = offset_commit(state, offset_commit_request)
+            partition: request.partition,
+            consumer_group: state_out.consumer_group}
+          {_, state} = offset_commit(state_out, offset_commit_request)
           {response, state}
         else
-          {response, state}
+          {response, state_out}
         end
     end
   end