Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temp fix for domain randomization #217

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 106 additions & 56 deletions bundle/markov/domain_randomizations/visual/light_randomizer.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,126 @@
import logging
import numpy as np
from enum import Enum

from markov.rospy_wrappers import ServiceProxyWrapper
from markov.domain_randomizations.abs_randomizer import AbstractRandomizer
from markov.log_handler.logger import Logger
from markov.domain_randomizations.constants import (GazeboServiceName,
RangeType,
RANGE_MIN, RANGE_MAX,
Color, Attenuation)
from markov.domain_randomizations.constants import RangeType, RANGE_MIN, RANGE_MAX, Color

import rospy
from std_msgs.msg import ColorRGBA
from gazebo_msgs.srv import SetLightProperties, SetLightPropertiesRequest

from gazebo_msgs.srv import SpawnModel, DeleteLight
from geometry_msgs.msg import Pose, Point, Quaternion

logger = Logger(__name__, logging.INFO).get_logger()


class LightRandomizer(AbstractRandomizer):
"""Light Randomizer class"""
def __init__(self, light_name, color_range=None, attenuation_range=None):
"""
Constructor

Args:
light_name (str): name of the light
color_range (dict): min-max of each color component (r, g, b).
Valid format: {'r': {'min': 0.0, 'max': 1.0},
'g': {'min': 0.0, 'max': 1.0},
'b': {'min': 0.0, 'max': 1.0}}
attenuation_range (dict): min-max of each attenuation component (constant, linear, quadratic).
Valid format: {'constant': {'min': 0.0, 'max':1.0},
'linear': {'min': 0.0, 'max':1.0},
'quadratic': {'min': 0.0, 'max':1.0}}
"""

def __init__(self, light_name, color_range=None):

super(LightRandomizer, self).__init__()
self.light_name = light_name

self.range = {RangeType.COLOR: {Color.R.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0},
Color.G.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0},
Color.B.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0}},
RangeType.ATTENUATION: {Attenuation.CONSTANT.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0},
Attenuation.LINEAR.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0},
Attenuation.QUADRATIC.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0}}}
# Set default ranges
self.range = {
RangeType.COLOR: {
Color.R.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0},
Color.G.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0},
Color.B.value: {RANGE_MIN: 0.0, RANGE_MAX: 1.0},
}
}

if color_range:
self.range[RangeType.COLOR].update(color_range)
if attenuation_range:
self.range[RangeType.ATTENUATION].update(attenuation_range)

# ROS Services
rospy.wait_for_service(GazeboServiceName.SET_LIGHT_PROPERTIES.value)
self.set_light_prop = ServiceProxyWrapper(GazeboServiceName.SET_LIGHT_PROPERTIES.value, SetLightProperties)
# ROS Service proxies
rospy.wait_for_service("/gazebo/spawn_sdf_model")
self.spawn_model = rospy.ServiceProxy("/gazebo/spawn_sdf_model", SpawnModel)

rospy.wait_for_service("/gazebo/delete_light")
self.delete_light = rospy.ServiceProxy("/gazebo/delete_light", DeleteLight)

def _delete_light(self):
try:
self.delete_light(self.light_name)
logger.info(f"Deleted light: {self.light_name}")
except rospy.ServiceException as ex:
logger.warning(f"Failed to delete light {self.light_name}: {str(ex)}")

def _generate_light_sdf(self):

# Generate the standalone light SDF
if self.light_name == "Light 1":

# Randomize color
color_range = self.range[RangeType.COLOR]
r = np.random.uniform(color_range[Color.R.value][RANGE_MIN], color_range[Color.R.value][RANGE_MAX])
g = np.random.uniform(color_range[Color.G.value][RANGE_MIN], color_range[Color.G.value][RANGE_MAX])
b = np.random.uniform(color_range[Color.B.value][RANGE_MIN], color_range[Color.B.value][RANGE_MAX])

# Randomize position
x = np.random.uniform(-5.0, 5.0)
y = np.random.uniform(-5.0, 5.0)
z = np.random.uniform(2.0, 5.0)

return f"""<sdf version="1.6">
<light name="{self.light_name}" type="point">
<pose>{x} {y} {z} 0 0 0</pose>
<diffuse>{r} {g} {b} 1</diffuse>
<specular>.1 .1 .1 1</specular>
<attenuation>
<constant>0.4</constant>
<linear>0.01</linear>
<quadratic>0.00</quadratic>
<range>50</range>
</attenuation>
<direction>0 0 -1</direction>
<cast_shadows>false</cast_shadows>
</light>
</sdf>"""
elif self.light_name == "sun":

# Randomize attenuation values
constant = np.random.uniform(0.0, 1.0)
linear = np.random.uniform(0.0, 1.0)
quadratic = np.random.uniform(0.0, 1.0)

# Slightly randomize brightness
brightness = np.random.uniform(0.5, 1)
r, g, b = brightness, brightness, brightness
return f"""<sdf version="1.6">
<light name="{self.light_name}" type="directional">
<pose>0.0 0.0 15.0 0 0 0</pose>
<diffuse>{r} {g} {b} 1</diffuse>
<cast_shadows>false</cast_shadows>
<attenuation>
<constant>{constant}</constant>
<linear>{linear}</linear>
<quadratic>{quadratic}</quadratic>
<range>100</range>
</attenuation>
</light>
</sdf>"""

def _spawn_light(self, sdf_content):
initial_pose = Pose(position=Point(0, 0, 0), orientation=Quaternion(0, 0, 0, 1)) # Placeholder pose
try:
res = self.spawn_model(self.light_name, sdf_content, "", initial_pose, "world")
if not res.success:
logger.error(f"Failed to spawn light {self.light_name}: {res.status_message}")
else:
logger.info(f"Spawned new light: {self.light_name}")
except rospy.ServiceException as ex:
logger.error(f"Service call to spawn light failed: {str(ex)}")

def _randomize(self):
req = SetLightPropertiesRequest()
req.light_name = self.light_name

color_range = self.range[RangeType.COLOR]
req.diffuse = ColorRGBA(*[np.random.uniform(color_range[Color.R.value][RANGE_MIN],
color_range[Color.R.value][RANGE_MAX]),
np.random.uniform(color_range[Color.G.value][RANGE_MIN],
color_range[Color.G.value][RANGE_MAX]),
np.random.uniform(color_range[Color.B.value][RANGE_MIN],
color_range[Color.B.value][RANGE_MAX]),
1.0])

attenuation_range = self.range[RangeType.ATTENUATION]
req.attenuation_constant = np.random.uniform(attenuation_range[Attenuation.CONSTANT.value][RANGE_MIN],
attenuation_range[Attenuation.CONSTANT.value][RANGE_MAX])
req.attenuation_linear = np.random.uniform(attenuation_range[Attenuation.LINEAR.value][RANGE_MIN],
attenuation_range[Attenuation.LINEAR.value][RANGE_MAX])
req.attenuation_quadratic = np.random.uniform(attenuation_range[Attenuation.QUADRATIC.value][RANGE_MIN],
attenuation_range[Attenuation.QUADRATIC.value][RANGE_MAX])
res = self.set_light_prop(req)
if self.light_name not in ["sun", "Light 1"]:
logger.info(f"Skipping randomization for light: {self.light_name}")
return # Skip randomization for lights other than "sun" and "Light 1"

# Delete the existing light
self._delete_light()

# Generate the new light SDF
sdf_content = self._generate_light_sdf()

# Spawn the new light
self._spawn_light(sdf_content)
2 changes: 1 addition & 1 deletion docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pytest-cov==4.1.0
redis>=4.4.4
retrying==1.3.4
rl-coach-slim==1.0.0
sagemaker
sagemaker==2.232.2
sagemaker-training
scikit-image==0.21.0
scipy
Expand Down