Skip to content

Commit

Permalink
Update usage of deprecated transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Mar 25, 2024
1 parent f2c2b3a commit 2c6fd74
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 37 deletions.
17 changes: 8 additions & 9 deletions basic_example/src/main/java/tech/ydb/example/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.slf4j.LoggerFactory;

import tech.ydb.auth.iam.CloudAuthHelper;
import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcTransport;
Expand All @@ -24,7 +25,7 @@
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListType;
import tech.ydb.table.values.ListValue;
Expand Down Expand Up @@ -303,6 +304,7 @@ private void scanQueryWithParams(long seriesID, long seasonID) {

private void multiStepTransaction(long seriesID, long seasonID) {
retryCtx.supplyStatus(session -> {
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
String query1
= "DECLARE $seriesId AS Uint64; "
+ "DECLARE $seasonId AS Uint64; "
Expand All @@ -312,8 +314,7 @@ private void multiStepTransaction(long seriesID, long seasonID) {
// Execute first query to get the required values to the client.
// Transaction control settings don't set CommitTx flag to keep transaction active
// after query execution.
TxControl<?> tx1 = TxControl.serializableRw().setCommitTx(false);
DataQueryResult res1 = session.executeDataQuery(query1, tx1, Params.of(
DataQueryResult res1 = transaction.executeDataQuery(query1, Params.of(
"$seriesId", PrimitiveValue.newUint64(seriesID),
"$seasonId", PrimitiveValue.newUint64(seasonID)
)).join().getValue();
Expand All @@ -327,7 +328,7 @@ private void multiStepTransaction(long seriesID, long seasonID) {
LocalDate toDate = fromDate.plusDays(15);

// Get active transaction id
String txId = res1.getTxId();
logger.info("got transaction id {}", transaction.getId());

// Construct next query based on the results of client logic
String query2
Expand All @@ -340,8 +341,7 @@ private void multiStepTransaction(long seriesID, long seasonID) {
// Execute second query.
// Transaction control settings continues active transaction (tx) and
// commits it at the end of second query execution.
TxControl<?> tx2 = TxControl.id(txId).setCommitTx(true);
DataQueryResult res2 = session.executeDataQuery(query2, tx2, Params.of(
DataQueryResult res2 = transaction.executeDataQueryAndCommit(query2, Params.of(
"$seriesId", PrimitiveValue.newUint64(seriesID),
"$fromDate", PrimitiveValue.newDate(fromDate),
"$toDate", PrimitiveValue.newDate(toDate)
Expand All @@ -362,7 +362,7 @@ private void multiStepTransaction(long seriesID, long seasonID) {

private void tclTransaction() {
retryCtx.supplyStatus(session -> {
Transaction transaction = session.beginTransaction(Transaction.Mode.SERIALIZABLE_READ_WRITE)
TableTransaction transaction = session.beginTransaction(TxMode.SERIALIZABLE_RW)
.join().getValue();

String query
Expand All @@ -373,8 +373,7 @@ private void tclTransaction() {

// Execute data query.
// Transaction control settings continues active transaction (tx)
TxControl<?> txControl = TxControl.id(transaction).setCommitTx(false);
DataQueryResult result = session.executeDataQuery(query, txControl, params)
DataQueryResult result = transaction.executeDataQuery(query, params)
.join().getValue();

logger.info("get transaction {}", result.getTxId());
Expand Down
32 changes: 16 additions & 16 deletions query-example/src/main/java/tech/ydb/example/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import org.slf4j.LoggerFactory;

import tech.ydb.auth.iam.CloudAuthHelper;
import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.query.QueryClient;
import tech.ydb.query.QueryStream;
import tech.ydb.query.QueryTransaction;
import tech.ydb.query.QueryTx;
import tech.ydb.query.tools.QueryReader;
import tech.ydb.query.tools.SessionRetryContext;
import tech.ydb.table.query.Params;
Expand Down Expand Up @@ -73,7 +73,7 @@ private void createTables() {
+ " series_info Text,"
+ " release_date Date,"
+ " PRIMARY KEY(series_id)"
+ ")", QueryTx.NONE).execute()
+ ")", TxMode.NONE).execute()
).join().getStatus().expectSuccess("Can't create table series");

retryCtx.supplyResult(session -> session.createQuery(""
Expand All @@ -84,7 +84,7 @@ private void createTables() {
+ " first_aired Date,"
+ " last_aired Date,"
+ " PRIMARY KEY(series_id, season_id)"
+ ")", QueryTx.NONE).execute()
+ ")", TxMode.NONE).execute()
).join().getStatus().expectSuccess("Can't create table seasons");

retryCtx.supplyResult(session -> session.createQuery(""
Expand All @@ -95,7 +95,7 @@ private void createTables() {
+ " title Text,"
+ " air_date Date,"
+ " PRIMARY KEY(series_id, season_id, episode_id)"
+ ")", QueryTx.NONE).execute()
+ ")", TxMode.NONE).execute()
).join().getStatus().expectSuccess("Can't create table episodes");
}

Expand All @@ -120,7 +120,7 @@ private void upsertTablesData() {
// Upsert list of series to table
retryCtx.supplyResult(session -> session.createQuery(
"UPSERT INTO series SELECT * FROM AS_TABLE($values)",
QueryTx.SERIALIZABLE_RW,
TxMode.SERIALIZABLE_RW,
Params.of("$values", seriesData)
).execute()).join().getStatus().expectSuccess("upsert problem");

Expand All @@ -147,7 +147,7 @@ private void upsertTablesData() {
// Upsert list of seasons to table
retryCtx.supplyResult(session -> session.createQuery(
"UPSERT INTO seasons SELECT * FROM AS_TABLE($values)",
QueryTx.SERIALIZABLE_RW,
TxMode.SERIALIZABLE_RW,
Params.of("$values", seasonsData)
).execute()).join().getStatus().expectSuccess("upsert problem");

Expand All @@ -174,7 +174,7 @@ private void upsertTablesData() {
// Upsert list of series to episodes
retryCtx.supplyResult(session -> session.createQuery(
"UPSERT INTO episodes SELECT * FROM AS_TABLE($values)",
QueryTx.SERIALIZABLE_RW,
TxMode.SERIALIZABLE_RW,
Params.of("$values", episodesData)
).execute()).join().getStatus().expectSuccess("upsert problem");
}
Expand All @@ -185,7 +185,7 @@ private void upsertSimple() {
+ "VALUES (2, 6, 1, \"TBD\");";

// Executes data query with specified transaction control settings.
retryCtx.supplyResult(session -> session.createQuery(query, QueryTx.SERIALIZABLE_RW).execute())
retryCtx.supplyResult(session -> session.createQuery(query, TxMode.SERIALIZABLE_RW).execute())
.join().getValue();
}

Expand All @@ -196,7 +196,7 @@ private void selectSimple() {

// Executes data query with specified transaction control settings.
QueryReader result = retryCtx.supplyResult(
session -> QueryReader.readFrom(session.createQuery(query, QueryTx.SERIALIZABLE_RW))
session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW))
).join().getValue();

logger.info("--[ SelectSimple ]--");
Expand Down Expand Up @@ -226,7 +226,7 @@ private void selectWithParams(long seriesID, long seasonID) {
);

QueryReader result = retryCtx.supplyResult(
session -> QueryReader.readFrom(session.createQuery(query, QueryTx.SNAPSHOT_RO, params))
session -> QueryReader.readFrom(session.createQuery(query, TxMode.SNAPSHOT_RO, params))
).join().getValue();

logger.info("--[ SelectWithParams ] -- ");
Expand Down Expand Up @@ -258,7 +258,7 @@ private void asyncSelectRead(long seriesID, long seasonID) {

logger.info("--[ ExecuteAsyncQueryWithParams ]--");
retryCtx.supplyResult(session -> {
QueryStream asyncQuery = session.createQuery(query, QueryTx.SNAPSHOT_RO, params);
QueryStream asyncQuery = session.createQuery(query, TxMode.SNAPSHOT_RO, params);
return asyncQuery.execute(part -> {
ResultSetReader rs = part.getResultSetReader();
logger.info("read {} rows of result set {}", rs.getRowCount(), part.getResultSetIndex());
Expand All @@ -275,7 +275,7 @@ private void asyncSelectRead(long seriesID, long seasonID) {

private void multiStepTransaction(long seriesID, long seasonID) {
retryCtx.supplyStatus(session -> {
QueryTransaction transaction = session.createNewTransaction(QueryTx.SNAPSHOT_RO);
QueryTransaction transaction = session.createNewTransaction(TxMode.SNAPSHOT_RO);
String query1
= "DECLARE $seriesId AS Uint64; "
+ "DECLARE $seasonId AS Uint64; "
Expand Down Expand Up @@ -329,7 +329,7 @@ private void multiStepTransaction(long seriesID, long seasonID) {

private void tclTransaction() {
retryCtx.supplyResult(session -> {
QueryTransaction transaction = session.beginTransaction(QueryTx.SERIALIZABLE_RW)
QueryTransaction transaction = session.beginTransaction(TxMode.SERIALIZABLE_RW)
.join().getValue();

String query
Expand All @@ -351,11 +351,11 @@ private void tclTransaction() {
}

private void dropTables() {
retryCtx.supplyResult(session -> session.createQuery("DROP TABLE episodes;", QueryTx.NONE).execute())
retryCtx.supplyResult(session -> session.createQuery("DROP TABLE episodes;", TxMode.NONE).execute())
.join().getStatus().expectSuccess("drop table episodes problem");
retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", QueryTx.NONE).execute())
retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute())
.join().getStatus().expectSuccess("drop table seasons problem");
retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", QueryTx.NONE).execute())
retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute())
.join().getStatus().expectSuccess("drop table series problem");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package tech.ydb.examples.simple;

import java.time.Duration;
import tech.ydb.core.grpc.GrpcTransport;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.PrimitiveType;

Expand All @@ -24,7 +25,7 @@ protected void run(GrpcTransport transport, String pathPrefix) {

try (
TableClient tableClient = TableClient.newClient(transport).build();
Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue();
Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()
) {

session.dropTable(tablePath)
Expand All @@ -40,22 +41,17 @@ protected void run(GrpcTransport transport, String pathPrefix) {
.join()
.expectSuccess("cannot create table");

Transaction transaction = session.beginTransaction(Transaction.Mode.SERIALIZABLE_READ_WRITE)
.join()
.getValue();
TableTransaction transaction = session.beginTransaction(TxMode.SERIALIZABLE_RW)
.join().getValue();

String query1 = "UPSERT INTO [" + tablePath + "] (key, value) VALUES (1, 'one');";
DataQueryResult result1 = session.executeDataQuery(query1, TxControl.id(transaction))
.join()
.getValue();
DataQueryResult result1 = transaction.executeDataQuery(query1).join().getValue();
System.out.println("--[insert1]-------------------");
DataQueryResults.print(result1);
System.out.println("------------------------------");

String query2 = "UPSERT INTO [" + tablePath + "] (key, value) VALUES (2, 'two');";
DataQueryResult result2 = session.executeDataQuery(query2, TxControl.id(transaction))
.join()
.getValue();
DataQueryResult result2 = transaction.executeDataQuery(query2).join().getValue();
System.out.println("--[insert2]-------------------");
DataQueryResults.print(result2);
System.out.println("------------------------------");
Expand Down

0 comments on commit 2c6fd74

Please sign in to comment.