Functions used to preprocess time series (both X and y).
dsid = 'NATOPS'
X, y, splits = get_UCR_data(dsid, return_split=False)
tfms = [None, Categorize()]
dsets = TSDatasets(X, y, tfms=tfms, splits=splits)

class ToNumpyCategory[source]

ToNumpyCategory(**kwargs) :: Transform

Categorize a numpy batch

t = ToNumpyCategory()
y_cat = t(y)
y_cat[:10]
array([3, 2, 2, 3, 2, 4, 0, 5, 2, 1])
test_eq(t.decode(tensor(y_cat)), y)
test_eq(t.decode(np.array(y_cat)), y)

class OneHot[source]

OneHot(n_classes=None, **kwargs) :: Transform

One-hot encode/ decode a batch

oh_encoder = OneHot()
y_cat = ToNumpyCategory()(y)
oht = oh_encoder(y_cat)
oht[:10]
array([[0., 0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0.],
       [1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1.],
       [0., 0., 1., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0.]])
n_classes = 10
n_samples = 100

t = torch.randint(0, n_classes, (n_samples,))
oh_encoder = OneHot()
oht = oh_encoder(t)
test_eq(oht.shape, (n_samples, n_classes))
test_eq(torch.argmax(oht, dim=-1), t)
test_eq(oh_encoder.decode(oht), t)
n_classes = 10
n_samples = 100

a = np.random.randint(0, n_classes, (n_samples,))
oh_encoder = OneHot()
oha = oh_encoder(a)
test_eq(oha.shape, (n_samples, n_classes))
test_eq(np.argmax(oha, axis=-1), a)
test_eq(oh_encoder.decode(oha), a)

class Nan2Value[source]

Nan2Value(value=0, median=True, by_sample_and_var=False) :: Transform

Replaces any nan values by a predefined value

o = TSTensor(torch.randn(16, 10, 100))
o[0,0] = float('nan')
o[o > .9] = float('nan')
o[[0,1,5,8,14,15], :, -20:] = float('nan')
nan_vals1 = torch.isnan(o).sum()
o2 = Pipeline(Nan2Value(median=True, by_sample_and_var=True), split_idx=0)(o.clone())
nan_vals2 = torch.isnan(o2).sum()
test_ne(nan_vals1, 0)
test_eq(nan_vals2, 0)

class TSStandardize[source]

TSStandardize(mean=None, std=None, by_sample=False, by_var=False, by_step=False, eps=1e-08, verbose=False) :: Transform

Standardizes batch of type TSTensor

Tensor.mul_min[source]

Tensor.mul_min(x:NumpyTensor'>), axes=(), keepdim=False)

TSTensor.mul_min[source]

TSTensor.mul_min(x:NumpyTensor'>), axes=(), keepdim=False)

NumpyTensor.mul_min[source]

NumpyTensor.mul_min(x:NumpyTensor'>), axes=(), keepdim=False)

Tensor.mul_max[source]

Tensor.mul_max(x:NumpyTensor'>), axes=(), keepdim=False)

TSTensor.mul_max[source]

TSTensor.mul_max(x:NumpyTensor'>), axes=(), keepdim=False)

NumpyTensor.mul_max[source]

NumpyTensor.mul_max(x:NumpyTensor'>), axes=(), keepdim=False)

class TSNormalize[source]

TSNormalize(min=None, max=None, range=(-1, 1), by_sample=False, by_var=False, by_step=False, verbose=False) :: Transform

Normalizes batch of type TSTensor

batch_tfms=[TSStandardize(by_sample=True, by_var=False, verbose=True)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
TSStandardize mean shape=torch.Size([128, 1, 1]), std shape=torch.Size([128, 1, 1]), by_sample=True, by_var=False, by_step=False

from tsai.data.validation import TimeSplitter
X_nan = np.random.rand(100, 5, 10)
idxs = np.random.choice(len(X_nan), int(len(X_nan)*.5), False)
X_nan[idxs, 0] = float('nan')
idxs = np.random.choice(len(X_nan), int(len(X_nan)*.5), False)
X_nan[idxs, 1, -10:] = float('nan')
batch_tfms = TSStandardize(by_var=True)
dls = get_ts_dls(X_nan, batch_tfms=batch_tfms, splits=TimeSplitter(show_plot=False)(range_of(X_nan)))
test_eq(torch.isnan(dls.after_batch[0].mean).sum(), 0)
test_eq(torch.isnan(dls.after_batch[0].std).sum(), 0)
xb = first(dls.train)[0]
test_ne(torch.isnan(xb).sum(), 0)
test_ne(torch.isnan(xb).sum(), torch.isnan(xb).numel())
batch_tfms = [TSStandardize(by_var=True), Nan2Value()]
dls = get_ts_dls(X_nan, batch_tfms=batch_tfms, splits=TimeSplitter(show_plot=False)(range_of(X_nan)))
xb = first(dls.train)[0]
test_eq(torch.isnan(xb).sum(), 0)
batch_tfms=[TSStandardize(by_sample=True, by_var=False, verbose=False)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
xb, yb = next(iter(dls.valid))
test_close(xb.mean(), 0, eps=1e-1)
test_close(xb.std(), 1, eps=1e-1)
batch_tfms=[TSNormalize(by_sample=True, by_var=False, verbose=False)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
assert xb.max() <= 1
assert xb.min() >= -1
test_close(xb.min(), -1, eps=1e-1)
test_close(xb.max(), 1, eps=1e-1)
xb, yb = next(iter(dls.valid))
test_close(xb.min(), -1, eps=1e-1)
test_close(xb.max(), 1, eps=1e-1)

class TSClipOutliers[source]

TSClipOutliers(min=None, max=None, by_sample=False, by_var=False, verbose=False) :: Transform

Clip outliers batch of type TSTensor based on the IQR

batch_tfms=[TSClipOutliers(-1, 1, verbose=True)]
dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, bs=128, num_workers=0, after_batch=batch_tfms)
xb, yb = next(iter(dls.train))
assert xb.max() <= 1
assert xb.min() >= -1
test_close(xb.min(), -1, eps=1e-1)
test_close(xb.max(), 1, eps=1e-1)
xb, yb = next(iter(dls.valid))
test_close(xb.min(), -1, eps=1e-1)
test_close(xb.max(), 1, eps=1e-1)
TSClipOutliers min=-1, max=1

class TSClip[source]

TSClip(min=-6, max=6) :: Transform

Clip batch of type TSTensor

t = TSTensor(torch.randn(10, 20, 100)*10)
test_le(TSClip()(t).max().item(), 6)
test_ge(TSClip()(t).min().item(), -6)

class TSRobustScale[source]

TSRobustScale(median=None, min=None, max=None, by_sample=False, by_var=False, verbose=False) :: Transform

This Scaler removes the median and scales the data according to the quantile range (defaults to IQR: Interquartile Range)

dls = TSDataLoaders.from_dsets(dsets.train, dsets.valid, num_workers=0)
xb, yb = next(iter(dls.train))
clipped_xb = TSRobustScale(by_sample=true)(xb)
test_ne(clipped_xb, xb)
clipped_xb.min(), clipped_xb.max(), xb.min(), xb.max()
(-0.5988853573799133,
 1.0738041400909424,
 -2.6447908878326416,
 2.5364139080047607)

class TSDiff[source]

TSDiff(lag=1, pad=True) :: Transform

Differences batch of type TSTensor

t = TSTensor(torch.arange(24).reshape(2,3,4))
test_eq(TSDiff()(t)[..., 1:].float().mean(), 1)
test_eq(TSDiff(lag=2, pad=False)(t).float().mean(), 2)

class TSLog[source]

TSLog(enc=None, dec=None, split_idx=None, order=None) :: Transform

Log transforms batch of type TSTensor. For positive values only

t = TSTensor(torch.rand(2,3,4))
enc_t = TSLog()(t)
test_ne(enc_t, t)
test_close(TSLog().decodes(enc_t).data, t.data)

class TSLogReturn[source]

TSLogReturn(lag=1, pad=True) :: Transform

Calculates log-return of batch of type TSTensor. For positive values only

t = TSTensor([1,2,4,8,16,32,64,128,256]).float()
test_eq(TSLogReturn(pad=False)(t).std(), 0)

class TSAdd[source]

TSAdd(add) :: Transform

Add a defined amount to each batch of type TSTensor.

t = TSTensor([1,2,3]).float()
test_eq(TSAdd(1)(t), TSTensor([2,3,4]).float())

y transforms

from sklearn.preprocessing import *

class TargetTransformer():
    def __init__(self, preprocessor, **kwargs): 
        self.preprocessor = preprocessor(**kwargs)
        self.kwargs = kwargs
        
    def fit(self, o): 
        if o.ndim == 1: o = o.reshape(-1,1)
        self.fit_preprocessor = self.preprocessor.fit(o)
        return self.fit_preprocessor
    
    def transform(self, o, copy=True):
        if type(o) in [float, int]: o = array([o]).reshape(-1,1)
        o_shape = o.shape
        if o.ndim == 1: o = o.reshape(-1,1)
        if hasattr(self.fit_preprocessor.transform, 'copy'):
            return self.fit_preprocessor.transform(o, copy=copy).reshape(*o_shape)
        else: 
            return self.fit_preprocessor.transform(o).reshape(*o_shape)
    
    def inverse_transform(self, o, copy=True):
        o_shape = o.shape
        if o.ndim == 1: o = o.reshape(-1,1)
        if hasattr(self.fit_preprocessor.inverse_transform, 'copy'):
            return self.fit_preprocessor.inverse_transform(o, copy=copy).reshape(*o_shape)
        else: 
            return self.fit_preprocessor.inverse_transform(o).reshape(*o_shape)
        
Normalizer = partial(MinMaxScaler, feature_range=(-1, 1))
BoxCox = partial(PowerTransformer, method='box-cox')
YeoJohnshon = partial(PowerTransformer, method='yeo-johnson')
Quantile = partial(QuantileTransformer, n_quantiles=100, output_distribution='normal')
from tsai.data.validation import TimeSplitter
y = random_shuffle(np.random.randn(1000) * 10 + 5)
splits = TimeSplitter()(y)
preprocessor = TargetTransformer(StandardScaler)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
y = random_shuffle(np.random.randn(1000) * 10 + 5)
splits = TimeSplitter()(y)
preprocessor = TargetTransformer(RobustScaler)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
y = random_shuffle(np.random.rand(1000) * 3 + .5)
splits = TimeSplitter()(y)
preprocessor = TargetTransformer(Normalizer)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
y = random_shuffle(np.random.rand(1000) * 10 + 5)
splits = TimeSplitter()(y)
preprocessor = TargetTransformer(BoxCox)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
y = random_shuffle(np.random.randn(1000) * 10 + 5)
y = np.random.beta(.5, .5, size=1000)
splits = TimeSplitter()(y)
preprocessor = TargetTransformer(YeoJohnshon)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()
y = - np.random.beta(1, .5, 10000) * 10
splits = TimeSplitter()(y)
preprocessor = TargetTransformer(Quantile)
preprocessor.fit(y[splits[0]])
y_tfm = preprocessor.transform(y)
test_close(preprocessor.inverse_transform(y_tfm), y, 1e-1)
plt.hist(y, 50, label='ori',)
plt.hist(y_tfm, 50, label='tfm')
plt.legend(loc='best')
plt.show()

ReLabeler[source]

ReLabeler(cm)

Changes the labels in a dataset based on a dictionary (class mapping) Args: cm = class mapping dictionary

vals = {0:'a', 1:'b', 2:'c', 3:'d', 4:'e'}
y = np.array([vals[i] for i in np.random.randint(0, 5, 20)])
labeler = ReLabeler(dict(a='x', b='x', c='y', d='z', e='z'))
y_new = labeler(y)
test_eq(y.shape, y_new.shape)
y, y_new
(array(['e', 'e', 'b', 'd', 'e', 'b', 'a', 'a', 'b', 'b', 'e', 'e', 'a',
        'a', 'd', 'e', 'e', 'd', 'b', 'd'], dtype='<U1'),
 array(['z', 'z', 'x', 'z', 'z', 'x', 'x', 'x', 'x', 'x', 'z', 'z', 'x',
        'x', 'z', 'z', 'z', 'z', 'x', 'z'], dtype='<U1'))