Source code for citylearn.reward_function

from typing import Any, List, Mapping, Tuple, Union
import numpy as np
from citylearn.building import Building
from citylearn.data import ZERO_DIVISION_PLACEHOLDER
import logging

LOGGER = logging.getLogger()
[docs] class RewardFunction: r"""Base and default reward function class. The default reward is the electricity consumption from the grid at the current time step returned as a negative value. Parameters ---------- env_metadata: Mapping[str, Any]: General static information about the environment. **kwargs : dict Other keyword arguments for custom reward calculation. """ def __init__(self, env_metadata: Mapping[str, Any], exponent: float = None, **kwargs): penalty_coefficient = kwargs.pop('charging_constraint_penalty_coefficient', None) self.env_metadata = env_metadata self.exponent = exponent self.charging_constraint_penalty_coefficient = penalty_coefficient @property def env_metadata(self) -> Mapping[str, Any]: return self._env_metadata @env_metadata.setter def env_metadata(self, env_metadata: Mapping[str, Any]): self._env_metadata = env_metadata @property def central_agent(self) -> bool: """Expect 1 central agent to control all buildings.""" return self.env_metadata['central_agent'] @property def exponent(self) -> float: return self.__exponent @exponent.setter def exponent(self, exponent: float): self.__exponent = 1.0 if exponent is None else exponent @property def charging_constraint_penalty_coefficient(self) -> float: return getattr(self, '_charging_constraint_penalty_coefficient', 1.0) @charging_constraint_penalty_coefficient.setter def charging_constraint_penalty_coefficient(self, coefficient: float): if coefficient is None: self._charging_constraint_penalty_coefficient = 1.0 else: self._charging_constraint_penalty_coefficient = float(coefficient)
[docs] def reset(self): """Use to reset variables at the start of an episode.""" pass
[docs] def calculate(self, observations: List[Mapping[str, Union[int, float]]]) -> List[float]: r"""Calculates reward. Parameters ---------- observations: List[Mapping[str, Union[int, float]]] List of all building observations at current :py:attr:`citylearn.citylearn.CityLearnEnv. time_step` that are got from calling :py:meth:`citylearn.building.Building.observations`. Returns ------- reward: List[float] Reward for transition to current timestep. """ net_electricity_consumption = [o['net_electricity_consumption'] for o in observations] reward_list = [-(max(o, 0)**self.exponent) for o in net_electricity_consumption] if self.central_agent: reward = [sum(reward_list)] else: reward = reward_list return reward
[docs] class MultiBuildingRewardFunction(RewardFunction): def __init__(self, env, reward_functions: dict[str, RewardFunction]): self.env = env self.reward_functions = reward_functions super().__init__(env)
[docs] def calculate(self, observations: list[dict]) -> list[float]: rewards = [] for obs, (building_name, rf) in zip(observations, self.reward_functions.items()): if rf is None: raise ValueError(f"No reward function for building '{building_name}'") rewards.append(rf.calculate([obs])) return rewards
[docs] def reset(self): for rf in self.reward_functions.values(): rf.reset()
@property def env_metadata(self): return self._env_metadata @env_metadata.setter def env_metadata(self, env_metadata: Mapping[str, Any]): self._env_metadata = env_metadata for rf in self.reward_functions.values(): rf.env_metadata = env_metadata
[docs] class MARL(RewardFunction): """MARL reward function class. Parameters ---------- env_metadata: Mapping[str, Any]: General static information about the environment. """ def __init__(self, env_metadata: Mapping[str, Any]): super().__init__(env_metadata)
[docs] def calculate(self, observations: List[Mapping[str, Union[int, float]]]) -> List[float]: net_electricity_consumption = [o['net_electricity_consumption'] for o in observations] district_electricity_consumption = sum(net_electricity_consumption) building_electricity_consumption = np.array(net_electricity_consumption, dtype=float)*-1 reward_list = np.sign(building_electricity_consumption)*0.01*building_electricity_consumption**2*np.nanmax([0, district_electricity_consumption]) if self.central_agent: reward = [reward_list.sum()] else: reward = reward_list.tolist() return reward
[docs] class IndependentSACReward(RewardFunction): """Recommended for use with the `SAC` controllers. Returned reward assumes that the building-agents act independently of each other, without sharing information through the reward. Parameters ---------- env_metadata: Mapping[str, Any]: General static information about the environment. """ def __init__(self, env_metadata: Mapping[str, Any]): super().__init__(env_metadata)
[docs] def calculate(self, observations: List[Mapping[str, Union[int, float]]]) -> List[float]: net_electricity_consumption = [o['net_electricity_consumption'] for o in observations] reward_list = [min(v*-1**3, 0) for v in net_electricity_consumption] if self.central_agent: reward = [sum(reward_list)] else: reward = reward_list return reward
[docs] class SolarPenaltyReward(RewardFunction): """The reward is designed to minimize electricity consumption and maximize solar generation to charge energy storage systems. The reward is calculated for each building, i and summed to provide the agent with a reward that is representative of all the building or buildings (in centralized case)it controls. It encourages net-zero energy use by penalizing grid load satisfaction when there is energy in the energy storage systems as well as penalizing net export when the energy storage systems are not fully charged through the penalty term. There is neither penalty nor reward when the energy storage systems are fully charged during net export to the grid. Whereas, when the energy storage systems are charged to capacity and there is net import from the grid the penalty is maximized. Parameters ---------- env_metadata: Mapping[str, Any]: General static information about the environment. """ def __init__(self, env_metadata: Mapping[str, Any]): super().__init__(env_metadata)
[docs] def calculate(self, observations: List[Mapping[str, Union[int, float]]]) -> List[float]: reward_list = [] for o, m in zip(observations, self.env_metadata['buildings']): e = o['net_electricity_consumption'] cc = m['cooling_storage']['capacity'] hc = m['heating_storage']['capacity'] dc = m['dhw_storage']['capacity'] ec = m['electrical_storage']['capacity'] cs = o.get('cooling_storage_soc', 0.0) hs = o.get('heating_storage_soc', 0.0) ds = o.get('dhw_storage_soc', 0.0) es = o.get('electrical_storage_soc', 0.0) reward = 0.0 reward += -(1.0 + np.sign(e)*cs)*abs(e) if cc > ZERO_DIVISION_PLACEHOLDER else 0.0 reward += -(1.0 + np.sign(e)*hs)*abs(e) if hc > ZERO_DIVISION_PLACEHOLDER else 0.0 reward += -(1.0 + np.sign(e)*ds)*abs(e) if dc > ZERO_DIVISION_PLACEHOLDER else 0.0 reward += -(1.0 + np.sign(e)*es)*abs(e) if ec > ZERO_DIVISION_PLACEHOLDER else 0.0 reward_list.append(reward) if self.central_agent: reward = [sum(reward_list)] else: reward = reward_list return reward
[docs] class ComfortReward(RewardFunction): """Reward for occupant thermal comfort satisfaction. The reward is calculated as the negative difference between the setpoint and indoor dry-bulb temperature raised to some exponent if outside the comfort band. If within the comfort band, the reward is the negative difference when in cooling mode and temperature is below the setpoint or when in heating mode and temperature is above the setpoint. The reward is 0 if within the comfort band and above the setpoint in cooling mode or below the setpoint and in heating mode. Parameters ---------- env_metadata: Mapping[str, Any]: General static information about the environment. band: float, default: 2.0 Setpoint comfort band (+/-). If not provided, the comfort band time series defined in the building file, or the default time series value of 2.0 is used. lower_exponent: float, default = 2.0 Penalty exponent for when in cooling mode but temperature is above setpoint upper boundary or heating mode but temperature is below setpoint lower boundary. higher_exponent: float, default = 2.0 Penalty exponent for when in cooling mode but temperature is below setpoint lower boundary or heating mode but temperature is above setpoint upper boundary. """ def __init__(self, env_metadata: Mapping[str, Any], band: float = None, lower_exponent: float = None, higher_exponent: float = None): super().__init__(env_metadata) self.band = band self.lower_exponent = lower_exponent self.higher_exponent = higher_exponent @property def band(self) -> float: return self.__band @property def lower_exponent(self) -> float: return self.__lower_exponent @property def higher_exponent(self) -> float: return self.__higher_exponent @band.setter def band(self, band: float): self.__band = band @lower_exponent.setter def lower_exponent(self, lower_exponent: float): self.__lower_exponent = 2.0 if lower_exponent is None else lower_exponent @higher_exponent.setter def higher_exponent(self, higher_exponent: float): self.__higher_exponent = 2.0 if higher_exponent is None else higher_exponent
[docs] def calculate(self, observations: List[Mapping[str, Union[int, float]]]) -> List[float]: reward_list = [] for o in observations: heating_demand = o.get('heating_demand', 0.0) cooling_demand = o.get('cooling_demand', 0.0) heating = heating_demand > cooling_demand hvac_mode = o['hvac_mode'] indoor_dry_bulb_temperature = o['indoor_dry_bulb_temperature'] if hvac_mode in [1, 2]: set_point = o['indoor_dry_bulb_temperature_cooling_set_point'] if hvac_mode == 1 else o['indoor_dry_bulb_temperature_heating_set_point'] band = self.band if self.band is not None else o['comfort_band'] lower_bound_comfortable_indoor_dry_bulb_temperature = set_point - band upper_bound_comfortable_indoor_dry_bulb_temperature = set_point + band delta = abs(indoor_dry_bulb_temperature - set_point) if indoor_dry_bulb_temperature < lower_bound_comfortable_indoor_dry_bulb_temperature: exponent = self.lower_exponent if hvac_mode == 2 else self.higher_exponent reward = -(delta**exponent) elif lower_bound_comfortable_indoor_dry_bulb_temperature <= indoor_dry_bulb_temperature < set_point: reward = 0.0 if heating else -delta elif set_point <= indoor_dry_bulb_temperature <= upper_bound_comfortable_indoor_dry_bulb_temperature: reward = -delta if heating else 0.0 else: exponent = self.higher_exponent if heating else self.lower_exponent reward = -(delta**exponent) else: cooling_set_point = o['indoor_dry_bulb_temperature_cooling_set_point'] heating_set_point = o['indoor_dry_bulb_temperature_heating_set_point'] band = self.band if self.band is not None else o['comfort_band'] lower_bound_comfortable_indoor_dry_bulb_temperature = heating_set_point - band upper_bound_comfortable_indoor_dry_bulb_temperature = cooling_set_point + band cooling_delta = indoor_dry_bulb_temperature - cooling_set_point heating_delta = indoor_dry_bulb_temperature - heating_set_point if indoor_dry_bulb_temperature < lower_bound_comfortable_indoor_dry_bulb_temperature: exponent = self.higher_exponent if not heating else self.lower_exponent reward = -(abs(heating_delta)**exponent) elif lower_bound_comfortable_indoor_dry_bulb_temperature <= indoor_dry_bulb_temperature < heating_set_point: reward = -(abs(heating_delta)) elif heating_set_point <= indoor_dry_bulb_temperature <= cooling_set_point: reward = 0.0 elif cooling_set_point < indoor_dry_bulb_temperature < upper_bound_comfortable_indoor_dry_bulb_temperature: reward = -(abs(cooling_delta)) else: exponent = self.higher_exponent if heating else self.lower_exponent reward = -(abs(cooling_delta)**exponent) reward_list.append(reward) if self.central_agent: reward = [sum(reward_list)] else: reward = reward_list return reward
[docs] class SolarPenaltyAndComfortReward(RewardFunction): """Addition of :py:class:`citylearn.reward_function.SolarPenaltyReward` and :py:class:`citylearn.reward_function.ComfortReward`. Parameters ---------- env_metadata: Mapping[str, Any]: General static information about the environment. band: float, default = 2.0 Setpoint comfort band (+/-). If not provided, the comfort band time series defined in the building file, or the default time series value of 2.0 is used. lower_exponent: float, default = 2.0 Penalty exponent for when in cooling mode but temperature is above setpoint upper boundary or heating mode but temperature is below setpoint lower boundary. higher_exponent: float, default = 3.0 Penalty exponent for when in cooling mode but temperature is below setpoint lower boundary or heating mode but temperature is above setpoint upper boundary. coefficients: Tuple, default = (1.0, 1.0) Coefficents for `citylearn.reward_function.SolarPenaltyReward` and :py:class:`citylearn.reward_function.ComfortReward` values respectively. """ def __init__(self, env_metadata: Mapping[str, Any], band: float = None, lower_exponent: float = None, higher_exponent: float = None, coefficients: Tuple = None): self.__functions: List[RewardFunction] = [ SolarPenaltyReward(env_metadata), ComfortReward(env_metadata, band=band, lower_exponent=lower_exponent, higher_exponent=higher_exponent) ] super().__init__(env_metadata) self.coefficients = coefficients @property def coefficients(self) -> Tuple: return self.__coefficients @RewardFunction.env_metadata.setter def env_metadata(self, env_metadata: Mapping[str, Any]) -> Mapping[str, Any]: RewardFunction.env_metadata.fset(self, env_metadata) for f in self.__functions: f.env_metadata = self.env_metadata @coefficients.setter def coefficients(self, coefficients: Tuple): coefficients = [1.0]*len(self.__functions) if coefficients is None else coefficients assert len(coefficients) == len(self.__functions), f'{type(self).__name__} needs {len(self.__functions)} coefficients.' self.__coefficients = coefficients
[docs] def calculate(self, observations: List[Mapping[str, Union[int, float]]]) -> List[float]: reward = np.array([f.calculate(observations) for f in self.__functions], dtype='float32') reward = reward*np.reshape(self.coefficients, (len(self.coefficients), 1)) reward = reward.sum(axis=0).tolist() return reward
[docs] class Electric_Vehicles_Reward_Function(MARL): """ Reward function for electric vehicle charging behavior in V2G settings. Only affects EV-related behavior; other building logic comes from the superclass. """ def __init__(self, env_metadata: Mapping[str, Any], weights: Mapping[str, float] = None): super().__init__(env_metadata) # Default tunable weights for EV-related reward components self.weights = weights or { "no_car_charging": -5.0, "battery_limits": -2.0, "soc_impossible": -10.0, "soc_under": -5.0, "close_soc": 10.0, "self_ev_consumption": 5.0, "extra_self_production": 5.0, } self._last_base_reward_total = 0.0 self._last_penalty_total = 0.0 self._last_base_rewards_per_building: List[float] = [] self._last_penalties_per_building: List[float] = []
[docs] def calculate(self, observations: List[Mapping[str, Union[int, float, dict]]]) -> List[float]: current_reward = super().calculate(observations) reward_list = [] base_rewards = [] penalty_values = [] for i, o in enumerate(observations): ev_info = o.get("electric_vehicles_chargers_dict", {}) if not ev_info: reward = 0 else: if self.central_agent: reward_value = current_reward[0] if isinstance(current_reward, list) else current_reward reward = self.calculate_ev_penalty(o, reward_value) else: reward = self.calculate_ev_penalty(o, current_reward[i]) base_rewards.append(reward) violation = float(o.get("charging_constraint_violation_kwh", 0.0) or 0.0) penalty = violation * self.charging_constraint_penalty_coefficient if violation > 0.0 else 0.0 penalty_values.append(penalty) reward -= penalty reward_list.append(reward) self._last_base_rewards_per_building = base_rewards self._last_penalties_per_building = penalty_values total_reward = [sum(reward_list)] if self.central_agent else reward_list self._last_base_reward_total = sum(base_rewards) if self.central_agent else base_rewards self._last_penalty_total = sum(penalty_values) if self.central_agent else penalty_values LOGGER.debug(f"Calculated EV reward: {total_reward}") return total_reward
[docs] def calculate_ev_penalty(self, o: Mapping[str, Union[int, float, dict]], current_reward: float) -> float: penalty_total = 0.0 ev_chargers: dict = o.get("electric_vehicles_chargers_dict", {}) net_energy_before = o.get("net_electricity_consumption", 0) # Bounding the multiplier to avoid extreme scaling penalty_multiplier = 1.0 / (1.0 + abs(current_reward)) for charger_id, data in ev_chargers.items(): contributions = {k: 0.0 for k in self.weights.keys()} if not data["connected"]: if data["last_charged_kwh"] and abs(data["last_charged_kwh"]) > 0.1: contributions["no_car_charging"] += self.weights["no_car_charging"] * penalty_multiplier LOGGER.debug(f"Charger {charger_id} | EV not connected | Contributions: {contributions}") continue # Extract values soc_prev = data.get("previous_battery_soc") soc_now = data.get("battery_soc") capacity = data.get("battery_capacity") min_capacity = data.get("min_capacity") last_charged_kwh = data.get("last_charged_kwh", 0) if last_charged_kwh is None: last_charged_kwh = 0.0 required_soc = data.get("required_soc") hours_until_departure = data.get("hours_until_departure", 0) max_charging_power = data.get("max_charging_power", 0) max_discharging_power = data.get("max_discharging_power", 0) if soc_prev is None or soc_now is None or capacity is None: raise ValueError("Something went wrong, this values should not be none") continue # Battery limits current_energy = soc_prev * capacity + last_charged_kwh if current_energy > capacity or current_energy < min_capacity: contributions["battery_limits"] += self.weights["battery_limits"] * penalty_multiplier # SoC penalties/rewards if required_soc is not None: soc_diff = soc_now - required_soc soc_diff_kWh = soc_diff * capacity max_possible_charge = max_charging_power * hours_until_departure max_possible_discharge = max_discharging_power * hours_until_departure if soc_diff_kWh > max_possible_charge: contributions["soc_impossible"] += self.weights["soc_impossible"] * penalty_multiplier if hours_until_departure == 0: if -0.25 < soc_diff <= -0.10: contributions["soc_under"] += 2 * self.weights["soc_under"] * penalty_multiplier elif soc_diff <= -0.25: contributions["soc_under"] += (self.weights["soc_under"] ** 2) * penalty_multiplier elif -0.10 < soc_diff <= 0.10: contributions["close_soc"] += self.weights["close_soc"] * penalty_multiplier if abs(soc_diff_kWh) <= max(max_possible_charge, max_possible_discharge): reward_multiplier = 1.0 / (hours_until_departure + 0.1) contributions["close_soc"] += self.weights["close_soc"] * penalty_multiplier * reward_multiplier # Self-production reward if last_charged_kwh > 0 and net_energy_before < 0: contributions["extra_self_production"] += self.weights["extra_self_production"] * penalty_multiplier elif last_charged_kwh < 0 and net_energy_before < 0: contributions["extra_self_production"] += -0.5 * self.weights["extra_self_production"] * penalty_multiplier # Self-consumption reward if last_charged_kwh < 0 and net_energy_before > 0: contributions["self_ev_consumption"] += self.weights["self_ev_consumption"] * penalty_multiplier elif last_charged_kwh > 0 and net_energy_before > 0: contributions["self_ev_consumption"] += -0.5 * self.weights["self_ev_consumption"] * penalty_multiplier LOGGER.debug(f"Charger {charger_id} | Contributions: {contributions}") penalty_total += sum(contributions.values()) return penalty_total