from copy import deepcopy
from enum import Enum
import hashlib
import importlib
import logging
import os
from pathlib import Path
from typing import Any, List, Mapping, Tuple, Union
from gymnasium import Env, spaces
import numpy as np
import pandas as pd
from citylearn.base import Environment, EpisodeTracker
from citylearn.building import Building, DynamicsBuilding
from citylearn.cost_function import CostFunction
from citylearn.data import CarbonIntensity, DataSet, ElectricVehicleSimulation, EnergySimulation, LogisticRegressionOccupantParameters, Pricing, Weather
from citylearn.electric_vehicle import ElectricVehicle
from citylearn.energy_model import Battery, PV
from citylearn.reward_function import RewardFunction
from citylearn.utilities import read_json
LOGGER = logging.getLogger()
logging.getLogger('matplotlib.font_manager').disabled = True
logging.getLogger('matplotlib.pyplot').disabled = True
[docs]
class EvaluationCondition(Enum):
"""Evaluation conditions.
Used in `citylearn.CityLearnEnv.calculate` method.
"""
# general (soft private)
_DEFAULT = ''
_STORAGE_SUFFIX = '_without_storage'
_PARTIAL_LOAD_SUFFIX = '_and_partial_load'
_PV_SUFFIX = '_and_pv'
# Building type
WITH_STORAGE_AND_PV = _DEFAULT
WITHOUT_STORAGE_BUT_WITH_PV = _STORAGE_SUFFIX
WITHOUT_STORAGE_AND_PV = WITHOUT_STORAGE_BUT_WITH_PV +_PV_SUFFIX
# DynamicsBuilding type
WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV = WITH_STORAGE_AND_PV
WITHOUT_STORAGE_BUT_WITH_PARTIAL_LOAD_AND_PV = WITHOUT_STORAGE_BUT_WITH_PV
WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV = WITHOUT_STORAGE_BUT_WITH_PARTIAL_LOAD_AND_PV + _PARTIAL_LOAD_SUFFIX
WITHOUT_STORAGE_AND_PARTIAL_LOAD_AND_PV = WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV + _PV_SUFFIX
[docs]
class CityLearnEnv(Environment, Env):
r"""CityLearn nvironment class.
Parameters
----------
schema: Union[str, Path, Mapping[str, Any]]
Name of CityLearn data set, filepath to JSON representation or :code:`dict` object of a CityLearn schema.
Call :py:meth:`citylearn.data.DataSet.get_names` for list of available CityLearn data sets.
root_directory: Union[str, Path]
Absolute path to directory that contains the data files including the schema.
buildings: Union[List[Building], List[str], List[int]], optional
Buildings to include in environment. If list of :code:`citylearn.building.Building` is provided, will override :code:`buildings` definition in schema.
If list of :str: is provided will include only schema :code:`buildings` keys that are contained in provided list of :code:`str`.
If list of :int: is provided will include only schema :code:`buildings` whose index is contained in provided list of :code:`int`.
simulation_start_time_step: int, optional
Time step to start reading data files contents.
simulation_end_time_step: int, optional
Time step to end reading from data files contents.
episode_time_steps: Union[int, List[Tuple[int, int]]], optional
If type is `int`, it is the number of time steps in an episode. If type is `List[Tuple[int, int]]]` is provided,
it is a list of episode start and end time steps between `simulation_start_time_step` and `simulation_end_time_step`.
Defaults to (`simulation_end_time_step` - `simulation_start_time_step`) + 1. Will ignore `rolling_episode_split` if `episode_splits` is of type `List[Tuple[int, int]]]`.
rolling_episode_split: bool, default: False
True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise, False to split episodes in steps of `episode_time_steps`.
random_episode_split: bool, default: False
True if episode splits are to be selected at random during training otherwise, False to select sequentially.
seconds_per_time_step: float
Number of seconds in 1 `time_step` and must be set to >= 1.
reward_function: Union[RewardFunction, str], optional
Reward function class instance or path to function class e.g. 'citylearn.reward_function.IndependentSACReward'.
If provided, will override :code:`reward_function` definition in schema.
reward_function_kwargs: Mapping[str, Any], optional
Parameters to be parsed to :py:attr:`reward_function` at intialization.
central_agent: bool, optional
Expect 1 central agent to control all buildings.
shared_observations: List[str], optional
Names of common observations across all buildings i.e. observations that have the same value irrespective of the building.
active_observations: Union[List[str], List[List[str]]], optional
List of observations to be made available in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the observations defined in the :code:`schema`.
inactive_observations: Union[List[str], List[List[str]]], optional
List of observations to be made unavailable in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the observations defined in the :code:`schema`.
active_actions: Union[List[str], List[List[str]]], optional
List of actions to be made available in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the actions defined in the :code:`schema`.
inactive_actions: Union[List[str], List[List[str]]], optional
List of actions to be made unavailable in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the actions defined in the :code:`schema`.
simulate_power_outage: Union[bool, List[bool]]
Whether to simulate power outages. Can be specified for all buildings as single :code:`bool` or for
each building independently in a :code:`List[bool]`. Will override power outage defined in the :code:`schema`.
solar_generation: Union[bool, List[bool]]
Wehther to allow solar generation. Can be specified for all buildings as single :code:`bool` or for
each building independently in a :code:`List[bool]`. Will override :code:`pv` defined in the :code:`schema`.
random_seed: int, optional
Pseudorandom number generator seed for repeatable results.
Other Parameters
----------------
**kwargs : dict
Other keyword arguments used to initialize super classes.
Notes
-----
Parameters passed to `citylearn.citylearn.CityLearnEnv.__init__` that are also defined in `schema` will override their `schema` definition.
"""
def __init__(self,
schema: Union[str, Path, Mapping[str, Any]], root_directory: Union[str, Path] = None, buildings: Union[List[Building], List[str], List[int]] = None,
electric_vehicles: Union[List[ElectricVehicle], List[str], List[int]] = None,
simulation_start_time_step: int = None, simulation_end_time_step: int = None, episode_time_steps: Union[int, List[Tuple[int, int]]] = None, rolling_episode_split: bool = None,
random_episode_split: bool = None, seconds_per_time_step: float = None, reward_function: Union[RewardFunction, str] = None, reward_function_kwargs: Mapping[str, Any] = None,
central_agent: bool = None, shared_observations: List[str] = None, active_observations: Union[List[str], List[List[str]]] = None,
inactive_observations: Union[List[str], List[List[str]]] = None, active_actions: Union[List[str], List[List[str]]] = None,
inactive_actions: Union[List[str], List[List[str]]] = None, simulate_power_outage: bool = None, solar_generation: bool = None, random_seed: int = None, **kwargs: Any
):
self.schema = schema
self.__rewards = None
self.buildings = []
self.random_seed = self.schema.get('random_seed', None) if random_seed is None else random_seed
root_directory, buildings, electric_vehicles, episode_time_steps, rolling_episode_split, random_episode_split, \
seconds_per_time_step, reward_function, central_agent, shared_observations, episode_tracker = self._load(
deepcopy(self.schema),
root_directory=root_directory,
buildings=buildings,
electric_vehicles=electric_vehicles,
simulation_start_time_step=simulation_start_time_step,
simulation_end_time_step=simulation_end_time_step,
episode_time_steps=episode_time_steps,
rolling_episode_split=rolling_episode_split,
random_episode=random_episode_split,
seconds_per_time_step=seconds_per_time_step,
reward_function=reward_function,
reward_function_kwargs=reward_function_kwargs,
central_agent=central_agent,
shared_observations=shared_observations,
active_observations=active_observations,
inactive_observations=inactive_observations,
active_actions=active_actions,
inactive_actions=inactive_actions,
simulate_power_outage=simulate_power_outage,
solar_generation=solar_generation,
random_seed=self.random_seed,
)
self.root_directory = root_directory
self.buildings = buildings
self.electric_vehicles = electric_vehicles
# now call super class initialization and set episode tracker now that buildings are set
super().__init__(seconds_per_time_step=seconds_per_time_step, random_seed=self.random_seed, episode_tracker=episode_tracker)
# set other class variables
self.episode_time_steps = episode_time_steps
self.rolling_episode_split = rolling_episode_split
self.random_episode_split = random_episode_split
self.central_agent = central_agent
self.shared_observations = shared_observations
# set reward function
self.reward_function = reward_function
# reset environment and initializes episode time steps
self.reset()
# reset episode tracker to start after initializing episode time steps during reset
self.episode_tracker.reset_episode_index()
# set reward metadata
self.reward_function.env_metadata = self.get_metadata()
# reward history tracker
self.__episode_rewards = []
@property
def schema(self) -> Mapping[str, Any]:
"""`dict` object of CityLearn schema."""
return self.__schema
@property
def root_directory(self) -> Union[str, Path]:
"""Absolute path to directory that contains the data files including the schema."""
return self.__root_directory
@property
def buildings(self) -> List[Building]:
"""Buildings in CityLearn environment."""
return self.__buildings
@property
def electric_vehicles(self) -> List[ElectricVehicle]:
"""Electric Vehicles in CityLearn environment."""
return self.__electric_vehicles
@property
def time_steps(self) -> int:
"""Number of time steps in current episode split."""
return self.episode_tracker.episode_time_steps
@property
def episode_time_steps(self) -> Union[int, List[Tuple[int, int]]]:
"""If type is `int`, it is the number of time steps in an episode. If type is `List[Tuple[int, int]]]` is provided, it is a list of
episode start and end time steps between `simulation_start_time_step` and `simulation_end_time_step`. Defaults to (`simulation_end_time_step`
- `simulation_start_time_step`) + 1. Will ignore `rolling_episode_split` if `episode_splits` is of type `List[Tuple[int, int]]]`."""
return self.__episode_time_steps
@property
def rolling_episode_split(self) -> bool:
"""True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise,
False to split episodes in steps of `episode_time_steps`."""
return self.__rolling_episode_split
@property
def random_episode_split(self) -> bool:
"""True if episode splits are to be selected at random during training otherwise, False to select sequentially."""
return self.__random_episode_split
@property
def episode(self) -> int:
"""Current episode index."""
return self.episode_tracker.episode
@property
def reward_function(self) -> RewardFunction:
"""Reward function class instance."""
return self.__reward_function
@property
def rewards(self) -> List[List[float]]:
"""Reward time series"""
return self.__rewards
@property
def episode_rewards(self) -> List[Mapping[str, Union[float, List[float]]]]:
"""Reward summary statistics for elapsed episodes."""
return self.__episode_rewards
@property
def central_agent(self) -> bool:
"""Expect 1 central agent to control all buildings."""
return self.__central_agent
@property
def shared_observations(self) -> List[str]:
"""Names of common observations across all buildings i.e. observations that have the same value irrespective of the building."""
return self.__shared_observations
@property
def terminated(self) -> bool:
"""Check if simulation has reached completion."""
return self.time_step == self.time_steps - 1
@property
def truncated(self) -> bool:
"""Check if episode truncates due to a time limit or a reason that is not defined as part of the task MDP."""
return False
@property
def observation_space(self) -> List[spaces.Box]:
"""Controller(s) observation spaces.
Returns
-------
observation_space : List[spaces.Box]
List of agent(s) observation spaces.
Notes
-----
If `central_agent` is True, a list of 1 `spaces.Box` object is returned that contains all buildings' limits with the limits in the same order as `buildings`.
The `shared_observations` limits are only included in the first building's limits. If `central_agent` is False, a list of `space.Box` objects as
many as `buildings` is returned in the same order as `buildings`.
"""
if self.central_agent:
low_limit = []
high_limit = []
shared_observations = []
for i, b in enumerate(self.buildings):
for l, h, s in zip(b.observation_space.low, b.observation_space.high, b.active_observations):
if i == 0 or s not in self.shared_observations or s not in shared_observations:
low_limit.append(l)
high_limit.append(h)
else:
pass
if s in self.shared_observations and s not in shared_observations:
shared_observations.append(s)
else:
pass
observation_space = [spaces.Box(low=np.array(low_limit), high=np.array(high_limit), dtype=np.float32)]
else:
observation_space = [b.observation_space for b in self.buildings]
return observation_space
@property
def action_space(self) -> List[spaces.Box]:
"""Controller(s) action spaces.
Returns
-------
action_space : List[spaces.Box]
List of agent(s) action spaces.
Notes
-----
If `central_agent` is True, a list of 1 `spaces.Box` object is returned that contains all buildings' limits with the limits in the same order as `buildings`.
If `central_agent` is False, a list of `space.Box` objects as many as `buildings` is returned in the same order as `buildings`.
"""
if self.central_agent:
low_limit = [v for b in self.buildings for v in b.action_space.low]
high_limit = [v for b in self.buildings for v in b.action_space.high]
action_space = [spaces.Box(low=np.array(low_limit), high=np.array(high_limit), dtype=np.float32)]
else:
action_space = [b.action_space for b in self.buildings]
return action_space
@property
def observations(self) -> List[List[float]]:
"""Observations at current time step.
Notes
-----
If `central_agent` is True, a list of 1 sublist containing all building observation values is returned in the same order as `buildings`.
The `shared_observations` values are only included in the first building's observation values. If `central_agent` is False, a list of sublists
is returned where each sublist is a list of 1 building's observation values and the sublist in the same order as `buildings`.
"""
if self.central_agent:
observations = []
shared_observations = []
for i, b in enumerate(self.buildings):
for k, v in b.observations(normalize=False, periodic_normalization=False, check_limits=True).items():
if i == 0 or k not in self.shared_observations or k not in shared_observations:
observations.append(v)
else:
pass
if k in self.shared_observations and k not in shared_observations:
shared_observations.append(k)
else:
pass
observations = [observations]
else:
observations = [list(b.observations(normalize=False, periodic_normalization=False, check_limits=True).values()) for b in self.buildings]
return observations
@property
def observation_names(self) -> List[List[str]]:
"""Names of returned observations.
Notes
-----
If `central_agent` is True, a list of 1 sublist containing all building observation names is returned in the same order as `buildings`.
The `shared_observations` names are only included in the first building's observation names. If `central_agent` is False, a list of sublists
is returned where each sublist is a list of 1 building's observation names and the sublist in the same order as `buildings`.
"""
if self.central_agent:
observation_names = []
for i, b in enumerate(self.buildings):
for k, _ in b.observations(normalize=False, periodic_normalization=False).items():
if i == 0 or k not in self.shared_observations or k not in observation_names:
observation_names.append(k)
else:
pass
observation_names = [observation_names]
else:
observation_names = [list(b.observations().keys()) for b in self.buildings]
return observation_names
@property
def action_names(self) -> List[List[str]]:
"""Names of received actions.
Notes
-----
If `central_agent` is True, a list of 1 sublist containing all building action names is returned in the same order as `buildings`.
If `central_agent` is False, a list of sublists is returned where each sublist is a list of 1 building's action names and the sublist
in the same order as `buildings`.
"""
if self.central_agent:
action_names = []
for b in self.buildings:
action_names += b.active_actions
action_names = [action_names]
else:
action_names = [b.active_actions for b in self.buildings]
return action_names
@property
def net_electricity_consumption_emission_without_storage_and_partial_load_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage_and_partial_load_and_pv` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage_and_partial_load_and_pv
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_emission_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage_and_partial_load_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage_and_partial_load_and_pv` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage_and_partial_load_and_pv
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_cost_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage_and_partial_load_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage_and_partial_load_and_pv` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage_and_partial_load_and_pv
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage_and_partial_load(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage_and_partial_load` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage_and_partial_load
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_emission_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage_and_partial_load(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage_and_partial_load` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage_and_partial_load
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_cost_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage_and_partial_load(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage_and_partial_load` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage_and_partial_load
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage_and_pv` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage_and_pv` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage_and_pv` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).tolist()
@property
def net_electricity_consumption_cost_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission(self) -> List[float]:
"""Summed `Building.net_electricity_consumption_emission` time series, in [kg_co2]."""
return self.__net_electricity_consumption_emission
@property
def net_electricity_consumption_cost(self) -> List[float]:
"""Summed `Building.net_electricity_consumption_cost` time series, in [$]."""
return self.__net_electricity_consumption_cost
@property
def net_electricity_consumption(self) -> List[float]:
"""Summed `Building.net_electricity_consumption` time series, in [kWh]."""
return self.__net_electricity_consumption
@property
def cooling_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.cooling_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.cooling_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def heating_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.heating_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.heating_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def dhw_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.dhw_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.dhw_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def cooling_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.cooling_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.cooling_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def heating_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.heating_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.heating_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def dhw_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.dhw_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.dhw_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def electrical_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.electrical_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.electrical_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_cooling_device_to_cooling_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_cooling_device_to_cooling_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_cooling_device_to_cooling_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_heating_device_to_heating_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_heating_device_to_heating_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_heating_device_to_heating_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_dhw_device_to_dhw_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_dhw_device_to_dhw_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_dhw_device_to_dhw_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_to_electrical_storage(self) -> np.ndarray:
"""Summed `Building.energy_to_electrical_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_to_electrical_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_cooling_device(self) -> np.ndarray:
"""Summed `Building.energy_from_cooling_device` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_cooling_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_heating_device(self) -> np.ndarray:
"""Summed `Building.energy_from_heating_device` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_heating_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_dhw_device(self) -> np.ndarray:
"""Summed `Building.energy_from_dhw_device` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_dhw_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_to_non_shiftable_load(self) -> np.ndarray:
"""Summed `Building.energy_to_non_shiftable_load` time series, in [kWh]."""
return pd.DataFrame([b.energy_to_non_shiftable_load for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_cooling_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_cooling_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_cooling_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_heating_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_heating_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_heating_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_dhw_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_dhw_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_dhw_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_electrical_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_electrical_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_electrical_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def cooling_demand(self) -> np.ndarray:
"""Summed `Building.cooling_demand`, in [kWh]."""
return pd.DataFrame([b.cooling_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def heating_demand(self) -> np.ndarray:
"""Summed `Building.heating_demand`, in [kWh]."""
return pd.DataFrame([b.heating_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def dhw_demand(self) -> np.ndarray:
"""Summed `Building.dhw_demand`, in [kWh]."""
return pd.DataFrame([b.dhw_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def non_shiftable_load(self) -> np.ndarray:
"""Summed `Building.non_shiftable_load`, in [kWh]."""
return pd.DataFrame([b.non_shiftable_load for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def solar_generation(self) -> np.ndarray:
"""Summed `Building.solar_generation, in [kWh]`."""
return pd.DataFrame([b.solar_generation for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def power_outage(self) -> np.ndarray:
"""Time series of number of buildings experiencing power outage."""
return pd.DataFrame([b.power_outage_signal for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()[:self.time_step + 1]
@schema.setter
def schema(self, schema: Union[str, Path, Mapping[str, Any]]):
dataset = DataSet()
if isinstance(schema, (str, Path)) and os.path.isfile(schema):
schema_filepath = Path(schema) if isinstance(schema, str) else schema
schema = read_json(schema)
schema['root_directory'] = os.path.split(schema_filepath.absolute())[0] if schema['root_directory'] is None \
else schema['root_directory']
elif isinstance(schema, str) and schema in dataset.get_dataset_names():
schema = dataset.get_schema(schema)
schema['root_directory'] = '' if schema['root_directory'] is None else schema['root_directory']
elif isinstance(schema, dict):
schema = deepcopy(schema)
schema['root_directory'] = '' if schema['root_directory'] is None else schema['root_directory']
else:
raise UnknownSchemaError()
self.__schema = schema
@root_directory.setter
def root_directory(self, root_directory: Union[str, Path]):
self.__root_directory = root_directory
@buildings.setter
def buildings(self, buildings: List[Building]):
self.__buildings = buildings
@electric_vehicles.setter
def electric_vehicles(self, electric_vehicles: List[ElectricVehicle]):
self.__electric_vehicles = electric_vehicles
@Environment.episode_tracker.setter
def episode_tracker(self, episode_tracker: EpisodeTracker):
Environment.episode_tracker.fset(self, episode_tracker)
for b in self.buildings:
b.episode_tracker = self.episode_tracker
@episode_time_steps.setter
def episode_time_steps(self, episode_time_steps: Union[int, List[Tuple[int, int]]]):
self.__episode_time_steps = self.episode_tracker.simulation_time_steps if episode_time_steps is None else episode_time_steps
@rolling_episode_split.setter
def rolling_episode_split(self, rolling_episode_split: bool):
self.__rolling_episode_split = False if rolling_episode_split is None else rolling_episode_split
@random_episode_split.setter
def random_episode_split(self, random_episode_split: bool):
self.__random_episode_split = False if random_episode_split is None else random_episode_split
@reward_function.setter
def reward_function(self, reward_function: RewardFunction):
self.__reward_function = reward_function
@central_agent.setter
def central_agent(self, central_agent: bool):
self.__central_agent = central_agent
@shared_observations.setter
def shared_observations(self, shared_observations: List[str]):
self.__shared_observations = self.get_default_shared_observations() if shared_observations is None else shared_observations
@Environment.random_seed.setter
def random_seed(self, seed: int):
Environment.random_seed.fset(self, seed)
for b in self.buildings:
b.random_seed = self.random_seed
[docs]
@staticmethod
def get_default_shared_observations() -> List[str]:
"""Names of default common observations across all buildings i.e. observations that have the same value irrespective of the building.
Notes
-----
May be used to assigned :attr:`shared_observations` value during `CityLearnEnv` object initialization.
"""
return [
'month', 'day_type', 'hour', 'daylight_savings_status',
'outdoor_dry_bulb_temperature', 'outdoor_dry_bulb_temperature_predicted_1',
'outdoor_dry_bulb_temperature_predicted_2', 'outdoor_dry_bulb_temperature_predicted_3',
'outdoor_relative_humidity', 'outdoor_relative_humidity_predicted_1',
'outdoor_relative_humidity_predicted_2', 'outdoor_relative_humidity_predicted_3',
'diffuse_solar_irradiance', 'diffuse_solar_irradiance_predicted_1',
'diffuse_solar_irradiance_predicted_2', 'diffuse_solar_irradiance_predicted_3',
'direct_solar_irradiance', 'direct_solar_irradiance_predicted_1',
'direct_solar_irradiance_predicted_2', 'direct_solar_irradiance_predicted_3',
'carbon_intensity', 'electricity_pricing', 'electricity_pricing_predicted_1',
'electricity_pricing_predicted_2', 'electricity_pricing_predicted_3',
]
[docs]
def step(self, actions: List[List[float]]) -> Tuple[List[List[float]], List[float], bool, bool, dict]:
"""Advance to next time step then apply actions to `buildings` and update variables.
Parameters
----------
actions: List[List[float]]
Fractions of `buildings` storage devices' capacities to charge/discharge by.
If `central_agent` is True, `actions` parameter should be a list of 1 list containing all buildings' actions and follows
the ordering of buildings in `buildings`. If `central_agent` is False, `actions` parameter should be a list of sublists
where each sublists contains the actions for each building in `buildings` and follows the ordering of buildings in `buildings`.
Returns
-------
observations: List[List[float]]
:attr:`observations` current value.
reward: List[float]
:meth:`get_reward` current value.
terminated: bool
A boolean value for if the episode has ended, in which case further :meth:`step` calls will return undefined results.
A done signal may be emitted for different reasons: Maybe the task underlying the environment was solved successfully,
a certain timelimit was exceeded, or the physics simulation has entered an invalid observation.
truncated: bool
A boolean value for if episode truncates due to a time limit or a reason that is not defined as part of the task MDP.
Will always return False in this base class.
info: dict
A dictionary that may contain additional information regarding the reason for a `terminated` signal.
`info` contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
Override :meth"`get_info` to get custom key-value pairs in `info`.
"""
self.next_time_step()
actions = self._parse_actions(actions)
for building, building_actions in zip(self.buildings, actions):
building.apply_actions(**building_actions)
self.update_variables()
# NOTE:
# This call to retrieve each building's observation dictionary is an expensive call especially since the observations
# are retrieved again to send to agent but the observations in dict form is needed for the reward function to easily
# extract building-level values. Can't think of a better way to handle this without giving the reward direct access to
# env, which is not the best design for competition integrity sake. Will revisit the building.observations() function
# to see how it can be optimized.
reward_observations = [b.observations(include_all=True, normalize=False, periodic_normalization=False) for b in self.buildings]
reward = self.reward_function.calculate(observations=reward_observations)
self.__rewards.append(reward)
# store episode reward summary
if self.terminated:
rewards = np.array(self.__rewards[1:], dtype='float32')
self.__episode_rewards.append({
'min': rewards.min(axis=0).tolist(),
'max': rewards.max(axis=0).tolist(),
'sum': rewards.sum(axis=0).tolist(),
'mean': rewards.mean(axis=0).tolist()
})
else:
pass
return self.observations, reward, self.terminated, self.truncated, self.get_info()
[docs]
def get_info(self) -> Mapping[Any, Any]:
"""Other information to return from the `citylearn.CityLearnEnv.step` function."""
return {}
def _parse_actions(self, actions: List[List[float]]) -> List[Mapping[str, float]]:
"""Return mapping of action name to action value for each building."""
actions = list(actions)
building_actions = []
if self.central_agent:
actions = actions[0]
number_of_actions = len(actions)
expected_number_of_actions = self.action_space[0].shape[0]
assert number_of_actions == expected_number_of_actions, \
f'Expected {expected_number_of_actions} actions but {number_of_actions} were parsed to env.step.'
for building in self.buildings:
size = building.action_space.shape[0]
building_actions.append(actions[0:size])
actions = actions[size:]
else:
building_actions = [list(a) for a in actions]
# check that appropriate number of building actions have been provided
for b, a in zip(self.buildings, building_actions):
number_of_actions = len(a)
expected_number_of_actions = b.action_space.shape[0]
assert number_of_actions == expected_number_of_actions,\
f'Expected {expected_number_of_actions} for {b.name} but {number_of_actions} actions were provided.'
active_actions = [[k for k, v in b.action_metadata.items() if v] for b in self.buildings]
# Create a list of dictionaries for actions including EV-specific actions
parsed_actions = []
for i, building in enumerate(self.buildings):
action_dict = {}
electric_vehicle_actions = {}
# Populate the action_dict with regular actions
for k, action in zip(active_actions[i], building_actions[i]):
if 'electric_vehicle_storage' in k:
# Collect EV actions separately
charger_id = k.split('_')[-1] # Assuming the key format contains charger ID, e.g., 'electric_vehicle_storage_1'
electric_vehicle_actions[charger_id] = action
else:
action_dict[f'{k}_action'] = action
# Add EV actions to the action_dict if they exist
if electric_vehicle_actions:
action_dict['electric_vehicle_storage_actions'] = electric_vehicle_actions
# Fill missing actions with default NaN
for k in building.action_metadata:
if f'{k}_action' not in action_dict and 'electric_vehicle_storage' not in k:
action_dict[f'{k}_action'] = np.nan
parsed_actions.append(action_dict)
return parsed_actions
[docs]
def evaluate(self, control_condition: EvaluationCondition = None, baseline_condition: EvaluationCondition = None, comfort_band: float = None) -> pd.DataFrame:
r"""Evaluate cost functions at current time step.
Calculates and returns building-level and district-level cost functions normalized w.r.t. the no control scenario.
Parameters
----------
control_condition: EvaluationCondition, default: :code:`EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV`
Condition for net electricity consumption, cost and emission to use in calculating cost functions for the control/flexible scenario.
baseline_condition: EvaluationCondition, default: :code:`EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV`
Condition for net electricity consumption, cost and emission to use in calculating cost functions for the baseline scenario
that is used to normalize the control_condition scenario.
comfort_band: float, optional
Comfort band above dry_bulb_temperature_cooling_set_point and below dry_bulb_temperature_heating_set_point beyond
which occupant is assumed to be uncomfortable. Defaults to :py:attr:`citylearn.data.EnergySimulation.DEFUALT_COMFORT_BAND`.
Returns
-------
cost_functions: pd.DataFrame
Cost function summary including the following: electricity consumption, zero net energy, carbon emissions, cost,
discomfort (total, too cold, too hot, minimum delta, maximum delta, average delta), ramping, 1 - load factor,
average daily peak and average annual peak.
Notes
-----
The equation for the returned cost function values is :math:`\frac{C_{\textrm{control}}}{C_{\textrm{no control}}}`
where :math:`C_{\textrm{control}}` is the value when the agent(s) control the environment and :math:`C_{\textrm{no control}}`
is the value when none of the storages and partial load cooling and heating devices in the environment are actively controlled.
"""
# lambda functions to get building or district level properties w.r.t. evaluation condition
get_net_electricity_consumption = lambda x, c: getattr(x, f'net_electricity_consumption{c.value}')
get_net_electricity_consumption_cost = lambda x, c: getattr(x, f'net_electricity_consumption_cost{c.value}')
get_net_electricity_consumption_emission = lambda x, c: getattr(x, f'net_electricity_consumption_emission{c.value}')
comfort_band = EnergySimulation.DEFUALT_COMFORT_BAND if comfort_band is None else comfort_band
building_level = []
for b in self.buildings:
if isinstance(b, DynamicsBuilding):
control_condition = EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV if control_condition is None else control_condition
baseline_condition = EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV if baseline_condition is None else baseline_condition
else:
control_condition = EvaluationCondition.WITH_STORAGE_AND_PV if control_condition is None else control_condition
baseline_condition = EvaluationCondition.WITHOUT_STORAGE_BUT_WITH_PV if baseline_condition is None else baseline_condition
discomfort_kwargs = {
'indoor_dry_bulb_temperature': b.indoor_dry_bulb_temperature,
'dry_bulb_temperature_cooling_set_point': b.indoor_dry_bulb_temperature_cooling_set_point,
'dry_bulb_temperature_heating_set_point': b.indoor_dry_bulb_temperature_heating_set_point,
'band': b.comfort_band if comfort_band is None else comfort_band,
'occupant_count': b.occupant_count,
}
unmet, cold, hot,\
cold_minimum_delta, cold_maximum_delta, cold_average_delta,\
hot_minimum_delta, hot_maximum_delta, hot_average_delta =\
CostFunction.discomfort(**discomfort_kwargs)
expected_energy = b.cooling_demand + b.heating_demand + b.dhw_demand + b.non_shiftable_load
served_energy = b.energy_from_cooling_device + b.energy_from_cooling_storage\
+ b.energy_from_heating_device + b.energy_from_heating_storage\
+ b.energy_from_dhw_device + b.energy_from_dhw_storage\
+ b.energy_to_non_shiftable_load
building_level_ = pd.DataFrame([{
'cost_function': 'electricity_consumption_total',
'value': CostFunction.electricity_consumption(get_net_electricity_consumption(b, control_condition))[-1]/\
CostFunction.electricity_consumption(get_net_electricity_consumption(b, baseline_condition))[-1],
}, {
'cost_function': 'zero_net_energy',
'value': CostFunction.zero_net_energy(get_net_electricity_consumption(b, control_condition))[-1]/\
CostFunction.zero_net_energy(get_net_electricity_consumption(b, baseline_condition))[-1],
}, {
'cost_function': 'carbon_emissions_total',
'value': CostFunction.carbon_emissions(get_net_electricity_consumption_emission(b, control_condition))[-1]/\
CostFunction.carbon_emissions(get_net_electricity_consumption_emission(b, baseline_condition))[-1]\
if sum(b.carbon_intensity.carbon_intensity) != 0 else None,
}, {
'cost_function': 'cost_total',
'value': CostFunction.cost(get_net_electricity_consumption_cost(b, control_condition))[-1]/\
CostFunction.cost(get_net_electricity_consumption_cost(b, baseline_condition))[-1]\
if sum(b.pricing.electricity_pricing) != 0 else None,
}, {
'cost_function': 'discomfort_proportion',
'value': unmet[-1],
}, {
'cost_function': 'discomfort_cold_proportion',
'value': cold[-1],
}, {
'cost_function': 'discomfort_hot_proportion',
'value': hot[-1],
}, {
'cost_function': 'discomfort_cold_delta_minimum',
'value': cold_minimum_delta[-1],
}, {
'cost_function': 'discomfort_cold_delta_maximum',
'value': cold_maximum_delta[-1],
}, {
'cost_function': 'discomfort_cold_delta_average',
'value': cold_average_delta[-1],
}, {
'cost_function': 'discomfort_hot_delta_minimum',
'value': hot_minimum_delta[-1],
}, {
'cost_function': 'discomfort_hot_delta_maximum',
'value': hot_maximum_delta[-1],
}, {
'cost_function': 'discomfort_hot_delta_average',
'value': hot_average_delta[-1],
}, {
'cost_function': 'one_minus_thermal_resilience_proportion',
'value': CostFunction.one_minus_thermal_resilience(power_outage=b.power_outage_signal, **discomfort_kwargs)[-1],
}, {
'cost_function': 'power_outage_normalized_unserved_energy_total',
'value': CostFunction.normalized_unserved_energy(expected_energy, served_energy, power_outage=b.power_outage_signal)[-1]
}, {
'cost_function': 'annual_normalized_unserved_energy_total',
'value': CostFunction.normalized_unserved_energy(expected_energy, served_energy)[-1]
}])
building_level_['name'] = b.name
building_level.append(building_level_)
building_level = pd.concat(building_level, ignore_index=True)
building_level['level'] = 'building'
## district level
# set default evaluation conditions
control_condition = EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV if control_condition is None else control_condition
baseline_condition = EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV if baseline_condition is None else baseline_condition
district_level = pd.DataFrame([{
'cost_function': 'ramping_average',
'value': CostFunction.ramping(get_net_electricity_consumption(self, control_condition))[-1]/\
CostFunction.ramping(get_net_electricity_consumption(self, baseline_condition))[-1],
}, {
'cost_function': 'daily_one_minus_load_factor_average',
'value': CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, control_condition), window=24)[-1]/\
CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, baseline_condition), window=24)[-1],
},{
'cost_function': 'monthly_one_minus_load_factor_average',
'value': CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, control_condition), window=730)[-1]/\
CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, baseline_condition), window=730)[-1],
}, {
'cost_function': 'daily_peak_average',
'value': CostFunction.peak(get_net_electricity_consumption(self, control_condition), window=24)[-1]/\
CostFunction.peak(get_net_electricity_consumption(self, baseline_condition), window=24)[-1],
}, {
'cost_function': 'all_time_peak_average',
'value': CostFunction.peak(get_net_electricity_consumption(self, control_condition), window=self.time_steps)[-1]/\
CostFunction.peak(get_net_electricity_consumption(self, baseline_condition), window=self.time_steps)[-1],
}])
district_level = pd.concat([district_level, building_level], ignore_index=True, sort=False)
district_level = district_level.groupby(['cost_function'])[['value']].mean().reset_index()
district_level['name'] = 'District'
district_level['level'] = 'district'
cost_functions = pd.concat([district_level, building_level], ignore_index=True, sort=False)
return cost_functions
[docs]
def next_time_step(self):
r"""Advance all buildings to next `time_step`."""
for building in self.buildings:
building.next_time_step()
# Advance electric vehicles to the next time step. This function is used as EVs exist even without being connected to any building (e.g. when they are being used to commute)
# As such, this function simulates the EV to the next time step. EVs simulation (connection status) is based on the dataset corresponding to each one
for electric_vehicle in self.electric_vehicles:
electric_vehicle.next_time_step()
super().next_time_step()
#This function is here so that, when the new time step is reached, the first thing to do is plug in/out the EVs according to their individual dataset
#It basicly associates an EV to a Building.Charger
self.associate_electric_vehicles_to_chargers()
[docs]
def associate_electric_vehicles_to_chargers(self):
r"""Associate electric_vehicle to its destination charger for observations."""
for electric_vehicle in self.electric_vehicles:
charger = electric_vehicle.electric_vehicle_simulation.charger[self.time_step]
state = electric_vehicle.electric_vehicle_simulation.electric_vehicle_charger_state[self.time_step]
if charger != "" and charger != "nan":
for b in self.buildings:
if b.electric_vehicle_chargers is not None:
for c in b.electric_vehicle_chargers:
if c.charger_id == charger:
if state == 1: # ev connected to charger
c.plug_car(electric_vehicle)
if state == 2: #EVs can also be associated as incoming to a given charger
c.associate_incoming_car(electric_vehicle)
[docs]
def reset(self, seed: int = None, options: Mapping[str, Any] = None) -> Tuple[List[List[float]], dict]:
r"""Reset `CityLearnEnv` to initial state.
Parameters
----------
seed: int, optional
Use to updated :code:`citylearn.CityLearnEnv.random_seed` if value is provided.
options: Mapping[str, Any], optional
Use to pass additional data to environment on reset. Not used in this base class
but included to conform to gymnasium interface.
Returns
-------
observations: List[List[float]]
:attr:`observations`.
info: dict
A dictionary that may contain additional information regarding the reason for a `terminated` signal.
`info` contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
Override :meth"`get_info` to get custom key-value pairs in `info`.
"""
# object reset
super().reset()
# update seed
if seed is not None:
self.random_seed = seed
else:
pass
# update time steps for time series
self.episode_tracker.next_episode(
self.episode_time_steps,
self.rolling_episode_split,
self.random_episode_split,
self.random_seed,
)
for building in self.buildings:
building.reset()
for ev in self.electric_vehicles:
ev.reset()
self.associate_electric_vehicles_to_chargers()
# reset reward function (does nothing by default)
self.reward_function.reset()
# variable reset
self.__rewards = [[]]
self.__net_electricity_consumption = []
self.__net_electricity_consumption_cost = []
self.__net_electricity_consumption_emission = []
self.update_variables()
return self.observations, self.get_info()
[docs]
def update_variables(self):
# net electricity consumption
self.__net_electricity_consumption.append(sum([b.net_electricity_consumption[self.time_step] for b in self.buildings]))
# net electriciy consumption cost
self.__net_electricity_consumption_cost.append(sum([b.net_electricity_consumption_cost[self.time_step] for b in self.buildings]))
# net electriciy consumption emission
self.__net_electricity_consumption_emission.append(sum([b.net_electricity_consumption_emission[self.time_step] for b in self.buildings]))
[docs]
def load_agent(self, agent: Union[str, 'citylearn.agents.base.Agent'] = None, **kwargs) -> Union[Any, 'citylearn.agents.base.Agent']:
"""Return :class:`Agent` or sub class object as defined by the `schema`.
Parameters
----------
agent: Union[str, 'citylearn.agents.base.Agent], optional
Agent class or string describing path to agent class, e.g. 'citylearn.agents.base.BaselineAgent'.
If a value is not provided, defaults to the agent defined in the schema:agent:type.
**kwargs : dict
Agent initialization attributes. For most agents e.g. CityLearn and Stable-Baselines3 agents,
an intialized :py:attr:`env` must be parsed to the agent :py:meth:`init` function.
Returns
-------
agent: Agent
Initialized agent.
"""
# set agent class
if agent is not None:
agent_type = agent
if not isinstance(agent_type, str):
agent_type = [agent_type.__module__] + [agent_type.__name__]
agent_type = '.'.join(agent_type)
else:
pass
# set agent init attributes
else:
agent_type = self.schema['agent']['type']
if kwargs is not None and len(kwargs) > 0:
agent_attributes = kwargs
elif agent is None:
agent_attributes = self.schema['agent'].get('attributes', {})
else:
agent_attributes = None
agent_module = '.'.join(agent_type.split('.')[0:-1])
agent_name = agent_type.split('.')[-1]
agent_constructor = getattr(importlib.import_module(agent_module), agent_name)
agent = agent_constructor() if agent_attributes is None else agent_constructor(**agent_attributes)
return agent
def _load(self, schema: Mapping[str, Any], **kwargs) -> Tuple[Union[Path, str], List[Building], List[ElectricVehicle], Union[int, List[Tuple[int, int]]], bool, bool, float, RewardFunction, bool, List[str], EpisodeTracker]:
"""Return `CityLearnEnv` and `Controller` objects as defined by the `schema`.
Parameters
----------
schema: Mapping[str, Any]
N:code:`dict` object of a CityLearn schema.
Returns
-------
root_directory: Union[Path, str]
Absolute path to directory that contains the data files including the schema.
buildings : List[Building]
Buildings in CityLearn environment.
electric_vehicles : List[ElectricVehicle]
Electric Vehicles in CityLearn environment.
episode_time_steps: Union[int, List[Tuple[int, int]]]
Number of time steps in an episode. Defaults to (`simulation_end_time_step` - `simulation_start_time_step`) + 1.
rolling_episode_split: bool
True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise, False to split episodes
in steps of `episode_time_steps`.
random_episode_split: bool
True if episode splits are to be selected at random during training otherwise, False to select sequentially.
seconds_per_time_step: float
Number of seconds in 1 `time_step` and must be set to >= 1.
reward_function : RewardFunction
Reward function class instance.
central_agent : bool
Expect 1 central agent to control all building storage device.
shared_observations : List[str]
Names of common observations across all buildings i.e. observations that have the same value irrespective of the building.
"""
schema['root_directory'] = kwargs['root_directory'] if kwargs.get('root_directory') is not None else schema['root_directory']
schema['random_seed'] = schema.get('random_seed', None) if kwargs.get('random_seed', None) is None else schema.get('random_seed', None)
schema['central_agent'] = kwargs['central_agent'] if kwargs.get('central_agent') is not None else schema['central_agent']
#Separated chargers observations to create one for each charger at each building based on active ones at the schema
schema['chargers_observations_helper'] = {key: value for key, value in schema["observations"].items() if key.startswith("electric_vehicle_")}
schema['chargers_actions_helper'] = {key: value for key, value in schema["actions"].items() if key.startswith("electric_vehicle_")}
schema['chargers_shared_observations_helper'] = {key: value for key, value in schema["observations"].items()
if key.startswith("electric_vehicle_") and value.get("shared_in_central_agent", True)}
schema['observations'] = {key: value for key, value in schema["observations"].items() if key not in schema['chargers_observations_helper']}
schema['actions'] = {k: v for k, v in schema['actions'].items() if k not in schema['chargers_actions_helper']}
# Update shared observations, excluding any keys that start with 'electric_vehicle_'
schema['shared_observations'] = kwargs['shared_observations'] if kwargs.get('shared_observations') is not None else [
k for k, v in schema['observations'].items() if
not k.startswith("electric_vehicle_") and v.get('shared_in_central_agent', False)
]
schema['episode_time_steps'] = kwargs['episode_time_steps'] if kwargs.get('episode_time_steps') is not None else schema.get('episode_time_steps', None)
schema['rolling_episode_split'] = kwargs['rolling_episode_split'] if kwargs.get('rolling_episode_split') is not None else schema.get('rolling_episode_split', None)
schema['random_episode_split'] = kwargs['random_episode_split'] if kwargs.get('random_episode_split') is not None else schema.get('random_episode_split', None)
schema['seconds_per_time_step'] = kwargs['seconds_per_time_step'] if kwargs.get('seconds_per_time_step') is not None else schema['seconds_per_time_step']
schema['simulation_start_time_step'] = kwargs['simulation_start_time_step'] \
if kwargs.get('simulation_start_time_step') is not None else schema['simulation_start_time_step']
schema['simulation_end_time_step'] = kwargs['simulation_end_time_step'] \
if kwargs.get('simulation_end_time_step') is not None else schema['simulation_end_time_step']
episode_tracker = EpisodeTracker(schema['simulation_start_time_step'], schema['simulation_end_time_step'])
# get sizing data to reduce read time
dataset = DataSet()
pv_sizing_data = dataset.get_pv_sizing_data()
battery_sizing_data = dataset.get_battery_sizing_data()
# get buildings to include
buildings_to_include = list(schema['buildings'].keys())
buildings = []
if kwargs.get('buildings') is not None and len(kwargs['buildings']) > 0:
if isinstance(kwargs['buildings'][0], Building):
buildings: List[Building] = kwargs['buildings']
for b in buildings:
b.episode_tracker = episode_tracker
buildings_to_include = []
elif isinstance(kwargs['buildings'][0], str):
buildings_to_include = [b for b in buildings_to_include if b in kwargs['buildings']]
elif isinstance(kwargs['buildings'][0], int):
buildings_to_include = [buildings_to_include[i] for i in kwargs['buildings']]
else:
raise Exception('Unknown buildings type. Allowed types are citylearn.building.Building, int and str.')
else:
buildings_to_include = [b for b in buildings_to_include if schema['buildings'][b]['include']]
# load buildings
for i, building_name in enumerate(buildings_to_include):
buildings.append(self._load_building(i, building_name, schema, episode_tracker, pv_sizing_data, battery_sizing_data,**kwargs))
# Load electric vehicles (if present in the schema)
electric_vehicles = []
if kwargs.get('electric_vehicles_def') is not None and len(kwargs['electric_vehicles_def']) > 0:
electric_vehicle_schemas = kwargs['electric_vehicles_def']
else:
electric_vehicle_schemas = schema.get('electric_vehicles_def', {})
for electric_vehicle_name, electric_vehicle_schema in electric_vehicle_schemas.items():
if electric_vehicle_schema['include']:
electric_vehicles.append(self._load_electric_vehicle(electric_vehicle_name,schema,electric_vehicle_schema,episode_tracker))
# set reward function
if kwargs.get('reward_function') is not None:
reward_function_type = kwargs['reward_function']
if not isinstance(reward_function_type, str):
reward_function_type = [reward_function_type.__module__] + [reward_function_type.__name__]
reward_function_type = '.'.join(reward_function_type)
else:
pass
else:
reward_function_type = schema['reward_function']['type']
if kwargs.get('reward_function_kwargs') is not None:
reward_function_attributes = kwargs['reward_function_kwargs']
else:
reward_function_attributes = schema['reward_function'].get('attributes', None)
reward_function_attributes = {} if reward_function_attributes is None else reward_function_attributes
reward_function_module = '.'.join(reward_function_type.split('.')[0:-1])
reward_function_name = reward_function_type.split('.')[-1]
reward_function_constructor = getattr(importlib.import_module(reward_function_module), reward_function_name)
reward_function = reward_function_constructor(None, **reward_function_attributes)
return (
schema['root_directory'], buildings, electric_vehicles, schema['episode_time_steps'], schema['rolling_episode_split'],
schema['random_episode_split'],
schema['seconds_per_time_step'], reward_function, schema['central_agent'], schema['shared_observations'],
episode_tracker
)
def _load_building(self, index: int, building_name: str, schema: dict, episode_tracker: EpisodeTracker, pv_sizing_data: pd.DataFrame, battery_sizing_data: pd.DataFrame, **kwargs) -> Building:
"""Initializes and returns a building model."""
building_schema = schema['buildings'][building_name]
building_kwargs = {}
# data
energy_simulation = pd.read_csv(os.path.join(schema['root_directory'], building_schema['energy_simulation']))
energy_simulation = EnergySimulation(**energy_simulation.to_dict('list'))
weather = pd.read_csv(os.path.join(schema['root_directory'], building_schema['weather']))
weather = Weather(**weather.to_dict('list'))
if building_schema.get('carbon_intensity', None) is not None:
carbon_intensity = pd.read_csv(os.path.join(schema['root_directory'], building_schema['carbon_intensity']))
carbon_intensity = CarbonIntensity(**carbon_intensity.to_dict('list'))
else:
carbon_intensity = CarbonIntensity(np.zeros(energy_simulation.hour.shape[0], dtype='float32'))
if building_schema.get('pricing', None) is not None:
pricing = pd.read_csv(os.path.join(schema['root_directory'], building_schema['pricing']))
pricing = Pricing(**pricing.to_dict('list'))
else:
pricing = Pricing(
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
)
# observation metadata
observation_metadata = {k: v['active'] for k, v in schema['observations'].items()}
if kwargs.get('active_observations') is not None:
active_observations = kwargs['active_observations']
active_observations = active_observations[index] if isinstance(active_observations[0], list) else active_observations
observation_metadata = {k: True if k in active_observations else False for k in observation_metadata}
else:
pass
if kwargs.get('inactive_observations') is not None:
inactive_observations = kwargs['inactive_observations']
inactive_observations = inactive_observations[index] if isinstance(inactive_observations[0], list) else inactive_observations
elif building_schema.get('inactive_observations') is not None:
inactive_observations = building_schema['inactive_observations']
else:
inactive_observations = []
observation_metadata = {k: False if k in inactive_observations else v for k, v in observation_metadata.items()}
# action metadata
action_metadata = {k: v['active'] for k, v in schema['actions'].items()}
if kwargs.get('active_actions') is not None:
active_actions = kwargs['active_actions']
active_actions = active_actions[index] if isinstance(active_actions[0], list) else active_actions
action_metadata = {k: True if k in active_actions else False for k in action_metadata}
else:
pass
if kwargs.get('inactive_actions') is not None:
inactive_actions = kwargs['inactive_actions']
inactive_actions = inactive_actions[index] if isinstance(inactive_actions[0], list) else inactive_actions
elif building_schema.get('inactive_actions') is not None:
inactive_actions = building_schema['inactive_actions']
else:
inactive_actions = []
action_metadata = {k: False if k in inactive_actions else v for k, v in action_metadata.items()}
# construct building
building_type = 'citylearn.citylearn.Building' if building_schema.get('type', None) is None else building_schema['type']
building_type_module = '.'.join(building_type.split('.')[0:-1])
building_type_name = building_type.split('.')[-1]
building_constructor = getattr(importlib.import_module(building_type_module),building_type_name)
# set dynamics
if building_schema.get('dynamics', None) is not None:
dynamics_type = building_schema['dynamics']['type']
dynamics_module = '.'.join(dynamics_type.split('.')[0:-1])
dynamics_name = dynamics_type.split('.')[-1]
dynamics_constructor = getattr(importlib.import_module(dynamics_module), dynamics_name)
attributes = building_schema['dynamics'].get('attributes', {})
attributes['filepath'] = os.path.join(schema['root_directory'], attributes['filename'])
_ = attributes.pop('filename')
building_kwargs[f'dynamics'] = dynamics_constructor(**attributes)
else:
building_kwargs['dynamics'] = None
# set occupant
if building_schema.get('occupant', None) is not None:
building_occupant = building_schema['occupant']
occupant_type = building_occupant['type']
occupant_module = '.'.join(occupant_type.split('.')[0:-1])
occupant_name = occupant_type.split('.')[-1]
occupant_constructor = getattr(importlib.import_module(occupant_module), occupant_name)
attributes: dict = building_occupant.get('attributes', {})
parameters_filepath = os.path.join(schema['root_directory'], building_occupant['parameters_filename'])
parameters = pd.read_csv(parameters_filepath)
attributes['parameters'] = LogisticRegressionOccupantParameters(**parameters.to_dict('list'))
attributes['episode_tracker'] = episode_tracker
attributes['random_seed'] = schema['random_seed']
for k in ['increase', 'decrease']:
attributes[f'setpoint_{k}_model_filepath'] = os.path.join(schema['root_directory'], attributes[f'setpoint_{k}_model_filename'])
_ = attributes.pop(f'setpoint_{k}_model_filename')
building_kwargs['occupant'] = occupant_constructor(**attributes)
else:
building_kwargs['occupant'] = None
# set power outage model
building_schema_power_outage = building_schema.get('power_outage', {})
simulate_power_outage = kwargs.get('simulate_power_outage')
simulate_power_outage = building_schema_power_outage.get('simulate_power_outage') if simulate_power_outage is None else simulate_power_outage
simulate_power_outage = simulate_power_outage[index] if isinstance(simulate_power_outage,list) else simulate_power_outage
stochastic_power_outage = building_schema_power_outage.get('stochastic_power_outage')
if building_schema_power_outage.get('stochastic_power_outage_model', None) is not None:
stochastic_power_outage_model_type = building_schema_power_outage['stochastic_power_outage_model']['type']
stochastic_power_outage_model_module = '.'.join(stochastic_power_outage_model_type.split('.')[0:-1])
stochastic_power_outage_model_name = stochastic_power_outage_model_type.split('.')[-1]
stochastic_power_outage_model_constructor = getattr(
importlib.import_module(stochastic_power_outage_model_module),
stochastic_power_outage_model_name
)
attributes = building_schema_power_outage.get('stochastic_power_outage_model', {}).get('attributes', {})
stochastic_power_outage_model = stochastic_power_outage_model_constructor(**attributes)
else:
stochastic_power_outage_model = None
#Adding chargers to buildings if they exist
if building_schema.get("chargers", None) is not None:
chargers_list = []
for charger_name, charger_config in building_schema["chargers"].items():
charger_type = charger_config['type']
charger_module = '.'.join(charger_type.split('.')[0:-1])
charger_class_name = charger_type.split('.')[-1]
charger_class = getattr(importlib.import_module(charger_module), charger_class_name)
charger_attributes = charger_config.get('attributes', {})
charger_object = charger_class(charger_id=charger_name, **charger_attributes, seconds_per_time_step=schema['seconds_per_time_step'], )
chargers_list.append(charger_object)
if 'electric_vehicle_storage' not in inactive_actions and "electric_vehicle_storage" in schema['chargers_actions_helper']:
# Add new action for this charger to action_metadata
action_metadata[f'electric_vehicle_storage_{charger_name}'] = True
# Consider that if chargers_observations is not empty we should populate observations for chargers
# Each charger replicates the observations of the original chargers_observations but specific for its own
# If shared observations are active for the specific observation, that observation is added to shared_observations
if schema['chargers_observations_helper'] is not None and 'electric_vehicle_charger_state' in schema['chargers_observations_helper']:
for state_type in ['connected', 'incoming']:
if schema['chargers_observations_helper']['electric_vehicle_charger_state']["active"]:
observation_metadata[f'charger_{charger_name}_{state_type}_state'] = True # Add base case
if "electric_vehicle_charger_state" in schema['chargers_shared_observations_helper']:
schema['shared_observations'].append(f'charger_{charger_name}_{state_type}_state')
for obs in schema['chargers_observations_helper']:
if schema['chargers_observations_helper'][obs]["active"] and obs != 'electric_vehicle_charger_state':
observation_metadata[f'charger_{charger_name}_{state_type}_{obs}'] = True
if obs in schema['chargers_shared_observations_helper']:
schema['shared_observations'].append(f'charger_{charger_name}_{state_type}_{obs}')
else:
chargers_list = []
building: Building = building_constructor(
energy_simulation=energy_simulation,
electric_vehicle_chargers=chargers_list,
weather=weather,
observation_metadata=observation_metadata,
action_metadata=action_metadata,
carbon_intensity=carbon_intensity,
pricing=pricing,
name=building_name,
seconds_per_time_step=schema['seconds_per_time_step'],
random_seed=schema['random_seed'],
episode_tracker=episode_tracker,
simulate_power_outage=simulate_power_outage,
stochastic_power_outage=stochastic_power_outage,
stochastic_power_outage_model=stochastic_power_outage_model,
**building_kwargs,
)
# update devices
device_metadata = {
'cooling_device': {'autosizer': building.autosize_cooling_device},
'heating_device': {'autosizer': building.autosize_heating_device},
'dhw_device': {'autosizer': building.autosize_dhw_device},
'dhw_storage': {'autosizer': building.autosize_dhw_storage},
'cooling_storage': {'autosizer': building.autosize_cooling_storage},
'heating_storage': {'autosizer': building.autosize_heating_storage},
'electrical_storage': {'autosizer': building.autosize_electrical_storage},
'pv': {'autosizer': building.autosize_pv}
}
solar_generation = kwargs.get('solar_generation')
solar_generation = True if solar_generation is None else solar_generation
solar_generation = solar_generation[index] if isinstance(solar_generation, list) else solar_generation
for device_name in device_metadata:
if building_schema.get(device_name, None) is None:
device = None
elif device_name == 'pv' and not solar_generation:
device = None
else:
device_type: str = building_schema[device_name]['type']
device_module = '.'.join(device_type.split('.')[0:-1])
device_type_name = device_type.split('.')[-1]
constructor = getattr(importlib.import_module(device_module), device_type_name)
attributes = building_schema[device_name].get('attributes', {})
attributes['seconds_per_time_step'] = schema['seconds_per_time_step']
# in case device technical specifications are to be randomly sampled, make sure each device per building has a unique seed
md5 = hashlib.md5()
device_random_seed = 0
for string in [building_name, building_type, device_name, device_type]:
md5.update(string.encode())
hash_to_integer_base = 16
device_random_seed += int(md5.hexdigest(), hash_to_integer_base)
device_random_seed = int(str(device_random_seed * (schema['random_seed'] + 1))[:9])
attributes = {
**attributes,
'random_seed': attributes['random_seed'] if attributes.get('random_seed', None) is not None else device_random_seed
}
device = constructor(**attributes)
autosize = False if building_schema[device_name].get('autosize', None) is None else building_schema[device_name]['autosize']
building.__setattr__(device_name, device)
if autosize:
autosizer = device_metadata[device_name]['autosizer']
autosize_kwargs = {} if building_schema[device_name].get('autosize_attributes', None) is None else \
building_schema[device_name]['autosize_attributes']
if isinstance(device, PV):
autosize_kwargs['epw_filepath'] = os.path.join(schema['root_directory'], autosize_kwargs['epw_filepath'])
autosize_kwargs['sizing_data'] = pv_sizing_data
elif isinstance(device, Battery):
autosize_kwargs['sizing_data'] = battery_sizing_data
else:
pass
autosizer(**autosize_kwargs)
else:
pass
# set back the random seed to to building's random seed
device.random_seed = schema['random_seed']
building.observation_space = building.estimate_observation_space()
building.action_space = building.estimate_action_space()
return building
def _load_electric_vehicle(self, electric_vehicle_name: str, schema: dict, electric_vehicle_schema: dict, episode_tracker: EpisodeTracker) -> ElectricVehicle:
"""Initializes and returns an electric vehicle model."""
# Load energy simulation data for the EV
electric_vehicle_simulation = pd.read_csv(
os.path.join(schema['root_directory'], electric_vehicle_schema['energy_simulation'])
).iloc[schema['simulation_start_time_step']:schema['simulation_end_time_step'] + 1].copy()
electric_vehicle_simulation = ElectricVehicleSimulation(*electric_vehicle_simulation.values.T)
# Observation and action metadata
electric_vehicle_inactive_observations = electric_vehicle_schema.get('inactive_observations', [])
electric_vehicle_inactive_actions = electric_vehicle_schema.get('inactive_actions', [])
electric_vehicle_observation_metadata = {
s: False if s in electric_vehicle_inactive_observations else True
for s in schema['chargers_observations_helper'] if s != 'electric_vehicle_charger_state'}
electric_vehicle_action_metadata = {a: False if a in electric_vehicle_inactive_actions else True for a in ['chargers_actions_helper']}
# Construct the battery object
capacity = electric_vehicle_schema["battery"]["attributes"]["capacity"]
nominal_power = electric_vehicle_schema["battery"]["attributes"]["nominal_power"]
initial_soc = electric_vehicle_schema["battery"]["attributes"]["initial_soc"]
min_battery_soc = electric_vehicle_schema.get("battery", None).get("attributes", None).get("min_battery_soc", None)
battery = Battery(
capacity=capacity,
nominal_power=nominal_power,
initial_soc=initial_soc,
seconds_per_time_step=schema['seconds_per_time_step'],
random_seed=schema['random_seed'],
episode_tracker=episode_tracker
)
# Get the EV constructor
electric_vehicle_type = 'citylearn.citylearn.ElectricVehicle' \
if electric_vehicle_schema.get('type', None) is None else electric_vehicle_schema['type']
electric_vehicle_type_module = '.'.join(electric_vehicle_type.split('.')[0:-1])
electric_vehicle_type_name = electric_vehicle_type.split('.')[-1]
electric_vehicle_constructor = getattr(importlib.import_module(electric_vehicle_type_module), electric_vehicle_type_name)
# Initialize EV
ev: ElectricVehicle = electric_vehicle_constructor(
electric_vehicle_simulation=electric_vehicle_simulation,
observation_metadata=electric_vehicle_observation_metadata,
action_metadata=electric_vehicle_action_metadata,
battery=battery,
name=electric_vehicle_name,
seconds_per_time_step=schema['seconds_per_time_step'],
random_seed=schema['random_seed'],
min_battery_soc=min_battery_soc,
episode_tracker=episode_tracker
)
ev.observation_space = ev.estimate_observation_space()
ev.action_space = ev.estimate_action_space()
return ev
[docs]
class Error(Exception):
"""Base class for other exceptions."""
[docs]
class UnknownSchemaError(Error):
"""Raised when a schema is not a data set name, dict nor filepath."""
__MESSAGE = 'Unknown schema parsed into constructor. Schema must be name of CityLearn data set,'\
' a filepath to JSON representation or `dict` object of a CityLearn schema.'\
' Call citylearn.data.DataSet.get_names() for list of available CityLearn data sets.'
def __init__(self,message=None):
super().__init__(self.__MESSAGE if message is None else message)