Skip to content

Commit

Permalink
feat: enhance task runner with improved logging and dependency support
Browse files Browse the repository at this point in the history
- Added support for new dependency file types: 'go.mod' and 'pom.xml' in dependency.go.
- Refactored command configuration in runner.go to improve logging and error handling.
- Introduced a new method to configure Node.js paths, enhancing environment setup for tasks.
- Enhanced IPC message handling with detailed logging for better traceability.
- Updated service logging to remove unnecessary prefixes for cleaner output.
- Improved command execution handling in process.go for better compatibility across platforms.
  • Loading branch information
tikazyq committed Dec 31, 2024
1 parent 8916194 commit 7b6805a
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 40 deletions.
2 changes: 2 additions & 0 deletions core/constants/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
const (
DependencyFileTypeRequirementsTxt = "requirements.txt"
DependencyFileTypePackageJson = "package.json"
DependencyFileTypeGoMod = "go.mod"
DependencyFileTypePomXml = "pom.xml"
)
const (
DependencyActionSync = "sync"
Expand Down
212 changes: 179 additions & 33 deletions core/task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (r *Runner) GetTaskId() (id primitive.ObjectID) {
func (r *Runner) configureCmd() (err error) {
var cmdStr string

// customized spider
// command
if r.t.Cmd == "" {
cmdStr = r.s.Cmd
} else {
Expand All @@ -225,20 +225,31 @@ func (r *Runner) configureCmd() (err error) {
// set working directory
r.cmd.Dir = r.cwd

// Configure pipes for IPC
// Configure pipes for IPC and logs
r.stdinPipe, err = r.cmd.StdinPipe()
if err != nil {
r.Errorf("error creating stdin pipe: %v", err)
return err
}

// Add stdout pipe for IPC
// Add stdout pipe for IPC and logs
r.stdoutPipe, err = r.cmd.StdoutPipe()
if err != nil {
r.Errorf("error creating stdout pipe: %v", err)
return err
}

// Add stderr pipe for error logs
stderrPipe, err := r.cmd.StderrPipe()
if err != nil {
r.Errorf("error creating stderr pipe: %v", err)
return err
}

// Create buffered readers
r.scannerStdout = bufio.NewReader(r.stdoutPipe)
r.scannerStderr = bufio.NewReader(stderrPipe)

// Initialize IPC channel
r.ipcChan = make(chan entity.IPCMessage)

Expand Down Expand Up @@ -272,24 +283,51 @@ func (r *Runner) startHealthCheck() {
}
}

// configureNodePath sets up the Node.js environment paths, handling both nvm and default installations
func (r *Runner) configureNodePath() {
// Get user's home directory
home, err := os.UserHomeDir()
if err != nil {
r.Errorf("error getting user home directory: %v", err)
home = "/root" // fallback to root if can't get home dir
}

// Configure nvm-based Node.js paths
envPath := os.Getenv("PATH")
nvmPath := filepath.Join(home, ".nvm/versions/node")

// Check if nvm is being used
if utils.Exists(nvmPath) {
// Get the current node version from NVM
currentVersion := os.Getenv("NVM_BIN")
if currentVersion != "" {
nodePath := filepath.Dir(currentVersion) + "/lib/node_modules"
if !strings.Contains(envPath, nodePath) {
_ = os.Setenv("PATH", nodePath+":"+envPath)
}
_ = os.Setenv("NODE_PATH", nodePath)
}
} else {
// Fallback to default global node_modules path
nodePath := "/usr/lib/node_modules"
if !strings.Contains(envPath, nodePath) {
_ = os.Setenv("PATH", nodePath+":"+envPath)
}
_ = os.Setenv("NODE_PATH", nodePath)
}
}

// configureEnv sets up the environment variables for the task process, including:
// - Node.js paths
// - Crawlab-specific variables
// - Global environment variables from the system
func (r *Runner) configureEnv() {
// By default, add Node.js's global node_modules to PATH
envPath := os.Getenv("PATH")
nodePath := "/usr/lib/node_modules"
if !strings.Contains(envPath, nodePath) {
_ = os.Setenv("PATH", nodePath+":"+envPath)
}
_ = os.Setenv("NODE_PATH", nodePath)
// Configure Node.js paths
r.configureNodePath()

// Default envs
r.cmd.Env = os.Environ()
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_TASK_ID="+r.tid.Hex())
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_ADDRESS="+utils.GetGrpcAddress())
r.cmd.Env = append(r.cmd.Env, "CRAWLAB_GRPC_AUTH_KEY="+utils.GetAuthKey())
r.cmd.Env = append(r.cmd.Env, "PYENV_ROOT="+utils.PyenvRoot)
r.cmd.Env = append(r.cmd.Env, "PATH="+os.Getenv("PATH")+":"+utils.PyenvRoot+"/shims:"+utils.PyenvRoot+"/bin")

Expand Down Expand Up @@ -341,12 +379,16 @@ func (r *Runner) createHttpRequest(method, path string) (*http.Response, error)
// 3. Downloads new/modified files
// 4. Deletes files that no longer exist on master
func (r *Runner) syncFiles() (err error) {
r.Infof("starting file synchronization for spider: %s", r.s.Id.Hex())

workingDir := ""
if !r.s.GitId.IsZero() {
workingDir = r.s.GitRootPath
r.Debugf("using git root path: %s", workingDir)
}

// get file list from master
r.Infof("fetching file list from master node")
resp, err := r.createHttpRequest("GET", "/scan?path="+workingDir)
if err != nil {
r.Errorf("error getting file list from master: %v", err)
Expand Down Expand Up @@ -441,11 +483,14 @@ func (r *Runner) syncFiles() (err error) {
// wait for all files and directories to be synchronized
wg.Wait()

r.Infof("file synchronization completed successfully")
return err
}

// downloadFile downloads a file from the master node and saves it to the local file system
func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsFileInfo) error {
r.Debugf("downloading file: %s -> %s", path, filePath)

resp, err := r.createHttpRequest("GET", "/download?path="+path)
if err != nil {
r.Errorf("error getting file response: %v", err)
Expand Down Expand Up @@ -480,6 +525,8 @@ func (r *Runner) downloadFile(path string, filePath string, fileInfo *entity.FsF
r.Errorf("error copying file: %v", err)
return err
}

r.Debugf("successfully downloaded file: %s (size: %d bytes)", path, fileInfo.FileSize)
return nil
}

Expand Down Expand Up @@ -555,12 +602,28 @@ func (r *Runner) wait() (err error) {
return err
}

// log according to status
switch status {
case constants.TaskStatusFinished:
r.Infof("task[%s] finished", r.tid.Hex())
case constants.TaskStatusCancelled:
r.Infof("task[%s] cancelled", r.tid.Hex())
case constants.TaskStatusError:
r.Errorf("task[%s] error: %v", r.tid.Hex(), err)
default:
r.Errorf("invalid task status: %s", status)
}

return nil
}

// updateTask updates the task status and related statistics in the database
// If running on a worker node, updates are sent to the master
func (r *Runner) updateTask(status string, e error) (err error) {
if status != "" {
r.Debugf("updating task status to: %s", status)
}

if r.t != nil && status != "" {
// update task status
r.t.Status = status
Expand Down Expand Up @@ -588,8 +651,10 @@ func (r *Runner) updateTask(status string, e error) (err error) {
}

// get task
r.Debugf("fetching updated task from database")
r.t, err = r.svc.GetTaskById(r.tid)
if err != nil {
r.Errorf("failed to get updated task: %v", err)
return err
}

Expand Down Expand Up @@ -628,11 +693,18 @@ func (r *Runner) writeLogLines(lines []string) {
// - For running tasks: sets start time and wait duration
// - For completed tasks: sets end time and calculates durations
func (r *Runner) _updateTaskStat(status string) {
if status != "" {
r.Debugf("updating task statistics for status: %s", status)
}

ts, err := client.NewModelService[models.TaskStat]().GetById(r.tid)
if err != nil {
r.Errorf("error getting task stat: %v", err)
return
}

r.Debugf("current task statistics - wait_duration: %dms, runtime_duration: %dms", ts.WaitDuration, ts.RuntimeDuration)

switch status {
case constants.TaskStatusPending:
// do nothing
Expand Down Expand Up @@ -772,45 +844,68 @@ func (r *Runner) SetIPCHandler(handler func(entity.IPCMessage)) {
// startIPCReader continuously reads IPC messages from the child process's stdout
// Messages are parsed and either handled by the IPC handler or written to logs
func (r *Runner) startIPCReader() {
r.wg.Add(1)
defer r.wg.Done()
r.wg.Add(2) // Add 2 to wait group for both stdout and stderr readers

scanner := bufio.NewScanner(r.stdoutPipe)
// Start stdout reader
go func() {
defer r.wg.Done()
r.readOutput(r.scannerStdout, true) // true for stdout
}()

// Start stderr reader
go func() {
defer r.wg.Done()
r.readOutput(r.scannerStderr, false) // false for stderr
}()
}

func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) {
for {
select {
case <-r.ctx.Done():
return
default:
if !scanner.Scan() {
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF {
r.Errorf("error reading from %s: %v",
map[bool]string{true: "stdout", false: "stderr"}[isStdout],
err)
}
return
}
line := scanner.Text()

var ipcMsg entity.IPCMessage
err := json.Unmarshal([]byte(line), &ipcMsg)
if err == nil && ipcMsg.IPC {
// Only handle as IPC if it's valid JSON AND has IPC flag set
if r.ipcHandler != nil {
r.ipcHandler(ipcMsg)
} else {
// Default handler (insert data)
if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData {
r.handleIPCInsertDataMessage(ipcMsg)

// Trim the line
line = strings.TrimRight(line, "\n\r")

// For stdout, try to parse as IPC message first
if isStdout {
var ipcMsg entity.IPCMessage
if err := json.Unmarshal([]byte(line), &ipcMsg); err == nil && ipcMsg.IPC {
if r.ipcHandler != nil {
r.ipcHandler(ipcMsg)
} else {
r.Warnf("no IPC handler set for message: %v", ipcMsg)
// Default handler (insert data)
if ipcMsg.Type == "" || ipcMsg.Type == constants.IPCMessageData {
r.handleIPCInsertDataMessage(ipcMsg)
} else {
r.Warnf("no IPC handler set for message: %v", ipcMsg)
}
}
continue
}
} else {
// Everything else is treated as logs
r.writeLogLines([]string{line})
}

// If not an IPC message or from stderr, treat as log
r.writeLogLines([]string{line})
}
}
}

// handleIPCInsertDataMessage converts the IPC message payload to JSON and sends it to the master node
func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
// Validate message
r.Debugf("processing IPC data message")

if ipcMsg.Payload == nil {
r.Errorf("empty payload in IPC message")
return
Expand Down Expand Up @@ -887,6 +982,8 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) {
return
}
}

r.Infof("successfully sent %d records to master node", len(records))
}

// newTaskRunner creates a new task runner instance with the specified task ID
Expand Down Expand Up @@ -939,3 +1036,52 @@ func newTaskRunner(id primitive.ObjectID, svc *Service) (r *Runner, err error) {

return r, errs.ErrorOrNil()
}

// logInternally sends internal runner logs to the same logging system as the task
func (r *Runner) logInternally(level string, message string) {
// Format the internal log with a prefix
timestamp := time.Now().Local().Format("2006-01-02 15:04:05")

// Pad level
level = fmt.Sprintf("%-5s", level)

// Format the log message
internalLog := fmt.Sprintf("%s [%s] [%s] %s", level, timestamp, "Crawlab", message)

// Send to the same log system as task logs
if r.conn != nil {
r.writeLogLines([]string{internalLog})
}

// Also log through the standard logger
switch level {
case "ERROR":
r.Logger.Error(message)
case "WARN":
r.Logger.Warn(message)
case "INFO":
r.Logger.Info(message)
case "DEBUG":
r.Logger.Debug(message)
}
}

func (r *Runner) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
r.logInternally("ERROR", msg)
}

func (r *Runner) Warnf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
r.logInternally("WARN", msg)
}

func (r *Runner) Infof(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
r.logInternally("INFO", msg)
}

func (r *Runner) Debugf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
r.logInternally("DEBUG", msg)
}
6 changes: 3 additions & 3 deletions core/task/handler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (svc *Service) getRunnerCount() (count int) {
}

func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunner, err error) {
svc.Debugf("[TaskHandlerService] getRunner: taskId[%v]", taskId)
svc.Debugf("get runner: taskId[%v]", taskId)
v, ok := svc.runners.Load(taskId)
if !ok {
err = fmt.Errorf("task[%s] not exists", taskId.Hex())
Expand All @@ -242,12 +242,12 @@ func (svc *Service) getRunner(taskId primitive.ObjectID) (r interfaces.TaskRunne
}

func (svc *Service) addRunner(taskId primitive.ObjectID, r interfaces.TaskRunner) {
svc.Debugf("[TaskHandlerService] addRunner: taskId[%s]", taskId.Hex())
svc.Debugf("add runner: taskId[%s]", taskId.Hex())
svc.runners.Store(taskId, r)
}

func (svc *Service) deleteRunner(taskId primitive.ObjectID) {
svc.Debugf("[TaskHandlerService] deleteRunner: taskId[%v]", taskId)
svc.Debugf("delete runner: taskId[%v]", taskId)
svc.runners.Delete(taskId)
}

Expand Down
Loading

0 comments on commit 7b6805a

Please sign in to comment.