Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RemoteLogger #94

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b"
version = "1.11.0"

[deps]
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ For controlling other processes via RPC:
For communicating between processes in the style of a channel or stream:

- `RemoteChannel` - a `Channel`-like object that can be `put!` to or `take!` from any process
- `RemoteLogger` - an `AbstractLogger` forwarding logs to a given worker

For controlling multiple processes at once:

Expand Down
1 change: 1 addition & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Distributed.channel_from_id
Distributed.worker_id_from_socket
Distributed.cluster_cookie()
Distributed.cluster_cookie(::Any)
Distributed.RemoteLogger
```

## Cluster Manager Interface
Expand Down
3 changes: 3 additions & 0 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ using Base.Threads: Event
using Serialization, Sockets
import Serialization: serialize, deserialize
import Sockets: connect, wait_connected
import Logging

# NOTE: clusterserialize.jl imports additional symbols from Serialization for use

Expand Down Expand Up @@ -60,6 +61,7 @@ export
WorkerConfig,
RemoteException,
ProcessExitedException,
RemoteLogger,

process_messages,
remoteref_id,
Expand Down Expand Up @@ -107,6 +109,7 @@ include("messages.jl")
include("process_messages.jl") # process incoming messages
include("remotecall.jl") # the remotecall* api
include("macros.jl") # @spawn and friends
include("logger.jl")
include("workerpool.jl")
include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
Expand Down
27 changes: 27 additions & 0 deletions src/logger.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
RemoteLogger(pid=1, min_level=Info)

Logger that forwards all logging to worker `pid` via `remote_do` along with
adding the current worker `id` as a `pid` kwarg.
"""
struct RemoteLogger <: Logging.AbstractLogger
pid::Int
min_level::Logging.LogLevel
end
function RemoteLogger(pid=1)
RemoteLogger(pid, Logging.Info)
end

Logging.min_enabled_level(logger::RemoteLogger) = logger.min_level
Logging.shouldlog(logger::RemoteLogger, level, _module, group, id) = true

Check warning on line 16 in src/logger.jl

View check run for this annotation

Codecov / codecov/patch

src/logger.jl#L16

Added line #L16 was not covered by tests

# TODO: probably should live in base/logging.jl?
function logmsg(level::Logging.LogLevel, message, _module, _group, _id, _file, _line; kwargs...)
Logging.@logmsg level message _module = _module _group = _group _id = _id _file = _file _line = _line kwargs...
end

function Logging.handle_message(logger::RemoteLogger, level::Logging.LogLevel, message, _module, _group, _id,
_file, _line; kwargs...)
@nospecialize
remote_do(logmsg, logger.pid, level, message, _module, _group, _id, _file, _line; pid=myid(), kwargs...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I thought we could directly use handle_message from the targetted worker but I don't know how to fetch the current_logger there...

end
17 changes: 16 additions & 1 deletion test/distributed_exec.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test, Distributed, Random, Serialization, Sockets
using Test, Distributed, Random, Serialization, Sockets, Logging
import Distributed: launch, manage

sharedir = normpath(joinpath(Sys.BINDIR, "..", "share"))
Expand Down Expand Up @@ -1956,6 +1956,21 @@ begin
end
end

# test logging
w = only(addprocs(1))
@everywhere using Logging
@test_logs (:info, "from pid $w") begin
prev_logger = global_logger(current_logger())
try
wait(@spawnat w with_logger(RemoteLogger(1)) do
@info("from pid $(myid())")
end)
theogf marked this conversation as resolved.
Show resolved Hide resolved
finally
global_logger(prev_logger)
end
end
wait(rmprocs([w]))

# Run topology tests last after removing all workers, since a given
# cluster at any time only supports a single topology.
nprocs() > 1 && rmprocs(workers())
Expand Down
Loading