diff --git a/lib/presto/client/statement_client.rb b/lib/presto/client/statement_client.rb index c394d497..00887a24 100644 --- a/lib/presto/client/statement_client.rb +++ b/lib/presto/client/statement_client.rb @@ -15,6 +15,7 @@ # module Presto::Client + require 'bigdecimal' require 'json' require 'msgpack' require 'presto/client/models' @@ -42,7 +43,8 @@ def initialize(faraday, query, options, next_uri=nil) if next_uri response = faraday_get_with_retry(next_uri) - @results = @models::QueryResults.decode(parse_body(response)) + decoded = @models::QueryResults.decode(parse_body(response)) + @results = cast_results(decoded) else post_query_request! end @@ -69,7 +71,8 @@ def post_query_request! raise PrestoHttpError.new(response.status, "Failed to start query: #{response.body} (#{response.status})") end - @results = decode_model(uri, parse_body(response), @models::QueryResults) + decoded = decode_model(uri, parse_body(response), @models::QueryResults) + @results = cast_results(decoded) end private :post_query_request! @@ -113,7 +116,8 @@ def advance uri = @results.next_uri response = faraday_get_with_retry(uri) - @results = decode_model(uri, parse_body(response), @models::QueryResults) + decoded = decode_model(uri, parse_body(response), @models::QueryResults) + @results = cast_results(decoded) return true end @@ -124,6 +128,26 @@ def query_info decode_model(uri, parse_body(response), @models::QueryInfo) end + def cast_results(query_results) + data = query_results.data + columns = query_results.columns + return query_results if data.nil? || columns.nil? + + decimal_indices = columns.each_with_index.select { |c, i| c.type.start_with?('decimal(') }.map(&:last) + if decimal_indices.empty? + query_results + else + new_data = data.map do |row| + copy = row.dup + decimal_indices.each do |index| + copy[index] = BigDecimal(copy[index]) + end + copy + end + query_results.class.new(query_results.to_h.merge(data: new_data)) + end + end + def decode_model(uri, hash, body_class) begin body_class.decode(hash) diff --git a/spec/statement_client_spec.rb b/spec/statement_client_spec.rb index a529af9b..f6e395f3 100644 --- a/spec/statement_client_spec.rb +++ b/spec/statement_client_spec.rb @@ -43,6 +43,36 @@ StatementClient.new(faraday, query, options) end + it "returns results correctly" do + query = 'select 1 as i, 1.0 as d' + stub_request(:post, "localhost/v1/statement"). + with(body: query, + headers: { + "User-Agent" => "presto-ruby/#{VERSION}", + "X-Presto-Catalog" => options[:catalog], + "X-Presto-Schema" => options[:schema], + "X-Presto-User" => options[:user], + "X-Presto-Language" => options[:language], + "X-Presto-Time-Zone" => options[:time_zone], + }).to_return(body: <<-JSON) + {"id":"queryid","infoUri":"http://localhost/ui/query.html?queryid","nextUri":"http://localhost/v1/statement/queryid/1","stats":{"state":"QUEUED","queued":true,"scheduled":false,"nodes":0,"totalSplits":0,"queuedSplits":0,"runningSplits":0,"completedSplits":0,"userTimeMillis":0,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":0,"elapsedTimeMillis":0,"processedRows":0,"processedBytes":0,"peakMemoryBytes":0}} + JSON + client = StatementClient.new(faraday, query, options) + client.current_results.data.should be_nil + + stub_request(:get, "http://localhost/v1/statement/queryid/1").to_return(body: <<-JSON) + {"id":"20180927_190533_00177_setx5","infoUri":"http://localhost/ui/query.html?queryid","partialCancelUri":"http://192.168.128.77:8091/v1/stage/queryid.0","nextUri":"http://localhost/v1/statement/queryid/2","columns":[{"name":"i","type":"integer","typeSignature":{"rawType":"integer","typeArguments":[],"literalArguments":[],"arguments":[]}},{"name":"d","type":"decimal(2,1)","typeSignature":{"rawType":"decimal","typeArguments":[],"literalArguments":[],"arguments":[{"kind":"LONG_LITERAL","value":2},{"kind":"LONG_LITERAL","value":1}]}}],"data":[[1,"1.0"]],"stats":{"state":"RUNNING","queued":false,"scheduled":true,"nodes":1,"totalSplits":17,"queuedSplits":17,"runningSplits":0,"completedSplits":0,"userTimeMillis":0,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":1,"elapsedTimeMillis":121,"processedRows":0,"processedBytes":0,"peakMemoryBytes":0,"rootStage":{"stageId":"0","state":"RUNNING","done":false,"nodes":1,"totalSplits":17,"queuedSplits":17,"runningSplits":0,"completedSplits":0,"userTimeMillis":0,"cpuTimeMillis":0,"wallTimeMillis":0,"processedRows":0,"processedBytes":0,"subStages":[]},"progressPercentage":0.0}} + JSON + client.advance.should be_true + client.current_results.data.should == [[1, BigDecimal('1.0')]] + + stub_request(:get, "http://localhost/v1/statement/queryid/2").to_return(body: <<-JSON) + {"id":"queryid","infoUri":"http://localhost/ui/query.html?queryid","columns":[{"name":"i","type":"integer","typeSignature":{"rawType":"integer","typeArguments":[],"literalArguments":[],"arguments":[]}},{"name":"d","type":"decimal(2,1)","typeSignature":{"rawType":"decimal","typeArguments":[],"literalArguments":[],"arguments":[{"kind":"LONG_LITERAL","value":2},{"kind":"LONG_LITERAL","value":1}]}}],"stats":{"state":"FINISHED","queued":false,"scheduled":true,"nodes":1,"totalSplits":17,"queuedSplits":0,"runningSplits":0,"completedSplits":17,"userTimeMillis":4,"cpuTimeMillis":5,"wallTimeMillis":109,"queuedTimeMillis":1,"elapsedTimeMillis":128,"processedRows":0,"processedBytes":0,"peakMemoryBytes":0,"rootStage":{"stageId":"0","state":"FINISHED","done":true,"nodes":1,"totalSplits":17,"queuedSplits":0,"runningSplits":0,"completedSplits":17,"userTimeMillis":4,"cpuTimeMillis":5,"wallTimeMillis":109,"processedRows":1,"processedBytes":0,"subStages":[]},"progressPercentage":100.0}} + JSON + client.advance.should be_true + client.current_results.data.should be_nil + end + let :response_json2 do { id: "queryid", @@ -402,4 +432,3 @@ StatementClient.new(faraday, query, options) end end -