Skip to content

Commit

Permalink
AMBARI-18629. HDFS goes down after installing cluster (aonishuk)
Browse files Browse the repository at this point in the history
  • Loading branch information
aonishuk committed Oct 18, 2016
1 parent ee2a125 commit e68cc10
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 16 deletions.
26 changes: 10 additions & 16 deletions ambari-agent/src/main/python/ambari_agent/ActionQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from ambari_commons.str_utils import split_on_chunks
from ambari_commons.thread_utils import terminate_thread


logger = logging.getLogger()
Expand Down Expand Up @@ -83,7 +84,6 @@ def __init__(self, config, controller):
self.controller = controller
self.configTags = {}
self._stop = threading.Event()
self.hangingStatusCommands = {}
self.tmpdir = config.get('agent', 'prefix')
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
self.parallel_execution = config.get_parallel_exec_option()
Expand Down Expand Up @@ -230,22 +230,16 @@ def process_command(self, command):
elif commandType == self.STATUS_COMMAND:
component_name = command['componentName']

if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive():
del self.hangingStatusCommands[component_name]
thread = threading.Thread(target = self.execute_status_command, args = (command,))
thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
thread.start()
thread.join(timeout=self.status_command_timeout)

if not component_name in self.hangingStatusCommands:
thread = threading.Thread(target = self.execute_status_command, args = (command,))
thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
thread.start()
thread.join(timeout=self.status_command_timeout)

if thread.isAlive():
# Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
PythonReflectiveExecutor.last_context.revert()
logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout))
self.hangingStatusCommands[component_name] = thread
else:
logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name))
if thread.isAlive():
terminate_thread(thread)
# Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
PythonReflectiveExecutor.last_context.revert()
logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout))
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
Expand Down
43 changes: 43 additions & 0 deletions ambari-common/src/main/python/ambari_commons/thread_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python

'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

def terminate_thread(thread):
"""Terminates a python thread abruptly from another thread.
This is consider a bad pattern to do this.
If possible, please consider handling stopping of the thread from inside of it
or creating thread as a separate process (multiprocessing module).
:param thread: a threading.Thread instance
"""
import ctypes
if not thread.isAlive():
return

exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), exc)
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed")

0 comments on commit e68cc10

Please sign in to comment.