Skip to content

Commit

Permalink
Fix version control speed for first commit (#1192)
Browse files Browse the repository at this point in the history
  • Loading branch information
AbhinavTuli authored Sep 18, 2021
1 parent 56e1789 commit 5910c27
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 72 deletions.
2 changes: 1 addition & 1 deletion hub/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
TENSOR_INFO_FILENAME = "tensor_info.json"

DATASET_LOCK_FILENAME = "dataset_lock.lock"
TENSOR_COMMIT_CHUNK_LIST_FILENAME = "chunk_list"
TENSOR_COMMIT_CHUNK_SET_FILENAME = "chunk_set"

DATASET_LOCK_UPDATE_INTERVAL = 120 # seconds
DATASET_LOCK_VALIDITY = 300 # seconds
Expand Down
86 changes: 46 additions & 40 deletions hub/core/chunk_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from hub.compression import get_compression_type, BYTE_COMPRESSION, IMAGE_COMPRESSION
from hub.core.version_control.commit_node import CommitNode # type: ignore
from hub.core.version_control.commit_chunk_list import CommitChunkList # type: ignore
from hub.core.version_control.commit_chunk_set import CommitChunkSet # type: ignore
from hub.core.fast_forwarding import ffw_chunk_id_encoder
from hub.core.compression import decompress_array
from hub.core.sample import Sample, SampleValue # type: ignore
Expand All @@ -24,14 +24,14 @@
get_chunk_key,
get_chunk_id_encoder_key,
get_tensor_meta_key,
get_tensor_commit_chunk_list_key,
get_tensor_commit_chunk_set_key,
)
from hub.util.exceptions import (
CorruptedMetaError,
DynamicTensorNumpyError,
)
from hub.util.casting import get_dtype, intelligent_cast
from hub.util.version_control import auto_checkout, commit, commit_chunk_list_exists
from hub.util.version_control import auto_checkout, commit, commit_chunk_set_exists


def is_uniform_sequence(samples):
Expand Down Expand Up @@ -174,28 +174,28 @@ def chunk_id_encoder(self) -> ChunkIdEncoder:
return enc

@property
def commit_chunk_list(self) -> CommitChunkList:
"""Gets the commit chunk list from cache, if one is not found it creates a blank one.
def commit_chunk_set(self) -> CommitChunkSet:
"""Gets the commit chunk set from cache, if one is not found it creates a blank one.
Returns:
CommitChunkList: The commit chunk list keeps track of all the chunks present in the current commit.
CommitChunkSet: The commit chunk set keeps track of all the chunks present in the current commit.
"""
commit_id = self.version_state["commit_id"]
if commit_id == FIRST_COMMIT_ID:
# the first commit doesn't need a commit chunk list
# the first commit doesn't need a commit chunk set
return None
key = get_tensor_commit_chunk_list_key(self.key, commit_id)
if not self.commit_chunk_list_exists:
clist = CommitChunkList()
self.meta_cache[key] = clist
return clist
key = get_tensor_commit_chunk_set_key(self.key, commit_id)
if not self.commit_chunk_set_exists:
cset = CommitChunkSet()
self.meta_cache[key] = cset
return cset

clist = self.meta_cache.get_cachable(key, CommitChunkList)
return clist
cset = self.meta_cache.get_cachable(key, CommitChunkSet)
return cset

@property
def commit_chunk_list_exists(self) -> bool:
return commit_chunk_list_exists(self.version_state, self.meta_cache, self.key)
def commit_chunk_set_exists(self) -> bool:
return commit_chunk_set_exists(self.version_state, self.meta_cache, self.key)

@property
def chunk_id_encoder_exists(self) -> bool:
Expand Down Expand Up @@ -223,13 +223,13 @@ def num_samples(self) -> int:
def last_chunk(self) -> Optional[Chunk]:
if self.num_chunks == 0:
return None
chunk_commit_id = self.get_chunk_commit(self.last_chunk_key)
last_chunk = self.get_chunk(self.last_chunk_key)
chunk_name = self.last_chunk_name
chunk_commit_id = self.get_chunk_commit(chunk_name)
chunk_key = get_chunk_key(self.key, chunk_name, chunk_commit_id)
chunk = self.get_chunk(chunk_key)
if chunk_commit_id != self.version_state["commit_id"]:
last_chunk_name = self.chunk_id_encoder.get_name_for_chunk(-1)
last_chunk = self.copy_chunk_to_new_commit(last_chunk, last_chunk_name)

return last_chunk
chunk = self.copy_chunk_to_new_commit(chunk, chunk_name)
return chunk

def get_chunk(self, chunk_key: str) -> Chunk:
return self.cache.get_cachable(chunk_key, Chunk)
Expand All @@ -239,25 +239,33 @@ def get_chunk_commit(self, chunk_name) -> str:
cur_node: CommitNode = self.version_state["commit_node"]
while cur_node is not None:
commit_id = cur_node.commit_id
chunk_list_key = get_tensor_commit_chunk_list_key(self.key, commit_id)
chunk_set_key = get_tensor_commit_chunk_set_key(self.key, commit_id)
try:
chunk_list = self.meta_cache.get_cachable(
chunk_list_key, CommitChunkList
).chunks
# the first commit doesn't contain a chunk set, don't repeatedly try to fetch from storage
if commit_id == FIRST_COMMIT_ID:
chunk_set = set()
else:
chunk_set = self.meta_cache.get_cachable(
chunk_set_key, CommitChunkSet
).chunks
except Exception:
chunk_list = []
if chunk_name in chunk_list:
chunk_set = set()
if chunk_name in chunk_set:
return commit_id
cur_node = cur_node.parent
# the first commit doesn't have a commit chunk list, so any chunk that wasn't found belongs to the first commit
# the first commit doesn't have a commit chunk set, so any chunk that wasn't found belongs to the first commit
return FIRST_COMMIT_ID

@property
def last_chunk_key(self) -> str:
last_chunk_name = self.chunk_id_encoder.get_name_for_chunk(-1)
last_chunk_name = self.last_chunk_name
commit_id = self.get_chunk_commit(last_chunk_name)
return get_chunk_key(self.key, last_chunk_name, commit_id)

@property
def last_chunk_name(self) -> str:
return self.chunk_id_encoder.get_name_for_chunk(-1)

@property
def tensor_meta(self):
tensor_meta_key = get_tensor_meta_key(self.key, self.version_state["commit_id"])
Expand Down Expand Up @@ -412,13 +420,11 @@ def _synchronize_cache(self, chunk_keys: List[str] = None):
chunk_id_key = get_chunk_id_encoder_key(self.key, commit_id)
self.meta_cache[chunk_id_key] = self.chunk_id_encoder

# first commit doesn't have commit chunk list
# first commit doesn't have commit chunk set
if commit_id != FIRST_COMMIT_ID:
# synchronize current chunk list, all older ones are immutable
commit_chunk_list_key = get_tensor_commit_chunk_list_key(
self.key, commit_id
)
self.meta_cache[commit_chunk_list_key] = self.commit_chunk_list
# synchronize current chunk set, all older ones are immutable
commit_chunk_set_key = get_tensor_commit_chunk_set_key(self.key, commit_id)
self.meta_cache[commit_chunk_set_key] = self.commit_chunk_set

def _try_appending_to_last_chunk(
self, buffer: memoryview, shape: Tuple[int]
Expand Down Expand Up @@ -480,8 +486,8 @@ def _create_new_chunk(self):
chunk = Chunk()
chunk_name = ChunkIdEncoder.name_from_id(chunk_id)
chunk_key = get_chunk_key(self.key, chunk_name, self.version_state["commit_id"])
if self.commit_chunk_list is not None:
self.commit_chunk_list.append(chunk_name)
if self.commit_chunk_set is not None:
self.commit_chunk_set.add(chunk_name)
self.cache[chunk_key] = chunk
return chunk

Expand Down Expand Up @@ -768,8 +774,8 @@ def copy_chunk_to_new_commit(self, chunk, chunk_name):
chunk = chunk.copy()
chunk.key = new_chunk_key
self.cache[new_chunk_key] = chunk
if self.commit_chunk_list is not None:
self.commit_chunk_list.append(chunk_name)
if self.commit_chunk_set is not None:
self.commit_chunk_set.add(chunk_name)
return chunk


Expand Down
3 changes: 1 addition & 2 deletions hub/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def __init__(
self.public = public
self.verbose = verbose
self.version_state: Dict[str, Any] = version_state or {}

self._set_derived_attributes()

def _lock_lost_handler(self):
Expand Down Expand Up @@ -642,7 +641,7 @@ def _all_tensors_filtered(self) -> List[str]:

@property
def tensors(self) -> Dict[str, Tensor]:
"""All tensors belonging to this group, including those within sub groups"""
"""All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors."""
return {t: self[t] for t in self._all_tensors_filtered}

@property
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
from typing import List
from typing import Set
from hub.core.storage.cachable import Cachable


class CommitChunkList(Cachable):
"""Stores list of chunks stored for a particular tensor in a commit."""
class CommitChunkSet(Cachable):
"""Stores set of chunks stored for a particular tensor in a commit."""

def __init__(self) -> None:
self.chunks: List[str] = []
self.chunks: Set[str] = set()

def tobytes(self) -> bytes:
"""Dumps self.chunks in csv format."""
return bytes(",".join(self.chunks), "utf-8")

@classmethod
def frombuffer(cls, buffer: bytes):
"""Loads a CommitChunkList from a buffer."""
"""Loads a CommitChunkSet from a buffer."""
instance = cls()
instance.chunks = buffer.decode("utf-8").split(",")
instance.chunks = set(buffer.decode("utf-8").split(","))
return instance

@property
Expand All @@ -25,6 +25,6 @@ def nbytes(self) -> int:
return 0
return 8 + ((len(self.chunks) - 1) * 9)

def append(self, chunk_name: str) -> None:
"""Adds a new chunk name to the CommitChunkList."""
self.chunks.append(chunk_name)
def add(self, chunk_name: str) -> None:
"""Adds a new chunk name to the CommitChunkSet."""
self.chunks.add(chunk_name)
8 changes: 4 additions & 4 deletions hub/util/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
DATASET_META_FILENAME,
TENSOR_INFO_FILENAME,
TENSOR_META_FILENAME,
TENSOR_COMMIT_CHUNK_LIST_FILENAME,
TENSOR_COMMIT_CHUNK_SET_FILENAME,
VERSION_CONTROL_INFO_FILENAME,
)

Expand Down Expand Up @@ -57,10 +57,10 @@ def get_tensor_info_key(key: str, commit_id: str) -> str:
return posixpath.join("versions", commit_id, key, TENSOR_INFO_FILENAME)


def get_tensor_commit_chunk_list_key(key: str, commit_id: str) -> str:
def get_tensor_commit_chunk_set_key(key: str, commit_id: str) -> str:
if commit_id == FIRST_COMMIT_ID:
return posixpath.join(key, TENSOR_COMMIT_CHUNK_LIST_FILENAME)
return posixpath.join("versions", commit_id, key, TENSOR_COMMIT_CHUNK_LIST_FILENAME)
return posixpath.join(key, TENSOR_COMMIT_CHUNK_SET_FILENAME)
return posixpath.join("versions", commit_id, key, TENSOR_COMMIT_CHUNK_SET_FILENAME)


def get_chunk_id_encoder_key(key: str, commit_id: str) -> str:
Expand Down
Loading

0 comments on commit 5910c27

Please sign in to comment.