class NeuralNetworkRegressor(Generic[IFT, IPT]):
"""
A NeuralNetworkRegressor is a neural network that is used for regression tasks.
Parameters
----------
input_conversion:
to convert the input data for the neural network
layers:
a list of layers for the neural network to learn
Raises
------
InvalidModelStructureError
if the defined model structure is invalid
"""
def __init__(
self,
input_conversion: InputConversion[IFT, IPT],
layers: list[Layer],
):
if len(layers) == 0:
raise InvalidModelStructureError("You need to provide at least one layer to a neural network.")
if isinstance(input_conversion, _InputConversionImage):
# TODO: why is this limitation needed? we might want to output the probability that an image shows a certain
# object, which would be a 1-dimensional output.
if isinstance(input_conversion, InputConversionImageToColumn | InputConversionImageToTable):
raise InvalidModelStructureError(
"A NeuralNetworkRegressor cannot be used with images as input and 1-dimensional data as output.",
)
data_dimensions = 2
for layer in layers:
if data_dimensions == 2 and (isinstance(layer, Convolutional2DLayer | _Pooling2DLayer)):
continue
elif data_dimensions == 2 and isinstance(layer, FlattenLayer):
data_dimensions = 1
elif data_dimensions == 1 and isinstance(layer, ForwardLayer):
continue
else:
raise InvalidModelStructureError(
(
"The 2-dimensional data has to be flattened before using a 1-dimensional layer."
if data_dimensions == 2
else "You cannot use a 2-dimensional layer with 1-dimensional data."
),
)
if data_dimensions == 1 and isinstance(input_conversion, InputConversionImageToImage):
raise InvalidModelStructureError(
"The output data would be 1-dimensional but the provided output conversion uses 2-dimensional data.",
)
else:
for layer in layers:
if isinstance(layer, Convolutional2DLayer | FlattenLayer | _Pooling2DLayer):
raise InvalidModelStructureError("You cannot use a 2-dimensional layer with 1-dimensional data.")
self._input_conversion: InputConversion[IFT, IPT] = input_conversion
self._model: Module | None = None
self._layers: list[Layer] = layers
self._input_size: int | ModelImageSize | None = None
self._batch_size = 1
self._is_fitted = False
self._total_number_of_batches_done = 0
self._total_number_of_epochs_done = 0
@staticmethod
def from_pretrained_model(huggingface_repo: str) -> NeuralNetworkRegressor: # pragma: no cover
"""
Load a pretrained model from a [Huggingface repository](https://huggingface.co/models/).
Parameters
----------
huggingface_repo:
the name of the huggingface repository
Returns
-------
pretrained_model:
the pretrained model as a NeuralNetworkRegressor
"""
from transformers import (
AutoConfig,
AutoImageProcessor,
AutoModelForImageToImage,
PretrainedConfig,
Swin2SRForImageSuperResolution,
Swin2SRImageProcessor,
)
_init_default_device()
config: PretrainedConfig = AutoConfig.from_pretrained(huggingface_repo)
if config.model_type != "swin2sr":
raise ValueError("This model is not supported")
model: Swin2SRForImageSuperResolution = AutoModelForImageToImage.from_pretrained(huggingface_repo)
image_processor: Swin2SRImageProcessor = AutoImageProcessor.from_pretrained(huggingface_repo)
if hasattr(config, "num_channels"):
input_size = VariableImageSize(image_processor.pad_size, image_processor.pad_size, config.num_channels)
else: # Should never happen due to model check
raise ValueError("This model is not supported") # pragma: no cover
in_conversion = InputConversionImageToImage(input_size)
network = NeuralNetworkRegressor.__new__(NeuralNetworkRegressor)
network._input_conversion = in_conversion
network._model = model
network._input_size = input_size
network._batch_size = 1
network._is_fitted = True
network._total_number_of_epochs_done = 0
network._total_number_of_batches_done = 0
return network
def fit(
self,
train_data: IFT,
epoch_count: int = 25,
batch_size: int = 1,
learning_rate: float = 0.001,
callback_on_batch_completion: Callable[[int, float], None] | None = None,
callback_on_epoch_completion: Callable[[int, float], None] | None = None,
) -> Self:
"""
Train the neural network with given training data.
The original model is not modified.
Parameters
----------
train_data:
The data the network should be trained on.
epoch_count:
The number of times the training cycle should be done.
batch_size:
The size of data batches that should be loaded at one time.
learning_rate:
The learning rate of the neural network.
callback_on_batch_completion:
Function used to view metrics while training. Gets called after a batch is completed with the index of the
last batch and the overall loss average.
callback_on_epoch_completion:
Function used to view metrics while training. Gets called after an epoch is completed with the index of the
last epoch and the overall loss average.
Returns
-------
trained_model:
The trained Model
Raises
------
OutOfBoundsError
If epoch_count < 1
If batch_size < 1
"""
import torch
from torch import nn
from ._internal_model import _InternalModel # Slow import on global level
_init_default_device()
if self._contains_choices():
raise FittingWithChoiceError
if not self._input_conversion._is_fit_data_valid(train_data):
raise FeatureDataMismatchError
_check_bounds("epoch_count", epoch_count, lower_bound=_ClosedBound(1))
_check_bounds("batch_size", batch_size, lower_bound=_ClosedBound(1))
copied_model = copy.deepcopy(self)
# TODO: How is this supposed to work with pre-trained models? Should the old weights be kept or discarded?
copied_model._model = _InternalModel(self._input_conversion, self._layers, is_for_classification=False)
copied_model._input_size = copied_model._model.input_size
copied_model._batch_size = batch_size
# TODO: Re-enable or remove depending on how the above TODO is resolved
# if copied_model._input_conversion._data_size != copied_model._input_size:
# raise InputSizeError(copied_model._input_conversion._data_size, copied_model._input_size)
dataloader = copied_model._input_conversion._data_conversion_fit(train_data, copied_model._batch_size)
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(copied_model._model.parameters(), lr=learning_rate)
for _ in range(epoch_count):
loss_sum = 0.0
amount_of_loss_values_calculated = 0
for x, y in iter(dataloader):
optimizer.zero_grad()
pred = copied_model._model(x)
loss = loss_fn(pred, y)
loss_sum += loss.item()
amount_of_loss_values_calculated += 1
loss.backward()
optimizer.step()
copied_model._total_number_of_batches_done += 1
if callback_on_batch_completion is not None:
callback_on_batch_completion(
copied_model._total_number_of_batches_done,
loss_sum / amount_of_loss_values_calculated,
)
copied_model._total_number_of_epochs_done += 1
if callback_on_epoch_completion is not None:
callback_on_epoch_completion(
copied_model._total_number_of_epochs_done,
loss_sum / amount_of_loss_values_calculated,
)
copied_model._is_fitted = True
copied_model._model.eval()
return copied_model
# TODO: Does not work if tensorflow with CUDA is used
# def fit_by_exhaustive_search(
# self,
# train_data: IFT,
# optimization_metric: Literal[
# "mean_squared_error",
# "mean_absolute_error",
# "median_absolute_deviation",
# "coefficient_of_determination",
# ],
# epoch_count: int = 25,
# batch_size: int = 1,
# learning_rate: float = 0.001,
# ) -> Self:
# """
# Use the hyperparameter choices to create multiple models and fit them.
#
# **Note:** This model is not modified.
#
# Parameters
# ----------
# train_data:
# The data the network should be trained on.
# optimization_metric:
# The metric that should be used for determining the performance of a model.
# epoch_count:
# The number of times the training cycle should be done.
# batch_size:
# The size of data batches that should be loaded at one time.
# learning_rate:
# The learning rate of the neural network.
#
# Returns
# -------
# best_model:
# The model that performed the best out of all possible models given the Choices of hyperparameters.
#
# Raises
# ------
# FittingWithoutChoiceError
# When calling this method on a model without hyperparameter choices.
# LearningError
# If the training data contains invalid values or if the training failed. Currently raised, when calling this on RNNs or CNNs as well.
# """
# _init_default_device()
#
# if not self._contains_choices():
# raise FittingWithoutChoiceError
#
# if isinstance(train_data, ImageDataset):
# raise LearningError(
# "Hyperparameter optimization is currently not supported for CNN Regression Tasks.",
# ) # pragma: no cover
#
# _check_bounds("epoch_count", epoch_count, lower_bound=_ClosedBound(1))
# _check_bounds("batch_size", batch_size, lower_bound=_ClosedBound(1))
#
# list_of_models = self._get_models_for_all_choices()
# list_of_fitted_models: list[Self] = []
# if isinstance(train_data, TabularDataset):
# (train_set, test_set) = self._data_split_table(train_data)
# else:
# (train_set, test_set) = self._data_split_time_series(train_data) # type: ignore[assignment]
#
# with ProcessPoolExecutor(max_workers=len(list_of_models), mp_context=mp.get_context("spawn")) as executor:
# futures = []
# for model in list_of_models:
# futures.append(
# executor.submit(
# model.fit,
# train_set, # type: ignore[arg-type]
# epoch_count,
# batch_size,
# learning_rate,
# ),
# ) # type: ignore[arg-type]
# [done, _] = wait(futures, return_when=ALL_COMPLETED)
# for future in done:
# list_of_fitted_models.append(future.result())
# executor.shutdown()
#
# if isinstance(train_data, TabularDataset):
# return self._get_best_fnn_model(list_of_fitted_models, test_set, optimization_metric)
# else: # train_data is TimeSeriesDataset
# return self._get_best_rnn_model(
# list_of_fitted_models,
# train_set, # type: ignore[arg-type]
# test_set, # type: ignore[arg-type]
# optimization_metric,
# )
def _data_split_table(self, data: TabularDataset) -> tuple[TabularDataset, TabularDataset]:
[train_split, test_split] = data.to_table().split_rows(0.75)
train_data = train_split.to_tabular_dataset(
target_name=data.target.name,
extra_names=data.extras.column_names,
)
test_dataset = test_split.to_tabular_dataset(
target_name=train_data.target.name,
extra_names=train_data.extras.column_names,
)
return train_data, test_dataset
def _get_best_fnn_model(
self,
list_of_fitted_models: list[Self],
test_data: TabularDataset,
optimization_metric: Literal[
"mean_squared_error",
"mean_absolute_error",
"median_absolute_deviation",
"coefficient_of_determination",
],
) -> Self:
test_features = test_data.features
test_target = test_data.target
best_model = None
best_metric_value = None
for fitted_model in list_of_fitted_models:
if best_model is None:
best_model = fitted_model
match optimization_metric:
case "mean_squared_error":
best_metric_value = RegressionMetrics.mean_squared_error(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
case "mean_absolute_error":
best_metric_value = RegressionMetrics.mean_absolute_error(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
case "median_absolute_deviation":
best_metric_value = RegressionMetrics.median_absolute_deviation(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
case "coefficient_of_determination":
best_metric_value = RegressionMetrics.coefficient_of_determination(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
else:
match optimization_metric:
case "mean_squared_error":
error_of_fitted_model = RegressionMetrics.mean_squared_error(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
if error_of_fitted_model < best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
case "mean_absolute_error":
error_of_fitted_model = RegressionMetrics.mean_absolute_error(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
if error_of_fitted_model < best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
case "median_absolute_deviation":
error_of_fitted_model = RegressionMetrics.median_absolute_deviation(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
if error_of_fitted_model < best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
case "coefficient_of_determination":
error_of_fitted_model = RegressionMetrics.coefficient_of_determination(
predicted=fitted_model.predict(test_features), # type: ignore[arg-type]
expected=test_target,
) # type: ignore[arg-type]
if error_of_fitted_model > best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
assert best_model is not None # just for linter
best_model._is_fitted = True
return best_model
def _data_split_time_series(self, data: TimeSeriesDataset) -> tuple[TimeSeriesDataset, Table]:
(train_split, test_split) = data.to_table().split_rows(0.75)
train_data = train_split.to_time_series_dataset(
target_name=data.target.name,
window_size=data.window_size,
extra_names=data.extras.column_names,
continuous=data.continuous,
forecast_horizon=data.forecast_horizon,
)
return train_data, test_split
def _get_best_rnn_model(
self,
list_of_fitted_models: list[Self],
train_data: TimeSeriesDataset,
test_data: Table,
optimization_metric: Literal[
"mean_squared_error",
"mean_absolute_error",
"median_absolute_deviation",
"coefficient_of_determination",
],
) -> Self:
test_target = test_data.get_column(train_data.target.name)
best_model = None
best_metric_value = None
size = test_target.row_count
expected_values = []
for i in range(size - (train_data.forecast_horizon + train_data.window_size)):
if train_data.continuous:
label = test_target[
i + train_data.window_size : i + train_data.window_size + train_data.forecast_horizon
]
else:
label = test_target[i + train_data.window_size + train_data.forecast_horizon]
expected_values.append(label)
expected_values_as_col = Column("expected", expected_values)
for fitted_model in list_of_fitted_models:
if best_model is None:
best_model = fitted_model
match optimization_metric:
case "mean_squared_error":
best_metric_value = RegressionMetrics.mean_squared_error(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
case "mean_absolute_error":
best_metric_value = RegressionMetrics.mean_absolute_error(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
case "median_absolute_deviation":
best_metric_value = RegressionMetrics.median_absolute_deviation(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
case "coefficient_of_determination":
best_metric_value = RegressionMetrics.coefficient_of_determination(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
else:
match optimization_metric:
case "mean_squared_error":
error_of_fitted_model = RegressionMetrics.mean_squared_error(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
if error_of_fitted_model < best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
case "mean_absolute_error":
error_of_fitted_model = RegressionMetrics.mean_absolute_error(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
if error_of_fitted_model < best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
case "median_absolute_deviation":
error_of_fitted_model = RegressionMetrics.median_absolute_deviation(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
if error_of_fitted_model < best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
case "coefficient_of_determination":
error_of_fitted_model = RegressionMetrics.coefficient_of_determination(
predicted=fitted_model.predict(test_data), # type: ignore[arg-type]
expected=expected_values_as_col,
) # type: ignore[arg-type]
if error_of_fitted_model > best_metric_value:
best_model = fitted_model # pragma: no cover
best_metric_value = error_of_fitted_model # pragma: no cover
assert best_model is not None # just for linter
best_model._is_fitted = True
return best_model
# def _data_split_image(self, train_data: ImageDataset) -> (ImageDataset, ImageDataset):
# return train_data.split(0.75)
def _get_models_for_all_choices(self) -> list[Self]:
all_possible_layer_combinations: list[list] = [[]]
for layer in self._layers:
if not layer._contains_choices():
for item in all_possible_layer_combinations:
item.append(layer)
else:
updated_combinations = []
versions_of_one_layer = layer._get_layers_for_all_choices()
for version in versions_of_one_layer:
copy_of_all_current_possible_combinations = copy.deepcopy(all_possible_layer_combinations)
for combination in copy_of_all_current_possible_combinations:
combination.append(version)
updated_combinations.append(combination)
all_possible_layer_combinations = updated_combinations
models = []
for combination in all_possible_layer_combinations:
new_model = NeuralNetworkRegressor(input_conversion=self._input_conversion, layers=combination)
models.append(new_model)
return models # type: ignore[return-value]
def predict(self, test_data: IPT) -> IFT:
"""
Make a prediction for the given test data.
The original Model is not modified.
Parameters
----------
test_data:
The data the network should predict.
Returns
-------
prediction:
The given test_data with an added "prediction" column at the end
Raises
------
ModelNotFittedError
If the model has not been fitted yet
"""
import torch
_init_default_device()
if not self._is_fitted or self._model is None:
raise ModelNotFittedError
if not self._input_conversion._is_predict_data_valid(test_data):
raise FeatureDataMismatchError
dataloader = self._input_conversion._data_conversion_predict(test_data, self._batch_size)
predictions = []
with torch.no_grad():
for x in dataloader:
elem = self._model(x)
if not isinstance(elem, torch.Tensor) and hasattr(elem, "reconstruction"):
elem = elem.reconstruction # pragma: no cover
elif not isinstance(elem, torch.Tensor):
raise ValueError(f"Output of model has unsupported type: {type(elem)}") # pragma: no cover
predictions.append(elem.squeeze(dim=1))
return self._input_conversion._data_conversion_output(
test_data,
torch.cat(predictions, dim=0),
)
@property
def is_fitted(self) -> bool:
"""Whether the model is fitted."""
return self._is_fitted
@property
def input_size(self) -> int | ModelImageSize | None:
"""The input size of the model."""
# TODO: raise if not fitted, don't return None
return self._input_size
def _contains_choices(self) -> bool:
"""Whether the model contains choices in any layer."""
return any(layer._contains_choices() for layer in self._layers)