From d1b90df22ce8f9fa1b87d9483f7e833a19eaa86e Mon Sep 17 00:00:00 2001 From: Syphax bouazzouni Date: Fri, 17 Jan 2025 09:38:05 +0100 Subject: [PATCH] Feature: refactor cache and add SPARQL queries logging (#2) * extract cache logic in a separate module * add SPARQL query logging module * rename generate_cache_key to prevent conflicts --- lib/sparql/client.rb | 158 +++++++++++------------------------ lib/sparql/client/cache.rb | 120 ++++++++++++++++++++++++++ lib/sparql/client/logging.rb | 80 ++++++++++++++++++ lib/sparql/client/query.rb | 16 +--- 4 files changed, 252 insertions(+), 122 deletions(-) create mode 100644 lib/sparql/client/cache.rb create mode 100644 lib/sparql/client/logging.rb diff --git a/lib/sparql/client.rb b/lib/sparql/client.rb index 427b4426..aaacb829 100644 --- a/lib/sparql/client.rb +++ b/lib/sparql/client.rb @@ -17,6 +17,8 @@ module SPARQL # @see https://www.w3.org/TR/sparql11-results-csv-tsv/ class Client autoload :Query, 'sparql/client/query' + autoload :Cache, 'sparql/client/cache' + autoload :Logging, 'sparql/client/logging' autoload :Repository, 'sparql/client/repository' autoload :Update, 'sparql/client/update' autoload :VERSION, 'sparql/client/version' @@ -58,6 +60,9 @@ class ServerError < StandardError; end XMLNS = {'sparql' => 'http://www.w3.org/2005/sparql-results#'}.freeze + attr_reader :cache + attr_reader :logger + ## # The SPARQL endpoint URL, or an RDF::Queryable instance, to use the native SPARQL engine. # @@ -94,12 +99,9 @@ class ServerError < StandardError; end # Defaults `User-Agent` header, unless one is specified. # @option options [Hash] :read_timeout def initialize(url, **options, &block) - @logger = options[:logger] ||= Kernel.const_defined?("LOGGER") ? Kernel.const_get("LOGGER") : Logger.new(STDOUT) - @redis_cache = nil - - if options[:redis_cache] - @redis_cache = options[:redis_cache] - end + @cache = SPARQL::Client::Cache.new(redis_cache: options[:redis_cache]) + @logger = Logging.new(redis: @cache.redis_cache, + logger: options[:logger]) case url when RDF::Queryable @@ -331,54 +333,32 @@ def nodes # @raise [IOError] if connection is closed # @see https://www.w3.org/TR/sparql11-protocol/#query-operation def query(query, **options) - unless query.respond_to?(:options) && query.options[:bypass_cache] - if @redis_cache && (query.instance_of?(SPARQL::Client::Query) || options[:graphs]) - - - if options[:graphs] || query.options[:graphs] - cache_key = SPARQL::Client::Query.generate_cache_key(query.to_s, - options[:graphs] || query.options[:graphs]) - else - cache_key = query.cache_key - end + cached_response = nil + @logger.log(query, user: options[:user]) do + cached_response = @cache.get(query, options) + end - cache_response = @redis_cache.get(cache_key[:query]) + return cached_response if cached_response - if options[:reload_cache] and options[:reload_cache] == true - @redis_cache.del(cache_key[:query]) - cache_response = nil - end - - if cache_response - cache_key[:graphs].each do |g| - unless @redis_cache.sismember(g, cache_key[:query]) - @redis_cache.del(cache_key[:query]) - cache_response = nil - break - end - end - if cache_response - return Marshal.load(cache_response) - end - end - options[:cache_key] = cache_key - end - end @op = :query @alt_endpoint = options[:endpoint] - case @url - when RDF::Queryable - require 'sparql' unless defined?(::SPARQL::Grammar) - begin - SPARQL.execute(query, @url, optimize: true, **options) - rescue SPARQL::MalformedQuery - $stderr.puts "error running #{query}: #{$!}" - raise + output = nil + @logger.log(query, user: options[:user], cached: false) do + case @url + when RDF::Queryable + require 'sparql' unless defined?(::SPARQL::Grammar) + begin + output = SPARQL.execute(query, @url, optimize: true, **options) + rescue SPARQL::MalformedQuery + $stderr.puts "error running #{query}: #{$!}" + raise + end + else + output = parse_response(response(query, **options), **options) end - else - parse_response(response(query, **options), **options) end + output end ## @@ -394,17 +374,21 @@ def query(query, **options) # @see https://www.w3.org/TR/sparql11-protocol/#update-operation def update(query, **options) @op = :update - if @redis_cache && !query.options[:bypass_cache] - query_delete_cache(query) + + if @cache.redis_cache && !query.options[:bypass_cache] + raise Exception, "Unsupported cacheable query" if query.options[:graph].nil? + @cache.invalidate(query.options[:graph].to_s) end @alt_endpoint = options[:endpoint] - case @url - when RDF::Queryable - require 'sparql' unless defined?(::SPARQL::Grammar) - SPARQL.execute(query, @url, update: true, optimize: true, **options) - else - response(query, **options) + @logger.log(query, user: options[:user], cached: false) do + case @url + when RDF::Queryable + require 'sparql' unless defined?(::SPARQL::Grammar) + SPARQL.execute(query, @url, update: true, optimize: true, **options) + else + response(query, **options) + end end self end @@ -424,64 +408,20 @@ def response(query, **options) headers['Accept'] = options[:content_type] if options[:content_type] request(query, headers) do |response| case response - when Net::HTTPBadRequest # 400 Bad Request + when Net::HTTPBadRequest # 400 Bad Request raise MalformedQuery.new(response.body + " Processing query #{query}") when Net::HTTPClientError # 4xx raise ClientError.new(response.body + " Processing query #{query}") when Net::HTTPServerError # 5xx raise ServerError.new(response.body + " Processing query #{query}") - when Net::HTTPSuccess # 2xx + when Net::HTTPSuccess # 2xx response + else + # type code here end end end - def query_delete_cache(update) - if update.options[:graph].nil? - raise Exception, "Unsuported cacheable query" - end - cache_invalidate_graph(update.options[:graph].to_s) - end - - def cache_invalidate_graph(graphs) - return if @redis_cache.nil? - graphs = [graphs] unless graphs.instance_of?(Array) - graphs.each do |graph| - attempts = 0 - begin - graph = graph.to_s - graph = "sparql:graph:#{graph}" unless graph.start_with?("sparql:graph:") - if @redis_cache.exists?(graph) - begin - @redis_cache.del(graph) - rescue => exception - puts "warning: error in cache invalidation `#{exception}`" - puts exception.backtrace - end - end - rescue Exception => e - if attempts < 3 - attempts += 1 - sleep(5) - retry - end - end - end - end - - def query_put_cache(keys, entry) - # expiration = 1800 #1/2 hour - data = Marshal.dump(entry) - if data.length > 50e6 # 50MB of marshal object - # avoid large entries to go in the cache - return - end - keys[:graphs].each do |g| - @redis_cache.sadd(g, keys[:query]) - end - @redis_cache.set(keys[:query], data) - #@redis_cache.expire(keys[:query],expiration) - end ## # @param [Net::HTTPSuccess] response @@ -495,9 +435,7 @@ def parse_response(response, **options) response.body == 'true' when RESULT_JSON result_data = self.class.parse_json_bindings(response.body, nodes) - if options[:cache_key] - query_put_cache(options[:cache_key], result_data) - end + @cache.add(options[:cache_key], result_data) if options[:cache_key] return result_data when RESULT_XML self.class.parse_xml_bindings(response.body, nodes) @@ -815,7 +753,13 @@ def inspect end def redis_cache=(redis_cache) - @redis_cache = redis_cache + @cache.redis_cache = redis_cache + @logger.redis = redis_cache + end + + def logger=(logger) + @logger.logger = logger + @logger.redis = @cache.redis_cache end protected diff --git a/lib/sparql/client/cache.rb b/lib/sparql/client/cache.rb new file mode 100644 index 00000000..123ae827 --- /dev/null +++ b/lib/sparql/client/cache.rb @@ -0,0 +1,120 @@ +class SPARQL::Client + class Cache + attr_accessor :redis_cache + + def initialize(redis_cache: nil) + @redis_cache = redis_cache if redis_cache + end + + def add(key, value) + cache_query_response(key, value) + end + + def get(query, options) + cached_query_response(query, options) + end + + def invalidate(graphs) + cache_invalidate_graph(graphs) + end + + def key(query, options) + query_cache_key(query, options) + end + + def self.generate_cache_key(string, from) + from = from.map { |x| x.to_s }.uniq.sort + sorted_graphs = from.join ":" + digest = Digest::MD5.hexdigest(string) + from = from.map { |x| "sparql:graph:#{x}" } + return { graphs: from, query: "sparql:#{sorted_graphs}:#{digest}" } + end + + private + + def cache_invalidate_graph(graphs) + return if @redis_cache.nil? + graphs = [graphs] unless graphs.instance_of?(Array) + graphs.each do |graph| + attempts = 0 + begin + graph = graph.to_s + graph = "sparql:graph:#{graph}" unless graph.start_with?("sparql:graph:") + if @redis_cache.exists?(graph) + begin + @redis_cache.del(graph) + rescue => exception + puts "warning: error in cache invalidation `#{exception}`" + end + end + rescue Exception => e + if attempts < 3 + attempts += 1 + sleep(5) + retry + end + end + end + end + + def cache_query_response(keys, entry) + # expiration = 1800 #1/2 hour + data = Marshal.dump(entry) + if data.length > 50e6 # 50MB of marshal object + # avoid large entries to go in the cache + return + end + keys[:graphs].each do |g| + @redis_cache.sadd(g, keys[:query]) + end + @redis_cache.set(keys[:query], data) + #@redis_cache.expire(keys[:query],expiration) + end + + def cache_key(query) + return nil if query.options[:from].nil? || query.options[:from].empty? + from = query.options[:from] + from = [from] unless from.instance_of?(Array) + SPARQL::Client::Cache.generate_cache_key(query.to_s, from) + end + + def query_cache_key(query, options) + if options[:graphs] || query.options[:graphs] + cache_key = SPARQL::Client::Cache.generate_cache_key(query.to_s, options[:graphs] || query.options[:graphs]) + else + cache_key = cache_key(query) + end + cache_key + end + + def cached_query_response(query, options) + return nil if query.respond_to?(:options) && query.options[:bypass_cache] + + if @redis_cache && (query.instance_of?(SPARQL::Client::Query) || options[:graphs]) + + cache_key = query_cache_key(query, options) + cache_response = @redis_cache.get(cache_key[:query]) + + if options[:reload_cache] and options[:reload_cache] == true + @redis_cache.del(cache_key[:query]) + cache_response = nil + end + + if cache_response + cache_key[:graphs].each do |g| + unless @redis_cache.sismember(g, cache_key[:query]) + @redis_cache.del(cache_key[:query]) + cache_response = nil + break + end + end + + return Marshal.load(cache_response) if cache_response + end + + options[:cache_key] = cache_key + nil + end + end + end +end diff --git a/lib/sparql/client/logging.rb b/lib/sparql/client/logging.rb new file mode 100644 index 00000000..e45fc1fe --- /dev/null +++ b/lib/sparql/client/logging.rb @@ -0,0 +1,80 @@ +require 'benchmark' +require 'securerandom' + +class SPARQL::Client + class Logging + attr_accessor :logger + attr_accessor :redis + attr_accessor :enabled + + REDIS_EXPIRY = 86_400 # 24 hours + + def initialize(redis:, redis_key: 'query_logs', redis_expiry: REDIS_EXPIRY, logger: nil) + @redis = redis + @logger = logger + @redis_key = redis_key + @redis_expiry = redis_expiry + @enabled = !logger.nil? + end + + def log(query, id: SecureRandom.uuid, cached: nil, user: nil ,&block) + return block.call unless @enabled + + time = Benchmark.realtime do + result = block.call + cached = !result.nil? if cached.nil? + end + info(query, id: id, cached: cached, user: user, execution_time: time) + end + + + def info(query, id: SecureRandom.uuid, cached: 'null', user: 'null', execution_time: 0) + timestamp = Time.now.iso8601 + entry = { + id: id, + timestamp: timestamp, + query: query.to_s, + cached: cached, + user: user, + execution_time: execution_time + } + + + @logger&.info("SPARQL: #{query} (#{execution_time}s) | Cached: #{cached} | User: #{user}") + return if @redis.nil? + + key = "#{@redis_key}-#{id}-#{timestamp}" + entry = encode_data(entry) + return if entry.nil? + + @redis.set(key, entry) + @redis.expire(key, @redis_expiry) + end + + def get_logs + keys = @redis.keys("#{@redis_key}-*") + keys.map { |key| Marshal.load(@redis.get(key)) } + end + + def logger=(logger) + @logger = logger + @enabled = !logger.nil? + end + + private + def encode_data(entry) + data = Marshal.dump(entry.to_json) + if data.length > 50e6 # 50MB of marshal object + # avoid large entries to go in the cache + puts "Entry too large to be stored in cache" + return nil + end + data + end + + def decode_data(data) + Marshal.load(data) + end + + end +end diff --git a/lib/sparql/client/query.rb b/lib/sparql/client/query.rb index 1be43e58..a1d36efb 100644 --- a/lib/sparql/client/query.rb +++ b/lib/sparql/client/query.rb @@ -518,20 +518,6 @@ def optional_union_with_bind_as(*pattern_list) self end - def cache_key - return nil if options[:from].nil? || options[:from].empty? - from = options[:from] - from = [from] unless from.instance_of?(Array) - return Query.generate_cache_key(self.to_s, from) - end - - def self.generate_cache_key(string, from) - from = from.map { |x| x.to_s }.uniq.sort - sorted_graphs = from.join ":" - digest = Digest::MD5.hexdigest(string) - from = from.map { |x| "sparql:graph:#{x}" } - return { graphs: from, query: "sparql:#{sorted_graphs}:#{digest}" } - end ## # @example SELECT * WHERE \{ ?book dc:title ?title \} UNION \{ ?book dc11:title ?title \} @@ -1013,8 +999,8 @@ def inspect! def inspect sprintf("#<%s:%#0x(%s)>", self.class.name, __id__, to_s) end - # Allow Filters to be + class Filter < SPARQL::Client::QueryElement def initialize(*args) super