Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#191: add main logic for EidGeneratorStrategy #192

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.zalando.fahrschein.http.api.RequestFactory;
import org.zalando.fahrschein.http.simple.SimpleRequestFactory;
import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractor;
import org.zalando.nakadiproducer.eventlog.EidGeneratorStrategy;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.eventlog.impl.EventLogMapper;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepositoryImpl;
import org.zalando.nakadiproducer.eventlog.impl.EventLogWriterImpl;
import org.zalando.nakadiproducer.eventlog.impl.eidgenerator.NoOpEidGeneratorStrategy;
import org.zalando.nakadiproducer.flowid.FlowIdComponent;
import org.zalando.nakadiproducer.flowid.NoopFlowIdComponent;
import org.zalando.nakadiproducer.flowid.TracerFlowIdComponent;
Expand Down Expand Up @@ -121,6 +124,12 @@ public FlowIdComponent flowIdComponent() {
}
}

@Bean
@ConditionalOnMissingBean(EidGeneratorStrategy.class)
public EidGeneratorStrategy eidGeneratorStrategy() {
return new NoOpEidGeneratorStrategy();
}

@Bean
@ConditionalOnMissingBean
public SnapshotEventCreationEndpoint snapshotEventCreationEndpoint(
Expand All @@ -139,9 +148,9 @@ public SnapshotCreationService snapshotCreationService(
}

@Bean
public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, ObjectMapper objectMapper,
FlowIdComponent flowIdComponent, List<CompactionKeyExtractor> extractorList) {
return new EventLogWriterImpl(eventLogRepository, objectMapper, flowIdComponent, extractorList);
public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, EventLogMapper eventLogMapper,
List<CompactionKeyExtractor> extractorList) {
return new EventLogWriterImpl(eventLogRepository, eventLogMapper, extractorList);
}

@Bean
Expand Down Expand Up @@ -176,4 +185,10 @@ public EventTransmissionService eventTransmissionService(
public FlywayMigrator flywayMigrator() {
return new FlywayMigrator();
}

@Bean
public EventLogMapper eventLogMapper(ObjectMapper objectMapper, FlowIdComponent flowIdComponent,
EidGeneratorStrategy eidGeneratorStrategy) {
return new EventLogMapper(objectMapper, flowIdComponent, eidGeneratorStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,18 @@ public void persist(Collection<EventLog> eventLogs) {
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
namedParameterMap.addValue("compactionKey", eventLog.getCompactionKey());
namedParameterMap.addValue("eid", eventLog.getEid());
return namedParameterMap;
})
.toArray(MapSqlParameterSource[]::new);

jdbcTemplate.batchUpdate(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key, eid)" +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)",
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey, " +
" coalesce(:eid, CAST(LPAD(TO_HEX(currval('nakadi_events.event_log_id_seq')), 32, '0') AS UUID)))",
namedParameterMaps
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.zalando.nakadiproducer.eventlog.impl.eidgenerator;

import java.util.UUID;
import org.zalando.nakadiproducer.eventlog.EidGeneratorStrategy;

public class NoOpEidGeneratorStrategy implements EidGeneratorStrategy {
ePaul marked this conversation as resolved.
Show resolved Hide resolved

@Override
public UUID generateEid() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.zalando.nakadiproducer.eventlog.impl.eidgenerator;

import java.util.UUID;
import org.zalando.nakadiproducer.eventlog.EidGeneratorStrategy;

public class RandomEidGeneratorStrategy implements EidGeneratorStrategy {

@Override
public UUID generateEid() {
return UUID.randomUUID();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE nakadi_events.event_log ADD COLUMN eid uuid DEFAULT CAST(LPAD(TO_HEX(currval('nakadi_events.event_log_id_seq')), 32, '0') AS UUID);
ePaul marked this conversation as resolved.
Show resolved Hide resolved
ePaul marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.io.IOException;

import javax.sql.DataSource;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
ePaul marked this conversation as resolved.
Show resolved Hide resolved

@Configuration
public class EmbeddedDataSourceConfig {
Expand All @@ -23,4 +26,12 @@ public DataSource dataSource() throws IOException {
public EmbeddedPostgres embeddedPostgres() throws IOException {
return EmbeddedPostgres.start();
}

@Bean
public TransactionTemplate requiresNewTransactionTemplate(
PlatformTransactionManager transactionManager) {
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return transactionTemplate;
}
ePaul marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package org.zalando.nakadiproducer.eventlog.impl;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.samePropertyValuesAs;
import static org.hamcrest.core.Is.is;

import javax.transaction.Transactional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT;

import java.util.List;

@Transactional
public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {

@Autowired
Expand All @@ -24,6 +30,9 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
@Autowired
private JdbcTemplate jdbcTemplate;

@Autowired
private TransactionTemplate requiresNewTransactionTemplate;

private static final String WAREHOUSE_EVENT_BODY_DATA =
("{'self':'http://WAREHOUSE_DOMAIN',"
+ "'code':'WH-DE-EF',"
Expand All @@ -47,22 +56,18 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
@BeforeEach
public void setUp() {
eventLogRepository.deleteAll();

persistTestEvent("FLOW_ID");
jdbcTemplate.execute("alter sequence nakadi_events.event_log_id_seq restart with 1");
}

private void persistTestEvent(String flowId) {
final EventLog eventLog = EventLog.builder()
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
.eventType(WAREHOUSE_EVENT_TYPE)
.compactionKey(COMPACTION_KEY)
.flowId(flowId)
.build();
final EventLog eventLog = buildEventLog(flowId);
eventLogRepository.persist(eventLog);
}

@Test
@Transactional
public void testDeleteMultipleEvents() {
persistTestEvent("FLOW_ID");
persistTestEvent("second_Flow-ID");
persistTestEvent("third flow-ID");
persistTestEvent("fourth flow-ID");
Expand All @@ -77,8 +82,7 @@ public void testDeleteMultipleEvents() {

List<EventLog> remaining = findAllEventsInDB();
assertThat(remaining, hasSize(1));
assertThat(remaining.get(0).getId(), is(notDeleted.getId()));
assertThat(remaining.get(0).getFlowId(), is(notDeleted.getFlowId()));
assertThat(remaining.get(0), equalTo(notDeleted));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line worked when you wrote it, but now it fails, as EventLog doesn't have the generated equals method anymore, so it just does the == comparison.

I'd just revert this line of the change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed equalTo to Matchers.samePropertyValuesAs

}

private List<EventLog> findAllEventsInDB() {
ePaul marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -88,18 +92,125 @@ private List<EventLog> findAllEventsInDB() {
}

@Test
@Transactional
public void testFindEventInRepositoryById() {
persistTestEvent("FLOW_ID");
Integer id = jdbcTemplate.queryForObject(
"SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'",
Integer.class);
final EventLog eventLog = eventLogRepository.findOne(id);
compareWithPersistedEvent(eventLog);
}

@Test
@Transactional
public void testInsertEventWithDefaultEid() {
persistTestEvent("FLOW_ID");
EventLog actual = findAllEventsInDB().get(0);

EventLog expected = buildEventLog("FLOW_ID", 1, buildEid(1));
assertEvent(actual, expected);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded the test to make sure it's also working when inserting multiple events:

Suggested change
public void testInsertEventWithDefaultEid() {
persistTestEvent("FLOW_ID");
EventLog actual = findAllEventsInDB().get(0);
EventLog expected = buildEventLog("FLOW_ID", 1, buildEid(1));
assertEvent(actual, expected);
}
public void testInsertSingleEventsWithDefaultEid() {
persistTestEvent("FLOW_ID_1");
persistTestEvent("FLOW_ID_2");
List<EventLog> eventLogs = findAllEventsInDB();
EventLog actual1 = eventLogs.get(0);
EventLog actual2 = eventLogs.get(1);
EventLog expected1 = buildEventLog("FLOW_ID_1", 1, buildEid(1));
EventLog expected2 = buildEventLog("FLOW_ID_2", 2, buildEid(2));
assertEvent(actual1, expected1);
assertEvent(actual2, expected2);
}
@Test
@Transactional
public void testBulkInsertEventWithDefaultEid() {
List<EventLog> eventLogsToPersist = Arrays.asList(
buildEventLog("FLOW_ID_1"),
buildEventLog("FLOW_ID_2")
);
eventLogRepository.persist(eventLogsToPersist);
List<EventLog> eventLogsFound = findAllEventsInDB();
EventLog actual1 = eventLogsFound.get(0);
EventLog actual2 = eventLogsFound.get(1);
EventLog expected1 = buildEventLog("FLOW_ID_1", 1, buildEid(1));
EventLog expected2 = buildEventLog("FLOW_ID_2", 2, buildEid(2));
assertEvent(actual1, expected1);
assertEvent(actual2, expected2);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Test
@Transactional
public void testInsertEventWithDefinedEid() {
EventLog testEvent = buildEventLog("flow-id", 1, UUID.randomUUID());
eventLogRepository.persist(testEvent);

EventLog actual = findAllEventsInDB().get(0);
assertEvent(actual, testEvent);
}

/**
* This test checks that the default eid is generated correctly when multiple transactions are running in parallel.
* The test creates three events in two parallel transactions.
* Execution order of the transactions is the following:
* -- / T1 starts -- Event1 ----------------------------------------- Event3 --- T1 ends -/--
* -------------------------- / T2 starts --- Event2 --- T2 ends -/--------------------------
*/
@Test
public void testInsertEventWithDefaultEidWithParallelTransactions() throws Exception {
EventLog firstExpected = buildEventLog("first flow-id", 1, buildEid(1));
EventLog secondExpected = buildEventLog("second flow-id", 2, buildEid(2));
EventLog thirdExpected = buildEventLog("third flow-id", 3, buildEid(3));

CountDownLatch latchInsideTransaction = new CountDownLatch(1);
CountDownLatch latchOutsideTransaction = new CountDownLatch(1);

CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
requiresNewTransactionTemplate.executeWithoutResult(
(s) -> {
// We persist first event with default eid in the start of first transaction
// It should be first event in the table.
persistTestEvent("first flow-id");

latchInsideTransaction.countDown();
// Waiting for the second transaction to complete
try {
latchOutsideTransaction.await(2, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}

// We persist second event in the end of first transaction.
// It should be third event in the table.
persistTestEvent("third flow-id");
}
)
);

// Waiting for the first transaction to start
latchInsideTransaction.await(1, TimeUnit.SECONDS);

// We persist third event in the second transaction.
requiresNewTransactionTemplate.executeWithoutResult(
(s) -> persistTestEvent("second flow-id")
);
// We check that the first and third events haven't visible yet
List<EventLog> events = findAllEventsInDB();
assertThat(events, hasSize(1));
assertEvent(events.get(0), secondExpected);

// Waiting for the first transaction to complete
latchOutsideTransaction.countDown();
future.get(1, TimeUnit.SECONDS);

// We check that the all three events are in the table
events = findAllEventsInDB();
assertThat(events, hasSize(3));
assertEvent(events.get(0), firstExpected);
assertEvent(events.get(1), secondExpected);
assertEvent(events.get(2), thirdExpected);
}

private void assertEvent(EventLog actual, EventLog expected) {
assertThat(actual,
samePropertyValuesAs(expected, "created", "lastModified")
);
}

private void compareWithPersistedEvent(final EventLog eventLog) {
assertThat(eventLog.getEventBodyData(), is(WAREHOUSE_EVENT_BODY_DATA));
assertThat(eventLog.getEventType(), is(WAREHOUSE_EVENT_TYPE));
assertThat(eventLog.getCompactionKey(), is(COMPACTION_KEY));
}

private EventLog buildEventLog(String flowId) {
return buildEventLog(flowId, null, null);
}
private EventLog buildEventLog(String flowId, Integer id, UUID eid) {
return EventLog.builder()
.id(id)
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
.eventType(WAREHOUSE_EVENT_TYPE)
.compactionKey(COMPACTION_KEY)
.flowId(flowId)
.eid(eid)
.build();
}

private UUID buildEid(int id) {
return new UUID(0, id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.zalando.nakadiproducer.eventlog;

import java.util.UUID;

public interface EidGeneratorStrategy {

UUID generateEid();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we want to be more flexible here to allow for future extensions?
Like generating the UUID as a hash of the content?
Or a different one depending on the event type?

So my suggestion would be to have this method take some parameter(s) – either the EventLog object (though with the current approach this will be tricky, as it's only created afterwards) or the values that go into it (i.e. eventType, payload, compactionKey).

The two implementations we provide can just ignore these parameters, but if someone needs something specialized, we don't have to do another incompatible change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, I would prefer to use payload object for generation eid, but because we have just Object value I decided do nothing with that.

Added in last commit:

  1. Moved generateEid call to the end createEventLog method
  2. Added EventLog as parameter.

Personally I not so much like this solution, because it a little bit tricky and depends on what time we initialise eid field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a deeper thought, this breaks the package layering a bit. The *.impl packages (EventLog is part of it) should not be exposed in the user interface of the library (i.e. the EidGeneratorStrategy).

So that only leaves the second option of "take all the values which go into the EventLog", which makes the signature a bit unwieldly.

I think now that it should be possible to retrofit this also later – we can for now have just the no-argument abstract generateEid method in the interface, and later add a default method with more parameters, which will then be called by the library. The few applications which need a data-specific eid can overwrite this method, applications which have some other non-data-specific need can implement the abstract method, and the big majority of applications can just use one of the pre-made implementations via the factory methods (or even auto-configuration).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I removed EventLog.
I agree that we can add additional method later.

For some extra cases I extracted EventLogBuilder interface and clients can implement this and build EventLog as they needed


}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.zalando.nakadiproducer.eventlog.impl;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

import com.fasterxml.jackson.annotation.JsonProperty;

@Getter
@Setter
@EqualsAndHashCode
@AllArgsConstructor
class DataChangeEventEnvelope {
@JsonProperty("data_op")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package org.zalando.nakadiproducer.eventlog.impl;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

import java.time.Instant;

@ToString
@Getter
@Setter
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class EventLog {

public EventLog(Integer id, String eventType, String eventBodyData, String flowId,
Instant created,
Instant lastModified, String lockedBy, Instant lockedUntil,
String compactionKey) {
this.id = id;
this.eventType = eventType;
this.eventBodyData = eventBodyData;
this.flowId = flowId;
this.created = created;
this.lastModified = lastModified;
this.lockedBy = lockedBy;
this.lockedUntil = lockedUntil;
this.compactionKey = compactionKey;
}

private Integer id;
private String eventType;
private String eventBodyData;
Expand All @@ -26,4 +38,5 @@ public class EventLog {
private String lockedBy;
private Instant lockedUntil;
private String compactionKey;
private UUID eid;
}
Loading