Skip to content

Commit

Permalink
Merge pull request #56 from silviucpp/no_binding_tombstone
Browse files Browse the repository at this point in the history
Avoid undesired tombstone while null binding
  • Loading branch information
silviucpp authored Nov 8, 2022
2 parents 9832cea + a0e658e commit 8eca8f3
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 21 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
### Changelog:

##### v4.0.8

- Update cpp-driver to 2.16.2
- Add support to avoid creating tombstones while inserting data using prepared statements (https://github.com/silviucpp/erlcass/wiki/Null-bindings-on-prepared-statements-and-undesired-tombstone-creation).

##### v4.0.7

- Fix compilation on architectures where char is unsigned by default https://github.com/silviucpp/erlcass/issues/53
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ List of supported features:
- Asynchronous API
- Synchronous API
- Simple, Prepared, and Batch statements
- [Avoid undesired tombstone while null binding][10] (only on protocol 4 or newer).
- Paged queries
- Asynchronous I/O, parallel execution, and request pipelining
- Connection pooling
Expand Down Expand Up @@ -204,6 +205,7 @@ Also this is possible using `{Query, Options}` where options is a proplist with
- `serial_consistency_level` - This consistency can only be either `?CASS_CONSISTENCY_SERIAL` or
`?CASS_CONSISTENCY_LOCAL_SERIAL` and if not present, it defaults to `?CASS_CONSISTENCY_SERIAL`. This option will be
ignored for anything else that a conditional update/insert.
- `null_binding` - Boolean (by default `true`). Provides a way to disable the null values binding. [Binding null values][10] will create undesired tombstone in cassandra.

Example:

Expand Down Expand Up @@ -435,3 +437,4 @@ For mode details about bind by index and name please see: 'Run a prepared statem
[7]:https://github.com/lpgauth/marina
[8]:https://github.com/silviucpp/erlcass
[9]:https://github.com/silviucpp/erlcass/wiki/Todo-list
[10]:https://github.com/silviucpp/erlcass/wiki/Null-bindings-on-prepared-statements-and-undesired-tombstone-creation
17 changes: 14 additions & 3 deletions c_src/cass_binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,21 @@ ERL_NIF_TERM nif_list_to_cass_collection(ErlNifEnv* env, ERL_NIF_TERM list, cons
return ATOMS.atomOk;
}

ERL_NIF_TERM cass_bind_by_index(ErlNifEnv* env, CassStatement* statement, size_t index, const datastax::internal::core::DataType* data_type, ERL_NIF_TERM value)
ERL_NIF_TERM cass_bind_by_index(ErlNifEnv* env, CassStatement* statement, size_t index, const datastax::internal::core::DataType* data_type, ERL_NIF_TERM value, bool null_binding)
{
if(enif_is_identical(value, ATOMS.atomNull))
return cass_error_to_nif_term(env, cass_statement_bind_null(statement, index));
if(enif_is_atom(env, value))
{
if(enif_is_identical(value, ATOMS.atomNull))
{
if(null_binding)
return cass_error_to_nif_term(env, cass_statement_bind_null(statement, index));
else
return ATOMS.atomOk;
}

if(enif_is_identical(value, ATOMS.atomUndefined))
return ATOMS.atomOk;
}

return cass_set_from_nif(env, statement, index, kCassStatementFuns, data_type, value);
}
Expand Down
2 changes: 1 addition & 1 deletion c_src/cass_binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ namespace datastax { namespace internal { namespace core {
class DataType;
}}}

ERL_NIF_TERM cass_bind_by_index(ErlNifEnv* env, CassStatement* statement, size_t index, const datastax::internal::core::DataType* data_type, ERL_NIF_TERM value);
ERL_NIF_TERM cass_bind_by_index(ErlNifEnv* env, CassStatement* statement, size_t index, const datastax::internal::core::DataType* data_type, ERL_NIF_TERM value, bool null_binding);

#endif // C_SRC_CASS_BINDING_H_
2 changes: 2 additions & 0 deletions c_src/constants.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ const char kAtomError[] = "error";
const char kAtomTrue[] = "true";
const char kAtomFalse[] = "false";
const char kAtomNull[] = "null";
const char kAtomUndefined[] = "undefined";
const char kAtomBadArg[] = "badarg";
const char kAtomOptions[] = "options";
const char kAtomConsistencyLevel[] = "consistency_level";
const char kAtomSerialConsistencyLevel[] = "serial_consistency_level";
const char kAtomLogMsgRecord[] = "log_msg";
const char kAtomNullBinding[] = "null_binding";

// events atoms

Expand Down
2 changes: 2 additions & 0 deletions c_src/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ extern const char kAtomError[];
extern const char kAtomTrue[];
extern const char kAtomFalse[];
extern const char kAtomNull[];
extern const char kAtomUndefined[];
extern const char kAtomBadArg[];
extern const char kAtomOptions[];
extern const char kAtomConsistencyLevel[];
extern const char kAtomSerialConsistencyLevel[];
extern const char kAtomLogMsgRecord[];
extern const char kAtomNullBinding[];

// events atoms

Expand Down
2 changes: 2 additions & 0 deletions c_src/erlcass.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ int on_nif_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOMS.atomTrue = make_atom(env, erlcass::kAtomTrue);
ATOMS.atomFalse = make_atom(env, erlcass::kAtomFalse);
ATOMS.atomNull = make_atom(env, erlcass::kAtomNull);
ATOMS.atomUndefined = make_atom(env, erlcass::kAtomUndefined);
ATOMS.atomBadArg = make_atom(env, erlcass::kAtomBadArg);
ATOMS.atomOptions = make_atom(env, erlcass::kAtomOptions);
ATOMS.atomConsistencyLevel = make_atom(env, erlcass::kAtomConsistencyLevel);
ATOMS.atomSerialConsistencyLevel = make_atom(env, erlcass::kAtomSerialConsistencyLevel);
ATOMS.atomLogMsgRecord = make_atom(env, erlcass::kAtomLogMsgRecord);
ATOMS.atomNullBinding = make_atom(env, erlcass::kAtomNullBinding);

// events atoms

Expand Down
2 changes: 2 additions & 0 deletions c_src/erlcass.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ struct atoms
ERL_NIF_TERM atomTrue;
ERL_NIF_TERM atomFalse;
ERL_NIF_TERM atomNull;
ERL_NIF_TERM atomUndefined;
ERL_NIF_TERM atomBadArg;
ERL_NIF_TERM atomOptions;
ERL_NIF_TERM atomConsistencyLevel;
ERL_NIF_TERM atomSerialConsistencyLevel;
ERL_NIF_TERM atomLogMsgRecord;
ERL_NIF_TERM atomNullBinding;

// events atoms

Expand Down
6 changes: 4 additions & 2 deletions c_src/nif_cass_prepared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ struct enif_cass_prepared
{
const CassPrepared* prepared;
ConsistencyLevelOptions consistency;
bool null_binding;
};

ERL_NIF_TERM nif_cass_prepared_new(ErlNifEnv* env, ErlNifResourceType* rs, const CassPrepared* prep, const ConsistencyLevelOptions& consistency)
ERL_NIF_TERM nif_cass_prepared_new(ErlNifEnv* env, ErlNifResourceType* rs, const CassPrepared* prep, const ConsistencyLevelOptions& consistency, bool null_binding)
{
enif_cass_prepared* enif_obj = static_cast<enif_cass_prepared*>(enif_alloc_resource(rs, sizeof(enif_cass_prepared)));

Expand All @@ -18,6 +19,7 @@ ERL_NIF_TERM nif_cass_prepared_new(ErlNifEnv* env, ErlNifResourceType* rs, const

enif_obj->prepared = prep;
enif_obj->consistency = consistency;
enif_obj->null_binding = null_binding;

ERL_NIF_TERM term = enif_make_resource(env, enif_obj);
enif_release_resource(enif_obj);
Expand All @@ -42,7 +44,7 @@ ERL_NIF_TERM nif_cass_prepared_bind(ErlNifEnv* env, int argc, const ERL_NIF_TERM
if(!enif_get_resource(env, argv[0], data->resCassPrepared, reinterpret_cast<void**>(&enif_prep)))
return make_badarg(env);

ERL_NIF_TERM term = nif_cass_statement_new(env, data->resCassStatement, enif_prep->prepared, enif_prep->consistency);
ERL_NIF_TERM term = nif_cass_statement_new(env, data->resCassStatement, enif_prep->prepared, enif_prep->consistency, enif_prep->null_binding);

if(enif_is_tuple(env, term))
return term;
Expand Down
2 changes: 1 addition & 1 deletion c_src/nif_cass_prepared.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "cassandra.h"
#include "nif_utils.h"

ERL_NIF_TERM nif_cass_prepared_new(ErlNifEnv* env, ErlNifResourceType* rs, const CassPrepared* prep, const ConsistencyLevelOptions& consistency);
ERL_NIF_TERM nif_cass_prepared_new(ErlNifEnv* env, ErlNifResourceType* rs, const CassPrepared* prep, const ConsistencyLevelOptions& consistency, bool null_binding);
ERL_NIF_TERM nif_cass_prepared_bind(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
void nif_cass_prepared_free(ErlNifEnv* env, void* obj);

Expand Down
4 changes: 3 additions & 1 deletion c_src/nif_cass_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct callback_statement_info
ERL_NIF_TERM arguments;
ErlNifResourceType* prepared_res;
ConsistencyLevelOptions consistency;
bool null_binding;
CassSession* session;
};

Expand Down Expand Up @@ -115,7 +116,7 @@ void on_statement_prepared(CassFuture* future, void* user_data)
{
const CassPrepared* prep = cass_future_get_prepared(future);

ERL_NIF_TERM term = nif_cass_prepared_new(cb->env, cb->prepared_res, prep, cb->consistency);
ERL_NIF_TERM term = nif_cass_prepared_new(cb->env, cb->prepared_res, prep, cb->consistency, cb->null_binding);

if(enif_is_tuple(cb->env, term))
{
Expand Down Expand Up @@ -292,6 +293,7 @@ ERL_NIF_TERM nif_cass_session_prepare(ErlNifEnv* env, int argc, const ERL_NIF_TE
callback->arguments = enif_make_copy(callback->env, argv[3]);
callback->consistency = q.consistency;
callback->session = enif_session->session;
callback->null_binding = q.null_binding;

CassFuture* future = cass_session_prepare_n(enif_session->session, BIN_TO_STR(q.query.data), q.query.size);

Expand Down
19 changes: 13 additions & 6 deletions c_src/nif_cass_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@
#define BIND_BY_INDEX 1
#define BIND_BY_NAME 2

namespace {

struct enif_cass_statement
{
CassStatement* statement;
bool null_binding;
};

ERL_NIF_TERM bind_prepared_statement_params(ErlNifEnv* env, CassStatement* statement, int type, ERL_NIF_TERM list)
ERL_NIF_TERM bind_prepared_statement_params(ErlNifEnv* env, enif_cass_statement* enif_stm, int type, ERL_NIF_TERM list)
{
ERL_NIF_TERM head;

datastax::internal::core::Statement* stm = static_cast<datastax::internal::core::Statement*>(statement);
datastax::internal::core::Statement* stm = static_cast<datastax::internal::core::Statement*>(enif_stm->statement);
const datastax::internal::core::ResultResponse* result = static_cast<datastax::internal::core::ExecuteRequest*>(stm)->prepared()->result().get();

if(type == BIND_BY_NAME)
Expand All @@ -42,7 +45,7 @@ ERL_NIF_TERM bind_prepared_statement_params(ErlNifEnv* env, CassStatement* state
size_t index = indices[0];

const datastax::internal::core::DataType* data_type = result->metadata()->get_column_definition(index).data_type.get();
ERL_NIF_TERM nif_result = cass_bind_by_index(env, statement, index, data_type, items[1]);
ERL_NIF_TERM nif_result = cass_bind_by_index(env, enif_stm->statement, index, data_type, items[1], enif_stm->null_binding);

if(!enif_is_identical(nif_result, ATOMS.atomOk))
return nif_result;
Expand All @@ -66,7 +69,7 @@ ERL_NIF_TERM bind_prepared_statement_params(ErlNifEnv* env, CassStatement* state

const datastax::internal::core::DataType* data_type = def.data_type.get();

ERL_NIF_TERM nif_result = cass_bind_by_index(env, statement, index, data_type, head);
ERL_NIF_TERM nif_result = cass_bind_by_index(env, enif_stm->statement, index, data_type, head, enif_stm->null_binding);

if(!enif_is_identical(nif_result, ATOMS.atomOk))
return nif_result;
Expand All @@ -78,6 +81,8 @@ ERL_NIF_TERM bind_prepared_statement_params(ErlNifEnv* env, CassStatement* state
return ATOMS.atomOk;
}

}

CassStatement* get_statement(ErlNifEnv* env, ErlNifResourceType* resource_type, ERL_NIF_TERM arg)
{
enif_cass_statement* enif_stm = NULL;
Expand Down Expand Up @@ -120,21 +125,23 @@ ERL_NIF_TERM nif_cass_statement_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM
return make_error(env, erlcass::kFailedToAllocResourceMsg);

enif_obj->statement = stm;
enif_obj->null_binding = q.null_binding;

ERL_NIF_TERM term = enif_make_resource(env, enif_obj);
enif_release_resource(enif_obj);

return enif_make_tuple2(env, ATOMS.atomOk, term);
}

ERL_NIF_TERM nif_cass_statement_new(ErlNifEnv* env, ErlNifResourceType* resource_type, const CassPrepared* prep, const ConsistencyLevelOptions& consistency)
ERL_NIF_TERM nif_cass_statement_new(ErlNifEnv* env, ErlNifResourceType* resource_type, const CassPrepared* prep, const ConsistencyLevelOptions& consistency, bool null_binding)
{
enif_cass_statement* enif_obj = static_cast<enif_cass_statement*>(enif_alloc_resource(resource_type, sizeof(enif_cass_statement)));

if(enif_obj == NULL)
return make_error(env, erlcass::kFailedToAllocResourceMsg);

enif_obj->statement = cass_prepared_bind(prep);
enif_obj->null_binding = null_binding;

CassError cass_result = cass_statement_set_consistency(enif_obj->statement, consistency.cl);

Expand Down Expand Up @@ -177,7 +184,7 @@ ERL_NIF_TERM nif_cass_statement_bind_parameters(ErlNifEnv* env, int argc, const
if(!enif_get_int(env, argv[1], &bind_type) || (bind_type != BIND_BY_INDEX && bind_type != BIND_BY_NAME))
return make_badarg(env);

return bind_prepared_statement_params(env, enif_stm->statement, bind_type, argv[2]);
return bind_prepared_statement_params(env, enif_stm, bind_type, argv[2]);
}

ERL_NIF_TERM nif_cass_statement_set_paging_size(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
Expand Down
2 changes: 1 addition & 1 deletion c_src/nif_cass_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

CassStatement* get_statement(ErlNifEnv* env, ErlNifResourceType* resource_type, ERL_NIF_TERM arg);
ERL_NIF_TERM nif_cass_statement_new(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM nif_cass_statement_new(ErlNifEnv* env, ErlNifResourceType* resource_type, const CassPrepared* prep, const ConsistencyLevelOptions& consistency);
ERL_NIF_TERM nif_cass_statement_new(ErlNifEnv* env, ErlNifResourceType* resource_type, const CassPrepared* prep, const ConsistencyLevelOptions& consistency, bool null_binding);
ERL_NIF_TERM nif_cass_statement_bind_parameters(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM nif_cass_statement_set_paging_size(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
void nif_cass_statement_free(ErlNifEnv* env, void* obj);
Expand Down
56 changes: 55 additions & 1 deletion c_src/nif_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,60 @@

#include <string.h>

namespace {

ERL_NIF_TERM parse_query_options(ErlNifEnv* env, ERL_NIF_TERM options_list, QueryTerm* q)
{
ERL_NIF_TERM head;
const ERL_NIF_TERM* items;
int arity;

while(enif_get_list_cell(env, options_list, &head, &options_list))
{
if(!enif_get_tuple(env, head, &arity, &items) || arity != 2)
return make_bad_options(env, head);

ERL_NIF_TERM key = items[0];
ERL_NIF_TERM value = items[1];

if(enif_is_identical(key, ATOMS.atomConsistencyLevel))
{
int c_level;

if(!enif_get_int(env, value, &c_level))
return make_bad_options(env, head);

q->consistency.cl = static_cast<CassConsistency>(c_level);
}
else if(enif_is_identical(key, ATOMS.atomSerialConsistencyLevel))
{
int c_level;

if(!enif_get_int(env, value, &c_level))
return make_bad_options(env, head);

q->consistency.serial_cl = static_cast<CassConsistency>(c_level);
}
else if(enif_is_identical(key, ATOMS.atomNullBinding))
{
cass_bool_t bool_value;

if(!get_boolean(value, &bool_value))
return make_badarg(env);

q->null_binding = static_cast<bool>(bool_value);
}
else
{
return make_bad_options(env, head);
}
}

return ATOMS.atomOk;
}

}

ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name)
{
ERL_NIF_TERM ret;
Expand Down Expand Up @@ -143,7 +197,7 @@ ERL_NIF_TERM parse_query_term(ErlNifEnv* env, ERL_NIF_TERM qterm, QueryTerm* q)

if(enif_is_list(env, items[1]))
{
return parse_consistency_level_options(env, items[1], &q->consistency);
return parse_query_options(env, items[1], q);
}
else
{
Expand Down
1 change: 1 addition & 0 deletions c_src/nif_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct QueryTerm

ErlNifBinary query;
ConsistencyLevelOptions consistency;
bool null_binding = true;
};

ERL_NIF_TERM make_atom(ErlNifEnv* env, const char* name);
Expand Down
2 changes: 1 addition & 1 deletion src/erlcass.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{description, "ErlCass - Erlang Cassandra Driver"},
{licenses, ["MIT"]},
{links,[{"Github","https://github.com/silviucpp/erlcass"}]},
{vsn, "4.0.7"},
{vsn, "4.0.8"},
{registered, []},
{applications, [kernel, stdlib, lager]},
{mod, {erlcass_app, []}},
Expand Down
Loading

0 comments on commit 8eca8f3

Please sign in to comment.