Skip to content

Optimize auto-select data loading process and update dataset classes #1110

New issue

Have a question about this project? Sign up for a free account to open an issue and contact its maintainers and the community.

By clicking “Sign up for ”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on ? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ads/opctl/operator/lowcode/common/data.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,13 +19,16 @@


class AbstractData(ABC):
def __init__(self, spec: dict, name="input_data"):
def __init__(self, spec, name="input_data", data=None):
self.Transformations = Transformations
self.data = None
self._data_dict = dict()
self.name = name
self.spec = spec
self.load_transform_ingest_data(spec)
if data is not None:
self.data = data
else:
self.load_transform_ingest_data(spec)

def get_raw_data_by_cat(self, category):
mapping = self._data_transformer.get_target_category_columns_map()
Expand Down
2 changes: 0 additions & 2 deletions ads/opctl/operator/lowcode/common/transformations.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -31,7 +31,6 @@ def __init__(self, dataset_info, name="historical_data"):
dataset_info : ForecastOperatorConfig
"""
self.name = name
self.has_artificial_series = False
self.dataset_info = dataset_info
self.target_category_columns = dataset_info.target_category_columns
self.target_column_name = dataset_info.target_column
Expand DownExpand Up@@ -136,7 +135,6 @@ def _set_series_id_column(self, df):
self._target_category_columns_map = {}
if not self.target_category_columns:
df[DataColumns.Series] = "Series 1"
self.has_artificial_series = True
else:
df[DataColumns.Series] = merge_category_columns(
df, self.target_category_columns
Expand Down
15 changes: 7 additions & 8 deletions ads/opctl/operator/lowcode/forecast/model/base_model.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -120,7 +120,7 @@ def generate_report(self):

# Generate metrics
summary_metrics = None
test_data = None
test_data = self.datasets.test_data
self.eval_metrics = None

if self.spec.generate_report or self.spec.generate_metrics:
Expand All@@ -130,12 +130,11 @@ def generate_report(self):
{"Series 1": self.original_target_column}, axis=1, inplace=True
)

if self.spec.test_data:
if self.datasets.test_data is not None:
try:
(
self.test_eval_metrics,
summary_metrics,
test_data,
summary_metrics
) = self._test_evaluate_metrics(
elapsed_time=elapsed_time,
)
Expand DownExpand Up@@ -361,7 +360,7 @@ def generate_report(self):
def _test_evaluate_metrics(self, elapsed_time=0):
total_metrics = pd.DataFrame()
summary_metrics = pd.DataFrame()
data = TestData(self.spec)
data = self.datasets.test_data

# Generate y_pred and y_true for each series
for s_id in self.forecast_output.list_series_ids():
Expand DownExpand Up@@ -398,7 +397,7 @@ def _test_evaluate_metrics(self, elapsed_time=0):
total_metrics = pd.concat([total_metrics, metrics_df], axis=1)

if total_metrics.empty:
return total_metrics, summary_metrics, data
return total_metrics, summary_metrics

summary_metrics = pd.DataFrame(
{
Expand DownExpand Up@@ -464,7 +463,7 @@ def _test_evaluate_metrics(self, elapsed_time=0):
]
summary_metrics = summary_metrics[new_column_order]

return total_metrics, summary_metrics, data
return total_metrics, summary_metrics

def _save_report(
self,
Expand DownExpand Up@@ -548,7 +547,7 @@ def _save_report(
)

# test_metrics csv report
if self.spec.test_data is not None:
if self.datasets.test_data is not None:
if test_metrics_df is not None:
test_metrics_df_formatted = test_metrics_df.reset_index().rename(
{"index": "metrics", "Series 1": metrics_col_name}, axis=1
Expand Down
30 changes: 19 additions & 11 deletions ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -23,8 +23,8 @@


class HistoricalData(AbstractData):
def __init__(self, spec: dict):
super().__init__(spec=spec, name="historical_data")
def __init__(self, spec, historical_data = None):
super().__init__(spec=spec, name="historical_data", data=historical_data)

def _ingest_data(self, spec):
try:
Expand DownExpand Up@@ -52,8 +52,11 @@ def _verify_dt_col(self, spec):


class AdditionalData(AbstractData):
def __init__(self, spec, historical_data):
if spec.additional_data is not None:
def __init__(self, spec, historical_data, additional_data=None):
if additional_data is not None:
super().__init__(spec=spec, name="additional_data", data=additional_data)
self.additional_regressors = list(self.data.columns)
elif spec.additional_data is not None:
super().__init__(spec=spec, name="additional_data")
add_dates = self.data.index.get_level_values(0).unique().tolist()
add_dates.sort()
Expand DownExpand Up@@ -110,14 +113,15 @@ def _ingest_data(self, spec):


class TestData(AbstractData):
def __init__(self, spec):
super().__init__(spec=spec, name="test_data")
def __init__(self, spec, test_data):
if test_data is not None or spec.test_data is not None:
super().__init__(spec=spec, name="test_data", data=test_data)
self.dt_column_name = spec.datetime_column.name
self.target_name = spec.target_column


class ForecastDatasets:
def __init__(self, config: ForecastOperatorConfig):
def __init__(self, config: ForecastOperatorConfig, historical_data=None, additional_data=None, test_data=None):
"""Instantiates the DataIO instance.

Properties
Expand All@@ -127,11 +131,15 @@ def __init__(self, config: ForecastOperatorConfig):
"""
self.historical_data: HistoricalData = None
self.additional_data: AdditionalData = None

self._horizon = config.spec.horizon
self._datetime_column_name = config.spec.datetime_column.name
self._target_col = config.spec.target_column
self._load_data(config.spec)
if historical_data is not None:
self.historical_data = HistoricalData(config.spec, historical_data)
self.additional_data = AdditionalData(config.spec, self.historical_data, additional_data)
else:
self._load_data(config.spec)
self.test_data = TestData(config.spec, test_data)

def _load_data(self, spec):
"""Loads forecasting input data."""
Expand DownExpand Up@@ -200,7 +208,7 @@ def get_horizon_at_series(self, s_id):
return self.get_data_at_series(s_id)[-self._horizon :]

def has_artificial_series(self):
return self.historical_data._data_transformer.has_artificial_series
return bool(self.historical_data.spec.target_category_columns)

def get_earliest_timestamp(self):
return self.historical_data.get_min_time()
Expand DownExpand Up@@ -251,7 +259,7 @@ def __init__(
target_column: str,
dt_column: str,
):
"""Forecast Output contains all of the details required to generate the forecast.csv output file.
"""Forecast Output contains all the details required to generate the forecast.csv output file.

init
-------
Expand Down
27 changes: 12 additions & 15 deletions ads/opctl/operator/lowcode/forecast/model_evaluator.py
Original file line numberDiff line numberDiff line change
Expand Up@@ -91,20 +91,13 @@ def create_operator_config(self, operator_config, backtest, model, historical_da
output_dir = operator_config.spec.output_directory.url
output_file_path = f'{output_dir}/back_testing/{model}/{backtest}'
Path(output_file_path).mkdir(parents=True, exist_ok=True)
historical_data_url = f'{output_file_path}/historical.csv'
additional_data_url = f'{output_file_path}/additional.csv'
test_data_url = f'{output_file_path}/test.csv'
historical_data.to_csv(historical_data_url, index=False)
additional_data.to_csv(additional_data_url, index=False)
test_data.to_csv(test_data_url, index=False)
backtest_op_config_draft = operator_config.to_dict()
backtest_spec = backtest_op_config_draft["spec"]
backtest_spec["historical_data"]["url"] = historical_data_url
backtest_spec["datetime_column"]["format"] = None
if backtest_spec["additional_data"]:
backtest_spec["additional_data"]["url"] = additional_data_url
backtest_spec["test_data"] = {}
backtest_spec["test_data"]["url"] = test_data_url
backtest_spec.pop("test_data")
backtest_spec.pop("additional_data")
backtest_spec.pop("historical_data")
backtest_spec["generate_report"] = False
backtest_spec["model"] = model
backtest_spec['model_kwargs'] = None
backtest_spec["output_directory"] = {"url": output_file_path}
Expand All@@ -119,19 +112,23 @@ def create_operator_config(self, operator_config, backtest, model, historical_da
def run_all_models(self, datasets: ForecastDatasets, operator_config: ForecastOperatorConfig):
cut_offs, train_sets, additional_data, test_sets = self.generate_k_fold_data(datasets, operator_config)
metrics = {}
date_col = operator_config.spec.datetime_column.name
for model in self.models:
from .model.factory import ForecastOperatorModelFactory
metrics[model] = {}
for i in range(len(cut_offs)):
try:
backtest_historical_data = train_sets[i]
backtest_additional_data = additional_data[i]
backtest_test_data = test_sets[i]
backtest_historical_data = train_sets[i].set_index([date_col, DataColumns.Series])
backtest_additional_data = additional_data[i].set_index([date_col, DataColumns.Series])
backtest_test_data = test_sets[i].set_index([date_col, DataColumns.Series])
backtest_operator_config = self.create_operator_config(operator_config, i, model,
backtest_historical_data,
backtest_additional_data,
backtest_test_data)
datasets = ForecastDatasets(backtest_operator_config)
datasets = ForecastDatasets(backtest_operator_config,
backtest_historical_data,
backtest_additional_data,
backtest_test_data)
ForecastOperatorModelFactory.get_model(
backtest_operator_config, datasets
).generate_report()
Expand Down
Loading