Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kylin-5388 Add a rest api to support to update database name and table name without rebuilding data #2071

Open
wants to merge 6 commits into
base: kylin5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,8 @@ public boolean remoteRequest(BroadcastEventReadyNotifier notifier, String projec
}
return true;
}

public NProjectManager getProjectManager(){
return NProjectManager.getInstance(getConfig());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.rest.service.update;

import java.io.Serializable;
import java.util.Locale;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

public class TableSchemaUpdateMapping implements Serializable {

private String database;

private String tableName;

public boolean isDatabaseChanged() {
return !Strings.isNullOrEmpty(database);
}

public String getDatabase(String dbName) {
String ret = isDatabaseChanged() ? database : dbName;
return ret.toUpperCase(Locale.ROOT);
}

public void setDatabase(String database) {
this.database = database;
}

public boolean isTableNameChanged() {
return !Strings.isNullOrEmpty(tableName);
}

public String getTableName(String tblName) {
String ret = isTableNameChanged() ? tableName : tblName;
return ret.toUpperCase(Locale.ROOT);
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public boolean isTableIdentityChanged() {
return isDatabaseChanged() || isTableNameChanged();
}

public String getTableIdentity(String tableIdentity) {
String[] tableNameEs = tableIdentity.split("\\.");
Preconditions.checkArgument(tableNameEs.length == 2);
return getTableIdentity(tableNameEs[0], tableNameEs[1]);
}

public String getTableIdentity(String database, String tableName) {
return getDatabase(database) + "." + getTableName(tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.rest.service.update;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.ComputedColumnDesc;
import org.apache.kylin.metadata.model.JoinTableDesc;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TableDesc;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;

public class TableSchemaUpdater {

public static TableDesc dealWithMappingForTable(TableDesc other, Map<String, TableSchemaUpdateMapping> mappings) {
TableSchemaUpdateMapping mapping = getTableSchemaUpdateMapping(mappings, other.getIdentity());
if (mapping == null) {
return other;
}

TableDesc copy = new TableDesc(other);

copy.setDatabase(mapping.getDatabase(other.getDatabase()));
copy.setName(mapping.getTableName(other.getName()));
// It will always be a new one
copy.setLastModified(0L);

return copy;
}

public static NDataModel dealWithMappingForModel(KylinConfig config, String project, NDataModel other,
Map<String, TableSchemaUpdateMapping> mappings) {

// Currently, model with filter condition is not supported.
if (!Strings.isNullOrEmpty(other.getFilterCondition())) {
throw new UnsupportedOperationException("Cannot deal with filter condition " + other.getFilterCondition());
}

if ((!config.isSupportUpdateComputedColumnMapping()) && (other.getComputedColumnDescs().size() != 0)) {
throw new UnsupportedOperationException(
"Do not support deal with computed column " + other.getComputedColumnDescs());
}
NDataModel copy = NDataModelManager.getInstance(config, project).copyForWrite(other);

// mapping for root fact table identity
TableSchemaUpdateMapping rootMapping = getTableSchemaUpdateMapping(mappings, other.getRootFactTableName());
if (rootMapping != null) {
TableDesc rootFactTable = other.getRootFactTable().getTableDesc();
copy.setRootFactTableName(
rootMapping.getTableIdentity(rootFactTable.getDatabase(), rootFactTable.getName()));
}

// mapping for join tables
List<JoinTableDesc> joinTables = other.getJoinTables();
List<JoinTableDesc> joinTablesCopy = new ArrayList<>(joinTables.size());
for (int i = 0; i < joinTables.size(); i++) {
JoinTableDesc joinTable = joinTables.get(i);
joinTablesCopy.add(JoinTableDesc.getCopyOf(joinTable));
String tableIdentity = joinTable.getTable();
TableSchemaUpdateMapping mapping = getTableSchemaUpdateMapping(mappings, tableIdentity);
if (mapping != null && mapping.isTableIdentityChanged()) {
joinTablesCopy.get(i).setTable(mapping.getTableIdentity(tableIdentity));
}
}
copy.setJoinTables(joinTablesCopy);

//mapping for computed columns
List<ComputedColumnDesc> computedColumns = other.getComputedColumnDescs();
List<ComputedColumnDesc> computedColumnsCopy = new ArrayList<>(computedColumns.size());
for (int i = 0; i < computedColumns.size(); i++) {
ComputedColumnDesc columnDesc = computedColumns.get(i);
computedColumnsCopy.add(ComputedColumnDesc.getCopyOf(columnDesc));
String tableIdentity = columnDesc.getTableIdentity();
TableSchemaUpdateMapping mapping = getTableSchemaUpdateMapping(mappings, tableIdentity);
if (mapping != null && mapping.isTableIdentityChanged()) {
computedColumnsCopy.get(i).setTableIdentity(mapping.getTableIdentity(tableIdentity));
}
}
copy.setComputedColumnDescs(computedColumnsCopy);

return copy;
}

public static NDataflow dealWithMappingForDataFlow(KylinConfig config, String project, NDataflow other, Map<String, TableSchemaUpdateMapping> mappings) {

NDataflow copy = NDataflowManager.getInstance(config, project).copy(other);
copy.setLastModified(other.getLastModified());

// mapping for segments
if (other.getSegments() != null && !other.getSegments().isEmpty()) {
Segments<NDataSegment> segmentsCopy = new Segments<>();
for (NDataSegment segment : other.getSegments()) {
NDataSegment segmentCopy = new NDataSegment(segment);
segmentCopy.setDataflow(copy);
// mapping for snapshot
Map<String, String> snapshotCopy =
replaceTableIdentityForTableSnapshots(segment.getSnapshots(), mappings);
// mapping for column source bytes
Map<String, Long> columnSourceBytesCopy =
replaceColumnIdentityForColumnSourceBytes(segment.getColumnSourceBytes(), mappings);

segmentCopy.setSnapshots(snapshotCopy);
segmentCopy.setColumnSourceBytes(columnSourceBytesCopy);

segmentsCopy.add(segmentCopy);
}
copy.setSegments(segmentsCopy);
}
return copy;
}

private static Map<String, String> replaceTableIdentityForTableSnapshots(Map<String, String> snapshots,
Map<String, TableSchemaUpdateMapping> mappings) {
Map<String, String> snapshotsCopy = Maps.newHashMapWithExpectedSize(snapshots.size());
for (String tableIdentity : snapshots.keySet()) {
String resPath = snapshots.get(tableIdentity);
TableSchemaUpdateMapping mapping = getTableSchemaUpdateMapping(mappings, tableIdentity);
if (mapping != null && mapping.isTableIdentityChanged()) {
tableIdentity = mapping.getTableIdentity(tableIdentity);
}
snapshotsCopy.put(tableIdentity, resPath);
}
return snapshotsCopy;
}

private static Map<String, Long> replaceColumnIdentityForColumnSourceBytes(Map<String, Long> columnSourceBytes,
Map<String, TableSchemaUpdateMapping> mappings) {
Map<String, Long> copy = Maps.newHashMapWithExpectedSize(columnSourceBytes.size());
for (String columnIdentity : columnSourceBytes.keySet()) {
Long bytes = columnSourceBytes.get(columnIdentity);
String tableIdentity = columnIdentity.substring(0, columnIdentity.lastIndexOf("."));
String columnName = columnIdentity.substring(columnIdentity.lastIndexOf("."));
TableSchemaUpdateMapping mapping = getTableSchemaUpdateMapping(mappings, tableIdentity);
if (mapping != null && mapping.isTableIdentityChanged()) {
tableIdentity = mapping.getTableIdentity(tableIdentity);
columnIdentity = tableIdentity + columnName;
}
copy.put(columnIdentity, bytes);
}
return copy;
}

public static TableSchemaUpdateMapping getTableSchemaUpdateMapping(Map<String, TableSchemaUpdateMapping> mappings,
String key) {
return mappings.get(key.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.rest.service.update;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Map;
import java.util.Set;

import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets;

public class TableSchemaUpdaterTest extends NLocalFileMetadataTestCase {

private final String mappingRootPath = "src/test/resources/update";
private final String mappingFileName = "TableSchemaUpdateMapping.json";
private Map<String, TableSchemaUpdateMapping> mappings;
private static String PROJECT_NAME = "default";

@Before
public void setUp() throws IOException {
this.createTestMetadata();

File mappingFile = new File(mappingRootPath + "/" + mappingFileName);
String content = new String(Files.readAllBytes(mappingFile.toPath()), StandardCharsets.UTF_8);
mappings = JsonUtil.readValue(content, new TypeReference<Map<String, TableSchemaUpdateMapping>>() {
});
}

@Test
public void testDealWithMappingForTable() throws IOException {
NTableMetadataManager tableMetaManager = NTableMetadataManager.getInstance(getTestConfig(), PROJECT_NAME);
ResourceStore store = tableMetaManager.getStore();

Set<TableDesc> tables = Sets.newHashSet();
for (String tableIdentity : mappings.keySet()) {
tables.add(store.getResource(TableDesc.concatResourcePath(tableIdentity, "default"),
NTableMetadataManager.getInstance(getTestConfig(), PROJECT_NAME).getTableMetadataSerializer()));
}

for (TableDesc tableDesc : tables) {
TableDesc updated = TableSchemaUpdater.dealWithMappingForTable(tableDesc, mappings);
updated = reInit(updated, NTableMetadataManager.getInstance(getTestConfig(), PROJECT_NAME).getTableMetadataSerializer());

try (DataInputStream bis = new DataInputStream(new FileInputStream(
new File(mappingRootPath + TableDesc.concatResourcePath(updated.getIdentity(), PROJECT_NAME))))) {
TableDesc expected = NTableMetadataManager.getInstance(getTestConfig(), PROJECT_NAME).getTableMetadataSerializer().deserialize(bis);
Assert.assertEquals(expected, updated);
} catch (Exception e) {
Assert.fail("Table identity is not updated correctly");
}
}
}

@Test
public void testDealWithMappingForModel() throws IOException {
NDataModelManager dataModelManager = NDataModelManager.getInstance(getTestConfig(), PROJECT_NAME);
NDataModel model = dataModelManager.getDataModelDescByAlias("ut_inner_join_cube_partial");

NDataModel updated = TableSchemaUpdater.dealWithMappingForModel(getTestConfig(), PROJECT_NAME, model, mappings);
updated = reInit(updated, dataModelManager.getDataModelSerializer());

try (DataInputStream bis = new DataInputStream(
new FileInputStream(new File(mappingRootPath + NDataModel.concatResourcePath(updated.getUuid(), PROJECT_NAME))))) {
NDataModel expected = dataModelManager.getDataModelSerializer().deserialize(bis);
Assert.assertTrue(expected.equalsRaw(updated));
} catch (Exception e) {
Assert.fail("Model is not updated correctly");
}
}

@Test
public void testDealWithMappingForDataflow() throws IOException {
NDataflowManager dataflowManager = NDataflowManager.getInstance(getTestConfig(), PROJECT_NAME);
NDataflow dataflow = dataflowManager.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");

NDataflow updated = TableSchemaUpdater.dealWithMappingForDataFlow(getTestConfig(), PROJECT_NAME, dataflow, mappings);
updated = reInit(updated, dataflowManager.getDataflowSerializer());

try (DataInputStream bis = new DataInputStream(
new FileInputStream(new File(mappingRootPath + NDataflow.concatResourcePath(updated.getUuid(), PROJECT_NAME))))) {
NDataflow expected = dataflowManager.getDataflowSerializer().deserialize(bis);
Assert.assertTrue(expected.equalsRaw(updated));
} catch (Exception e) {
Assert.fail("Dataflow is not updated correctly");
}
}

private <T extends RootPersistentEntity> T reInit(T obj, Serializer<T> serializer) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
serializer.serialize(obj, dos);
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputStream dis = new DataInputStream(bais);
return serializer.deserialize(dis);
}
}
Loading