Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PICARD-2743: Add support for custom post tagging actions #374

Merged
merged 6 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
373 changes: 373 additions & 0 deletions plugins/post_tagging_actions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,373 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 Giorgio Fontanive (twodoorcoupe)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

PLUGIN_NAME = "Post Tagging Actions"
PLUGIN_AUTHOR = "Giorgio Fontanive"
PLUGIN_DESCRIPTION = """
This plugin lets you set up actions that run with a context menu click.
An action consists in a command line executed for each album or each track along
with a few options to tweak the behaviour.
This can be used to run external programs and pass some variables to it.
"""
PLUGIN_VERSION = "0.1"
PLUGIN_API_VERSIONS = ["2.10", "2.11"]
PLUGIN_LICENSE = "GPL-2.0"
PLUGIN_LICENSE_URL = "https://www.gnu.org/licenses/gpl-2.0.html"
PLUGIN_USER_GUIDE_URL = "https://github.com/twodoorcoupe/picard-plugins/tree/user_guides/user_guides/post_tagging_actions/guide.md"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I note that the documentation is NOT included in this PR - and IMO it is essential that it should be included in order to ensure that the availability and versioning of the documentation matches the plugin.

Also you are creating a new top-level sub-directory and have chosen user_guides. I would personally prefer to see this being docs as this seems to me to be a bit of a defacto standard - however others may think differently.

When you start to include this in the PR I think this URL will need to assume that this PR has been merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I also saw other plugins like Additional Artist Details and Format Performer Tags have their docs included in the plugin's folder, so I did the same rather than creating a new top-level sub-directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this documentation page, I am not sure that the COV integration tool is a good example for this plugin even though it is a valid example - because any Cover Art integration would ideally be done by creating a Cover Art plugin.


from picard.album import Album
from picard.track import Track
from picard.ui.options import OptionsPage, register_options_page
from picard.ui.itemviews import BaseAction, register_album_action, register_track_action
from picard import log, config
from picard.const import sys
from picard.util import thread
from picard.script import parser

from .options_post_tagging_actions import Ui_PostTaggingActions
from PyQt5 import QtWidgets
from PyQt5.QtCore import QObject

from collections import namedtuple
from queue import PriorityQueue
from threading import Thread
from concurrent import futures
from os import path
import re
import shlex
import subprocess

Check warning on line 49 in plugins/post_tagging_actions/__init__.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

plugins/post_tagging_actions/__init__.py#L49

Consider possible security implications associated with the subprocess module.

# Additional special variables.
TRACK_SPECIAL_VARIABLES = {
phw marked this conversation as resolved.
Show resolved Hide resolved
"filepath": lambda file: file,
"folderpath": lambda file: path.dirname(file),

Check warning on line 54 in plugins/post_tagging_actions/__init__.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

plugins/post_tagging_actions/__init__.py#L54

Lambda may not be necessary
"filename": lambda file: path.splitext(path.basename(file))[0],
"filename_ext": lambda file: path.basename(file),

Check warning on line 56 in plugins/post_tagging_actions/__init__.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

plugins/post_tagging_actions/__init__.py#L56

Lambda may not be necessary
"directory": lambda file: path.basename(path.dirname(file))
}
ALBUM_SPECIAL_VARIABLES = {
"get_num_matched_tracks",
"get_num_unmatched_files",
"get_num_total_files",
"get_num_unsaved_files",
"is_complete",
"is_modified"
}

# Settings.
CANCEL = "pta_cancel"
OPTIONS = ["pta_command", "pta_wait_for_exit", "pta_execute_for_tracks", "pta_refresh_tags"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be better as an immutable set (i.e. curly brackets) like the ALBUM_SPECIAL_VARIABLES above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of the options in this list is the same as the order of the columns of the table in the options page. I use the order when saving or loading the options. This way I can just iterate over the columns without having to explicitly save or load the values in each column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If set doesn't preserve order, this is a sound reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it could be a tuple :)


Options = namedtuple("Options", ("variables", *[option[4:] for option in OPTIONS]))
Action = namedtuple("Action", ("commands", "album", "options"))
PriorityAction = namedtuple("PriorityAction", ("priority", "counter", "action"))
action_queue = PriorityQueue()


class ActionLoader:
"""Adds actions to the execution queue.

Attributes:
action_options (list): Stores the actions' information loaded from the options page.
action_counter (int): The count of actions that have been added to the queue, used for priority.
"""

def __init__(self):
self.action_options = []
self.action_counter = 0
self.load_actions()

def _create_options(self, command, *other_options):
"""Finds the variables in the command and adds the options to the action options list.
"""
variables = [variable[1:-1] for variable in re.findall(r'%.*?%', command)]
variables = [parser.normalize_tagname(variable) for variable in variables]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variables = [parser.normalize_tagname(variable[1:-1]) for variable in re.findall(r'%.*?%', command)]?

command = re.sub(r'%.*?%', '{}', command)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to create a pre-compiled regex as a pre-initialised object?

options = Options(variables, command, *other_options)
self.action_options.append(options)

def _create_action(self, priority, commands, album, options):
"""Adds an action with the given parameters to the execution queue.

If the os is not windows, the command is split as suggested by the subprocess
module documentation.
"""
if not sys.IS_WIN:
commands = [shlex.split(command) for command in commands]
action = Action(commands, album, options)
priority_action = PriorityAction(priority, self.action_counter, action)
action_queue.put(priority_action)
self.action_counter += 1

def _replace_variables(self, variables, item):
"""Returns a list where each variable is replaced with its value.

Item is either an album or a track. For track special variables,
it uses the path of the first file of the given item.
If the variable is not found anywhere, it remains as in the original text.
"""
values = []
album = item.album if isinstance(item, Track) else item
first_file_path = next(item.iterfiles()).filename
for variable in variables:
if variable in ALBUM_SPECIAL_VARIABLES:
values.append(getattr(album, variable)())
elif variable in TRACK_SPECIAL_VARIABLES:
values.append(TRACK_SPECIAL_VARIABLES[variable](first_file_path))
else:
values.append(item.metadata.get(variable, f"%{variable}%"))
return values

def add_actions(self, album, tracks):
"""Adds one action to the execution queue for each tuple in the action options list.

Actions meant to be executed once for each track are considered as a single
action. This way, the other options are more consistent.
"""
for priority, options in enumerate(self.action_options):
if options.execute_for_tracks:
values_list = [self._replace_variables(options.variables, track) for track in tracks]
else:
values_list = [self._replace_variables(options.variables, album)]
commands = [options.command.format(*values) for values in values_list]
self._create_action(priority, commands, album, options)

def load_actions(self):
"""Loads the information from the options and saves it in the action options list.

This gets called when the plugin is loaded or when the user saves the options.
"""
self.action_options = []
loaded_options = zip(*[config.setting[name] for name in OPTIONS])
for options in loaded_options:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not iterate on for name in OPTIONS rather than create a new object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I used a confusing name for options and loaded_options here. I iterate over the options for each "action" rather than each option. Each options is a tuple made of all the options for that action (command, wait_for_exit, etc...), so I need to use the zip function to do this. I changed the name of both options and loaded_options to be hopefully less confusing.

command = options[0]
other_options = [eval(option) for option in options[1:]]

Check warning on line 155 in plugins/post_tagging_actions/__init__.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

plugins/post_tagging_actions/__init__.py#L155

Use of possibly insecure function - consider using safer ast.literal_eval.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The options here seem to be booleans, so avoiding use of eval and the consequent Codacy waarning shouldn't be difficult.

But if use of eval cannot be avoided, if you are able to annotate this to avoid this Codacy warning that would be helpful.

self._create_options(command, *other_options)


class ActionRunner:
"""Runs actions in the execution queue.

Attributes:
action_thread_pool (ThreadPoolExecutor): Pool used to run processes with the subprocess module.
refresh_tags_pool (ThreadPoolExecutor): Pool used to reload tags from files and refresh albums.
worker (Thread): Worker thread that picks actions from the execution queue.
"""

def __init__(self):
self.action_thread_pool = futures.ThreadPoolExecutor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the Python futures documentation:

If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5

So this is going to be the number of subprocesses that are run - and depending on the commands the user is choosing to run and the specifications of their PC this may be fine or it may cause major issues.

I think there needs to be a UI option with a sensible default to set the max-workers for this thread-pool.

self.refresh_tags_pool = futures.ThreadPoolExecutor(1)
self.worker = Thread(target = self._execute)
self.worker.start()

def _refresh_tags(self, future_objects, album):
"""Reloads tags from the album's files and refreshes the album.

First, it makes sure that the action has finished running. This is used for
when an external process changes a file's tags.
"""
futures.wait(future_objects, return_when = futures.ALL_COMPLETED)
for file in album.iterfiles():
file.set_pending()
file.load(lambda file: None)
thread.to_main(album.load, priority = True, refresh = True)

def _run_process(self, command):
"""Runs the process and waits for it to finish.
"""
process = subprocess.Popen(command, text = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)

Check warning on line 189 in plugins/post_tagging_actions/__init__.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

plugins/post_tagging_actions/__init__.py#L189

subprocess call - check for execution of untrusted input.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The command here is user provided, and is explicitly visible in the UI, so I see no issue of untrusted input here.

If you are able to annotate this to avoid this Codacy warning that would be helpful.

answer = process.communicate()
if answer[0]:
log.info("Action output:\n%s", answer[0])
if answer[1]:
log.error("Action error:\n%s", answer[1])

def _execute(self):
"""Takes actions from the execution queue and runs them.

If it finds an action with priority -1, the loop stops. When the loop
stops, both ThreadPoolExecutors are shutdown.
"""
while True:
priority_action = action_queue.get()
if priority_action.priority == -1:
break
next_action = priority_action.action
commands = next_action.commands
future_objects = {self.action_thread_pool.submit(self._run_process, command) for command in commands}

if next_action.options.wait_for_exit:
futures.wait(future_objects, return_when = futures.ALL_COMPLETED)
if next_action.options.refresh_tags:
self.refresh_tags_pool.submit(self._refresh_tags, future_objects, next_action.album)
action_queue.task_done()

self.action_thread_pool.shutdown(wait = False, cancel_futures = True)
self.refresh_tags_pool.shutdown(wait = False, cancel_futures = True)

def stop(self):
"""Makes the worker thread exit its loop.

This gets called when Picard is closed. It waits for the processes that
are still executing to finish before exiting.
"""
if not config.setting[CANCEL]:
action_queue.join()
action_queue.put(PriorityAction(-1, -1, None))
self.worker.join()


class ExecuteAlbumActions(BaseAction):

NAME = "Run actions for highlighted albums"

def callback(self, objs):
albums = {obj for obj in objs if isinstance(obj, Album)}
for album in albums:
action_loader.add_actions(album, album.tracks)


class ExecuteTrackActions(BaseAction):

NAME = "Run actions for highlighted tracks"

def callback(self, objs):
tracks = {obj for obj in objs if isinstance(obj, Track)}
albums = {track.album for track in tracks}
for album in albums:
album_tracks = tracks.intersection(album.tracks)
action_loader.add_actions(album, album_tracks)


class PostTaggingActionsOptions(OptionsPage):
"""Options page found under the "plugins" page.
"""

NAME = "post_tagging_actions"
TITLE = "Post Tagging Actions"
PARENT = "plugins"

action_options = [config.ListOption("setting", name, []) for name in OPTIONS]
options = [config.BoolOption("setting", CANCEL, True), *action_options]

def __init__(self, parent = None):
super(PostTaggingActionsOptions, self).__init__(parent)
self.ui = Ui_PostTaggingActions()
self.ui.setupUi(self)
self._reset_ui()

header = self.ui.table.horizontalHeader()
header.setSectionResizeMode(0, QtWidgets.QHeaderView.ResizeMode.Stretch)
for column in range(1, header.count()):
header.setSectionResizeMode(column, QtWidgets.QHeaderView.ResizeMode.ResizeToContents)

self.ui.add_file_path.clicked.connect(self._open_file_dialog)
self.ui.add_action.clicked.connect(self._add_action_to_table)
self.ui.remove_action.clicked.connect(self._remove_action_from_table)
self.ui.up.clicked.connect(self._move_action_up)
self.ui.down.clicked.connect(self._move_action_down)

self.get_table_columns_values = [
self.ui.action.text,
self.ui.wait.isChecked,
self.ui.tracks.isChecked,
self.ui.refresh.isChecked
]

def _open_file_dialog(self):
"""Adds the selected file's path to the command line text box.
"""
file = QtWidgets.QFileDialog.getOpenFileName(self)[0]
cursor_position = self.ui.action.cursorPosition()
current_text = self.ui.action.text()
if not sys.IS_WIN:
file = shlex.quote(file)
new_text = current_text[:cursor_position] + file + current_text[cursor_position:]
self.ui.action.setText(new_text)

def _reset_ui(self):
self.ui.action.setText("")
self.ui.wait.setChecked(False)
self.ui.refresh.setChecked(False)
self.ui.albums.setChecked(True)

def _add_action_to_table(self):
if not self.ui.action.text():
return
row_position = self.ui.table.rowCount()
self.ui.table.insertRow(row_position)
for column in range(self.ui.table.columnCount()):
value = self.get_table_columns_values[column]()
value = str(value)
widget = QtWidgets.QTableWidgetItem(value)
self.ui.table.setItem(row_position, column, widget)
self._reset_ui()

def _remove_action_from_table(self):
current_row = self.ui.table.currentRow()
if current_row != -1:
self.ui.table.removeRow(current_row)

def _move_action_up(self):
current_row = self.ui.table.currentRow()
new_row = current_row - 1
if current_row > 0:
self._swap_table_rows(current_row, new_row)
self.ui.table.setCurrentCell(new_row, 0)

def _move_action_down(self):
current_row = self.ui.table.currentRow()
new_row = current_row + 1
if current_row < self.ui.table.rowCount() - 1:
self._swap_table_rows(current_row, new_row)
self.ui.table.setCurrentCell(new_row, 0)

def _swap_table_rows(self, row1, row2):
for column in range(self.ui.table.columnCount()):
item1 = self.ui.table.takeItem(row1, column)
item2 = self.ui.table.takeItem(row2, column)
self.ui.table.setItem(row1, column, item2)
self.ui.table.setItem(row2, column, item1)

def load(self):
"""Puts the plugin's settings into the actions table.
"""
settings = zip(*[config.setting[name] for name in OPTIONS])
for row, values in enumerate(settings):
self.ui.table.insertRow(row)
for column in range(self.ui.table.columnCount()):
widget = QtWidgets.QTableWidgetItem(values[column])
self.ui.table.setItem(row, column, widget)
self.ui.cancel.setChecked(config.setting[CANCEL])

def save(self):
"""Saves the actions table items in the settings.
"""
settings = []
for column in range(self.ui.table.columnCount()):
settings.append([])
for row in range(self.ui.table.rowCount()):
setting = self.ui.table.item(row, column).text()
settings[column].append(setting)
config.setting[OPTIONS[column]] = settings[column]
config.setting[CANCEL] = self.ui.cancel.isChecked()
action_loader.load_actions()


action_loader = ActionLoader()
action_runner = ActionRunner()
register_album_action(ExecuteAlbumActions())
register_track_action(ExecuteTrackActions())
register_options_page(PostTaggingActionsOptions)
QObject.tagger.register_cleanup(action_runner.stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what this register_cleanup line is for or why it is an initialisation statement.

Can you please annotate this with a comment explaining it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved into the initialisation of the ActionRunner object?

Loading
Loading