Source code for citylearn.agents.sac

import logging
from typing import Any, List, Union
import numpy as np
import numpy.typing as npt

try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
except (ModuleNotFoundError, ImportError) as e:
    raise Exception("This functionality requires you to install torch. You can install torch by : pip install torch torchvision, or for more detailed instructions please visit https://pytorch.org.")

from citylearn.agents.rbc import RBC
from citylearn.agents.rlc import RLC
from citylearn.citylearn import CityLearnEnv
from citylearn.preprocessing import Encoder, RemoveFeature
from citylearn.rl import PolicyNetwork, ReplayBuffer, SoftQNetwork

[docs] class SAC(RLC): def __init__(self, env: CityLearnEnv, **kwargs: Any): r"""Custom soft actor-critic algorithm. Parameters ---------- env: CityLearnEnv CityLearn environment. Other Parameters ---------------- **kwargs : Any Other keyword arguments used to initialize super class. """ super().__init__(env, **kwargs) # internally defined self.normalized = [False for _ in self.action_space] self.soft_q_criterion = nn.SmoothL1Loss() self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.replay_buffer = [ReplayBuffer(int(self.replay_buffer_capacity)) for _ in self.action_space] self.soft_q_net1 = [None for _ in self.action_space] self.soft_q_net2 = [None for _ in self.action_space] self.target_soft_q_net1 = [None for _ in self.action_space] self.target_soft_q_net2 = [None for _ in self.action_space] self.policy_net = [None for _ in self.action_space] self.soft_q_optimizer1 = [None for _ in self.action_space] self.soft_q_optimizer2 = [None for _ in self.action_space] self.policy_optimizer = [None for _ in self.action_space] self.target_entropy = [None for _ in self.action_space] self.norm_mean = [None for _ in self.action_space] self.norm_std = [None for _ in self.action_space] self.r_norm_mean = [None for _ in self.action_space] self.r_norm_std = [None for _ in self.action_space] self.set_networks()
[docs] def update(self, observations: List[List[float]], actions: List[List[float]], reward: List[float], next_observations: List[List[float]], terminated: bool, truncated: bool): r"""Update replay buffer. Parameters ---------- observations : List[List[float]] Previous time step observations. actions : List[List[float]] Previous time step actions. reward : List[float] Current time step reward. next_observations : List[List[float]] Current time step observations. terminated : bool Indication that episode has ended. truncated : bool If episode truncates due to a time limit or a reason that is not defined as part of the task MDP. """ # Run once the regression model has been fitted # Normalize all the observations using periodical normalization, one-hot encoding, or -1, 1 scaling. It also removes observations that are not necessary (solar irradiance if there are no solar PV panels). for i, (o, a, r, n) in enumerate(zip(observations, actions, reward, next_observations)): o = self.get_encoded_observations(i, o) n = self.get_encoded_observations(i, n) if self.normalized[i]: o = self.get_normalized_observations(i, o) n = self.get_normalized_observations(i, n) r = self.get_normalized_reward(i, r) else: pass self.replay_buffer[i].push(o, a, r, n, terminated) if self.time_step >= self.standardize_start_time_step and self.batch_size <= len(self.replay_buffer[i]): if not self.normalized[i]: # calculate normalized observations and rewards X = np.array([j[0] for j in self.replay_buffer[i].buffer], dtype = float) self.norm_mean[i] = np.nanmean(X, axis=0) self.norm_std[i] = np.nanstd(X, axis=0) + 1e-5 R = np.array([j[2] for j in self.replay_buffer[i].buffer], dtype = float) self.r_norm_mean[i] = np.nanmean(R, dtype = float) self.r_norm_std[i] = np.nanstd(R, dtype = float)/self.reward_scaling + 1e-5 # update buffer with normalization self.replay_buffer[i].buffer = [( np.hstack(self.get_normalized_observations(i, o).reshape(1,-1)[0]), a, self.get_normalized_reward(i, r), np.hstack(self.get_normalized_observations(i, n).reshape(1,-1)[0]), d ) for o, a, r, n, d in self.replay_buffer[i].buffer] self.normalized[i] = True else: pass for _ in range(self.update_per_time_step): o, a, r, n, d = self.replay_buffer[i].sample(self.batch_size) tensor = torch.cuda.FloatTensor if self.device.type == 'cuda' else torch.FloatTensor o = tensor(o).to(self.device) n = tensor(n).to(self.device) a = tensor(a).to(self.device) r = tensor(r).unsqueeze(1).to(self.device) d = tensor(d).unsqueeze(1).to(self.device) with torch.no_grad(): # Update Q-values. First, sample an action from the Gaussian policy/distribution for the current (next) observation and its associated log probability of occurrence. new_next_actions, new_log_pi, _ = self.policy_net[i].sample(n) # The updated Q-value is found by subtracting the logprob of the sampled action (proportional to the entropy) to the Q-values estimated by the target networks. target_q_values = torch.min( self.target_soft_q_net1[i](n, new_next_actions), self.target_soft_q_net2[i](n, new_next_actions), ) - self.alpha*new_log_pi q_target = r + (1 - d)*self.discount*target_q_values # Update Soft Q-Networks q1_pred = self.soft_q_net1[i](o, a) q2_pred = self.soft_q_net2[i](o, a) q1_loss = self.soft_q_criterion(q1_pred, q_target) q2_loss = self.soft_q_criterion(q2_pred, q_target) self.soft_q_optimizer1[i].zero_grad() q1_loss.backward() self.soft_q_optimizer1[i].step() self.soft_q_optimizer2[i].zero_grad() q2_loss.backward() self.soft_q_optimizer2[i].step() # Update Policy new_actions, log_pi, _ = self.policy_net[i].sample(o) q_new_actions = torch.min( self.soft_q_net1[i](o, new_actions), self.soft_q_net2[i](o, new_actions) ) policy_loss = (self.alpha*log_pi - q_new_actions).mean() self.policy_optimizer[i].zero_grad() policy_loss.backward() self.policy_optimizer[i].step() # Soft Updates for target_param, param in zip(self.target_soft_q_net1[i].parameters(), self.soft_q_net1[i].parameters()): target_param.data.copy_(target_param.data*(1.0 - self.tau) + param.data*self.tau) for target_param, param in zip(self.target_soft_q_net2[i].parameters(), self.soft_q_net2[i].parameters()): target_param.data.copy_(target_param.data*(1.0 - self.tau) + param.data*self.tau) else: pass
[docs] def predict(self, observations: List[List[float]], deterministic: bool = None): r"""Provide actions for current time step. Will return randomly sampled actions from `action_space` if :attr:`end_exploration_time_step` <= :attr:`time_step` else will use policy to sample actions. Parameters ---------- observations: List[List[float]] Environment observations deterministic: bool, default: False Wether to return purely exploitatative deterministic actions. Returns ------- actions: List[float] Action values """ deterministic = False if deterministic is None else deterministic if self.time_step > self.end_exploration_time_step or deterministic: actions = self.get_post_exploration_prediction(observations, deterministic) else: actions = self.get_exploration_prediction(observations) self.actions = actions self.next_time_step() return actions
[docs] def get_post_exploration_prediction(self, observations: List[List[float]], deterministic: bool) -> List[List[float]]: """Action sampling using policy, post-exploration time step""" actions = [] for i, o in enumerate(observations): o = self.get_encoded_observations(i, o) o = self.get_normalized_observations(i, o) o = torch.FloatTensor(o).unsqueeze(0).to(self.device) result = self.policy_net[i].sample(o) a = result[2] if deterministic else result[0] actions.append(a.detach().cpu().numpy()[0]) return actions
[docs] def get_exploration_prediction(self, observations: List[List[float]]) -> List[List[float]]: """Return randomly sampled actions from `action_space` multiplied by :attr:`action_scaling_coefficient`.""" # random actions return [list(self.action_scaling_coefficient*s.sample()) for s in self.action_space]
[docs] def get_normalized_reward(self, index: int, reward: float) -> float: return (reward - self.r_norm_mean[index])/self.r_norm_std[index]
[docs] def get_normalized_observations(self, index: int, observations: List[float]) -> npt.NDArray[np.float64]: try: return (np.array(observations, dtype = float) - self.norm_mean[index])/self.norm_std[index] except: # self.time_step >= self.standardize_start_time_step and self.batch_size <= len(self.replay_buffer[i]) logging.debug('obs:',observations) logging.debug('mean:',self.norm_mean[index]) logging.debug('std:',self.norm_std[index]) logging.debug(self.time_step, self.standardize_start_time_step, self.batch_size, len(self.replay_buffer[0])) assert False
[docs] def get_encoded_observations(self, index: int, observations: List[float]) -> npt.NDArray[np.float64]: return np.array([j for j in np.hstack(self.encoders[index]*np.array(observations, dtype=float)) if j != None], dtype = float)
[docs] def set_networks(self, internal_observation_count: int = None): internal_observation_count = 0 if internal_observation_count is None else internal_observation_count for i in range(len(self.action_dimension)): observation_dimension = self.observation_dimension[i] + internal_observation_count # init networks self.soft_q_net1[i] = SoftQNetwork(observation_dimension, self.action_dimension[i], self.hidden_dimension).to(self.device) self.soft_q_net2[i] = SoftQNetwork(observation_dimension, self.action_dimension[i], self.hidden_dimension).to(self.device) self.target_soft_q_net1[i] = SoftQNetwork(observation_dimension, self.action_dimension[i], self.hidden_dimension).to(self.device) self.target_soft_q_net2[i] = SoftQNetwork(observation_dimension, self.action_dimension[i], self.hidden_dimension).to(self.device) for target_param, param in zip(self.target_soft_q_net1[i].parameters(), self.soft_q_net1[i].parameters()): target_param.data.copy_(param.data) for target_param, param in zip(self.target_soft_q_net2[i].parameters(), self.soft_q_net2[i].parameters()): target_param.data.copy_(param.data) # Policy self.policy_net[i] = PolicyNetwork(observation_dimension, self.action_dimension[i], self.action_space[i], self.action_scaling_coefficient, self.hidden_dimension).to(self.device) self.soft_q_optimizer1[i] = optim.Adam(self.soft_q_net1[i].parameters(), lr=self.lr) self.soft_q_optimizer2[i] = optim.Adam(self.soft_q_net2[i].parameters(), lr=self.lr) self.policy_optimizer[i] = optim.Adam(self.policy_net[i].parameters(), lr=self.lr) self.target_entropy[i] = -np.prod(self.action_space[i].shape).item()
[docs] def set_encoders(self) -> List[List[Encoder]]: encoders = super().set_encoders() for i, o in enumerate(self.observation_names): for j, n in enumerate(o): if n == 'net_electricity_consumption': encoders[i][j] = RemoveFeature() else: pass return encoders
[docs] class SACRBC(SAC): r"""Uses :py:class:`citylearn.agents.rbc.RBC` to select actions during exploration before using :py:class:`citylearn.agents.sac.SAC`. Parameters ---------- env: CityLearnEnv CityLearn environment. rbc: RBC :py:class:`citylearn.agents.rbc.RBC` or child class, used to select actions during exploration. Other Parameters ---------------- **kwargs : Any Other keyword arguments used to initialize super class. """ def __init__(self, env: CityLearnEnv, rbc: Union[RBC, str] = None, **kwargs: Any): super().__init__(env, **kwargs) self.__set_rbc(rbc, **kwargs) @property def rbc(self) -> RBC: """:py:class:`citylearn.agents.rbc.RBC` class child class or string path to an RBC class e.g. 'citylearn.agents.rbc.RBC', used to select actions during exploration.""" return self.__rbc def __set_rbc(self, rbc: RBC, **kwargs): if rbc is None: rbc = RBC(self.env, **kwargs) elif isinstance(rbc, RBC): pass elif isinstance(rbc, str): rbc = self.env.load_agent(rbc, env=self.env, **kwargs) else: rbc = rbc(self.env, **kwargs) self.__rbc = rbc
[docs] def get_exploration_prediction(self, observations: List[float]) -> List[float]: """Return actions using :class:`RBC`.""" return self.rbc.predict(observations)