Skip to content
This repository has been archived by the owner on Mar 21, 2020. It is now read-only.

Commit

Permalink
saga jdbc repository implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinChenYan committed Nov 7, 2017
1 parent 1572fc9 commit 765a6b6
Show file tree
Hide file tree
Showing 10 changed files with 615 additions and 555 deletions.
19 changes: 19 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
target/
bower_components/
node_modules/
dist/
.idea/
.tmp/
.project
.classpath
.settings
.metadata/
*.iml
*.log
*.tmp
*.zip
*.bak
*.versionsBackup

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
901 changes: 376 additions & 525 deletions .idea/workspace.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.networknt.saga.core.producer.CommandProducer;
import com.networknt.saga.participant.SagaLockManager;
import com.networknt.saga.repository.AggregateInstanceSubscriptionsDAO;
import com.networknt.saga.repository.EnlistedAggregatesDao;
import com.networknt.saga.repository.SagaInstanceRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class SagaManagerImpl<Data>

private AggregateInstanceSubscriptionsDAO aggregateInstanceSubscriptionsDAO;

//private EnlistedAggregatesDao enlistedAggregatesDao;
private EnlistedAggregatesDao enlistedAggregatesDao;


private ChannelMapping channelMapping;
Expand Down Expand Up @@ -89,7 +90,9 @@ public SagaInstance create(Data sagaData, Optional<String> resource) {

String sagaId = sagaInstance.getId();

// resource.ifPresent( r -> Assert.isTrue(sagaLockManager.claimLock(getSagaType(), sagaId, r), "Cannot claim lock for resource"));
if (resource.isPresent() && !sagaLockManager.claimLock(getSagaType(), sagaId, resource.get())) {
throw new IllegalArgumentException("Cannot claim lock for resource");
}

SagaActions<Data> actions = getStateDefinition().getStartingHandler().get().apply(sagaData);

Expand Down Expand Up @@ -127,10 +130,10 @@ private void publishEvents(String sagaId, Set<EventToPublish> eventsToPublish, O
for (EventToPublish event : eventsToPublish) {
Map<String, String> headers = new HashMap<>();
if (isEndState) {
// Set<String> sagaIds = enlistedAggregatesDao.findSagas(event.getAggregateType(), event.getAggregateId());
Set<String> sagaIds = enlistedAggregatesDao.findSagas(event.getAggregateType(), event.getAggregateId());

// sagaIds.remove(sagaId);
// headers.put("participating-saga-ids", sagaIds.stream().collect(joining(",")));
sagaIds.remove(sagaId);
headers.put("participating-saga-ids", sagaIds.stream().collect(joining(",")));
}
domainEventPublisher.publish(event.getAggregateType().getName(),
event.getAggregateId(),
Expand Down Expand Up @@ -175,7 +178,7 @@ private String makeSagaReplyChannel() {

private void updateEventInstanceSubscriptions(Data sagaData, String sagaId, String stateName) {
List<EventClassAndAggregateId> instanceEvents = getStateDefinition().findEventHandlers(saga, stateName, sagaData);
// aggregateInstanceSubscriptionsDAO.update(getSagaType(), sagaId, instanceEvents);
aggregateInstanceSubscriptionsDAO.update(getSagaType(), sagaId, instanceEvents);
}

private String sendCommands(String sagaId, List<CommandWithDestination> commands) {
Expand Down Expand Up @@ -206,10 +209,9 @@ public void handleMessage(Message message) {
String aggregateId = message.getRequiredHeader(Message.PARTITION_ID);
String eventType = message.getRequiredHeader(EventMessageHeaders.EVENT_TYPE);
// TODO query the saga event routing table: (at, aId, et) -> [(sagaType, sagaId)]
// for (SagaTypeAndId sagaTypeAndId : aggregateInstanceSubscriptionsDAO.findSagas(aggregateType, aggregateId, eventType)) {
// handleAggregateInstanceEvent(sagaTypeAndId.getSagaType(), sagaTypeAndId.getSagaId(), message, aggregateType, aggregateId, eventType);
// }
;
for (SagaTypeAndId sagaTypeAndId : aggregateInstanceSubscriptionsDAO.findSagas(aggregateType, aggregateId, eventType)) {
handleAggregateInstanceEvent(sagaTypeAndId.getSagaType(), sagaTypeAndId.getSagaId(), message, aggregateType, aggregateId, eventType);
}


} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.networknt.saga.repository;

import com.networknt.saga.orchestration.EnlistedAggregate;
import com.networknt.saga.orchestration.EventClassAndAggregateId;
import com.networknt.saga.orchestration.SagaTypeAndId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;


public class AggregateInstanceSubscriptionsDAOImpl implements AggregateInstanceSubscriptionsDAO{

private Logger logger = LoggerFactory.getLogger(getClass());
private DataSource dataSource;


public AggregateInstanceSubscriptionsDAOImpl(DataSource dataSource) {
this.dataSource = dataSource;
}

public void setDataSource(DataSource dataSource) {this.dataSource = dataSource;}

@Override
public void update(String sagaType, String sagaId, List<EventClassAndAggregateId> eventHandlers) {

String psDelete = "DELETE FROM aggregate_instance_subscriptions WHERE saga_type = ? AND saga_id =?";
String psInsert = "INSERT INTO aggregate_instance_subscriptions(aggregate_id, event_type, saga_type, saga_id) values(?, ?, ?, ?)";
try (final Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
PreparedStatement stmt = connection.prepareStatement(psDelete);
stmt.executeUpdate();
PreparedStatement ps = connection.prepareStatement(psInsert);
for (EventClassAndAggregateId eventClassAndAggregateId : eventHandlers) {
ps.setString(1, Long.toString(eventClassAndAggregateId.getAggregateId()));
ps.setString(2, eventClassAndAggregateId.getEventClass().getName());
ps.setString(3, sagaType);
ps.setString(4, sagaId);
ps.addBatch();
}
ps.executeBatch();
connection.commit();
} catch (SQLException e) {
logger.error("SqlException:", e);
}

}

@Override
public List<SagaTypeAndId> findSagas(String aggregateType, String aggregateId, String eventType){
String psSelect = "Select saga_type, saga_id from aggregate_instance_subscriptions where aggregate_id = ? and event_type = ?";

List<SagaTypeAndId> sagas = new ArrayList<>();
try (final Connection connection = dataSource.getConnection()) {

PreparedStatement ps = connection.prepareStatement(psSelect);
ps.setString(1, aggregateId);
ps.setString(2, eventType);

ResultSet rs = ps.executeQuery();
while (rs.next()) {
sagas.add(new SagaTypeAndId(rs.getString("saga_type"), rs.getString("saga_id") ));
}
} catch (SQLException e) {
logger.error("SqlException:", e);
}
return sagas;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.networknt.saga.repository;



import com.networknt.saga.orchestration.EnlistedAggregate;


import java.util.Set;

public interface EnlistedAggregatesDao {


void save(String sagaId, Set<EnlistedAggregate> enlistedAggregates) ;


Set<EnlistedAggregate> findEnlistedAggregates(String sagaId);

Set<String> findSagas(Class aggregateType, String aggregateId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.networknt.saga.repository;


import com.networknt.saga.orchestration.EnlistedAggregate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;

public class EnlistedAggregatesDaoImpl implements EnlistedAggregatesDao{

private Logger logger = LoggerFactory.getLogger(getClass());
private DataSource dataSource;


public EnlistedAggregatesDaoImpl(DataSource dataSource) {
this.dataSource = dataSource;
}

public void setDataSource(DataSource dataSource) {this.dataSource = dataSource;}


@Override
public void save(String sagaId, Set<EnlistedAggregate> enlistedAggregates) {

String psInsert = "INSERT INTO saga_enlisted_aggregates(saga_id, aggregate_type, aggregate_id) values(?,?,?)";

for (EnlistedAggregate ela : enlistedAggregates) {

try (final Connection connection = dataSource.getConnection()) {

PreparedStatement stmt = connection.prepareStatement(psInsert);
stmt.setString(1, sagaId);
stmt.setString(2, ela.getAggregateClass().getName());
stmt.setString(3, ela.getAggregateId().toString());
stmt.executeUpdate();
} catch (SQLException e) {
logger.error("SqlException:", e);
}
}
}

@Override
public Set<EnlistedAggregate> findEnlistedAggregates(String sagaId) {
return null;
/* return new HashSet<>(jdbcTemplate.query("Select aggregate_type, aggregate_id from saga_enlisted_aggregates where saga_id = ?",
(rs, rowNum) -> {
try {
return new EnlistedAggregate((Class) ClassUtils.forName(rs.getString("aggregate_type"), getClass().getClassLoader()), rs.getString("aggregate_id"));
} catch (ClassNotFoundException e) {
throw new RuntimeException();
}
},
sagaId));*/
}

@Override
public Set<String> findSagas(Class aggregateType, String aggregateId) {

String psSelect = "Select saga_id from saga_enlisted_aggregates where aggregate_type = ? AND aggregate_id = ?";

Set<String> sagas = new HashSet<>();
try (final Connection connection = dataSource.getConnection()) {

PreparedStatement ps = connection.prepareStatement(psSelect);
ps.setString(1, aggregateType.getName());
ps.setString(1, aggregateId);

ResultSet rs = ps.executeQuery();
while (rs.next()) {
sagas.add(rs.getString("saga_id"));
}
} catch (SQLException e) {
logger.error("SqlException:", e);
}
return sagas;

/*
return new HashSet<>(jdbcTemplate.query("Select saga_id from saga_enlisted_aggregates where aggregate_type = ? AND aggregate_id = ?",
(rs, rowNum) -> {
return rs.getString("aggregate_type");
},
aggregateType, aggregateId));*/
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MessageProducerJdbcImplTest {
ds = (DataSource) SingletonServiceFactory.getBean(DataSource.class);
try (Connection connection = ds.getConnection()) {
// Runscript doesn't work need to execute batch here.
String schemaResourceName = "/queryside_ddl.sql";
String schemaResourceName = "/saga_repository_ddl.sql";
InputStream in = MessageProducerJdbcImplTest.class.getResourceAsStream(schemaResourceName);

if (in == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package com.networknt.saga.repository;


import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.saga.core.command.common.CommandMessageHeaders;
import com.networknt.saga.core.message.common.Message;
import com.networknt.saga.core.message.producer.MessageBuilder;
import com.networknt.saga.core.message.producer.MessageProducer;
import com.networknt.saga.orchestration.DestinationAndResource;
import com.networknt.saga.orchestration.SagaInstance;
import com.networknt.saga.orchestration.SerializedSagaData;
Expand All @@ -19,9 +14,7 @@
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertNotNull;
Expand All @@ -39,7 +32,7 @@ public class SagaInstanceRepositoryTest {
ds = (DataSource) SingletonServiceFactory.getBean(DataSource.class);
try (Connection connection = ds.getConnection()) {
// Runscript doesn't work need to execute batch here.
String schemaResourceName = "/queryside_ddl.sql";
String schemaResourceName = "/saga_repository_ddl.sql";
InputStream in = SagaInstanceRepositoryTest.class.getResourceAsStream(schemaResourceName);

if (in == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package com.networknt.saga.repository;


import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.saga.core.command.common.CommandMessageHeaders;
import com.networknt.saga.core.message.common.Message;
import com.networknt.saga.core.message.producer.MessageBuilder;
import com.networknt.saga.core.message.producer.MessageProducer;
import com.networknt.saga.participant.SagaLockManager;
import com.networknt.service.SingletonServiceFactory;
import org.h2.tools.RunScript;
Expand All @@ -17,8 +12,8 @@
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertTrue;


/**
Expand All @@ -33,7 +28,7 @@ public class SagaLockManagerImplTest {
ds = (DataSource) SingletonServiceFactory.getBean(DataSource.class);
try (Connection connection = ds.getConnection()) {
// Runscript doesn't work need to execute batch here.
String schemaResourceName = "/queryside_ddl.sql";
String schemaResourceName = "/saga_repository_ddl.sql";
InputStream in = SagaLockManagerImplTest.class.getResourceAsStream(schemaResourceName);

if (in == null) {
Expand All @@ -58,8 +53,12 @@ public static void setUp() {

@Test
public void testClaimLock() {
sagaLockManager.claimLock("order.service","22222", "target");

boolean firstLock = sagaLockManager.claimLock("order.service","22222", "target");
assertTrue(firstLock);
boolean lockSameSagaId = sagaLockManager.claimLock("order.service","22222", "target");
assertTrue(lockSameSagaId);
boolean lockWithDiffSaga = sagaLockManager.claimLock("order.service","23456", "target");
assertTrue(lockWithDiffSaga);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ DROP Table IF Exists saga_instance_participants;
DROP Table IF Exists aggregate_instance_subscriptions;
DROP Table IF Exists saga_lock_table;
DROP Table IF Exists saga_stash_table;
DROP Table IF Exists saga_enlisted_aggregates;

CREATE TABLE message (
ID VARCHAR(120) PRIMARY KEY,
Expand Down Expand Up @@ -50,7 +51,8 @@ CREATE TABLE aggregate_instance_subscriptions(
create table saga_lock_table(
target VARCHAR(100) PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
saga_Id VARCHAR(100) NOT NULL
saga_Id VARCHAR(100) NOT NULL,
PRIMARY KEY(target)
);

create table saga_stash_table(
Expand All @@ -61,3 +63,10 @@ create table saga_stash_table(
message_headers VARCHAR(1000) NOT NULL,
message_payload VARCHAR(1000) NOT NULL
);

create table saga_enlisted_aggregates(
saga_id VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
aggregate_type VARCHAR(100) DEFAULT NULL,
PRIMARY KEY(aggregate_id, saga_id)
);

0 comments on commit 765a6b6

Please sign in to comment.