Skip to content

Commit

Permalink
Centralize logging in TransportTools
Browse files Browse the repository at this point in the history
Signed-off-by: Maximilian Chrzan <[email protected]>
Signed-off-by: mchrza <[email protected]>
  • Loading branch information
mchrza committed Sep 25, 2024
1 parent 5b7fc70 commit 9fa4f5d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@
import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine.Feature;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine.FeatureCollection;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_GEOJSON;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_JSON_WKB;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.GEOJSON;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Phase.CREATE_TMP_TABLE;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Phase.EXECUTE_IMPORT;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Phase.FILL_TMP_TABLE;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Phase.RESET_SUCCESS_MARKER;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Phase.RETRIEVE_NEW_VERSION;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.getTemporaryJobTableName;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.getTemporaryTriggerTableName;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.buildDropTemporaryTableQuery;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_EXECUTOR;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;

import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_ON_STATE_CHECK;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_ON_ASYNC_SUCCESS;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_VALIDATE;
import static com.here.xyz.util.web.XyzWebClient.WebClientException;

import com.fasterxml.jackson.annotation.JsonView;
Expand Down Expand Up @@ -65,7 +66,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -214,7 +214,7 @@ public int getEstimatedExecutionSeconds() {
if (estimatedSeconds == -1 && getSpaceId() != null) {
estimatedSeconds = ResourceAndTimeCalculator.getInstance()
.calculateImportTimeInSeconds(getSpaceId(), getUncompressedUploadBytesEstimation(), getExecutionMode());
logger.info("[{}] Import estimatedSeconds {}", getGlobalStepId(), estimatedSeconds);
infoLog(JOB_EXECUTOR, "Calculated estimatedSeconds: "+estimatedSeconds );
}
return estimatedSeconds;
}
Expand All @@ -223,9 +223,9 @@ public int getEstimatedExecutionSeconds() {
@Override
public ExecutionMode getExecutionMode() {
//CSV is not supported in SYNC mode
if (format == CSV_JSON_WKB || format == CSV_GEOJSON)
//if (format == CSV_JSON_WKB || format == CSV_GEOJSON)
return ASYNC;
return getUncompressedUploadBytesEstimation() > MAX_INPUT_BYTES_FOR_SYNC_IMPORT ? ASYNC : SYNC;
// return getUncompressedUploadBytesEstimation() > MAX_INPUT_BYTES_FOR_SYNC_IMPORT ? ASYNC : SYNC;
}

@Override
Expand All @@ -249,7 +249,7 @@ public void deleteOutputs() {
public boolean validate() throws ValidationException {
super.validate();
try {
logAndSetPhase(Phase.VALIDATE);
infoLog(JOB_VALIDATE);
//Check if the space is actually existing
space();

Expand All @@ -276,20 +276,6 @@ public boolean validate() throws ValidationException {
return true;
}

private void logAndSetPhase(Phase newPhase, String... messages) {
if (newPhase != null)
phase = newPhase;
logger.info("[{}@{}] ON/INTO '{}' {}", getGlobalStepId(), getPhase(), getSpaceId(), messages.length > 0 ? messages : "");
}

public Phase getPhase() {
return phase;
}

private void log(String... messages) {
logAndSetPhase(null, messages);
}

@Override
public void execute()
throws WebClientException, SQLException, TooManyResourcesClaimed, IOException, ParseException, InterruptedException {
Expand All @@ -300,17 +286,15 @@ private void _execute(boolean isResume) throws WebClientException, SQLException,
if (getExecutionMode() == SYNC)
syncExecution();
else {
log("Importing input files for job " + getJobId() + " into space " + getSpaceId() + " ...");

//TODO: Move resume logic into #resume()
if (!isResume) {
logAndSetPhase(Phase.SET_READONLY);
infoLog(STEP_EXECUTE, "Set ReadOnly");
hubWebClient().patchSpace(getSpaceId(), Map.of("readOnly", true));

logAndSetPhase(RETRIEVE_NEW_VERSION);
infoLog(STEP_EXECUTE, "Retrieve new version");
long newVersion = increaseVersionSequence();

logAndSetPhase(Phase.CREATE_TRIGGER); //FIXME: Use owner of the job
infoLog(STEP_EXECUTE, "Create TriggerTable and Trigger");
//Create Temp-ImportTable to avoid deserialization of JSON and fix missing row count
runBatchWriteQuerySync(buildTemporaryTriggerTableBlock(space.getOwner(), newVersion), db(), 0);
}
Expand All @@ -322,23 +306,21 @@ private void _execute(boolean isResume) throws WebClientException, SQLException,
MAX_DB_THREAD_COUNT);
double neededAcusForOneThread = calculateNeededAcus(1);

logAndSetPhase(EXECUTE_IMPORT);

for (int i = 1; i <= calculatedThreadCount; i++) {
logAndSetPhase(EXECUTE_IMPORT, "Start Import Thread number " + i);
infoLog(STEP_EXECUTE, "Start Import Thread number " + i);
runReadQueryAsync(buildImportQueryBlock(), db(), neededAcusForOneThread, false);
}
}
}

private void syncExecution() throws WebClientException, SQLException, TooManyResourcesClaimed, IOException {
//TODO: Support resume
logAndSetPhase(RETRIEVE_NEW_VERSION);
infoLog(STEP_EXECUTE, "Retrieve new version");
long newVersion = increaseVersionSequence();
long featureCount = 0;

for (Input input : loadInputs()) {
logger.info("[{}] Sync write from {} to {}", getGlobalStepId(), input.getS3Key(), getSpaceId());
infoLog(STEP_EXECUTE, "Sync write of file:"+ input.getS3Key());
featureCount += syncWriteFileToSpace(input, newVersion);
}
registerOutputs(List.of(new FeatureStatistics().withFeatureCount(featureCount).withByteSize(getUncompressedUploadBytesEstimation())),
Expand Down Expand Up @@ -388,14 +370,14 @@ private long increaseVersionSequence() throws SQLException, TooManyResourcesClai

private void createAndFillTemporaryJobTable() throws SQLException, TooManyResourcesClaimed, WebClientException {
if (isResume()) {
logAndSetPhase(RESET_SUCCESS_MARKER);
infoLog(STEP_EXECUTE, "Reset SuccessMarker");
runWriteQuerySync(resetSuccessMarkerAndRunningOnes(getSchema(db)), db, 0);
}
else {
logAndSetPhase(CREATE_TMP_TABLE);
infoLog(STEP_EXECUTE, "Create temporary job table");
runWriteQuerySync(buildTemporaryTableForImportQuery(getSchema(db)), db, 0);

logAndSetPhase(FILL_TMP_TABLE);
infoLog(STEP_EXECUTE, "Fill temporary job table");
fillTemporaryTableWithInputs(db, loadStepInputs(), bucketRegion());
}
}
Expand All @@ -414,8 +396,8 @@ protected void onStateCheck() {

getStatus().setEstimatedProgress(progress);

log("Progress[" + progress + "] => "
+ " processedBytes:" + processedBytes + " ,finishedCnt:" + finishedCnt + " ,failedCnt:" + failedCnt);
infoLog(STEP_ON_STATE_CHECK,"Progress[" + progress + "] => " + " processedBytes:"
+ processedBytes + " ,finishedCnt:" + finishedCnt + " ,failedCnt:" + failedCnt);
return progress;
});
}
Expand All @@ -429,20 +411,17 @@ protected void onStateCheck() {
protected void onAsyncSuccess() throws WebClientException,
SQLException, TooManyResourcesClaimed, IOException {
try {

logAndSetPhase(Phase.RETRIEVE_STATISTICS);
FeatureStatistics statistics = runReadQuerySync(buildStatisticDataOfTemporaryTableQuery(), db(),
0, rs -> rs.next()
? new FeatureStatistics().withFeatureCount(rs.getLong("imported_rows")).withByteSize(rs.getLong("imported_bytes"))
: new FeatureStatistics());

log("Statistics: bytes=" + statistics.getByteSize() + " rows=" + statistics.getFeatureCount());
logAndSetPhase(Phase.WRITE_STATISTICS);
infoLog(STEP_ON_ASYNC_SUCCESS, "Job Statistics: bytes=" + statistics.getByteSize() + " rows=" + statistics.getFeatureCount());
registerOutputs(List.of(statistics), true);

cleanUpDbRelatedResources();

logAndSetPhase(Phase.RELEASE_READONLY);
infoLog(STEP_ON_ASYNC_SUCCESS, "Release READONLY");
hubWebClient().patchSpace(getSpaceId(), Map.of(
"readOnly", false,
"contentUpdatedAt", Core.currentTimeMillis()
Expand All @@ -453,15 +432,15 @@ protected void onAsyncSuccess() throws WebClientException,
//relation "*_job_data" does not exist - can happen when we have received twice a SUCCESS_CALLBACK
//TODO: Find out the cases in which that could happen and prevent it from happening
if (e.getSQLState() != null && e.getSQLState().equals("42P01")) {
log("_job_data table got already deleted!");
errorLog(STEP_ON_ASYNC_SUCCESS, "_job_data table got already deleted!", e);
return;
}
throw e;
}
}

private void cleanUpDbRelatedResources() throws TooManyResourcesClaimed, SQLException, WebClientException {
logAndSetPhase(Phase.DROP_TMP_TABLE);
infoLog(STEP_ON_ASYNC_SUCCESS, "Clean up database resources");
runBatchWriteQuerySync(SQLQuery.batchOf(
buildDropTemporaryTableQuery(getSchema(db()), getTemporaryJobTableName(this)),
buildDropTemporaryTableQuery(getSchema(db()), getTemporaryTriggerTableName(this))
Expand Down Expand Up @@ -550,18 +529,6 @@ private void fillTemporaryTableWithInputs(Database db, List<S3DataFile> inputs,
runBatchWriteQuerySync(SQLQuery.batchOf(queryList), db, 0);
}

private SQLQuery buildDropTemporaryTableForImportQuery() throws WebClientException {
return new SQLQuery("DROP TABLE IF EXISTS ${schema}.${table};")
.withVariable("table", getTemporaryJobTableName(this))
.withVariable("schema", getSchema(db()));
}

private SQLQuery buildDropTemporaryTriggerTableForImportQuery() throws WebClientException {
return new SQLQuery("DROP TABLE IF EXISTS ${schema}.${table};")
.withVariable("table", TransportTools.getTemporaryTriggerTableName(this))
.withVariable("schema", getSchema(db()));
}

private SQLQuery buildTemporaryTriggerTableForImportQuery() throws WebClientException {
String tableFields =
"jsondata TEXT, "
Expand Down Expand Up @@ -783,11 +750,19 @@ private double calculateNeededAcus(int threadCount) {
neededACUs = ResourceAndTimeCalculator.getInstance().calculateNeededImportAcus(
getUncompressedUploadBytesEstimation(), fileCount, threadCount);

logAndSetPhase(Phase.CALCULATE_ACUS,
"expectedMemoryConsumption: " + getUncompressedUploadBytesEstimation() + " => neededACUs:" + neededACUs);
infoLog(JOB_EXECUTOR, "Calcualted ACUS: expectedMemoryConsumption: "
+ getUncompressedUploadBytesEstimation() + " => neededACUs:" + neededACUs);
return neededACUs;
}

private void infoLog(Phase phase, String... messages){
TransportTools.infoLog(phase.name(), getSpaceId(), getGlobalStepId(), messages);
}

private void errorLog(Phase phase, String message, Exception e){
TransportTools.errorLog(phase.name(), getSpaceId(), getGlobalStepId(), message, e);
}

//TODO: De-duplicate once CSV was removed (see GeoJson format class)
public enum EntityPerLine {
Feature,
Expand All @@ -799,21 +774,4 @@ public enum Format {
CSV_JSON_WKB,
GEOJSON;
}

public enum Phase {
VALIDATE,
CALCULATE_ACUS,
SET_READONLY,
RETRIEVE_NEW_VERSION,
CREATE_TRIGGER,
CREATE_TMP_TABLE,
RESET_SUCCESS_MARKER,
FILL_TMP_TABLE,
EXECUTE_IMPORT,
RETRIEVE_STATISTICS,
WRITE_STATISTICS,
DROP_TRIGGER,
DROP_TMP_TABLE,
RELEASE_READONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import com.here.xyz.jobs.steps.Step;
import com.here.xyz.util.db.SQLQuery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class TransportTools {
private static final Logger logger = LogManager.getLogger();

private static final String JOB_DATA_PREFIX = "job_data_";
private static final String TRIGGER_TABLE_SUFFIX = "_trigger_tbl";
Expand All @@ -40,4 +43,25 @@ protected static SQLQuery buildDropTemporaryTableQuery(String schema, String tab
.withVariable("table", tableName)
.withVariable("schema", schema);
}

protected static void infoLog(String phase, String spaceId, String getGlobalStepId, String... messages) {
logger.info("[{}@{}] ON '{}' {}", getGlobalStepId, phase, spaceId, messages.length > 0 ? messages : "");
}

protected static void errorLog(String phase, String spaceId, String getGlobalStepId, String message, Exception e) {
logger.error("[{}@{}] ON '{}' {}", getGlobalStepId, phase, spaceId, message, e);
}

protected enum Phase {
GRAPH_TRANSFORMER,
JOB_EXECUTOR,
STEP_EXECUTE,
STEP_RESUME,
STEP_CANCEL,
STEP_ON_STATE_CHECK,
STEP_ON_ASYNC_FAILURE,
STEP_ON_ASYNC_SUCCESS,
JOB_DELETE,
JOB_VALIDATE
}
}

0 comments on commit 9fa4f5d

Please sign in to comment.