Skip to content

Commit

Permalink
[#9683] Automatic updating of Cassandra client (ycqlsh) internal meta…
Browse files Browse the repository at this point in the history
…data cache.

Summary:
Implemented automatic internal cache update in case of an internal 'not found' error.

The cache updating is automatic & invisible.
To see when the cache is updated -  start the tool with `--debug` argument:
```
ycqlsh 127.0.0.3 --debug
```

Generic requests (like `DESC TABLES`, `DESC <keyspace>`) will NOT show updated information till e.g. the info will be updated via the `DESC <table>` request, which initiates the internal cache refreshing.
The new command line argument `--refresh_on_describe` (or `-r`) can be used to force the cache refreshing on every `DESC` command.
```
ycqlsh 127.0.0.3 --refresh_on_describe
```

The new function  `refresh_schema_metadata(self, fn_to_rerun, args, ex)` after the metadata refreshing must automatically rerun the function-caller: `fn_to_rerun(*args)`. If a recursive call detected, the exception will be raised (actually it's an old exception from the original implementation before the fix.) Automatic rerun of the caller helps too keep main part of the code unchanged.

Test Plan:
In terminal 1:
  CREATE KEYSPACE a;
  CREATE TABLE t (h INT PRIMARY KEY);
In terminal 2:
  SELECT * FROM a.t;
or
  INSERT INTO a.t (h) VALUES (1);
or
  DROP TABLE a.t;
or
  DESC a.t
or
  DESC a

Reviewers: mihnea, pjain

Reviewed By: pjain

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D12972
  • Loading branch information
OlegLoginov committed Sep 15, 2021
1 parent fc375bf commit 4e00848
Showing 1 changed file with 57 additions and 16 deletions.
73 changes: 57 additions & 16 deletions bin/ycqlsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def find_zip(libprefix):
from cqlshlib import cql3handling, cqlhandling, pylexotron, sslhandling, cqlshhandling
from cqlshlib.copyutil import ExportTask, ImportTask
from cqlshlib.displaying import (ANSI_RESET, BLUE, COLUMN_NAME_COLORS, CYAN,
RED, WHITE, FormattedValue, colorme)
RED, WHITE, YELLOW, FormattedValue, colorme)
from cqlshlib.formatting import (DEFAULT_DATE_FORMAT, DEFAULT_NANOTIME_FORMAT,
DEFAULT_TIMESTAMP_FORMAT, CqlType, DateTimeFormat,
format_by_type, formatter_for)
Expand Down Expand Up @@ -237,6 +237,8 @@ def find_zip(libprefix):
help='Specify the default request timeout in seconds (default: %default seconds).')
parser.add_option("-t", "--tty", action='store_true', dest='tty',
help='Force tty mode (command prompt).')
parser.add_option("-r", "--refresh_on_describe", action='store_true',
help='Force refreshing of the schema metadata on DESCRIBE command.')

optvalues = optparse.Values()
(options, arguments) = parser.parse_args(sys.argv[1:], values=optvalues)
Expand Down Expand Up @@ -420,6 +422,7 @@ class Shell(cmd.Cmd):
last_hist = None
shunted_query_out = None
use_paging = True
in_refresh_schema_metadata = False

default_page_size = 100

Expand All @@ -439,7 +442,8 @@ def __init__(self, hostname, port, color=False,
single_statement=None,
request_timeout=DEFAULT_REQUEST_TIMEOUT_SECONDS,
protocol_version=DEFAULT_PROTOCOL_VERSION,
connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS):
connect_timeout=DEFAULT_CONNECT_TIMEOUT_SECONDS,
refresh_on_describe=False):
cmd.Cmd.__init__(self, completekey=completekey)
self.hostname = hostname
self.port = port
Expand Down Expand Up @@ -525,6 +529,8 @@ def __init__(self, hostname, port, color=False,
self.statement_error = False
self.single_statement = single_statement

self.refresh_on_describe = refresh_on_describe

@property
def is_using_utf8(self):
# utf8 encodings from https://docs.python.org/{2,3}/library/codecs.html
Expand All @@ -538,6 +544,23 @@ def check_windows_encoding(self):
"If you experience encoding problems, change your console"
" codepage with 'chcp 65001' before starting ycqlsh.\n".format(self.encoding))

def internal_refresh_schema_metadata(self):
if self.debug:
self.printerr("Refreshing schema metadata cache...", color=YELLOW)
self.conn.refresh_schema_metadata(-1)

def refresh_schema_metadata(self, fn_to_rerun, args, ex):
if self.in_refresh_schema_metadata:
raise ex

self.internal_refresh_schema_metadata()

try:
self.in_refresh_schema_metadata = True
return fn_to_rerun(*args)
finally:
self.in_refresh_schema_metadata = False

def set_expanded_cql_version(self, ver):
ver, vertuple = full_cql_version(ver)
self.cql_version = ver
Expand Down Expand Up @@ -647,7 +670,8 @@ def get_usertype_layout(self, ksname, typename):
try:
user_type = ks_meta.user_types[typename]
except KeyError:
raise UserTypeNotFound("User type %r not found" % typename)
return self.refresh_schema_metadata(self.get_usertype_layout, (ksname, typename),
UserTypeNotFound("User type %r not found" % typename))

return list(zip(user_type.field_names, user_type.field_types))

Expand All @@ -671,7 +695,8 @@ def get_partitioner(self):

def get_keyspace_meta(self, ksname):
if ksname not in self.conn.metadata.keyspaces:
raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
return self.refresh_schema_metadata(self.get_keyspace_meta, (ksname,),
KeyspaceNotFound('Keyspace %r not found.' % ksname))
return self.conn.metadata.keyspaces[ksname]

def get_keyspaces(self):
Expand All @@ -690,7 +715,8 @@ def get_table_meta(self, ksname, tablename):
if ksname == 'system_auth' and tablename in ['roles', 'role_permissions']:
self.get_fake_auth_table_meta(ksname, tablename)
else:
raise ColumnFamilyNotFound("Column family %r not found" % tablename)
return self.refresh_schema_metadata(self.get_table_meta, (ksname, tablename),
ColumnFamilyNotFound("Column family %r not found" % tablename))
else:
return ksmeta.tables[tablename]

Expand All @@ -711,15 +737,17 @@ def get_fake_auth_table_meta(self, ksname, tablename):
table_meta.columns['resource'] = ColumnMetadata(table_meta, 'resource', cassandra.cqltypes.UTF8Type)
table_meta.columns['permission'] = ColumnMetadata(table_meta, 'permission', cassandra.cqltypes.UTF8Type)
else:
raise ColumnFamilyNotFound("Column family %r not found" % tablename)
return self.refresh_schema_metadata(self.get_fake_auth_table_meta, (ksname, tablename),
ColumnFamilyNotFound("Column family %r not found" % tablename))

def get_index_meta(self, ksname, idxname):
if ksname is None:
ksname = self.current_keyspace
ksmeta = self.get_keyspace_meta(ksname)

if idxname not in ksmeta.indexes:
raise IndexNotFound("Index %r not found" % idxname)
return self.refresh_schema_metadata(self.get_index_meta, (ksname, idxname),
IndexNotFound("Index %r not found" % idxname))

return ksmeta.indexes[idxname]

Expand All @@ -729,15 +757,17 @@ def get_view_meta(self, ksname, viewname):
ksmeta = self.get_keyspace_meta(ksname)

if viewname not in ksmeta.views:
raise MaterializedViewNotFound("Materialized view %r not found" % viewname)
return self.refresh_schema_metadata(self.get_view_meta, (ksname, viewname),
MaterializedViewNotFound("Materialized view %r not found" % viewname))
return ksmeta.views[viewname]

def get_object_meta(self, ks, name):
if name is None:
if ks and ks in self.conn.metadata.keyspaces:
return self.conn.metadata.keyspaces[ks]
elif self.current_keyspace is None:
raise ObjectNotFound("%r not found in keyspaces" % (ks))
return self.refresh_schema_metadata(self.get_object_meta, (ks, name),
ObjectNotFound("%r not found in keyspaces" % (ks)))
else:
name = ks
ks = self.current_keyspace
Expand All @@ -754,7 +784,8 @@ def get_object_meta(self, ks, name):
elif name in ksmeta.views:
return ksmeta.views[name]

raise ObjectNotFound("%r not found in keyspace %r" % (name, ks))
return self.refresh_schema_metadata(self.get_object_meta, (ks, name),
ObjectNotFound("%r not found in keyspace %r" % (name, ks)))

def get_usertypes_meta(self):
data = self.session.execute("select * from system.schema_usertypes")
Expand Down Expand Up @@ -1005,7 +1036,8 @@ def parse_for_select_meta(self, query_string):
try:
return self.get_view_meta(ks, name)
except MaterializedViewNotFound:
raise ObjectNotFound("%r not found in keyspace %r" % (name, ks))
return self.refresh_schema_metadata(self.parse_for_select_meta, (query_string,),
ObjectNotFound("%r not found in keyspace %r" % (name, ks)))

def parse_for_update_meta(self, query_string):
try:
Expand Down Expand Up @@ -1321,7 +1353,8 @@ def describe_function(self, ksname, functionname):
ksmeta = self.get_keyspace_meta(ksname)
functions = [f for f in list(ksmeta.functions.values()) if f.name == functionname]
if len(functions) == 0:
raise FunctionNotFound("User defined function %r not found" % functionname)
return self.refresh_schema_metadata(self.describe_function, (ksname, functionname),
FunctionNotFound("User defined function %r not found" % functionname))
print("\n\n".join(func.export_as_string() for func in functions))
print('')

Expand All @@ -1346,7 +1379,8 @@ def describe_aggregate(self, ksname, aggregatename):
ksmeta = self.get_keyspace_meta(ksname)
aggregates = [f for f in list(ksmeta.aggregates.values()) if f.name == aggregatename]
if len(aggregates) == 0:
raise FunctionNotFound("User defined aggregate %r not found" % aggregatename)
return self.refresh_schema_metadata(self.describe_aggregate, (ksname, aggregatename),
FunctionNotFound("User defined aggregate %r not found" % aggregatename))
print("\n\n".join(aggr.export_as_string() for aggr in aggregates))
print('')

Expand All @@ -1372,7 +1406,8 @@ def describe_usertype(self, ksname, typename):
try:
usertype = ksmeta.user_types[typename]
except KeyError:
raise UserTypeNotFound("User type %r not found" % typename)
return self.refresh_schema_metadata(self.describe_usertype, (ksname, typename),
UserTypeNotFound("User type %r not found" % typename))
print(usertype.export_as_string())

def _columnize_unicode(self, name_list, quote=False):
Expand Down Expand Up @@ -1498,6 +1533,9 @@ def do_describe(self, parsed):
where object can be either a keyspace or a table or an index or a materialized
view (in this order).
"""
if self.refresh_on_describe:
self.internal_refresh_schema_metadata()

what = parsed.matched[1][1].lower()
if what == 'functions':
self.describe_functions(self.current_keyspace)
Expand Down Expand Up @@ -1747,7 +1785,8 @@ def do_source(self, parsed):
display_timezone=self.display_timezone,
max_trace_wait=self.max_trace_wait, ssl=self.ssl,
request_timeout=self.session.default_timeout,
connect_timeout=self.conn.connect_timeout)
connect_timeout=self.conn.connect_timeout,
refresh_on_describe=self.refresh_on_describe)
subshell.cmdloop()
f.close()

Expand Down Expand Up @@ -2217,6 +2256,7 @@ def read_options(cmdlineargs, environment):
optvalues.max_trace_wait = option_with_default(configs.getfloat, 'tracing', 'max_trace_wait',
DEFAULT_MAX_TRACE_WAIT)
optvalues.timezone = option_with_default(configs.get, 'ui', 'timezone', None)
optvalues.refresh_on_describe = option_with_default(configs.getboolean, 'ui', 'refresh_on_describe', False)

optvalues.debug = False
optvalues.file = None
Expand Down Expand Up @@ -2390,7 +2430,8 @@ def main(options, hostname, port):
single_statement=options.execute,
request_timeout=options.request_timeout,
connect_timeout=options.connect_timeout,
encoding=options.encoding)
encoding=options.encoding,
refresh_on_describe=options.refresh_on_describe)
except KeyboardInterrupt:
sys.exit('Connection aborted.')
except CQL_ERRORS as e:
Expand Down

0 comments on commit 4e00848

Please sign in to comment.