Dataset Count and Group By

I have a dataset defined as follows:

user_ids = [“user1”, “user2”, “user3”, “user3”, “user1”, “user2”, “user1”, “user2”, “user3”, “user1”]
item_ids = [“item1”, “item2”, “item1”, “item1”, “item2”, “item3”, “item2”, “item2”, “item1”, “item1”]
ratings = [0, 1, 1, 1, 0, 0, 0, 1, 0, 1]
ds = tf.data.Dataset.from_tensor_slices({“user_id”: user_ids, “item_id”: item_ids, “rating”: ratings})

I want to calculate a dataset result_ds with each item ID and the number unique users that rated the item a ‘1’. I’d like the solution to be able to handle tens of millions of records and not just this small example.

The result should look something like:

for element in result_ds:
print(element)

{‘item_id’: <tf.Tensor: shape=(), dtype=string, numpy=b’item1’>, ‘positive_count’: <tf.Tensor: shape=(), dtype=int64, numpy=2>}
{‘item_id’: <tf.Tensor: shape=(), dtype=string, numpy=b’item2’>, ‘positive_count’: <tf.Tensor: shape=(), dtype=int64, numpy=2>}
{‘item_id’: <tf.Tensor: shape=(), dtype=string, numpy=b’item3’>, ‘positive_count’: <tf.Tensor: shape=(), dtype=int64, numpy=1>}

I tried using ds.group_by_window but couldn’t get it to work.

Hi @rcauvin, I tried to achieve the desired output with the alternative approach. Please find the workaround below:

I have created a dataset using

user_ids = ['user1', 'user2', 'user3', 'user3', 'user1', 'user2', 'user1', 'user2', 'user3', 'user1']
item_ids = ['item1','item2', 'item1', 'item1', 'item2', 'item3', 'item2', 'item2', 'item1', 'item1']
ratings = [0, 1, 1, 1, 0, 0, 0, 1, 0, 1]
ds = tf.data.Dataset.from_tensor_slices({'user_id': user_ids, 'item_id': item_ids, 'rating': ratings})

After creating a dataset i have extracted all the positive rating using

positive_ds = ds.filter(lambda x: x["rating"] == 1)

Now i have taken items and ratings into 2 separate list

 for element in positive_ds.as_numpy_iterator():
  binary_item_list.append(element['item_id'])
  rating.append(element['rating'])

#output
binary_item_list [b'item2', b'item1', b'item1', b'item2', b'item1']
rating [1, 1, 1, 1, 1]

Now i have extracted items total positives using

result_dict = defaultdict(int)
item_ratings = []

for item_a, item_b in zip(binary_item_list, rating):
    result_dict[item_a] += item_b

for item, count in result_dict.items():
    item_ratings.append((item.decode('utf-8'), count))

#output
item_ratings [('item2', 2), ('item1', 3)]

I have separated those and created a dataset using

for item, rating in item_ratings:
     items.append(item)
     ratings.append(rating)
ds = tf.data.Dataset.from_tensor_slices({'item_id': items, 'positive_rating': ratings})

The elements in the dataset look like

{'item_id': <tf.Tensor: shape=(), dtype=string, numpy=b'item2'>, 'positive_rating': <tf.Tensor: shape=(), dtype=int32, numpy=2>}
{'item_id': <tf.Tensor: shape=(), dtype=string, numpy=b'item1'>, 'positive_rating': <tf.Tensor: shape=(), dtype=int32, numpy=3>}

Please refer to this gist for working code example. Thank You

1 Like

Thank you, Kiran. Your solution is elegant and works well on small, in-memory datasets. I want a solution that scales to tens of millions of records that may not all be in memory at once, so I was hoping for a solution that leverages graph execution. The output should also include items that received no positive ratings.

I was able to figure out a solution using group_by_window, though I’m not sure it’s using batching in the optimal way:

user_ids = tf.constant(["user1", "user2", "user3", "user3", "user1", "user2", "user1", "user2", "user3", "user1"])
item_ids = tf.constant(["item1", "item2", "item1", "item1", "item2", "item3", "item2", "item2", "item1", "item1"])
ratings = tf.constant([0, 1, 1, 1, 0, 0, 0, 1, 0, 1])

ds = tf.data.Dataset.from_tensor_slices({"user_id": user_ids, "item_id": item_ids, "rating": ratings})

# tf.data.Dataset.group_by_window uses integer keys, but our item IDs are strings.
# So we define a function that maps an item ID string to its index in an array of unique item IDs.
item_lookup = lambda item_id: tf.squeeze(tf.where(tf.equal(tf.unique(item_ids).y, item_id)))

batch_size = ds.cardinality() # Batches of users and ratings will never exceed the cardinality of the dataset.

# Function to compute the popularity of item. Item popularity is the
# ratio of the number of users who rated the item positively to the
# number of users who had an opportunity to rate it.
def reduce_fn(
  key,
  window):

  batched_window = window.map(lambda e: (e["item_id"], e["user_id"], e["rating"])).batch(batch_size)
  counted_window = batched_window.map(lambda item_id, user_id, rating: \
    {
    "item_id": item_id[0], # All item IDs in the window will be the same, so pick the first one.
    "positive_user_count": tf.size(tf.unique(tf.boolean_mask(user_id, rating)).y), # Count users who rated the item positively.
    "user_count": tf.size(tf.unique(user_id).y), # Count users who had the opportunity to rate the item.
    "popularity": tf.size(tf.unique(tf.boolean_mask(user_id, rating)).y) / tf.size(tf.unique(user_id).y) # Compute the ratio.
    }
  )
    
  return counted_window

item_count = len(tf.unique(item_ids).y)
print(f"Computing the popularity of {item_count} items using window size {batch_size}.", end = " ")
grouped_ds = ds.group_by_window(
  key_func = lambda elem: item_lookup(elem["item_id"]),
  reduce_func = reduce_fn,
  window_size = batch_size
)
print("Done.")

print('Results:')
for element in grouped_ds:
  print(element)

Output:

Computing the popularity of 3 items using window size 10. Done.
Results:
{'item_id': <tf.Tensor: shape=(), dtype=string, numpy=b'item1'>, 'positive_user_count': <tf.Tensor: shape=(), dtype=int32, numpy=2>, 'user_count': <tf.Tensor: shape=(), dtype=int32, numpy=2>, 'popularity': <tf.Tensor: shape=(), dtype=float64, numpy=1.0>}
{'item_id': <tf.Tensor: shape=(), dtype=string, numpy=b'item2'>, 'positive_user_count': <tf.Tensor: shape=(), dtype=int32, numpy=1>, 'user_count': <tf.Tensor: shape=(), dtype=int32, numpy=2>, 'popularity': <tf.Tensor: shape=(), dtype=float64, numpy=0.5>}
{'item_id': <tf.Tensor: shape=(), dtype=string, numpy=b'item3'>, 'positive_user_count': <tf.Tensor: shape=(), dtype=int32, numpy=0>, 'user_count': <tf.Tensor: shape=(), dtype=int32, numpy=1>, 'popularity': <tf.Tensor: shape=(), dtype=float64, numpy=0.0>}