Skip to content

Commit

Permalink
spk&whisper
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxMax2016 committed Apr 9, 2023
1 parent e662b7c commit 6646c4b
Show file tree
Hide file tree
Showing 36 changed files with 106,925 additions and 0 deletions.
18 changes: 18 additions & 0 deletions speaker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
### Speaker Encoder

This is an implementation of https://arxiv.org/abs/1710.10467. This model can be used for voice and speaker embedding.

With the code here you can generate d-vectors for both multi-speaker and single-speaker TTS datasets, then visualise and explore them along with the associated audio files in an interactive chart.

Below is an example showing embedding results of various speakers. You can generate the same plot with the provided notebook as demonstrated in [this video](https://youtu.be/KW3oO7JVa7Q).

![](umap.png)

Download a pretrained model from [Released Models](https://github.com/mozilla/TTS/wiki/Released-Models) page.

To run the code, you need to follow the same flow as in TTS.

- Define 'config.json' for your needs. Note that, audio parameters should match your TTS model.
- Example training call ```python speaker_encoder/train.py --config_path speaker_encoder/config.json --data_path ~/Data/Libri-TTS/train-clean-360```
- Generate embedding vectors ```python speaker_encoder/compute_embeddings.py --use_cuda true /model/path/best_model.pth.tar model/config/path/config.json dataset/path/ output_path``` . This code parses all .wav files at the given dataset path and generates the same folder structure under the output path with the generated embedding files.
- Watch training on Tensorboard as in TTS
Empty file added speaker/__init__.py
Empty file.
64 changes: 64 additions & 0 deletions speaker/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from dataclasses import asdict, dataclass, field
from typing import Dict, List

from .utils.coqpit import MISSING
from .utils.shared_configs import BaseAudioConfig, BaseDatasetConfig, BaseTrainingConfig


@dataclass
class SpeakerEncoderConfig(BaseTrainingConfig):
"""Defines parameters for Speaker Encoder model."""

model: str = "speaker_encoder"
audio: BaseAudioConfig = field(default_factory=BaseAudioConfig)
datasets: List[BaseDatasetConfig] = field(default_factory=lambda: [BaseDatasetConfig()])
# model params
model_params: Dict = field(
default_factory=lambda: {
"model_name": "lstm",
"input_dim": 80,
"proj_dim": 256,
"lstm_dim": 768,
"num_lstm_layers": 3,
"use_lstm_with_projection": True,
}
)

audio_augmentation: Dict = field(default_factory=lambda: {})

storage: Dict = field(
default_factory=lambda: {
"sample_from_storage_p": 0.66, # the probability with which we'll sample from the DataSet in-memory storage
"storage_size": 15, # the size of the in-memory storage with respect to a single batch
}
)

# training params
max_train_step: int = 1000000 # end training when number of training steps reaches this value.
loss: str = "angleproto"
grad_clip: float = 3.0
lr: float = 0.0001
lr_decay: bool = False
warmup_steps: int = 4000
wd: float = 1e-6

# logging params
tb_model_param_stats: bool = False
steps_plot_stats: int = 10
checkpoint: bool = True
save_step: int = 1000
print_step: int = 20

# data loader
num_speakers_in_batch: int = MISSING
num_utters_per_speaker: int = MISSING
num_loader_workers: int = MISSING
skip_speakers: bool = False
voice_len: float = 1.6

def check_values(self):
super().check_values()
c = asdict(self)
assert (
c["model_params"]["input_dim"] == self.audio.num_mels
), " [!] model input dimendion must be equal to melspectrogram dimension."
108 changes: 108 additions & 0 deletions speaker/infer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import re
import json
import fsspec
import torch
import numpy as np
import argparse

from argparse import RawTextHelpFormatter
from .models.lstm import LSTMSpeakerEncoder
from .config import SpeakerEncoderConfig
from .utils.audio import AudioProcessor


def read_json(json_path):
config_dict = {}
try:
with fsspec.open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
except json.decoder.JSONDecodeError:
# backwards compat.
data = read_json_with_comments(json_path)
config_dict.update(data)
return config_dict


def read_json_with_comments(json_path):
"""for backward compat."""
# fallback to json
with fsspec.open(json_path, "r", encoding="utf-8") as f:
input_str = f.read()
# handle comments
input_str = re.sub(r"\\\n", "", input_str)
input_str = re.sub(r"//.*\n", "\n", input_str)
data = json.loads(input_str)
return data


if __name__ == "__main__":

parser = argparse.ArgumentParser(
description="""Compute embedding vectors for each wav file in a dataset.""",
formatter_class=RawTextHelpFormatter,
)
parser.add_argument("model_path", type=str, help="Path to model checkpoint file.")
parser.add_argument(
"config_path",
type=str,
help="Path to model config file.",
)

parser.add_argument("-s", "--source", help="input wave", dest="source")
parser.add_argument(
"-t", "--target", help="output 256d speaker embeddimg", dest="target"
)

parser.add_argument("--use_cuda", type=bool, help="flag to set cuda.", default=True)
parser.add_argument("--eval", type=bool, help="compute eval.", default=True)

args = parser.parse_args()
source_file = args.source
target_file = args.target

# config
config_dict = read_json(args.config_path)
# print(config_dict)

# model
config = SpeakerEncoderConfig(config_dict)
config.from_dict(config_dict)

speaker_encoder = LSTMSpeakerEncoder(
config.model_params["input_dim"],
config.model_params["proj_dim"],
config.model_params["lstm_dim"],
config.model_params["num_lstm_layers"],
)

speaker_encoder.load_checkpoint(args.model_path, eval=True, use_cuda=args.use_cuda)

# preprocess
speaker_encoder_ap = AudioProcessor(**config.audio)
# normalize the input audio level and trim silences
speaker_encoder_ap.do_sound_norm = True
speaker_encoder_ap.do_trim_silence = True

# compute speaker embeddings

# extract the embedding
waveform = speaker_encoder_ap.load_wav(
source_file, sr=speaker_encoder_ap.sample_rate
)
spec = speaker_encoder_ap.melspectrogram(waveform)
spec = torch.from_numpy(spec.T)
if args.use_cuda:
spec = spec.cuda()
spec = spec.unsqueeze(0)
embed = speaker_encoder.compute_embedding(spec).detach().cpu().numpy()
embed = embed.squeeze()
# print(embed)
# print(embed.size)
np.save(target_file, embed, allow_pickle=False)


if hasattr(speaker_encoder, 'module'):
state_dict = speaker_encoder.module.state_dict()
else:
state_dict = speaker_encoder.state_dict()
torch.save({'model': state_dict}, "model_small.pth")
Empty file added speaker/models/__init__.py
Empty file.
131 changes: 131 additions & 0 deletions speaker/models/lstm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import numpy as np
import torch
from torch import nn

from ..utils.io import load_fsspec


class LSTMWithProjection(nn.Module):
def __init__(self, input_size, hidden_size, proj_size):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.proj_size = proj_size
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
self.linear = nn.Linear(hidden_size, proj_size, bias=False)

def forward(self, x):
self.lstm.flatten_parameters()
o, (_, _) = self.lstm(x)
return self.linear(o)


class LSTMWithoutProjection(nn.Module):
def __init__(self, input_dim, lstm_dim, proj_dim, num_lstm_layers):
super().__init__()
self.lstm = nn.LSTM(input_size=input_dim, hidden_size=lstm_dim, num_layers=num_lstm_layers, batch_first=True)
self.linear = nn.Linear(lstm_dim, proj_dim, bias=True)
self.relu = nn.ReLU()

def forward(self, x):
_, (hidden, _) = self.lstm(x)
return self.relu(self.linear(hidden[-1]))


class LSTMSpeakerEncoder(nn.Module):
def __init__(self, input_dim, proj_dim=256, lstm_dim=768, num_lstm_layers=3, use_lstm_with_projection=True):
super().__init__()
self.use_lstm_with_projection = use_lstm_with_projection
layers = []
# choise LSTM layer
if use_lstm_with_projection:
layers.append(LSTMWithProjection(input_dim, lstm_dim, proj_dim))
for _ in range(num_lstm_layers - 1):
layers.append(LSTMWithProjection(proj_dim, lstm_dim, proj_dim))
self.layers = nn.Sequential(*layers)
else:
self.layers = LSTMWithoutProjection(input_dim, lstm_dim, proj_dim, num_lstm_layers)

self._init_layers()

def _init_layers(self):
for name, param in self.layers.named_parameters():
if "bias" in name:
nn.init.constant_(param, 0.0)
elif "weight" in name:
nn.init.xavier_normal_(param)

def forward(self, x):
# TODO: implement state passing for lstms
d = self.layers(x)
if self.use_lstm_with_projection:
d = torch.nn.functional.normalize(d[:, -1], p=2, dim=1)
else:
d = torch.nn.functional.normalize(d, p=2, dim=1)
return d

@torch.no_grad()
def inference(self, x):
d = self.layers.forward(x)
if self.use_lstm_with_projection:
d = torch.nn.functional.normalize(d[:, -1], p=2, dim=1)
else:
d = torch.nn.functional.normalize(d, p=2, dim=1)
return d

def compute_embedding(self, x, num_frames=250, num_eval=10, return_mean=True):
"""
Generate embeddings for a batch of utterances
x: 1xTxD
"""
max_len = x.shape[1]

if max_len < num_frames:
num_frames = max_len

offsets = np.linspace(0, max_len - num_frames, num=num_eval)

frames_batch = []
for offset in offsets:
offset = int(offset)
end_offset = int(offset + num_frames)
frames = x[:, offset:end_offset]
frames_batch.append(frames)

frames_batch = torch.cat(frames_batch, dim=0)
embeddings = self.inference(frames_batch)

if return_mean:
embeddings = torch.mean(embeddings, dim=0, keepdim=True)

return embeddings

def batch_compute_embedding(self, x, seq_lens, num_frames=160, overlap=0.5):
"""
Generate embeddings for a batch of utterances
x: BxTxD
"""
num_overlap = num_frames * overlap
max_len = x.shape[1]
embed = None
num_iters = seq_lens / (num_frames - num_overlap)
cur_iter = 0
for offset in range(0, max_len, num_frames - num_overlap):
cur_iter += 1
end_offset = min(x.shape[1], offset + num_frames)
frames = x[:, offset:end_offset]
if embed is None:
embed = self.inference(frames)
else:
embed[cur_iter <= num_iters, :] += self.inference(frames[cur_iter <= num_iters, :, :])
return embed / num_iters

# pylint: disable=unused-argument, redefined-builtin
def load_checkpoint(self, checkpoint_path: str, eval: bool = False, use_cuda: bool = False):
state = load_fsspec(checkpoint_path, map_location=torch.device("cpu"))
self.load_state_dict(state["model"])
if use_cuda:
self.cuda()
if eval:
self.eval()
assert not self.training
Loading

0 comments on commit 6646c4b

Please sign in to comment.