Skip to content

Commit

Permalink
Add final statistics.
Browse files Browse the repository at this point in the history
Signed-off-by: Maximilian Chrzan <[email protected]>
Signed-off-by: mchrza <[email protected]>
  • Loading branch information
mchrza committed Sep 27, 2024
1 parent b0d6dbd commit 8eb824b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
import com.here.xyz.jobs.steps.outputs.DownloadUrl;
import com.here.xyz.jobs.steps.outputs.FeatureStatistics;
import com.here.xyz.jobs.steps.outputs.FileStatistics;
import com.here.xyz.jobs.steps.resources.IOResource;
import com.here.xyz.jobs.steps.resources.Load;
import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed;
Expand Down Expand Up @@ -179,6 +180,17 @@ protected void onAsyncSuccess() throws Exception {
//TODO
super.onAsyncSuccess();

FileStatistics statistics = runReadQuerySync(buildStatisticDataOfTemporaryTableQuery(), db(),
0, rs -> rs.next()
? new FileStatistics()
.withBytesUploaded(rs.getLong("bytes_uploaded"))
.withRowsUploaded(rs.getLong("rows_uploaded"))
.withFilesUploaded(rs.getInt("files_uploaded"))
: new FileStatistics());

infoLog(STEP_ON_ASYNC_SUCCESS, "Job Statistics: bytes=" + statistics.getBytesUploaded() + " files=" + statistics.getFilesUploaded());
registerOutputs(List.of(statistics), true);

infoLog(STEP_ON_ASYNC_SUCCESS, "Cleanup temporary table");
runWriteQuerySync(buildDropTemporaryTableQuery(getSchema(db()), getTemporaryJobTableName(this)), db(), 0);
}
Expand Down Expand Up @@ -238,6 +250,19 @@ public SQLQuery buildExportQuery(int threadNumber) throws WebClientException {
.withNamedParameter("content_query", exportSelectString.substitute().text());
}

private SQLQuery buildStatisticDataOfTemporaryTableQuery() throws WebClientException {
return new SQLQuery("""
SELECT sum((data->'export_statistics'->'rows_uploaded')::bigint) as rows_uploaded,
sum((data->'export_statistics'->'files_uploaded')::bigint) as files_uploaded,
sum((data->'export_statistics'->'bytes_uploaded')::bigint) as bytes_uploaded
FROM ${schema}.${tmpTable}
WHERE POSITION('SUCCESS_MARKER' in state) = 0;
""")
.withVariable("schema", getSchema(db()))
.withVariable("tmpTable", getTemporaryJobTableName(this))
.withVariable("triggerTable", TransportTools.getTemporaryTriggerTableName(this));
}

private Map<String, Object> getQueryContext() throws WebClientException {
String superTable = space().getExtension() != null ? getRootTableName(superSpace()) : null;
return createQueryContext(getId(), getSchema(db()), getRootTableName(space()), (space().getVersionsToKeep() > 1), superTable);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2017-2024 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

package com.here.xyz.jobs.steps.outputs;

public class FileStatistics extends ModelBasedOutput {
private long rowsUploaded;
private long bytesUploaded;
private int filesUploaded;

public long getRowsUploaded() {
return rowsUploaded;
}

public void setRowsUploaded(long rowsUploaded) {
this.rowsUploaded = rowsUploaded;
}

public FileStatistics withRowsUploaded(long rowsUploaded) {
setRowsUploaded(rowsUploaded);
return this;
}

public long getBytesUploaded() {
return bytesUploaded;
}

public void setBytesUploaded(long bytesUploaded) {
this.bytesUploaded = bytesUploaded;
}

public FileStatistics withBytesUploaded(long bytesUploaded) {
setBytesUploaded(bytesUploaded);
return this;
}

public int getFilesUploaded() {
return filesUploaded;
}

public void setFilesUploaded(int filesUploaded) {
this.filesUploaded = filesUploaded;
}

public FileStatistics withFilesUploaded(int filesUploaded) {
setFilesUploaded(filesUploaded);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@

@JsonTypeInfo(use = Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = FeatureStatistics.class, name = "FeatureStatistics")
@JsonSubTypes.Type(value = FeatureStatistics.class, name = "FeatureStatistics"),
@JsonSubTypes.Type(value = FileStatistics.class, name = "FileStatistics")
})
public abstract class ModelBasedOutput extends Output<ModelBasedOutput> {
@Override
Expand Down

0 comments on commit 8eb824b

Please sign in to comment.