Skip to content

Commit

Permalink
Feature / Expose job logs in the platform API (#489)
Browse files Browse the repository at this point in the history
* Move logging into a separate _impl module

* Start implementing job logging

* Add LogProvider interface, use it to record engine processing logs

* Use log provider mechanism to set up trac context and model logs

* Fix print job log statement

* Handle saving job results at the engine level

* Take result spec and save results out of graph processing

* Metadata for job results

* Validator updates for job results

* Update test data to include job result

* Validation fixes for jobs submitted as requests to the orchestrator

* Fix duplicate registration for job def validator

* Fix building result metadata for import model job

* Add a test case for job result and log file outputs in run flow e2e

* Helper for preallocated IDs

* Support results and log files in orch job processor

* Support for RESULT and log file outputs in the runtime

* Filter search result for outputs of FILE IO model in run model test (we want the model output, not the job log)

* Fixes found during testing
  • Loading branch information
martin-traverse authored Jan 7, 2025
1 parent 4d703ce commit 59c6c79
Show file tree
Hide file tree
Showing 49 changed files with 935 additions and 408 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ message JobDefinition {
ExportDataJob exportData = 6;
JobGroup jobGroup = 7;
}

TagSelector resultId = 8;
}

/**
* Define the result of a job after it has completed
*/
message ResultDefinition {

TagSelector jobId = 1;

JobStatusCode statusCode = 2;
string statusMessage = 3;

TagSelector logFileId = 4;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ message ObjectDefinition {
CustomDefinition custom = 7;
StorageDefinition storage = 8;
SchemaDefinition schema = 9;
ResultDefinition result = 10;
}

map<string, Value> objectProps = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum ObjectType {
CUSTOM = 6;
STORAGE = 7;
SCHEMA = 8;
RESULT = 9;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ public static TagHeader nextTagVersion(TagHeader header, Instant timestamp) {
.build();
}

public static TagSelector preallocated(TagHeader selector) {

return TagSelector.newBuilder()
.setObjectType(selector.getObjectType())
.setObjectId(selector.getObjectId())
.setObjectVersion(0)
.setTagVersion(0)
.build();
}

public static TagSelector preallocated(TagSelector selector) {

return TagSelector.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static ObjectDefinition dummyDefinitionForType(ObjectType objectType) {
case CUSTOM: return dummyCustomDef();
case STORAGE: return dummyStorageDef();
case SCHEMA: return dummySchemaDef();
case RESULT: return dummyResultDef();

default:
throw new RuntimeException("No dummy data available for object type " + objectType.name());
Expand Down Expand Up @@ -88,6 +89,7 @@ public static ObjectDefinition dummyVersionForType(ObjectDefinition definition)

case FLOW:
case JOB:
case RESULT:

return definition;

Expand Down Expand Up @@ -435,12 +437,43 @@ public static ObjectDefinition dummyJobDef() {
.setObjectVersion(1)
.setLatestTag(true);

var resultSelector = TagSelector.newBuilder()
.setObjectType(ObjectType.RESULT)
.setObjectId(UUID.randomUUID().toString())
.setObjectVersion(1)
.setLatestTag(true);

return ObjectDefinition.newBuilder()
.setObjectType(ObjectType.JOB)
.setJob(JobDefinition.newBuilder()
.setJobType(JobType.RUN_MODEL)
.setRunModel(RunModelJob.newBuilder()
.setModel(targetSelector)))
.setModel(targetSelector))
.setResultId(resultSelector))
.build();
}

public static ObjectDefinition dummyResultDef() {

var jobSelector = TagSelector.newBuilder()
.setObjectType(ObjectType.JOB)
.setObjectId(UUID.randomUUID().toString())
.setObjectVersion(1)
.setLatestTag(true);

var logFileSelector = TagSelector.newBuilder()
.setObjectType(ObjectType.FILE)
.setObjectId(UUID.randomUUID().toString())
.setObjectVersion(1)
.setLatestTag(true);

return ObjectDefinition.newBuilder()
.setObjectType(ObjectType.RESULT)
.setResult(ResultDefinition.newBuilder()
.setJobId(jobSelector)
.setStatusCode(JobStatusCode.SUCCEEDED)
.setStatusMessage("Job completed in [42] seconds")
.setLogFileId(logFileSelector))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ private static ValidationContext validateOrSubmit(JobRequest msg, ValidationCont

ctx = ctx.push(JR_JOB)
.apply(CommonValidators::required)
.applyRegistered()
.apply(JobValidator::outputsMustBeEmpty, JobDefinition.class)
.apply(JobValidator::jobRequest, JobDefinition.class)
.pop();

ctx = ctx.pushRepeated(JR_JOB_ATTRS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class JobValidator {
private static final Descriptors.Descriptor JOB_DEFINITION;
private static final Descriptors.FieldDescriptor JD_JOB_TYPE;
private static final Descriptors.OneofDescriptor JD_JOB_DETAILS;
private static final Descriptors.FieldDescriptor JD_RESULT_ID;

private static final Descriptors.Descriptor IMPORT_MODEL_JOB;
private static final Descriptors.FieldDescriptor IMJ_LANGUAGE;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class JobValidator {
JOB_DEFINITION = JobDefinition.getDescriptor();
JD_JOB_TYPE = field(JOB_DEFINITION, JobDefinition.JOBTYPE_FIELD_NUMBER);
JD_JOB_DETAILS = field(JOB_DEFINITION, JobDefinition.RUNMODEL_FIELD_NUMBER).getContainingOneof();
JD_RESULT_ID = field(JOB_DEFINITION, JobDefinition.RESULTID_FIELD_NUMBER);

IMPORT_MODEL_JOB = ImportModelJob.getDescriptor();
IMJ_LANGUAGE = field(IMPORT_MODEL_JOB, ImportModelJob.LANGUAGE_FIELD_NUMBER);
Expand All @@ -95,10 +97,23 @@ public class JobValidator {
RFJ_PRIOR_OUTPUTS = field(RUN_FLOW_JOB, RunFlowJob.PRIOROUTPUTS_FIELD_NUMBER);
}


@Validator
public static ValidationContext job(JobDefinition msg, ValidationContext ctx) {

return ctx.apply(JobValidator::job, JobDefinition.class, /* isClientRequest = */ false);
}

// Do not register two validators for the same object type
// This method is called directly from the orch API validator
public static ValidationContext jobRequest(JobDefinition msg, ValidationContext ctx) {

return ctx
.apply(JobValidator::job, JobDefinition.class, /* isClientRequest = */ true)
.apply(JobValidator::outputsMustBeEmpty, JobDefinition.class);
}

public static ValidationContext job(JobDefinition msg, boolean isClientRequest, ValidationContext ctx) {

ctx = ctx.push(JD_JOB_TYPE)
.apply(CommonValidators::required)
.apply(CommonValidators::nonZeroEnum, JobType.class)
Expand All @@ -110,6 +125,17 @@ public static ValidationContext job(JobDefinition msg, ValidationContext ctx) {
.applyRegistered()
.pop();

// Jobs submitted through the API must not contain a result ID (it is added later by the orchestrator)

var clientRequestQualifier = "a job is submitted from the client";

ctx = ctx.push(JD_RESULT_ID)
.apply(CommonValidators.ifAndOnlyIf(!isClientRequest, clientRequestQualifier, true))
.apply(ObjectIdValidator::tagSelector, TagSelector.class)
.apply(ObjectIdValidator::selectorType, TagSelector.class, ObjectType.RESULT)
.apply(ObjectIdValidator::fixedObjectVersion, TagSelector.class)
.pop();

return ctx;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class ObjectValidator {
Map.entry(ObjectDefinition.DefinitionCase.FILE, ObjectType.FILE),
Map.entry(ObjectDefinition.DefinitionCase.CUSTOM, ObjectType.CUSTOM),
Map.entry(ObjectDefinition.DefinitionCase.STORAGE, ObjectType.STORAGE),
Map.entry(ObjectDefinition.DefinitionCase.SCHEMA, ObjectType.SCHEMA));
Map.entry(ObjectDefinition.DefinitionCase.SCHEMA, ObjectType.SCHEMA),
Map.entry(ObjectDefinition.DefinitionCase.RESULT, ObjectType.RESULT));

private static final Descriptors.Descriptor OBJECT_DEFINITION;
private static final Descriptors.FieldDescriptor OD_OBJECT_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Fintech Open Source Foundation (FINOS) under one or
* more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* FINOS licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.finos.tracdap.common.validation.static_;

import com.google.protobuf.Descriptors;
import org.finos.tracdap.common.validation.core.ValidationContext;
import org.finos.tracdap.common.validation.core.ValidationType;
import org.finos.tracdap.common.validation.core.Validator;
import org.finos.tracdap.metadata.*;

import static org.finos.tracdap.common.validation.core.ValidatorUtils.field;


@Validator(type = ValidationType.STATIC)
public class ResultValidator {

private static final Descriptors.Descriptor RESULT_DEFINITION;
private static final Descriptors.FieldDescriptor RD_JOB_ID;
private static final Descriptors.FieldDescriptor RD_STATUS_CODE;
private static final Descriptors.FieldDescriptor RD_STATUS_MESSAGE;
private static final Descriptors.FieldDescriptor RD_LOG_FILE_ID;

static {

RESULT_DEFINITION = ResultDefinition.getDescriptor();
RD_JOB_ID = field(RESULT_DEFINITION, ResultDefinition.JOBID_FIELD_NUMBER);
RD_STATUS_CODE = field(RESULT_DEFINITION, ResultDefinition.STATUSCODE_FIELD_NUMBER);
RD_STATUS_MESSAGE = field(RESULT_DEFINITION, ResultDefinition.STATUSMESSAGE_FIELD_NUMBER);
RD_LOG_FILE_ID = field(RESULT_DEFINITION, ResultDefinition.LOGFILEID_FIELD_NUMBER);
}

@Validator
public static ValidationContext job(ResultDefinition msg, ValidationContext ctx) {

ctx = ctx.push(RD_JOB_ID)
.apply(CommonValidators::required)
.apply(ObjectIdValidator::tagSelector, TagSelector.class)
.apply(ObjectIdValidator::selectorType, TagSelector.class, ObjectType.JOB)
.apply(ObjectIdValidator::fixedObjectVersion, TagSelector.class)
.pop();

ctx = ctx.push(RD_STATUS_CODE)
.apply(CommonValidators::required)
.apply(CommonValidators::nonZeroEnum, JobStatusCode.class)
.pop();

ctx = ctx.push(RD_STATUS_MESSAGE)
.apply(CommonValidators::optional)
.pop();

ctx = ctx.push(RD_LOG_FILE_ID)
.apply(CommonValidators::required)
.apply(ObjectIdValidator::tagSelector, TagSelector.class)
.apply(ObjectIdValidator::selectorType, TagSelector.class, ObjectType.FILE)
.apply(ObjectIdValidator::fixedObjectVersion, TagSelector.class)
.pop();

return ctx;
}
}
9 changes: 5 additions & 4 deletions tracdap-runtime/python/src/tracdap/rt/_exec/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import queue
import time

import tracdap.rt._impl.util as util # noqa
import tracdap.rt._impl.logging as _logging # noqa
import tracdap.rt._impl.util as _util # noqa
import tracdap.rt._impl.validation as _val # noqa
import tracdap.rt.exceptions as _ex

Expand Down Expand Up @@ -235,7 +236,7 @@ def __init__(self):
self.__shutdown = False
self.__shutdown_now = False
self.__done = False
self.__log = util.logger_for_object(self)
self.__log = _logging.logger_for_object(self)

def post_message(self, msg: _T_MSG, processor: tp.Callable[[_T_MSG], None]) -> bool:
with self.__msg_lock:
Expand Down Expand Up @@ -365,7 +366,7 @@ def lookup_message_function(cls, actor_class: Actor.__class__, message: str):

class ActorNode:

_log = util.logger_for_class(Actor)
_log = _logging.logger_for_class(Actor)

def __init__(
self, actor_id: ActorId, actor: Actor,
Expand Down Expand Up @@ -904,7 +905,7 @@ def __init__(

super().__init__()

self._log = util.logger_for_object(self)
self._log = _logging.logger_for_object(self)

# self.__actors: tp.Dict[ActorId, ActorNode] = {self.__ROOT_ID: ActorNode("", self.__ROOT_ID, None)}
# self.__message_queue: tp.List[Msg] = list()
Expand Down
36 changes: 26 additions & 10 deletions tracdap-runtime/python/src/tracdap/rt/_exec/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import tracdap.rt.api.experimental as _eapi
import tracdap.rt.metadata as _meta
import tracdap.rt.exceptions as _ex
import tracdap.rt._impl.type_system as _types # noqa
import tracdap.rt._impl.data as _data # noqa
import tracdap.rt._impl.logging as _logging # noqa
import tracdap.rt._impl.storage as _storage # noqa
import tracdap.rt._impl.type_system as _types # noqa
import tracdap.rt._impl.util as _util # noqa
import tracdap.rt._impl.validation as _val # noqa

Expand Down Expand Up @@ -63,10 +64,15 @@ def __init__(self,
model_class: _api.TracModel.__class__,
local_ctx: tp.Dict[str, tp.Any],
dynamic_outputs: tp.List[str] = None,
checkout_directory: pathlib.Path = None):
checkout_directory: pathlib.Path = None,
log_provider: _logging.LogProvider = None):

# If no log provider is supplied, use the default (system logs only)
if log_provider is None:
log_provider = _logging.LogProvider()

self.__ctx_log = _util.logger_for_object(self)
self.__model_log = _util.logger_for_class(model_class)
self.__ctx_log = log_provider.logger_for_object(self)
self.__model_log = log_provider.logger_for_class(model_class)

self.__model_def = model_def
self.__model_class = model_class
Expand Down Expand Up @@ -368,9 +374,9 @@ def __init__(
self, model_def: _meta.ModelDefinition, model_class: _api.TracModel.__class__,
local_ctx: tp.Dict[str, tp.Any], dynamic_outputs: tp.List[str],
storage_map: tp.Dict[str, tp.Union[_eapi.TracFileStorage, _eapi.TracDataStorage]],
checkout_directory: pathlib.Path = None):
checkout_directory: pathlib.Path = None, log_provider: _logging.LogProvider = None):

super().__init__(model_def, model_class, local_ctx, dynamic_outputs, checkout_directory)
super().__init__(model_def, model_class, local_ctx, dynamic_outputs, checkout_directory, log_provider)

self.__model_def = model_def
self.__local_ctx = local_ctx
Expand Down Expand Up @@ -460,7 +466,9 @@ def set_schema(self, dataset_name: str, schema: _meta.SchemaDefinition):

class TracFileStorageImpl(_eapi.TracFileStorage):

def __init__(self, storage_key: str, storage_impl: _storage.IFileStorage, write_access: bool, checkout_directory):
def __init__(
self, storage_key: str, storage_impl: _storage.IFileStorage,
write_access: bool, checkout_directory, log_provider: _logging.LogProvider):

self.__storage_key = storage_key

Expand All @@ -481,7 +489,11 @@ def __init__(self, storage_key: str, storage_impl: _storage.IFileStorage, write_
self.__rmdir = None
self.__write_byte_stream = None

self.__log = _util.logger_for_object(self)
# If no log provider is supplied, use the default (system logs only)
if log_provider is None:
log_provider = _logging.LogProvider()

self.__log = log_provider.logger_for_object(self)
self.__val = TracStorageValidator(self.__log, checkout_directory, self.__storage_key)

def get_storage_key(self) -> str:
Expand Down Expand Up @@ -602,7 +614,7 @@ class TracDataStorageImpl(_eapi.TracDataStorage[_eapi.DATA_API]):
def __init__(
self, storage_key: str, storage_impl: _storage.IDataStorageBase[_data.T_INTERNAL_DATA, _data.T_INTERNAL_SCHEMA],
data_converter: _data.DataConverter[_eapi.DATA_API, _data.T_INTERNAL_DATA, _data.T_INTERNAL_SCHEMA],
write_access: bool, checkout_directory):
write_access: bool, checkout_directory, log_provider: _logging.LogProvider):

self.__storage_key = storage_key
self.__converter = data_converter
Expand All @@ -619,7 +631,11 @@ def __init__(
self.__create_table = None
self.__write_table = None

self.__log = _util.logger_for_object(self)
# If no log provider is supplied, use the default (system logs only)
if log_provider is None:
log_provider = _logging.LogProvider()

self.__log = log_provider.logger_for_object(self)
self.__val = TracStorageValidator(self.__log, checkout_directory, self.__storage_key)

def has_table(self, table_name: str) -> bool:
Expand Down
Loading

0 comments on commit 59c6c79

Please sign in to comment.