Dataset with sliding window of multiple csv files

I’m trying to build a data pipeline using tf.data that would build a time series of the past 5 rows. Since I have more than 6000 different csv files, these cannot fit in memory and cannot be preprocessed and saved in disk due to size.

To read the 6K csv’s I’m trying to use tf.data.experimental.make_csv_dataset and apply an overlapping window with the window function. The closest I’ve got so far is:

dataset = tf.data.experimental.make_csv_dataset(
    file_pattern="/path/stock/*1min*.csv",
    batch_size=1,
    num_epochs=1,
    shuffle=False,
    header=False,
    column_names=['timestamp','open','high', 'low', 'close', 'volume'],
    column_defaults=[tf.string, tf.float32, tf.float32, tf.float32, tf.float32, tf.float32]
).window(
    size=5,  # Number of rows per window
    shift=1,  # Stride for overlapping windows
    stride=1
)

Ideally I should end up with a shape of (M, 5 timesteps, 4 [open, low, high, close])

I’m trying to write a map function that would take the dataset of datasets and convert it so that I can feed that to the model.

The issues I’m facing are: Im not able to write the map function to build and return the arrays or each timestep. I havent found a way to group the timesteps by date, as I dont want to build timesteps spanning multiple days. Lastly even if I attach a shuffle to the window dataset, it seems to only shuffle from a single file and not from the whole dataset.

How to solve this issue? Is there a better way to accomplish this task? Any help is greatly appreciated.

1 Like

To address your data pipeline issues for handling multiple CSV files with TensorFlow, follow these steps:

  1. Preprocess CSV Files: Add a ‘day’ column to each CSV if not already present, based on the timestamp, to ensure windows don’t span multiple days.
  2. Read and Window Data:
  • Use tf.data.experimental.make_csv_dataset to read CSV files.
  • Apply a custom map function to preprocess data and ensure windows are within the same day.
  • Use window, flat_map, and batch to create overlapping windows without crossing day boundaries.
  1. Shuffling:
  • Use Dataset.shuffle with a sufficiently large buffer to shuffle data across all windows, ensuring a mix of data from different files.
  1. Batching and Prefetching:
  • Batch the dataset according to your model requirements and use prefetch to improve pipeline efficiency.

This approach should help you manage the large dataset efficiently, respecting the time series nature and memory constraints.

Hi @Tim_Wolfe, thanks for your reply.

I agree with your approach and its what I have been trying. However its the transformations that I’m stuck with.
The window function returns the following structure.

-WindowDataset
--OrderedDict
---VariantDataset
----Tensor (single element)
----Tensor...

And I have failed to perform any transformation.

Additionally,

dataset = tf.data.experimental.make_csv_dataset(
    file_pattern="/path/stock/*1min*.csv",
    batch_size=1,
    num_epochs=1,
    shuffle=False,
    header=False,
    column_names=['timestamp','open','high', 'low', 'close', 'volume'],
    column_defaults=[tf.string, tf.float32, tf.float32, tf.float32, tf.float32, tf.float32]
).window(
    size=5,  # Number of rows per window
    shift=1,  # Stride for overlapping windows
    stride=1
).flat_map(lambda window: window.batch(5))

Does not work because of the following error

AttributeError                            Traceback (most recent call last)

<ipython-input-47-46d1550f08a0> in <cell line: 1>()
     11     shift=1,  # Stride for overlapping windows
     12     stride=1
---> 13 ).flat_map(lambda window: window.batch(5))

19 frames

/tmp/__autograph_generated_filersrgq3km.py in <lambda>(lscope)
      3 
      4     def inner_factory(ag__):
----> 5         tf__lam = lambda window: ag__.with_function_scope(lambda lscope: ag__.converted_call(window.batch, (5,), None, lscope), 'lscope', ag__.STD)
      6         return tf__lam
      7     return inner_factory

AttributeError: in user code:

    File "<ipython-input-47-46d1550f08a0>", line 13, in None  *
        lambda window: window.batch(5)

    AttributeError: 'collections.OrderedDict' object has no attribute 'batch'

If I try to batch the dataset of the OrderedDict, Its also giving me this error:

AttributeError                            Traceback (most recent call last)

<ipython-input-59-16e14170c082> in <cell line: 25>()
     23 
     24 
---> 25 data = dataset.map(extract)
     26 
     27 

35 frames

/usr/local/lib/python3.10/dist-packages/tensorflow/python/framework/tensor.py in __getattr__(self, name)
    259         tf.experimental.numpy.experimental_enable_numpy_behavior()
    260       """)
--> 261     self.__getattribute__(name)
    262 
    263   @property

AttributeError: in user code:

    File "<ipython-input-59-16e14170c082>", line 3, in extract  *
        opens = data.get('open').flat_map(lambda x: x.batch(5))

    AttributeError: 'SymbolicTensor' object has no attribute 'batch'

[Best Guess DeepMind Assist]

It looks like you’re encountering issues with transforming a windowed dataset in TensorFlow, specifically when dealing with the structure returned by the window function and attempting to apply transformations like batch on it. The root of your problem lies in how the windowed dataset is structured and the nature of operations like batch and map when applied to it.

The window method creates a dataset of windows, where each window contains a dataset of elements. When you use flat_map, it expects a transformation that converts each window (which is itself a dataset) into a new dataset. However, the error indicates that an attempt to call .batch(5) on an OrderedDict object is made, which is not valid because .batch() is a method applicable to datasets, not to OrderedDict objects.

To resolve this issue, you need to first transform the inner datasets (contained within each window) into the desired structure before you can apply operations like batch. Here’s a revised approach:

1.	Understand the structure of your windowed dataset: Each window is a dataset containing an OrderedDict of columns, where each column is a dataset.
2.	Define a transformation function: This function should take a window (which is a dataset of OrderedDicts), and apply transformations to the inner datasets (the columns in the OrderedDict). You might want to convert these inner datasets into tensors, and then stack or batch them as needed.
3.	Apply the transformation using flat_map: The transformation function should return a dataset, which flat_map can then flatten.

Here’s an example of how you can define such a transformation function and apply it:

def transform_window(window):
# Assuming the window is a dataset of OrderedDicts, each representing a row in the window
def transform_row(row):
# Convert the OrderedDict values (which are datasets) into tensors
return tf.stack([tf.convert_to_tensor(value) for value in row.values()], axis=-1)

# Apply the transformation to each row in the window, and then batch the results
return window.map(transform_row).batch(5)

Apply the transformation to each window in the dataset

transformed_dataset = dataset.flat_map(transform_window)

In this example, transform_row converts each row (an OrderedDict of column datasets) into a tensor by stacking the column values. Then, transform_window applies this transformation to each row in the window and batches the transformed rows. Finally, flat_map is used to apply transform_window to each window in the original dataset, effectively flattening the result into a single dataset of batched windows.

This approach should resolve the errors you’re encountering and allow you to perform the desired transformations on your windowed dataset.

There is one thing that its missing: The VariantDataset, contains a list of tensors of size 1. For that reason I dont think the proposed solution is working:

def transform_row(row):
  return tf.stack([tf.convert_to_tensor(value) for value in row.values()], axis=-1)
d = dataset.map(transform_row)

Throws the following error:

---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

<ipython-input-145-faf21917a924> in <cell line: 3>()
      1 def transform_row(row):
      2   return tf.stack([tf.convert_to_tensor(value) for value in row.values()], axis=-1)
----> 3 d = dataset.map(transform_row)

22 frames

/usr/local/lib/python3.10/dist-packages/tensorflow/python/framework/tensor_util.py in make_tensor_proto(values, dtype, shape, verify_shape, allow_broadcast)
    611       str_values = [compat.as_bytes(x) for x in proto_values]
    612     except TypeError:
--> 613       raise TypeError(f"Failed to convert elements of {values} to Tensor. "
    614                       "Consider casting elements to a supported type. See "
    615                       "https://www.tensorflow.org/api_docs/python/tf/dtypes "

TypeError: in user code:

    File "<ipython-input-142-e1687e4812a4>", line 2, in transform_row  *
        return tf.stack([tf.convert_to_tensor(value) for value in row.values()], axis=-1)

    TypeError: Failed to convert elements of <_VariantDataset element_spec=TensorSpec(shape=(None,), dtype=tf.string, name=None)> to Tensor. Consider casting elements to a supported type. See https://www.tensorflow.org/api_docs/python/tf/dtypes for supported TF dtypes.

The structure of the Window dataset is as follows:

take = dataset.take(1)
print (f'{type(take)}')
for data in take:
  print (f'\t{type(data)}')
  for key, value in data.items():
    print (f'\t\t{type(value)} --> {key}')
    for tensor in value:
      print(f'\t\t\t{type(tensor)} --> {tf.size(tensor)} --> {tensor[0]}')

Results in

<class 'tensorflow.python.data.ops.take_op._TakeDataset'>
	<class 'collections.OrderedDict'>
		<class 'tensorflow.python.data.ops.dataset_ops._VariantDataset'> --> timestamp
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> b'2005-01-03 08:00:00'
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> b'2005-01-03 08:02:00'
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> b'2005-01-03 08:03:00'
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> b'2005-01-03 08:04:00'
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> b'2005-01-03 08:07:00'
		<class 'tensorflow.python.data.ops.dataset_ops._VariantDataset'> --> open
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9979000091552734
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9902999997138977
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9994999766349792
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0003000497817993
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0011999607086182
		<class 'tensorflow.python.data.ops.dataset_ops._VariantDataset'> --> high
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9983999729156494
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9902999997138977
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9995999932289124
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0025999546051025
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0011999607086182
		<class 'tensorflow.python.data.ops.dataset_ops._VariantDataset'> --> low
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9979000091552734
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9902999997138977
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9994999766349792
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0003000497817993
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0010000467300415
		<class 'tensorflow.python.data.ops.dataset_ops._VariantDataset'> --> close
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9983999729156494
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9902999997138977
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 0.9995999932289124
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0025999546051025
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 1.0010000467300415
		<class 'tensorflow.python.data.ops.dataset_ops._VariantDataset'> --> volume
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 45594.0
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 354001.0
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 19540.0
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 187845.0
			<class 'tensorflow.python.framework.ops.EagerTensor'> --> 1 --> 58620.0