Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Text Generation][V2] NonKVCachePipeline #1483

Merged
merged 72 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
3e00175
Pipelines Refactor - Initial Impl (#1287)
bfineran Oct 26, 2023
224e116
[Pipeline Refactor] Additional functionality, engine operator, linear…
dsikka Oct 31, 2023
58b0758
[v2] EngineOperator updates to make continuous batching easier (#1371)
bfineran Nov 1, 2023
e1ff108
[Pipeline Refactor] Update routes, text generation initial functional…
dsikka Nov 3, 2023
59457b7
[Pipeline Refactor] Additional Operators, Route update and completed …
dsikka Nov 3, 2023
f18d5f3
add split/join functionality
dsikka Nov 3, 2023
2c4d231
update router to include split/join in parent class, refactor pipelin…
dsikka Nov 7, 2023
672ca20
process multiple generations
dsikka Nov 7, 2023
304eb35
initial commit
dbogunowicz Nov 8, 2023
71515ac
fix error
dbogunowicz Nov 8, 2023
6f1b175
Merge remote-tracking branch 'origin/features/v2/run_multiple' into f…
dbogunowicz Nov 9, 2023
041174b
[Pipeline Refactor] Split/Join Functionality for multiple prompts (#1…
dsikka Nov 9, 2023
a508342
unit testing for text generation operators
dsikka Nov 6, 2023
cbb0e86
additional changes
dsikka Nov 7, 2023
2541581
unit testing completion
dsikka Nov 8, 2023
8c8989d
remove debug
dsikka Nov 8, 2023
f8d75e3
fix
dsikka Nov 8, 2023
fd1e466
add todo
dsikka Nov 8, 2023
64c0552
more clean-up
dsikka Nov 8, 2023
913665a
fix test
dsikka Nov 8, 2023
e15521f
add docstrings/comments
dsikka Nov 8, 2023
379481e
break out tests to individual unit test files; add conftest and make …
dsikka Nov 9, 2023
a90a20a
Merge remote-tracking branch 'origin/features/v2/unit_testing' into f…
dbogunowicz Nov 10, 2023
0a50d1d
[Pipeline Refactor] Unit Testing for Text Generation Operators (#1392)
dsikka Nov 10, 2023
c0c4240
Merge branch 'v2' into feature/damian/v2/factor_out_transformation_utils
dbogunowicz Nov 10, 2023
4f248dd
Delete tests/deepsparse/v2/unit/text_generation/test_msic.py
dbogunowicz Nov 13, 2023
20980a7
[Continuous Batching] Queue Implementation to support batching groupi…
bfineran Nov 13, 2023
d81012d
[Continuous Batching] Executor thread for running continuous batching…
bfineran Nov 13, 2023
5c48505
[ContinuousBatching] ContinuousBatchingScheduler Implementation (#1375)
bfineran Nov 13, 2023
e1b7f37
[continuous batching] singleton pattern for scheduler (#1391)
bfineran Nov 13, 2023
98f7a6d
Merge branch 'v2' into feature/damian/v2/factor_out_transformation_utils
dbogunowicz Nov 14, 2023
bbd534d
[Pipeline Refactor][Text-Generation] Create a helper function for cre…
dbogunowicz Nov 14, 2023
d1683b4
Merge branch 'v2' into feature/damian/v2/factor_out_transformation_utils
dbogunowicz Nov 14, 2023
51c4ee6
pipeline runs, but incorrectly
dbogunowicz Nov 17, 2023
fa96efb
it works for a single sequence
dbogunowicz Nov 20, 2023
e41ddf8
cleanup. now lets figure out how to run multiple sequences
dbogunowicz Nov 20, 2023
b80a417
[Pipeline Refactor][Text-Generation] Refactor `transformers` helpers …
dbogunowicz Nov 20, 2023
1b9238a
[Text Generation][V2] End-to-end tests (#1402)
dbogunowicz Nov 20, 2023
89f11e5
Merge remote-tracking branch 'origin/v2' into feature/damian/no_kv_cache
dbogunowicz Nov 21, 2023
9b441f5
integration tests pass
dbogunowicz Nov 21, 2023
c858b1f
[Pipeline Refactor][Text Generation][Continuous Batching] Integration…
dsikka Nov 21, 2023
bb3ff41
[Pipeline Refactor] Operator Registry (#1420)
dsikka Nov 21, 2023
19434e7
Merge remote-tracking branch 'origin/v2' into feature/damian/no_kv_cache
dbogunowicz Nov 22, 2023
90de2b3
fix tricky rebase
dbogunowicz Nov 22, 2023
66ca295
one more cleanup
dbogunowicz Nov 22, 2023
dcded1d
got tests to work after rebase. implementing SPLIT and JOIN in linear…
dbogunowicz Nov 22, 2023
127aa00
pipeline working, with GraphRouter. Needs some more testing
dbogunowicz Nov 22, 2023
af57698
ready for review
dbogunowicz Nov 27, 2023
4397c80
cleanup
dbogunowicz Nov 28, 2023
105b1d5
simplify after PR review round
dbogunowicz Dec 5, 2023
e15a24b
[Pipeline Refactor] Fix Operator scheduling to fix issue with slow ex…
dsikka Dec 5, 2023
36f742b
[Pipeline Refactor] Add `Pipeline.create` method to initialize pipeli…
dsikka Dec 5, 2023
c0267d9
[Pipeline Refactor] async (#1380)
dsikka Dec 5, 2023
cfa61b7
Merge branch 'main' into v2
dsikka Dec 5, 2023
2d9b0a1
rebase fixes (#1458)
dsikka Dec 5, 2023
a2aaa51
more fixes (#1459)
dsikka Dec 5, 2023
39be9a0
Merge remote-tracking branch 'origin/v2' into feature/damian/no_kv_cache
dbogunowicz Dec 6, 2023
dcab3f9
bring back functionalities that were lost in v2 during rebasing
dbogunowicz Dec 6, 2023
e0a9dee
Merge remote-tracking branch 'origin/main' into feature/damian/no_kv_…
dbogunowicz Dec 11, 2023
bc1b11e
Merge remote-tracking branch 'origin/main' into feature/damian/no_kv_…
dbogunowicz Dec 18, 2023
e5d2f39
Update src/deepsparse/transformers/helpers.py
dbogunowicz Dec 18, 2023
9ed5b06
ready for review
dbogunowicz Dec 18, 2023
1ac1f5c
bring tests back"
dbogunowicz Dec 18, 2023
a734459
quality
dbogunowicz Dec 18, 2023
60fa00f
original readme
dbogunowicz Dec 18, 2023
14b0dc0
Merge branch 'main' into feature/damian/no_kv_cache_retrieve
dbogunowicz Dec 20, 2023
9371990
addressing Dipikas comments
dbogunowicz Dec 20, 2023
4eed463
Update src/deepsparse/transformers/pipelines/text_generation/pipeline…
dbogunowicz Dec 20, 2023
0b17bd8
Merge branch 'main' into feature/damian/no_kv_cache_retrieve
dbogunowicz Dec 21, 2023
111d533
addressing PR review
dbogunowicz Dec 21, 2023
4370c52
Merge branch 'main' into feature/damian/no_kv_cache_retrieve
dbogunowicz Dec 21, 2023
8d352fc
Merge branch 'main' into feature/damian/no_kv_cache_retrieve
dbogunowicz Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions src/deepsparse/transformers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@
from onnx import ModelProto

from deepsparse.log import get_main_logger
from deepsparse.utils.onnx import (
_MODEL_DIR_ONNX_NAME,
model_to_path,
truncate_onnx_model,
)
from deepsparse.utils.onnx import MODEL_ONNX_NAME, model_to_path, truncate_onnx_model
dsikka marked this conversation as resolved.
Show resolved Hide resolved
from sparsezoo.utils import save_onnx


Expand All @@ -55,6 +51,7 @@ def setup_transformers_pipeline(
sequence_length: int,
tokenizer_padding_side: str = "left",
engine_kwargs: Optional[Dict] = None,
onnx_model_name: Optional[str] = None,
) -> Tuple[
str, transformers.PretrainedConfig, transformers.PreTrainedTokenizer, Dict[str, Any]
]:
Expand All @@ -65,10 +62,14 @@ def setup_transformers_pipeline(
:param sequence_length: The sequence length to use for the model
:param tokenizer_padding_side: The side to pad on for the tokenizer,
either "left" or "right"
:param onnx_model_name: The name of the onnx model to be loaded.
If not specified, defaults are used (see fetch_onnx_file_path)
:param engine_kwargs: The kwargs to pass to the engine
:return The model path, config, tokenizer, and engine kwargs
"""
model_path, config, tokenizer = fetch_onnx_file_path(model_path, sequence_length)
model_path, config, tokenizer = fetch_onnx_file_path(
model_path, sequence_length, onnx_model_name
)

tokenizer.padding_side = tokenizer_padding_side
if not tokenizer.pad_token:
Expand All @@ -89,6 +90,7 @@ def setup_transformers_pipeline(
def fetch_onnx_file_path(
model_path: str,
sequence_length: int,
onnx_model_name: Optional[str] = None,
task: Optional[str] = None,
) -> Tuple[str, transformers.PretrainedConfig, transformers.PreTrainedTokenizer]:
"""
Expand All @@ -97,9 +99,13 @@ def fetch_onnx_file_path(
derived from the `model_path` provided.
:param model_path: path to the model to be parsed
:param sequence_length: maximum sequence length of the model
:param onnx_model_name: optionally, the precise name of the ONNX model
of interest may be specified. If not specified, the default ONNX model
name will be used (refer to `get_deployment_path` for details)
:param task: task to use for the config. Defaults to None
:return: file path to the processed ONNX file for the engine to compile
"""
deployment_path, onnx_path = get_deployment_path(model_path)
deployment_path, onnx_path = get_deployment_path(model_path, onnx_model_name)

hf_logger = logging.getLogger("transformers")
hf_logger_level = hf_logger.level
Expand All @@ -126,7 +132,9 @@ def fetch_onnx_file_path(
return onnx_path, config, tokenizer


def get_deployment_path(model_path: str) -> Tuple[str, str]:
def get_deployment_path(
model_path: str, onnx_model_name: Optional[str] = None
) -> Tuple[str, str]:
"""
Returns the path to the deployment directory
for the given model path and the path to the mandatory
Expand All @@ -135,27 +143,33 @@ def get_deployment_path(model_path: str) -> Tuple[str, str]:
for running the transformers model in the deepsparse pipeline

:param model_path: path to model directory, sparsezoo stub, or ONNX file
:param onnx_model_name: name of the ONNX file to look for in the deployment
directory. Defaults to MODEL_ONNX_NAME
:return: path to the deployment directory and path to the ONNX file inside
the deployment directory
"""
onnx_model_name = onnx_model_name or MODEL_ONNX_NAME

if os.path.isfile(model_path):
# return the parent directory of the ONNX file
return os.path.dirname(model_path), model_path

if os.path.isdir(model_path):
model_files = os.listdir(model_path)

if _MODEL_DIR_ONNX_NAME not in model_files:
if onnx_model_name not in model_files:
raise ValueError(
f"{_MODEL_DIR_ONNX_NAME} not found in transformers model directory "
f"{onnx_model_name} not found in transformers model directory "
f"{model_path}. Be sure that an export of the model is written to "
f"{os.path.join(model_path, _MODEL_DIR_ONNX_NAME)}"
f"{os.path.join(model_path, onnx_model_name)}"
)
return model_path, os.path.join(model_path, _MODEL_DIR_ONNX_NAME)
return model_path, os.path.join(model_path, onnx_model_name)

elif model_path.startswith("zoo:") or model_path.startswith("hf:"):
onnx_model_path = model_to_path(model_path)
return os.path.dirname(onnx_model_path), onnx_model_path
return os.path.dirname(onnx_model_path), onnx_model_path.replace(
MODEL_ONNX_NAME, onnx_model_name
)
else:
raise ValueError(
f"model_path {model_path} is not a valid file, directory, or zoo stub"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .kv_cache_operator import *
from .multi_engine_prefill_operator import *
from .nl_engine_operator import *
from .nl_engine_operator_no_kv_cache import *
from .parse_inputs import *
from .prep_for_prefill import *
from .process_inputs import *
Expand All @@ -31,3 +32,4 @@
from .prep_for_generation import * # isort:skip

from .pipeline import * # isort:skip
from .pipeline_no_kv_cache import * # isort:skip
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def run(self, inp: NLEngineOutputs, inference_state: InferenceState, **kwargs):
callback = inference_state.current_state.get("callback")
stop = inference_state.current_state.get("stop")

if kv_cache.total_num_processed_tokens >= kv_cache.capacity:
finish_reason = FinishReason.CAPACITY
if kv_cache:
if kv_cache.total_num_processed_tokens >= kv_cache.capacity:
dbogunowicz marked this conversation as resolved.
Show resolved Hide resolved
finish_reason = FinishReason.CAPACITY

if token == self.tokenizer.eos_token_id and not self.force_max_tokens:
finish_reason = FinishReason.STOP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
from typing import Dict, List, Tuple

import numpy

Expand All @@ -34,7 +34,8 @@ class JoinOutput(Operator):
def __init__(self, tokenizer):
self.tokenizer = tokenizer

def run(self, inp: List[CompileGenerationsOutput], **kwargs):
def run(self, inp: Tuple[List[CompileGenerationsOutput], Dict], **kwargs):

batch_outputs = [x for x in inp[0]]
generated_tokens = [x.generated_tokens for x in batch_outputs]
generated_logits = [x.generated_logits for x in batch_outputs]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)


__all__ = ["NLEngineOperator", "NLEngineInputs"]
__all__ = ["NLEngineOperator", "NLEngineInputs", "NLEngineOutputs"]


class NLEngineInputs(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any

import numpy
from pydantic import BaseModel

from deepsparse.operators.engine_operator import EngineOperator, EngineOperatorInputs
from deepsparse.transformers.helpers import overwrite_transformer_onnx_model_inputs


__all__ = [
"NLEngineOperatorNoCache",
"NLEngineInputsNoCache",
]


class NLEngineInputsNoCache(BaseModel):
input_ids: Any
attention_mask: Any


class NLEngineOperatorNoCache(EngineOperator):
"""
Operator the Natural Language Engine, that operates without
KV Cache. This means that this operator merely maps input_ids
and attention_mask to logits
"""

input_schema = NLEngineInputsNoCache
output_schema = None

def __init__(self, sequence_length: int, **kwargs):
overwrite_transformer_onnx_model_inputs(
path=kwargs.get("model_path"),
batch_size=kwargs.get("batch_size", 1),
max_length=sequence_length,
)
super().__init__(**kwargs)

def run(self, inp: NLEngineInputsNoCache, **kwargs) -> Any:
engine_inputs = [inp.input_ids, inp.attention_mask]
logits = (
super()
.run(EngineOperatorInputs(engine_inputs=engine_inputs), **kwargs)
.get("engine_outputs")
)

# By default, the engine outputs logits for all tokens in the sequence.
# Let's filter out the logits for the padding tokens.
logits = numpy.compress(inp.attention_mask.flatten(), logits[0], axis=1)

return {"logits": [logits], "kv_cache": None, "tokens": None}, {
"prompt_logits": [logits]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright (c) 2021 - present / Neuralmagic, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Dict, Optional

from deepsparse.pipeline import Pipeline
from deepsparse.routers import GraphRouter
from deepsparse.schedulers import OperatorScheduler
from deepsparse.transformers.helpers import setup_transformers_pipeline
from deepsparse.transformers.pipelines.text_generation import (
CompileGenerations,
GenerateNewTokenOperator,
JoinOutput,
NLEngineOperatorNoCache,
PrepareGeneration,
ProcessInputsTextGeneration,
ProcessOutputs,
TokenGeneratorOperator,
)
from deepsparse.transformers.utils.helpers import process_generation_config
from deepsparse.utils import split_engine_inputs
from deepsparse.utils.onnx import default_cached_outputs


_LOGGER = logging.getLogger(__name__)


class TextGenerationPipelineNoCache(Pipeline):
def __init__(
self,
model_path: str,
sequence_length: int = 1024,
onnx_model_name: Optional[str] = None,
generation_config=None,
engine_kwargs: Optional[Dict] = None,
**kwargs,
):

(
self.model_path,
self.config,
self.tokenizer,
engine_kwargs,
) = setup_transformers_pipeline(
model_path,
sequence_length,
tokenizer_padding_side="right",
onnx_model_name=onnx_model_name,
engine_kwargs=engine_kwargs,
)
self.verify_no_kv_cache_present()

token_generator = TokenGeneratorOperator()

process_inputs = ProcessInputsTextGeneration(
generation_config=process_generation_config(generation_config),
sequence_length=sequence_length,
tokenizer=self.tokenizer,
)
engine_operator = NLEngineOperatorNoCache(
sequence_length=sequence_length,
**engine_kwargs,
)
prepare_generation = PrepareGeneration(
sequence_length=sequence_length,
prompt_sequence_length=1,
token_generator=token_generator,
)
generate_new_token = GenerateNewTokenOperator(
tokenizer=self.tokenizer, force_max_tokens=True
)
compile_generations = CompileGenerations()
join_output = JoinOutput(tokenizer=self.tokenizer)
process_outputs = ProcessOutputs(tokenizer=self.tokenizer)

ops = {
"process_input": process_inputs,
dbogunowicz marked this conversation as resolved.
Show resolved Hide resolved
"engine_operator": engine_operator,
"prepare_generation": prepare_generation,
"generate_new_token": generate_new_token,
"compile_generations": compile_generations,
"join_output": join_output,
"process_outputs": process_outputs,
}
routes = {
"process_input": "SPLIT",
dbogunowicz marked this conversation as resolved.
Show resolved Hide resolved
"SPLIT": "engine_operator",
"engine_operator": "prepare_generation",
"prepare_generation": "generate_new_token",
"generate_new_token": "compile_generations",
"compile_generations": "JOIN",
"JOIN": "join_output",
"join_output": "process_outputs",
"process_outputs": "STOP",
}

# TODO: Using the GraphRouter, but should use
# LinearRouter with appropriate split/join support
router = GraphRouter(
end_route="STOP", start_route="process_input", route=routes
)
scheduler = [OperatorScheduler()]
super().__init__(
ops=ops,
router=router,
schedulers=scheduler,
)

def run(self, *args, **kwargs):
# we need to set the fixed_sequences_length flag to True
# for the non-kv cache pipeline
kwargs.update(dict(fixed_sequences_length=True, max_new_tokens=1))
return super().run(*args, **kwargs)

def condense_inputs(self, *args, **kwargs):
return args[0], kwargs

def expand_inputs(self, items, batch_size):
items = [items.get(key) for key in items.keys()]
out, orig_batch_size = split_engine_inputs(items, batch_size)
combined_batches = [{"input_ids": b[0], "attention_mask": b[1]} for b in out]
return combined_batches, orig_batch_size

def verify_no_kv_cache_present(self) -> bool:
"""
Verifies that the ONNX model does not have
KV cache inputs/outputs present.
:return: True if compatible, False otherwise
"""
is_kv_cache_present = any(default_cached_outputs(self.model_path))
if is_kv_cache_present:
raise ValueError(
f"The model: {self.model_path} has KV cache inputs/outputs present. "
"Please use the TextGenerationPipeline instead."
)
return not is_kv_cache_present
Loading
Loading