import os
import torch
import time
import numpy as np
from .base_model import BaseModel
from torch.utils.data import DataLoader
[docs]class RNN(torch.nn.Module):
"""
RNN wrapper around RNN implementation `Kuguoglu 2021 <https://towardsdatascience.com/building-rnn-lstm-and-gru-for-time-series-using-pytorch-a46e5b094e7b>`_
:param number_of_features: number of input features
:param hidden_dim: hidden size of the RNN
:param n_layers: number of layers in RNN
:param batch_size: batch size
:param seq_len: TODO: specify this better
:param output_size: Output dimension
"""
USES = ['autoregression','impute', 'fault_pred']
def __init__(self, input_size, batch_size, seq_len, output_size, hidden_dim, n_layers):
super(RNN, self).__init__()
# Defining some parameters
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.batch_size = batch_size
self.seq_len = seq_len
#Defining the layers
# RNN Layer
self.rnn = torch.nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
# Fully connected layer
self.fc = torch.nn.Linear(hidden_dim, output_size)
[docs] def forward(self, x):
batch_size = x.size(0)
# Initializing hidden state for first input using method defined below
hidden = self.init_hidden(batch_size)
# Passing in the input and hidden state into the model and obtaining outputs
out, hidden = self.rnn(x, hidden)
# Reshaping the outputs such that it can be fit into the fully connected layer
out = out.contiguous().view(self.batch_size, self.seq_len, self.hidden_dim)
out = self.fc(out)
return out, hidden
def init_hidden(self, batch_size):
# This method generates the first hidden state of zeros which we'll use in the forward pass
# We'll send the tensor holding the hidden state to the device we specified earlier as well
hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim)
return hidden
[docs]class VanillaRNN(BaseModel):
USES = ['autoregression', 'fault_pred']
def __init__(self, name, train_iters=10, num_layers=3, hidden_size=1, batch_size=5, learning_rate=.01):
super(VanillaRNN, self).__init__(name)
self.name = name
self.train_iters = train_iters
self.hidden_size = hidden_size
self.num_layers = num_layers
self.learning_rate = learning_rate
self.batch_size = batch_size
[docs] def predict(self, X, **kwargs):
self.multivariate = X.shape[1] > 1
if not self.multivariate:
X_true, X = self.lag_transform(X, lag=self.batch_size, horizon=self.horizon)
X = torch.from_numpy(X).float()
X = self.batch_shape(X, self.batch_size)
test_loader = DataLoader(X,
batch_size = self.batch_size,
shuffle = False,
drop_last=True)
predictions = []
with torch.no_grad():
for t, x in enumerate(test_loader):
output, hidden_size = self.model(x)
predictions.append(output)
predictions = torch.cat(predictions, dim=1)
predictions = predictions.permute(1, 0, 2) # put batch in the middle
predictions = self.batch_reshape(predictions.numpy())
return predictions # should cnversion be handled here?
[docs] def fit(self,X, **kwargs):
# transform X here
orig_shape = X.shape
lag_size = 20
self.multivariate = X.shape[1] > 1
if not self.multivariate:
X_true, X = self.lag_transform(X, lag=self.batch_size, horizon=self.horizon)
else: # we are passed multple features so just reconstruct those instead of lags
X_true = X
X_true, X = torch.from_numpy(X_true), torch.from_numpy(X)
X_true, X = X_true.float(), X.float()
X = self.batch_shape(X, self.batch_size)
X_true = self.batch_shape(X_true, self.batch_size)
seq_len, n_features = X.shape[-2], X.shape[-1]
self.input_size = X.shape[2]
self.output_size = X_true.shape[-1] # number of output features
dataset = torch.utils.data.TensorDataset(X, X_true)
train_loader = DataLoader(dataset,
batch_size = self.batch_size,
shuffle = False,
drop_last=True)
self.model = RNN(self.input_size, self.batch_size, seq_len, self.output_size, self.hidden_size, self.num_layers)
# set optimizer
optimizer = torch.optim.Adam([{'params': self.model.parameters()}, ],
lr=self.learning_rate)
loss_fxn = torch.nn.MSELoss()
for i in range(self.train_iters):
losses = []
for t, (x,x_true) in enumerate(train_loader):
# zero gradients from previous step
optimizer.zero_grad()
# output from model
output, hidden_state = self.model(x)
# calc loss and backprop gradients
loss = loss_fxn(output, x_true)
loss.backward()
losses.append(loss.item())
optimizer.step()
print(np.mean(losses), "RNN Loss")
self.loss.append(np.mean(losses))