Skip to content

Commit

Permalink
Improve pgxn_consumer logging
Browse files Browse the repository at this point in the history
Add the `--log-file` option for logging (defaults to STDOUT) and add a
timestamp and log level to every message. It also respects various
verbosity levels. Update and expand the tests, and ensure that UTF-8
characters are properly handled.
  • Loading branch information
theory committed Feb 18, 2023
1 parent b587aea commit 2d55aed
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 84 deletions.
3 changes: 3 additions & 0 deletions Build.PL
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ $class->new(
'Encode' => '2.40',
'Encode::Locale' => '1.05',
'Exception::Class::DBI' => '1.00',
'Fcntl' => 0,
'File::Basename' => '2.77',
'File::Copy' => '2.14',
'File::Path' => '2.08',
Expand All @@ -103,6 +104,7 @@ $class->new(
'HTML::Entities' => '3.68',
'HTTP::Negotiate' => '5.835',
'I18N::LangTags::Detect' => '1.03',
'IO::File' => 0,
'JSON::XS' => '2.3',
'Locale::Maketext' => '1.13',
'LWP::Protocol::https' => 0,
Expand All @@ -125,6 +127,7 @@ $class->new(
'Plack::Middleware::MethodOverride' => '0.10',
'Proc::Daemon' => 0,
'Pod::Usage' => '1.33',
'POSIX' => 0,
'namespace::autoclean' => '0.11',
'Router::Resource' => '0.11',
'SemVer' => '0.10.0',
Expand Down
1 change: 1 addition & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Revision history for Perl extension PGXN::Manager

0.30.1
- Added the --log-file option to `pgxn_consumer`.

0.30.0 2023-02-12T00:33:38Z
- Upgraded jQuery to v3.6.0 and the jquery.validate plugin to 1.19.3.
Expand Down
1 change: 1 addition & 0 deletions bin/pgxn_consumer
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on the PostgreSQL LISTEN/NOTIFY queue.
-p --pid PIDFILE PID file path; used with --daemonize.
-D --daemonize Run in the background as a daemon.
-i --interval SECONDS Interval to sleep between consuming events.
-l --log-file FILE Path to a log file.
-V --verbose Incremental verbosity to STDOUT.
-h --help Print a usage statement and exit.
-m --man Print the complete documentation and exit.
Expand Down
97 changes: 73 additions & 24 deletions lib/PGXN/Manager/Consumer.pm
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,29 @@ use Carp;
use Time::HiRes qw(sleep);
use PGXN::Manager;
use Proc::Daemon;
use IO::File;
use POSIX ();
use Fcntl qw(:flock);
use Cwd;
use namespace::autoclean;

our $VERSION = v0.30.1;
use constant CHANNELS => qw(release new_user new_mirror);
sub _emit($) { say encode_utf8 $_[0] }

has verbose => (is => 'ro', isa => 'Int', required => 1, default => 0);
has interval => (is => 'ro', isa => 'Num', required => 1, default => 5);
has continue => (is => 'rw', isa => 'Bool', required => 1, default => 1);
has log_fh => (is => 'ro', isa => 'IO::Handle', required => 1, default => sub {
_log_fh()
});
has conn => (is => 'ro', isa => 'DBIx::Connector', lazy => 1, default => sub {
# Use our own connetion instead of $pgxn->conn in order to add the callback.
my $cb = shift->verbose ? sub {
_emit "Listening on " . join(', ', CHANNELS);
my $self = shift;
my $cb = $self->verbose ? sub {
$_[0]->do("LISTEN pgxn_$_") for CHANNELS;
$self->log("INFO: Listening on ", join ', ', map { s/^pgxn_//r } @{
$_[0]->selectcol_arrayref('SELECT * FROM pg_listening_channels()')
});
return;
} : sub {
$_[0]->do("LISTEN pgxn_$_") for CHANNELS;
Expand All @@ -36,6 +44,15 @@ has conn => (is => 'ro', isa => 'DBIx::Connector', lazy => 1, default => sub
PGXN::Manager->instance->_connect( Callbacks => { connected => $cb } );
});

sub _log_fh {
my $fn = shift;
my $fh = $fn ? IO::File->new($fn, '>>:utf8')
: IO::Handle->new_from_fd(fileno STDOUT, 'w');
binmode $fh, ":utf8";
$fh->autoflush(1);
$fh;
}

sub go {
my $class = shift;
my $cfg = $class->_config;
Expand All @@ -47,12 +64,13 @@ sub go {
pid_file => $cfg->{pid},
);
if (my $pid = $daemon->Init) {
_emit $pid unless delete $cfg->{pid};
_log(_log_fh($cfg->{'log-file'}), "INFO: Forked PID $pid");
return 0;
}
}

# In the child process.
# In the child process. Set up log file handle and go.
$cfg->{log_fh} = _log_fh delete $cfg->{'log-file'};
my $cmd = $class->new( $cfg );
$SIG{TERM} = sub { $cmd->continue(0) };
$cmd->run(@ARGV);
Expand All @@ -62,8 +80,8 @@ sub run {
my $self = shift;
my $pgxn = PGXN::Manager->instance;
my $cfg = $pgxn->config->{consumers} || do {
_emit "No consumers configured; messages will be dropped";
[];
$self->log("WARN: No consumers configured; messages will be dropped");
undef
};

# Load the map from events to consumers.
Expand All @@ -75,6 +93,7 @@ sub run {
sleep($self->interval);
}

$self->log("INFO: Shutting down");
return 0;
}

Expand All @@ -85,7 +104,7 @@ sub load_consumers {
my $type = delete $cfg->{type}
or die "No type specified for event consumer\n";
my $pkg = __PACKAGE__ . "::$type";
_emit "Loading $pkg" if $self->verbose > 1;
$self->log("INFO: Loading $pkg") if $self->verbose > 1;
eval "use $pkg";
die "Error loading $pkg: $@\n" if $@;
my $events = delete $cfg->{events};
Expand All @@ -95,7 +114,7 @@ sub load_consumers {
);

for my $e (@{ $events }) {
_emit "Configuring $pkg for $e" if $self->verbose > 1;
$self->log("INFO: Configuring $pkg for $e") if $self->verbose > 1;
push @{ $consumers{$e} ||= [] } => $consumer;
}
}
Expand All @@ -110,43 +129,58 @@ sub consume {
# Notify payload treated as UTF-8 text, so already decoded from UTF-8 bytes.
my $json = JSON::XS->new->utf8(0);
my $dbh = shift;
_emit "Consuming" if $self->verbose > 1;
$self->log("INFO: Consuming") if $self->verbose > 2;
while (my $notify = $dbh->pg_notifies) {
my ($name, $pid, $msg) = @{ $notify };
_emit "Received “$name” event from PID $pid"
$self->log("INFO: Received “$name” event from PID $pid")
if $self->verbose;
unless ($name =~ s/^pgxn_//) {
_emit "Unknown channel “$name”; skipping";
$self->log("WARN: Unknown channel “$name”; skipping");
next;
}
my $handlers = $consumers_for->{$name} || do {
_emit "No handlers configured for pgxn_$name channel; skipping"
if $self->verbose;
$self->log(
"INFO: No handlers configured for ",
"pgxn_$name channel; skipping",
)if $self->verbose;
next;
};

# Decode the JSON payload;
my $meta = try {
$json->decode($msg);
} catch {
_emit "Cannot decode JSON: $_";
$self->log("ERORR: Cannot decode JSON: $_");
undef;
};
next unless $meta;

# Run all the handlers.
for my $h (@{ $handlers }) {
_emit "Sending to " . $h->name . " handler" if $self->verbose > 1;
try { $h->handle($name, $meta) } catch { _emit $_ };
$self->log("INFO: Sending to ", $h->name, " handler")
if $self->verbose;
try { $h->handle($name, $meta) }
catch { $self->log("ERROR: $_") };
}
}
});
} catch {
_emit $_;
$self->log("ERROR: $_");
};
return 1;
}

sub log {
_log(shift->log_fh, @_);
}

sub _log {
my $fh = shift;
flock $fh, LOCK_EX;
say {$fh} POSIX::strftime('%Y-%m-%dT%H:%M:%SZ - ', gmtime), join '', @_;
flock $fh, LOCK_UN;
}

sub _config {
my $self = shift;
require Getopt::Long;
Expand All @@ -163,6 +197,7 @@ sub _config {
'env|E=s',
'daemonize|D',
'pid=s',
'log-file|l=s',
'interval|i=s',
'verbose|V+',
'help|h',
Expand Down Expand Up @@ -242,7 +277,7 @@ Sent when a new mirror has been added.
=head3 C<new>
my $listener = PGXN::Manager::Consumer->new(%params);
my $consumer = PGXN::Manager::Consumer->new(%params);
Creates and returns a new PGXN::Manager::Consumer object. The supported
parameters are:
Expand All @@ -261,6 +296,10 @@ A decimal value specifying how many seconds to sleep between calls to consume
C<NOTIFY> events. Defaults to C<5>, meaning it will pause for 5 seconds after
consuming messages before making the call to consume more.
=item C<log_fh>
An IO::Handle for logging. Defaults to C<STDOUT>.
=back
=head2 Class Method
Expand All @@ -285,7 +324,7 @@ work.
=head3 C<run>
$listener->run($task, @args);
$consumer->run($task, @args);
Calls C<load_consumers> to load all the configured consumers, then runs the
listener, consuming events and passing them off to handlers configure to
Expand All @@ -295,7 +334,7 @@ termination signal handler installed by C<go()>).
=head3 C<load_consumers>
my $consumers_for_event = $listener->load_consumers($cfg);
my $consumers_for_event = $consumer->load_consumers($cfg);
Loads all of the consumers present in C<$cfg> and returns a hash reference
mapping channel names to each consumer configured to handle those channel's
Expand All @@ -304,18 +343,24 @@ instance created.
=head3 C<consume>
$listener->consume($manager, $consumers);
$consumer->consume($manager, $consumers);
Called by C<run()>, this method performs a single check for events in all
PGXN C<NOTIFY> channels, dispatching any it consumes to the handlers
configured to handle them, if any.
=head3 C<log>
$consumer->log("INFO: Hello");
Write a message to the log.
=head2 Instance Accessors
=head3 C<verbosity>
my $verbosity = $listener->verbosity;
$listener->verbosity($verbosity);
my $verbosity = $consumer->verbosity;
$consumer->verbosity($verbosity);
Get or set an incremental verbosity. The higher the integer specified, the
more verbose the sync.
Expand All @@ -332,6 +377,10 @@ signal handler installed by C<go> in C<--daemonize> mode, which then causes
the service to gracefully shut down after its next C<interval> consuming
events.
=head3 C<log_fh>
Logging file handle.
=head1 Author
David E. Wheeler <[email protected]>
Expand Down
Loading

0 comments on commit 2d55aed

Please sign in to comment.