Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#191: add main logic for EidGeneratorStrategy #192

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.zalando.fahrschein.http.api.RequestFactory;
import org.zalando.fahrschein.http.simple.SimpleRequestFactory;
import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractor;
import org.zalando.nakadiproducer.eventlog.EidGeneratorStrategy;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.eventlog.impl.EventLogMapper;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepositoryImpl;
import org.zalando.nakadiproducer.eventlog.impl.EventLogWriterImpl;
Expand Down Expand Up @@ -121,6 +123,12 @@ public FlowIdComponent flowIdComponent() {
}
}

@Bean
@ConditionalOnMissingBean(EidGeneratorStrategy.class)
public EidGeneratorStrategy eidGeneratorStrategy() {
return EidGeneratorStrategy.noop();
}

@Bean
@ConditionalOnMissingBean
public SnapshotEventCreationEndpoint snapshotEventCreationEndpoint(
Expand All @@ -139,9 +147,9 @@ public SnapshotCreationService snapshotCreationService(
}

@Bean
public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, ObjectMapper objectMapper,
FlowIdComponent flowIdComponent, List<CompactionKeyExtractor> extractorList) {
return new EventLogWriterImpl(eventLogRepository, objectMapper, flowIdComponent, extractorList);
public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, EventLogMapper eventLogMapper,
List<CompactionKeyExtractor> extractorList) {
return new EventLogWriterImpl(eventLogRepository, eventLogMapper, extractorList);
}

@Bean
Expand Down Expand Up @@ -176,4 +184,10 @@ public EventTransmissionService eventTransmissionService(
public FlywayMigrator flywayMigrator() {
return new FlywayMigrator();
}

@Bean
public EventLogMapper eventLogMapper(ObjectMapper objectMapper, FlowIdComponent flowIdComponent,
EidGeneratorStrategy eidGeneratorStrategy) {
return new EventLogMapper(objectMapper, flowIdComponent, eidGeneratorStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,18 @@ public void persist(Collection<EventLog> eventLogs) {
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
namedParameterMap.addValue("compactionKey", eventLog.getCompactionKey());
namedParameterMap.addValue("eid", eventLog.getEid());
return namedParameterMap;
})
.toArray(MapSqlParameterSource[]::new);

jdbcTemplate.batchUpdate(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key, eid)" +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)",
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey, " +
" COALESCE(:eid, CAST(LPAD(TO_HEX(CAST(CURRVAL('nakadi_events.event_log_id_seq') as BIGINT)), 32, '0') AS UUID)))",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the "convert on reading" fallback, then we can just put NULL here when using the noop strategy, right?
No need to do the complicated conversion on insert.

Suggested change
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey, " +
" COALESCE(:eid, CAST(LPAD(TO_HEX(CAST(CURRVAL('nakadi_events.event_log_id_seq') as BIGINT)), 32, '0') AS UUID)))",
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey, " +
" :eid)",

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but decided that if we provide a new column it would be better to provide a value too.
For me it's better to check data in the DB than to try to understand what the value will be send

But as you wish, in general I agree to remove this complex function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't change this, I think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I left the function for now.
If you still think that better to remove it, I'll do it

namedParameterMaps
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE nakadi_events.event_log ADD COLUMN eid UUID;
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.io.IOException;

import javax.sql.DataSource;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
ePaul marked this conversation as resolved.
Show resolved Hide resolved

@Configuration
public class EmbeddedDataSourceConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package org.zalando.nakadiproducer.eventlog.impl;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.samePropertyValuesAs;
import static org.hamcrest.core.Is.is;

import javax.transaction.Transactional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT;

import java.util.List;

@Transactional
public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {

@Autowired
Expand All @@ -24,6 +30,9 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
@Autowired
private JdbcTemplate jdbcTemplate;

@Autowired
private TransactionTemplate transactionTemplate;

private static final String WAREHOUSE_EVENT_BODY_DATA =
("{'self':'http://WAREHOUSE_DOMAIN',"
+ "'code':'WH-DE-EF',"
Expand All @@ -47,22 +56,18 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
@BeforeEach
public void setUp() {
eventLogRepository.deleteAll();

persistTestEvent("FLOW_ID");
jdbcTemplate.execute("alter sequence nakadi_events.event_log_id_seq restart with 1");
}

private void persistTestEvent(String flowId) {
final EventLog eventLog = EventLog.builder()
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
.eventType(WAREHOUSE_EVENT_TYPE)
.compactionKey(COMPACTION_KEY)
.flowId(flowId)
.build();
final EventLog eventLog = buildEventLog(flowId);
eventLogRepository.persist(eventLog);
}

@Test
@Transactional
public void testDeleteMultipleEvents() {
persistTestEvent("FLOW_ID");
persistTestEvent("second_Flow-ID");
persistTestEvent("third flow-ID");
persistTestEvent("fourth flow-ID");
Expand All @@ -77,8 +82,7 @@ public void testDeleteMultipleEvents() {

List<EventLog> remaining = findAllEventsInDB();
assertThat(remaining, hasSize(1));
assertThat(remaining.get(0).getId(), is(notDeleted.getId()));
assertThat(remaining.get(0).getFlowId(), is(notDeleted.getFlowId()));
assertThat(remaining.get(0), equalTo(notDeleted));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line worked when you wrote it, but now it fails, as EventLog doesn't have the generated equals method anymore, so it just does the == comparison.

I'd just revert this line of the change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed equalTo to Matchers.samePropertyValuesAs

}

private List<EventLog> findAllEventsInDB() {
ePaul marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -88,18 +92,140 @@ private List<EventLog> findAllEventsInDB() {
}

@Test
@Transactional
public void testFindEventInRepositoryById() {
persistTestEvent("FLOW_ID");
Integer id = jdbcTemplate.queryForObject(
"SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'",
Integer.class);
final EventLog eventLog = eventLogRepository.findOne(id);
compareWithPersistedEvent(eventLog);
}

@Test
@Transactional
public void testInsertEventWithDefaultEid() {
persistTestEvent("FLOW_ID");
EventLog actual = findAllEventsInDB().get(0);

EventLog expected = buildEventLog("FLOW_ID", 1, buildEid(1));
assertEvent(actual, expected);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded the test to make sure it's also working when inserting multiple events:

Suggested change
public void testInsertEventWithDefaultEid() {
persistTestEvent("FLOW_ID");
EventLog actual = findAllEventsInDB().get(0);
EventLog expected = buildEventLog("FLOW_ID", 1, buildEid(1));
assertEvent(actual, expected);
}
public void testInsertSingleEventsWithDefaultEid() {
persistTestEvent("FLOW_ID_1");
persistTestEvent("FLOW_ID_2");
List<EventLog> eventLogs = findAllEventsInDB();
EventLog actual1 = eventLogs.get(0);
EventLog actual2 = eventLogs.get(1);
EventLog expected1 = buildEventLog("FLOW_ID_1", 1, buildEid(1));
EventLog expected2 = buildEventLog("FLOW_ID_2", 2, buildEid(2));
assertEvent(actual1, expected1);
assertEvent(actual2, expected2);
}
@Test
@Transactional
public void testBulkInsertEventWithDefaultEid() {
List<EventLog> eventLogsToPersist = Arrays.asList(
buildEventLog("FLOW_ID_1"),
buildEventLog("FLOW_ID_2")
);
eventLogRepository.persist(eventLogsToPersist);
List<EventLog> eventLogsFound = findAllEventsInDB();
EventLog actual1 = eventLogsFound.get(0);
EventLog actual2 = eventLogsFound.get(1);
EventLog expected1 = buildEventLog("FLOW_ID_1", 1, buildEid(1));
EventLog expected2 = buildEventLog("FLOW_ID_2", 2, buildEid(2));
assertEvent(actual1, expected1);
assertEvent(actual2, expected2);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Test
@Transactional
public void testInsertEventWithDefinedEid() {
EventLog testEvent = buildEventLog("flow-id", 1, UUID.randomUUID());
eventLogRepository.persist(testEvent);

EventLog actual = findAllEventsInDB().get(0);
assertEvent(actual, testEvent);
}

@Test
@Transactional
public void testInsertEventWithNegativeId() {
jdbcTemplate.execute("ALTER SEQUENCE nakadi_events.event_log_id_seq " +
" MINVALUE " + Integer.MIN_VALUE +
" START " + Integer.MIN_VALUE +
" RESTART " + Integer.MIN_VALUE);
persistTestEvent("FLOW_ID");
EventLog actual = findAllEventsInDB().get(0);

EventLog expected = buildEventLog("FLOW_ID", Integer.MIN_VALUE, buildEid(Integer.MIN_VALUE));
assertEvent(actual, expected);
}

/**
* This test checks that the default eid is generated correctly when multiple transactions are running in parallel.
* The test creates three events in two parallel transactions.
* Execution order of the transactions is the following:
* -- / T1 starts -- Event1 ----------------------------------------- Event3 --- T1 ends -/--
* -------------------------- / T2 starts --- Event2 --- T2 ends -/--------------------------
*/
@Test
public void testInsertEventWithDefaultEidWithParallelTransactions() throws Exception {
EventLog firstExpected = buildEventLog("first flow-id", 1, buildEid(1));
EventLog secondExpected = buildEventLog("second flow-id", 2, buildEid(2));
EventLog thirdExpected = buildEventLog("third flow-id", 3, buildEid(3));

CountDownLatch latchInsideTransaction = new CountDownLatch(1);
CountDownLatch latchOutsideTransaction = new CountDownLatch(1);

CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
transactionTemplate.executeWithoutResult(
(s) -> {
// We persist first event with default eid in the start of first transaction
// It should be first event in the table.
persistTestEvent("first flow-id");

latchInsideTransaction.countDown();
// Waiting for the second transaction to complete
try {
latchOutsideTransaction.await(2, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}

// We persist second event in the end of first transaction.
// It should be third event in the table.
persistTestEvent("third flow-id");
}
)
);

// Waiting for the first transaction to start
latchInsideTransaction.await(1, TimeUnit.SECONDS);

// We persist third event in the second transaction.
transactionTemplate.executeWithoutResult(
(s) -> persistTestEvent("second flow-id")
);
// We check that the first and third events haven't visible yet
List<EventLog> events = findAllEventsInDB();
assertThat(events, hasSize(1));
assertEvent(events.get(0), secondExpected);

// Waiting for the first transaction to complete
latchOutsideTransaction.countDown();
future.get(1, TimeUnit.SECONDS);

// We check that the all three events are in the table
events = findAllEventsInDB();
assertThat(events, hasSize(3));
assertEvent(events.get(0), firstExpected);
assertEvent(events.get(1), secondExpected);
assertEvent(events.get(2), thirdExpected);
}

private void assertEvent(EventLog actual, EventLog expected) {
assertThat(actual,
samePropertyValuesAs(expected, "created", "lastModified")
);
}

private void compareWithPersistedEvent(final EventLog eventLog) {
assertThat(eventLog.getEventBodyData(), is(WAREHOUSE_EVENT_BODY_DATA));
assertThat(eventLog.getEventType(), is(WAREHOUSE_EVENT_TYPE));
assertThat(eventLog.getCompactionKey(), is(COMPACTION_KEY));
}

private EventLog buildEventLog(String flowId) {
return buildEventLog(flowId, null, null);
}

private EventLog buildEventLog(String flowId, Integer id, UUID eid) {
return EventLog.builder()
.id(id)
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
.eventType(WAREHOUSE_EVENT_TYPE)
.compactionKey(COMPACTION_KEY)
.flowId(flowId)
.eid(eid)
.build();
}

private UUID buildEid(int id) {
return new UUID(0, id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.zalando.nakadiproducer.eventlog;

import java.util.UUID;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;

/**
* Strategy for generating EIDs.
* <p>
* EID - is unique identifier for nakadi events and we expect that implementation will generate unique EID for each event.
*/
public interface EidGeneratorStrategy {

static EidGeneratorStrategy noop() {
return (EventLog eventLog) -> null;
}

static EidGeneratorStrategy random() {
return (EventLog eventLog) -> UUID.randomUUID();
}

UUID generateEid(EventLog eventLog);
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.zalando.nakadiproducer.eventlog.impl;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

import com.fasterxml.jackson.annotation.JsonProperty;

@Getter
@Setter
@EqualsAndHashCode
@AllArgsConstructor
class DataChangeEventEnvelope {
@JsonProperty("data_op")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package org.zalando.nakadiproducer.eventlog.impl;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

import java.time.Instant;

@ToString
@Getter
@Setter
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class EventLog {

public EventLog(Integer id, String eventType, String eventBodyData, String flowId,
Instant created,
Instant lastModified, String lockedBy, Instant lockedUntil,
String compactionKey) {
this.id = id;
this.eventType = eventType;
this.eventBodyData = eventBodyData;
this.flowId = flowId;
this.created = created;
this.lastModified = lastModified;
this.lockedBy = lockedBy;
this.lockedUntil = lockedUntil;
this.compactionKey = compactionKey;
}

private Integer id;
private String eventType;
private String eventBodyData;
Expand All @@ -26,4 +38,18 @@ public class EventLog {
private String lockedBy;
private Instant lockedUntil;
private String compactionKey;
private UUID eid;

/**
* This is only needed for backward compatibility.
*
* <p>For instance 213 will be converted to "00000000-0000-0000-0000-0000000000d5"</p>
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* This is only needed for backward compatibility.
*
* <p>For instance 213 will be converted to "00000000-0000-0000-0000-0000000000d5"</p>
*/
/**
* Returns the eid to be used for submitting the event. If none was stored, we'll convert it from the DB-ID.
*
* <p>For instance 213 will be converted to "00000000-0000-0000-0000-0000000000d5"</p>
*/

public UUID getEid() {
if (eid == null) {
return new UUID(0, id);
}

return eid;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For performance reasons, we likely want to only do the conversion once.

Suggested change
if (eid == null) {
return new UUID(0, id);
}
return eid;
if (eid == null) {
eid = new UUID(0, id);
}
return eid;

Currently we do it once before submitting, and once again after checking the failure results from Nakadi.
(It's been like that in the old code, but if we can improve this now, even better.)

This makes also the lookup a tiny bit faster, as it'll now be an == comparison before comparing the content.

}
}
Loading