-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#191: add main logic for EidGeneratorStrategy #192
base: master
Are you sure you want to change the base?
Changes from 3 commits
e57ac5f
7eafb87
2eb8d38
203304a
61e04d7
911e388
59f396a
882e60d
793c58a
79421d6
1a49e50
6da7987
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
ALTER TABLE nakadi_events.event_log ADD COLUMN eid UUID; |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,21 +1,27 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
package org.zalando.nakadiproducer.eventlog.impl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import static org.hamcrest.MatcherAssert.assertThat; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import static org.hamcrest.Matchers.equalTo; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import static org.hamcrest.Matchers.hasSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import static org.hamcrest.Matchers.samePropertyValuesAs; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import static org.hamcrest.core.Is.is; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import javax.transaction.Transactional; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.UUID; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.CompletableFuture; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.CountDownLatch; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.TimeUnit; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.junit.jupiter.api.BeforeEach; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.junit.jupiter.api.Test; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.springframework.beans.factory.annotation.Autowired; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.springframework.jdbc.core.BeanPropertyRowMapper; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.springframework.jdbc.core.JdbcTemplate; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.springframework.transaction.annotation.Transactional; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.springframework.transaction.support.TransactionTemplate; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Transactional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Autowired | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -24,6 +30,9 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Autowired | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private JdbcTemplate jdbcTemplate; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Autowired | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private TransactionTemplate transactionTemplate; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private static final String WAREHOUSE_EVENT_BODY_DATA = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
("{'self':'http://WAREHOUSE_DOMAIN'," | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ "'code':'WH-DE-EF'," | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -47,22 +56,18 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@BeforeEach | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void setUp() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
eventLogRepository.deleteAll(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("FLOW_ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
jdbcTemplate.execute("alter sequence nakadi_events.event_log_id_seq restart with 1"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private void persistTestEvent(String flowId) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final EventLog eventLog = EventLog.builder() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.eventType(WAREHOUSE_EVENT_TYPE) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.compactionKey(COMPACTION_KEY) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.flowId(flowId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final EventLog eventLog = buildEventLog(flowId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
eventLogRepository.persist(eventLog); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Test | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Transactional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void testDeleteMultipleEvents() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("FLOW_ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("second_Flow-ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("third flow-ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("fourth flow-ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -77,8 +82,7 @@ public void testDeleteMultipleEvents() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<EventLog> remaining = findAllEventsInDB(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(remaining, hasSize(1)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(remaining.get(0).getId(), is(notDeleted.getId())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(remaining.get(0).getFlowId(), is(notDeleted.getFlowId())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(remaining.get(0), equalTo(notDeleted)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this line worked when you wrote it, but now it fails, as EventLog doesn't have the generated equals method anymore, so it just does the I'd just revert this line of the change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private List<EventLog> findAllEventsInDB() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ePaul marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -88,18 +92,140 @@ private List<EventLog> findAllEventsInDB() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Test | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Transactional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void testFindEventInRepositoryById() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("FLOW_ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Integer id = jdbcTemplate.queryForObject( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Integer.class); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final EventLog eventLog = eventLogRepository.findOne(id); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
compareWithPersistedEvent(eventLog); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Test | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Transactional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void testInsertEventWithDefaultEid() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("FLOW_ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog actual = findAllEventsInDB().get(0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog expected = buildEventLog("FLOW_ID", 1, buildEid(1)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertEvent(actual, expected); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've expanded the test to make sure it's also working when inserting multiple events:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Test | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Transactional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void testInsertEventWithDefinedEid() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog testEvent = buildEventLog("flow-id", 1, UUID.randomUUID()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
eventLogRepository.persist(testEvent); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog actual = findAllEventsInDB().get(0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertEvent(actual, testEvent); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Test | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Transactional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void testInsertEventWithNegativeId() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
jdbcTemplate.execute("ALTER SEQUENCE nakadi_events.event_log_id_seq " + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
" MINVALUE " + Integer.MIN_VALUE + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
" START " + Integer.MIN_VALUE + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
" RESTART " + Integer.MIN_VALUE); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("FLOW_ID"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog actual = findAllEventsInDB().get(0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog expected = buildEventLog("FLOW_ID", Integer.MIN_VALUE, buildEid(Integer.MIN_VALUE)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertEvent(actual, expected); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* This test checks that the default eid is generated correctly when multiple transactions are running in parallel. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* The test creates three events in two parallel transactions. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Execution order of the transactions is the following: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* -- / T1 starts -- Event1 ----------------------------------------- Event3 --- T1 ends -/-- | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* -------------------------- / T2 starts --- Event2 --- T2 ends -/-------------------------- | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Test | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void testInsertEventWithDefaultEidWithParallelTransactions() throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog firstExpected = buildEventLog("first flow-id", 1, buildEid(1)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog secondExpected = buildEventLog("second flow-id", 2, buildEid(2)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventLog thirdExpected = buildEventLog("third flow-id", 3, buildEid(3)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CountDownLatch latchInsideTransaction = new CountDownLatch(1); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CountDownLatch latchOutsideTransaction = new CountDownLatch(1); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
transactionTemplate.executeWithoutResult( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(s) -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// We persist first event with default eid in the start of first transaction | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// It should be first event in the table. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("first flow-id"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
latchInsideTransaction.countDown(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Waiting for the second transaction to complete | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
latchOutsideTransaction.await(2, TimeUnit.SECONDS); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// We persist second event in the end of first transaction. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// It should be third event in the table. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
persistTestEvent("third flow-id"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Waiting for the first transaction to start | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
latchInsideTransaction.await(1, TimeUnit.SECONDS); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// We persist third event in the second transaction. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
transactionTemplate.executeWithoutResult( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
(s) -> persistTestEvent("second flow-id") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// We check that the first and third events haven't visible yet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<EventLog> events = findAllEventsInDB(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(events, hasSize(1)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertEvent(events.get(0), secondExpected); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Waiting for the first transaction to complete | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
latchOutsideTransaction.countDown(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
future.get(1, TimeUnit.SECONDS); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// We check that the all three events are in the table | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
events = findAllEventsInDB(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(events, hasSize(3)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertEvent(events.get(0), firstExpected); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertEvent(events.get(1), secondExpected); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertEvent(events.get(2), thirdExpected); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private void assertEvent(EventLog actual, EventLog expected) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(actual, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
samePropertyValuesAs(expected, "created", "lastModified") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private void compareWithPersistedEvent(final EventLog eventLog) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(eventLog.getEventBodyData(), is(WAREHOUSE_EVENT_BODY_DATA)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(eventLog.getEventType(), is(WAREHOUSE_EVENT_TYPE)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
assertThat(eventLog.getCompactionKey(), is(COMPACTION_KEY)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private EventLog buildEventLog(String flowId) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return buildEventLog(flowId, null, null); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private EventLog buildEventLog(String flowId, Integer id, UUID eid) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return EventLog.builder() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.id(id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.eventType(WAREHOUSE_EVENT_TYPE) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.compactionKey(COMPACTION_KEY) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.flowId(flowId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.eid(eid) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private UUID buildEid(int id) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return new UUID(0, id); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package org.zalando.nakadiproducer.eventlog; | ||
|
||
import java.util.UUID; | ||
import org.zalando.nakadiproducer.eventlog.impl.EventLog; | ||
|
||
/** | ||
* Strategy for generating EIDs. | ||
* <p> | ||
* EID - is unique identifier for nakadi events and we expect that implementation will generate unique EID for each event. | ||
*/ | ||
public interface EidGeneratorStrategy { | ||
|
||
static EidGeneratorStrategy noop() { | ||
return (EventLog eventLog) -> null; | ||
} | ||
|
||
static EidGeneratorStrategy random() { | ||
return (EventLog eventLog) -> UUID.randomUUID(); | ||
} | ||
|
||
UUID generateEid(EventLog eventLog); | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,22 +1,34 @@ | ||||||||||||||||||||||
package org.zalando.nakadiproducer.eventlog.impl; | ||||||||||||||||||||||
|
||||||||||||||||||||||
import java.util.UUID; | ||||||||||||||||||||||
import lombok.AllArgsConstructor; | ||||||||||||||||||||||
import lombok.Builder; | ||||||||||||||||||||||
import lombok.Getter; | ||||||||||||||||||||||
import lombok.Data; | ||||||||||||||||||||||
import lombok.NoArgsConstructor; | ||||||||||||||||||||||
import lombok.Setter; | ||||||||||||||||||||||
import lombok.ToString; | ||||||||||||||||||||||
|
||||||||||||||||||||||
import java.time.Instant; | ||||||||||||||||||||||
|
||||||||||||||||||||||
@ToString | ||||||||||||||||||||||
@Getter | ||||||||||||||||||||||
@Setter | ||||||||||||||||||||||
@Data | ||||||||||||||||||||||
@NoArgsConstructor | ||||||||||||||||||||||
@AllArgsConstructor | ||||||||||||||||||||||
@Builder(toBuilder = true) | ||||||||||||||||||||||
public class EventLog { | ||||||||||||||||||||||
|
||||||||||||||||||||||
public EventLog(Integer id, String eventType, String eventBodyData, String flowId, | ||||||||||||||||||||||
Instant created, | ||||||||||||||||||||||
Instant lastModified, String lockedBy, Instant lockedUntil, | ||||||||||||||||||||||
String compactionKey) { | ||||||||||||||||||||||
this.id = id; | ||||||||||||||||||||||
this.eventType = eventType; | ||||||||||||||||||||||
this.eventBodyData = eventBodyData; | ||||||||||||||||||||||
this.flowId = flowId; | ||||||||||||||||||||||
this.created = created; | ||||||||||||||||||||||
this.lastModified = lastModified; | ||||||||||||||||||||||
this.lockedBy = lockedBy; | ||||||||||||||||||||||
this.lockedUntil = lockedUntil; | ||||||||||||||||||||||
this.compactionKey = compactionKey; | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
private Integer id; | ||||||||||||||||||||||
private String eventType; | ||||||||||||||||||||||
private String eventBodyData; | ||||||||||||||||||||||
|
@@ -26,4 +38,18 @@ public class EventLog { | |||||||||||||||||||||
private String lockedBy; | ||||||||||||||||||||||
private Instant lockedUntil; | ||||||||||||||||||||||
private String compactionKey; | ||||||||||||||||||||||
private UUID eid; | ||||||||||||||||||||||
|
||||||||||||||||||||||
/** | ||||||||||||||||||||||
* This is only needed for backward compatibility. | ||||||||||||||||||||||
* | ||||||||||||||||||||||
* <p>For instance 213 will be converted to "00000000-0000-0000-0000-0000000000d5"</p> | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||
public UUID getEid() { | ||||||||||||||||||||||
if (eid == null) { | ||||||||||||||||||||||
return new UUID(0, id); | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
return eid; | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For performance reasons, we likely want to only do the conversion once.
Suggested change
Currently we do it once before submitting, and once again after checking the failure results from Nakadi. This makes also the lookup a tiny bit faster, as it'll now be an == comparison before comparing the content. |
||||||||||||||||||||||
} | ||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep the "convert on reading" fallback, then we can just put NULL here when using the noop strategy, right?
No need to do the complicated conversion on insert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that, but decided that if we provide a new column it would be better to provide a value too.
For me it's better to check data in the DB than to try to understand what the value will be send
But as you wish, in general I agree to remove this complex function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't change this, I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I left the function for now.
If you still think that better to remove it, I'll do it