Skip to content

Commit

Permalink
refactor: 优化批量新增保存时重复数据的处理逻辑:合并相同的数据.
Browse files Browse the repository at this point in the history
当批量操作时,如果一个批次中存在相同的数据(id相同),新的数据将会于旧的数据进行合并(属性会发生变更).
  • Loading branch information
zhou-hao committed Oct 9, 2023
1 parent ed4e4ad commit 0164ba2
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ public Mono<SaveResult> save(Publisher<E> data) {
public Mono<Integer> insert(Publisher<E> data) {
return Flux
.from(data)
.flatMap(e -> doInsert(e).reactive().as(this::setupLogger))
.reduce(Math::addExact)
.defaultIfEmpty(0);
.buffer(100)
.as(this::insertBatch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,60 @@ protected void initMapping(Class<E> entityType) {
defaultContextKeyValue.add(MappingContextKeys.columnMapping(mapping));
}

protected Collection<E> tryMergeDuplicate(Collection<E> data) {
if (data.isEmpty()) {
return data;
}
Map<Object, E> merging = new HashMap<>(data.size());
List<E> merged = new ArrayList<>(data.size());
for (E datum : data) {
Object id = getProperty(datum, getIdColumn());
if (id == null) {
merged.add(datum);
} else {
merging.compute(id, (_id, old) -> {
if (old != null) {
return merge(old, datum);
}
return datum;
});
}
}
merged.addAll(merging.values());
return merged;
}

protected E merge(E older, E newer) {
ObjectPropertyOperator opt = GlobalConfig.getPropertyOperator();
for (String property : getProperties()) {
Object newerVal = opt.getProperty(newer, property).orElse(null);
if (newerVal != null) {
continue;
}
opt.getProperty(older, property)
.ifPresent(olderValue -> opt.setProperty(newer, property, olderValue));

}
return newer;
}

private Object getProperty(E data, String property) {
return GlobalConfig
.getPropertyOperator()
.getProperty(data, property)
.orElse(null);
}

protected SaveResultOperator doSave(Collection<E> data) {
Collection<E> _data = tryMergeDuplicate(data);
RDBTableMetadata table = getTable();
UpsertOperator upsert = operator.dml().upsert(table.getFullName());

return EventResultOperator.create(
() -> {
upsert.columns(getProperties());
List<String> ignore = new ArrayList<>();
for (E e : data) {
for (E e : _data) {
upsert.values(Stream.of(getProperties())
.map(property -> getInsertColumnValue(e, property, (prop, val) -> ignore.add(prop)))
.toArray());
Expand All @@ -134,7 +179,7 @@ protected SaveResultOperator doSave(Collection<E> data) {
table,
MappingEventTypes.save_before,
MappingEventTypes.save_after,
getDefaultContextKeyValue(instance(data),
getDefaultContextKeyValue(instance(_data),
type("batch"),
tableMetadata(table),
upsert(upsert))
Expand Down Expand Up @@ -176,7 +221,7 @@ private Object getInsertColumnValue(E data, String property, BiConsumer<String,
if (value != null) {
whenDefaultValue.accept(property, value);
//回填
if(!(value instanceof NativeSql)){
if (!(value instanceof NativeSql)) {
GlobalConfig.getPropertyOperator().setProperty(data, property, value);
}
}
Expand All @@ -191,14 +236,15 @@ private Object getInsertColumnValue(E data, String property) {
}

protected InsertResultOperator doInsert(Collection<E> batch) {
Collection<E> _data = tryMergeDuplicate(batch);
RDBTableMetadata table = getTable();
InsertOperator insert = operator.dml().insert(table.getFullName());

return EventResultOperator.create(
() -> {
insert.columns(getProperties());

for (E e : batch) {
for (E e : _data) {
insert.values(Stream.of(getProperties())
.map(property -> getInsertColumnValue(e, property))
.toArray());
Expand All @@ -210,7 +256,7 @@ protected InsertResultOperator doInsert(Collection<E> batch) {
MappingEventTypes.insert_before,
MappingEventTypes.insert_after,
getDefaultContextKeyValue(
instance(batch),
instance(_data),
type("batch"),
tableMetadata(table),
insert(insert))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,48 @@ public void testReactivePager() {

}

@Test
public void testInsertMerge(){

BasicTestEntity first= BasicTestEntity
.builder()
.id("test_merge")
.balance(1000L)
.name("first")
.createTime(new Date())
.tags(Arrays.asList("a", "b", "c", "d"))
.state((byte) 1)
.stateEnum(StateEnum.enabled)
.build();

BasicTestEntity second= BasicTestEntity
.builder()
.id("test_merge")
.balance(1000L)
.name("second")
.createTime(new Date())
.tags(Arrays.asList("a", "b", "c", "d"))
.state((byte) 1)
.stateEnum(StateEnum.enabled)
.build();

repository
.insert(Flux.just(first,second))
.as(StepVerifier::create)
.expectNext(1)
.verifyComplete();

repository
.createQuery()
.where(BasicTestEntity::getId,first.getId())
.select("id","name")
.fetch()
.map(BasicTestEntity::getName)
.as(StepVerifier::create)
.expectNext(second.getName())
.verifyComplete();
}

@Test
public void testInsertDuplicate() {
//10次insert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void test() {



sqlExecutor.execute(SqlRequests.of("create index test_index on test_index_parser (age)"));
sqlExecutor.execute(SqlRequests.of("create index test_index_0 on test_index_parser (age)"));

sqlExecutor.execute(SqlRequests.of("create unique index test_index_2 on test_index_parser (name)"));

Expand Down

0 comments on commit 0164ba2

Please sign in to comment.