Skip to content

Commit

Permalink
added shell to remove dagster dep
Browse files Browse the repository at this point in the history
  • Loading branch information
cody-scott committed Sep 18, 2024
1 parent 4e8625a commit 1fbf8be
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "dagster-mssql-bcp"
version = "0.0.2"
dependencies = [
"dagster",
"dagster-shell",

"pendulum",
"pyodbc",
"polars",
Expand Down
2 changes: 1 addition & 1 deletion src/dagster_mssql_bcp/bcp_core/bcp_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dagster import (
get_dagster_logger,
)
from dagster_shell import execute_shell_command
from .shell import execute as execute_shell_command
from sqlalchemy import Connection, text

from .asset_schema import AssetSchema
Expand Down
183 changes: 183 additions & 0 deletions src/dagster_mssql_bcp/bcp_core/shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import os
import signal
from logging import Logger
from subprocess import PIPE, STDOUT, Popen
from typing import Mapping, Optional, Tuple

import dagster._check as check
from dagster._utils import safe_tempfile_path
from typing_extensions import Final

import sys
OUTPUT_LOGGING_OPTIONS: Final = ["STREAM", "BUFFER", "NONE"]
is_win = sys.platform == "win32"

def execute_script_file(
shell_script_path: str,
output_logging: str,
log: Logger,
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
log_shell_command: bool = True,
) -> Tuple[str, int]:
"""Execute a shell script file specified by the argument ``shell_script_path``. The script will be
invoked via ``subprocess.Popen(['bash', shell_script_path], ...)``.
In the Popen invocation, ``stdout=PIPE, stderr=STDOUT`` is used, and the combined stdout/stderr
output is retrieved.
Examples:
.. literalinclude:: ../../../../../../python_modules/libraries/dagster-shell/dagster_shell_tests/example_shell_script_utility.py
:language: python
Args:
shell_script_path (str): The shell script to execute.
output_logging (str): The logging mode to use. Supports STREAM, BUFFER, and NONE.
log (Union[logging.Logger, DagsterLogManager]): Any logger which responds to .info()
cwd (str, optional): Working directory for the shell command to use. Defaults to the
temporary path where we store the shell command in a script file.
env (Dict[str, str], optional): Environment dictionary to pass to ``subprocess.Popen``.
Unused by default.
log_shell_command (bool, optional): Whether to log the shell command before executing it.
Raises:
Exception: When an invalid output_logging is selected. Unreachable from op-based
invocation since the config system will check output_logging against the config
enum.
Returns:
Tuple[str, int]: A tuple where the first element is the combined stdout/stderr output of running the shell
command and the second element is the return code.
"""
check.str_param(shell_script_path, "shell_script_path")
check.str_param(output_logging, "output_logging")
check.opt_str_param(cwd, "cwd", default=os.path.dirname(shell_script_path))
env = check.opt_nullable_dict_param(env, "env", key_type=str, value_type=str)

if output_logging not in OUTPUT_LOGGING_OPTIONS:
raise Exception("Unrecognized output_logging %s" % output_logging)

def pre_exec():
# Restore default signal disposition and invoke setsid
for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
if hasattr(signal, sig):
signal.signal(getattr(signal, sig), signal.SIG_DFL)
os.setsid()

with open(shell_script_path, "rb") as f:
shell_command = f.read().decode("utf-8")

if log_shell_command:
log.info(f"Running command:\n{shell_command}")

sub_process = None
try:
stdout_pipe = PIPE
stderr_pipe = STDOUT
if output_logging == "NONE":
stdout_pipe = stderr_pipe = None

is_win = sys.platform == "win32"
if is_win:
sub_process = Popen(
shell_command,
stdout=stdout_pipe,
stderr=stderr_pipe,
cwd=cwd,
env=env,
preexec_fn=None, # noqa: PLW1509
encoding="UTF-8",
shell=False
)
else:
sub_process = Popen(
["bash", shell_script_path],
stdout=stdout_pipe,
stderr=stderr_pipe,
cwd=cwd,
env=env,
preexec_fn=pre_exec, # noqa: PLW1509
encoding="UTF-8",
)

log.info(f"Command pid: {sub_process.pid}")

output = ""
if output_logging == "STREAM":
assert sub_process.stdout is not None, "Setting stdout=PIPE should always set stdout."
# Stream back logs as they are emitted
lines = []
for line in sub_process.stdout:
log.info(line.rstrip())
lines.append(line)
output = "".join(lines)
elif output_logging == "BUFFER":
# Collect and buffer all logs, then emit
output, _ = sub_process.communicate()
log.info(output)

sub_process.wait()
log.info(f"Command exited with return code {sub_process.returncode}")

return output, sub_process.returncode
finally:
# Always terminate subprocess, including in cases where the run is terminated
if sub_process:
sub_process.terminate()


def execute(
shell_command: str,
output_logging: str,
log: Logger,
cwd: Optional[str] = None,
env: Optional[Mapping[str, str]] = None,
log_shell_command: bool = True,
) -> Tuple[str, int]:
"""This function is a utility for executing shell commands from within a Dagster op (or from Python in general).
It can be used to execute shell commands on either op input data, or any data generated within a generic python op.
Internally, it executes a shell script specified by the argument ``shell_command``. The script will be written
to a temporary file first and invoked via ``subprocess.Popen(['bash', shell_script_path], ...)``.
In the Popen invocation, ``stdout=PIPE, stderr=STDOUT`` is used, and the combined stdout/stderr
output is retrieved.
Examples:
.. literalinclude:: ../../../../../../python_modules/libraries/dagster-shell/dagster_shell_tests/example_shell_command_utility.py
:language: python
Args:
shell_command (str): The shell command to execute
output_logging (str): The logging mode to use. Supports STREAM, BUFFER, and NONE.
log (Union[logging.Logger, DagsterLogManager]): Any logger which responds to .info()
cwd (str, optional): Working directory for the shell command to use. Defaults to the
temporary path where we store the shell command in a script file.
env (Dict[str, str], optional): Environment dictionary to pass to ``subprocess.Popen``.
Unused by default.
log_shell_command (bool, optional): Whether to log the shell command before executing it.
Returns:
Tuple[str, int]: A tuple where the first element is the combined stdout/stderr output of running the shell
command and the second element is the return code.
"""
check.str_param(shell_command, "shell_command")
# other args checked in execute_file

with safe_tempfile_path() as tmp_file_path:
tmp_path = os.path.dirname(tmp_file_path)
log.info("Using temporary directory: %s" % tmp_path)

with open(tmp_file_path, "wb") as tmp_file:
tmp_file.write(shell_command.encode("utf-8"))
tmp_file.flush()
script_location = os.path.abspath(tmp_file.name)
log.info(f"Temporary script location: {script_location}")
return execute_script_file(
shell_script_path=tmp_file.name,
output_logging=output_logging,
log=log,
cwd=(cwd or tmp_path),
env=env,
log_shell_command=log_shell_command,
)

0 comments on commit 1fbf8be

Please sign in to comment.