diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java index e0edd09f7..b6ded7cba 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java @@ -24,6 +24,7 @@ import com.here.xyz.jobs.steps.impl.SpaceBasedStep; import com.here.xyz.jobs.steps.outputs.DownloadUrl; import com.here.xyz.jobs.steps.outputs.FeatureStatistics; +import com.here.xyz.jobs.steps.outputs.FileStatistics; import com.here.xyz.jobs.steps.resources.IOResource; import com.here.xyz.jobs.steps.resources.Load; import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed; @@ -179,6 +180,17 @@ protected void onAsyncSuccess() throws Exception { //TODO super.onAsyncSuccess(); + FileStatistics statistics = runReadQuerySync(buildStatisticDataOfTemporaryTableQuery(), db(), + 0, rs -> rs.next() + ? new FileStatistics() + .withBytesUploaded(rs.getLong("bytes_uploaded")) + .withRowsUploaded(rs.getLong("rows_uploaded")) + .withFilesUploaded(rs.getInt("files_uploaded")) + : new FileStatistics()); + + infoLog(STEP_ON_ASYNC_SUCCESS, "Job Statistics: bytes=" + statistics.getBytesUploaded() + " files=" + statistics.getFilesUploaded()); + registerOutputs(List.of(statistics), true); + infoLog(STEP_ON_ASYNC_SUCCESS, "Cleanup temporary table"); runWriteQuerySync(buildDropTemporaryTableQuery(getSchema(db()), getTemporaryJobTableName(this)), db(), 0); } @@ -238,6 +250,19 @@ public SQLQuery buildExportQuery(int threadNumber) throws WebClientException { .withNamedParameter("content_query", exportSelectString.substitute().text()); } + private SQLQuery buildStatisticDataOfTemporaryTableQuery() throws WebClientException { + return new SQLQuery(""" + SELECT sum((data->'export_statistics'->'rows_uploaded')::bigint) as rows_uploaded, + sum((data->'export_statistics'->'files_uploaded')::bigint) as files_uploaded, + sum((data->'export_statistics'->'bytes_uploaded')::bigint) as bytes_uploaded + FROM ${schema}.${tmpTable} + WHERE POSITION('SUCCESS_MARKER' in state) = 0; + """) + .withVariable("schema", getSchema(db())) + .withVariable("tmpTable", getTemporaryJobTableName(this)) + .withVariable("triggerTable", TransportTools.getTemporaryTriggerTableName(this)); + } + private Map getQueryContext() throws WebClientException { String superTable = space().getExtension() != null ? getRootTableName(superSpace()) : null; return createQueryContext(getId(), getSchema(db()), getRootTableName(space()), (space().getVersionsToKeep() > 1), superTable); diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/FileStatistics.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/FileStatistics.java new file mode 100644 index 000000000..26915ae9f --- /dev/null +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/FileStatistics.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2017-2024 HERE Europe B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ + +package com.here.xyz.jobs.steps.outputs; + +public class FileStatistics extends ModelBasedOutput { + private long rowsUploaded; + private long bytesUploaded; + private int filesUploaded; + + public long getRowsUploaded() { + return rowsUploaded; + } + + public void setRowsUploaded(long rowsUploaded) { + this.rowsUploaded = rowsUploaded; + } + + public FileStatistics withRowsUploaded(long rowsUploaded) { + setRowsUploaded(rowsUploaded); + return this; + } + + public long getBytesUploaded() { + return bytesUploaded; + } + + public void setBytesUploaded(long bytesUploaded) { + this.bytesUploaded = bytesUploaded; + } + + public FileStatistics withBytesUploaded(long bytesUploaded) { + setBytesUploaded(bytesUploaded); + return this; + } + + public int getFilesUploaded() { + return filesUploaded; + } + + public void setFilesUploaded(int filesUploaded) { + this.filesUploaded = filesUploaded; + } + + public FileStatistics withFilesUploaded(int filesUploaded) { + setFilesUploaded(filesUploaded); + return this; + } +} diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/ModelBasedOutput.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/ModelBasedOutput.java index 17309827d..d7d58d30f 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/ModelBasedOutput.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/ModelBasedOutput.java @@ -28,7 +28,8 @@ @JsonTypeInfo(use = Id.NAME, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(value = FeatureStatistics.class, name = "FeatureStatistics") + @JsonSubTypes.Type(value = FeatureStatistics.class, name = "FeatureStatistics"), + @JsonSubTypes.Type(value = FileStatistics.class, name = "FileStatistics") }) public abstract class ModelBasedOutput extends Output { @Override