Spaces:
Paused
Paused
import numpy as np | |
import matplotlib.pyplot as plt | |
from matplotlib.cm import ScalarMappable | |
from matplotlib.patches import Rectangle | |
from librosa.sequence import viterbi_discriminative , dtw | |
from librosa import note_to_hz,midi_to_hz | |
from numba import jit | |
from scipy.stats import norm | |
from scipy.ndimage import gaussian_filter1d | |
from scipy.signal import medfilt ,upfirdn,argrelmax | |
from torchaudio.models.conformer import ConformerLayer | |
from torchaudio import load as torchaudio_load | |
from torchaudio.functional import resample as torchaudio_functional_resample | |
from torch import cat as torch_cat , load as torch_load ,Tensor as torch_Tensor , from_numpy as torch_from_numpy,no_grad as torch_no_grad ,mean as torch_mean,std as torch_std,sigmoid as torch_sigmoid,nan_to_num as torch_nan_to_num,nn | |
from sklearn.metrics.pairwise import euclidean_distances | |
from mir_eval.melody import hz2cents | |
from pretty_midi import PrettyMIDI , Instrument , Note , PitchBend , instrument_name_to_program ,note_name_to_number | |
from time import perf_counter | |
from collections import defaultdict | |
from typing import DefaultDict, Dict, List, Optional, Tuple | |
from pathlib import Path | |
from mido import MidiFile,MidiTrack | |
class PitchEstimator(nn.Module): | |
""" | |
This is the base class that everything else inherits from. The hierarchy is: | |
PitchEstimator -> Transcriber -> Synchronizer -> AutonomousAgent -> The n-Head Music Performance Analysis Models | |
PitchEstimator can handle reading the audio, predicting all the features, | |
estimating a single frame level f0 using viterbi, or | |
MIDI pitch bend creation for the predicted note events when used inside a Transcriber, or | |
score-informed f0 estimation when used inside a Synchronizer. | |
""" | |
def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160): | |
super().__init__() | |
self.labeling = labeling | |
self.sr = sr | |
self.window_size = window_size | |
self.hop_length = hop_length | |
self.instrument = instrument | |
self.f0_bins_per_semitone = int(np.round(100/self.labeling.f0_granularity_c)) | |
def read_audio(self, audio): | |
""" | |
Read and resample an audio file, convert to mono, and unfold into representation frames. | |
The time array represents the center of each small frame with 5.8ms hop length. This is different than the chunk | |
level frames. The chunk level frames represent the entire sequence the model sees. Whereas it predicts with the | |
small frames intervals (5.8ms). | |
:param audio: str, pathlib.Path, np.ndarray, or torch.Tensor | |
:return: frames: (n_big_frames, frame_length), times: (n_small_frames,) | |
""" | |
if isinstance(audio, str) or isinstance(audio, Path): | |
audio, sample_rate = torchaudio_load(audio, normalize=True) | |
audio = audio.mean(axis=0) # convert to mono | |
if sample_rate != self.sr: | |
audio = torchaudio_functional_resample(audio, sample_rate, self.sr) | |
elif isinstance(audio, np.ndarray): | |
audio = torch_from_numpy(audio) | |
else: | |
assert isinstance(audio, torch_Tensor) | |
len_audio = audio.shape[-1] | |
n_frames = int(np.ceil((len_audio + sum(self.frame_overlap)) / (self.hop_length * self.chunk_size))) | |
audio = nn.functional.pad(audio, (self.frame_overlap[0], | |
self.frame_overlap[1] + (n_frames * self.hop_length * self.chunk_size) - len_audio)) | |
frames = audio.unfold(0, self.max_window_size, self.hop_length*self.chunk_size) | |
times = np.arange(0, len_audio, self.hop_length) / self.sr # not tensor, we don't compute anything with it | |
return frames, times | |
def predict(self, audio, batch_size): | |
frames, times = self.read_audio(audio) | |
performance = {'f0': [], 'note': [], 'onset': [], 'offset': []} | |
self.eval() | |
device = self.main.conv0.conv2d.weight.device | |
with torch_no_grad(): | |
for i in range(0, len(frames), batch_size): | |
f = frames[i:min(i + batch_size, len(frames))].to(device) | |
f -= (torch_mean(f, axis=1).unsqueeze(-1)) | |
f /= (torch_std(f, axis=1).unsqueeze(-1)) | |
out = self.forward(f) | |
for key, value in out.items(): | |
value = torch_sigmoid(value) | |
value = torch_nan_to_num(value) # the model outputs nan when the frame is silent (this is an expected behavior due to normalization) | |
value = value.view(-1, value.shape[-1]) | |
value = value.detach().cpu().numpy() | |
performance[key].append(value) | |
performance = {key: np.concatenate(value, axis=0)[:len(times)] for key, value in performance.items()} | |
performance['time'] = times | |
return performance | |
def estimate_pitch(self, audio, batch_size, viterbi=False): | |
out = self.predict(audio, batch_size) | |
f0_hz = self.out2f0(out, viterbi) | |
return out['time'], f0_hz | |
def out2f0(self, out, viterbi=False): | |
""" | |
Monophonic f0 estimation from the model output. The viterbi postprocessing is specialized for the violin family. | |
""" | |
salience = out['f0'] | |
if viterbi == 'constrained': | |
assert hasattr(self, 'out2note') | |
notes = spotify_create_notes( out["note"], out["onset"], note_low=self.labeling.midi_centers[0], | |
note_high=self.labeling.midi_centers[-1], onset_thresh=0.5, frame_thresh=0.3, | |
infer_onsets=True, melodia_trick=True, | |
min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length)))) | |
note_cents = self.get_pitch_bends(salience, notes, to_midi=False, timing_refinement_range=0) | |
cents = np.zeros_like(out['time']) | |
cents[note_cents[:,0].astype(int)] = note_cents[:,1] | |
elif viterbi: | |
# transition probabilities inducing continuous pitch | |
# big changes are penalized with one order of magnitude | |
transition = gaussian_filter1d(np.eye(self.labeling.f0_n_bins), 30) + 99 * gaussian_filter1d( | |
np.eye(self.labeling.f0_n_bins), 2) | |
transition = transition / np.sum(transition, axis=1)[:, None] | |
p = salience / salience.sum(axis=1)[:, None] | |
p[np.isnan(p.sum(axis=1)), :] = np.ones(self.labeling.f0_n_bins) * 1 / self.labeling.f0_n_bins | |
path = viterbi_discriminative(p.T, transition) | |
cents = np.array([self.labeling.f0_label2c(salience[i, :], path[i]) for i in range(len(path))]) | |
else: | |
cents = self.labeling.f0_label2c(salience, center=None) # use argmax for center | |
f0_hz = self.labeling.f0_c2hz(cents) | |
f0_hz[np.isnan(f0_hz)] = 0 | |
return f0_hz | |
def get_pitch_bends( | |
self, | |
contours: np.ndarray, note_events: List[Tuple[int, int, int, float]], | |
timing_refinement_range: int = 0, to_midi: bool = True, | |
) -> List[Tuple[int, int, int, float, Optional[List[int]]]]: | |
"""Modified version of an excellent script from Spotify/basic_pitch!! Thank you!!!! | |
Given note events and contours, estimate pitch bends per note. | |
Pitch bends are represented as a sequence of evenly spaced midi pitch bend control units. | |
The time stamps of each pitch bend can be inferred by computing an evenly spaced grid between | |
the start and end times of each note event. | |
Args: | |
contours: Matrix of estimated pitch contours | |
note_events: note event tuple | |
timing_refinement_range: if > 0, refine onset/offset boundaries with f0 confidence | |
to_midi: whether to convert pitch bends to midi pitch bends. If False, return pitch estimates in the format | |
[time (index), pitch (Hz), confidence in range [0, 1]]. | |
Returns: | |
note events with pitch bends | |
""" | |
f0_matrix = [] # [time (index), pitch (Hz), confidence in range [0, 1]] | |
note_events_with_pitch_bends = [] | |
for start_idx, end_idx, pitch_midi, amplitude in note_events: | |
if timing_refinement_range: | |
start_idx = np.max([0, start_idx - timing_refinement_range]) | |
end_idx = np.min([contours.shape[0], end_idx + timing_refinement_range]) | |
freq_idx = int(np.round(self.midi_pitch_to_contour_bin(pitch_midi))) | |
freq_start_idx = np.max([freq_idx - self.labeling.f0_tolerance_bins, 0]) | |
freq_end_idx = np.min([self.labeling.f0_n_bins, freq_idx + self.labeling.f0_tolerance_bins + 1]) | |
trans_start_idx = np.max([0, self.labeling.f0_tolerance_bins - freq_idx]) | |
trans_end_idx = (2 * self.labeling.f0_tolerance_bins + 1) - \ | |
np.max([0, freq_idx - (self.labeling.f0_n_bins - self.labeling.f0_tolerance_bins - 1)]) | |
# apply regional viterbi to estimate the intonation | |
# observation probabilities come from the f0_roll matrix | |
observation = contours[start_idx:end_idx, freq_start_idx:freq_end_idx] | |
observation = observation / observation.sum(axis=1)[:, None] | |
observation[np.isnan(observation.sum(axis=1)), :] = np.ones(freq_end_idx - freq_start_idx) * 1 / ( | |
freq_end_idx - freq_start_idx) | |
# transition probabilities assure continuity | |
transition = self.labeling.f0_transition_matrix[trans_start_idx:trans_end_idx, | |
trans_start_idx:trans_end_idx] + 1e-6 | |
transition = transition / np.sum(transition, axis=1)[:, None] | |
path = viterbi_discriminative(observation.T / observation.sum(axis=1), transition) + freq_start_idx | |
cents = np.array([self.labeling.f0_label2c(contours[i + start_idx, :], path[i]) for i in range(len(path))]) | |
bends = cents - self.labeling.midi_centers_c[pitch_midi - self.labeling.midi_centers[0]] | |
if to_midi: | |
bends = (bends * 4096 / 100).astype(int) | |
bends[bends > 8191] = 8191 | |
bends[bends < -8192] = -8192 | |
if timing_refinement_range: | |
confidences = np.array([contours[i + start_idx, path[i]] for i in range(len(path))]) | |
threshold = np.median(confidences) | |
threshold = (np.median(confidences > threshold) + threshold) / 2 # some magic | |
median_kernel = 2 * (timing_refinement_range // 2) + 1 # some more magic | |
confidences = medfilt(confidences, kernel_size=median_kernel) | |
conf_bool = confidences > threshold | |
onset_idx = np.argmax(conf_bool) | |
offset_idx = len(confidences) - np.argmax(conf_bool[::-1]) | |
bends = bends[onset_idx:offset_idx] | |
start_idx = start_idx + onset_idx | |
end_idx = start_idx + offset_idx | |
note_events_with_pitch_bends.append((start_idx, end_idx, pitch_midi, amplitude, bends)) | |
else: | |
confidences = np.array([contours[i + start_idx, path[i]] for i in range(len(path))]) | |
time_idx = np.arange(len(path)) + start_idx | |
# f0_hz = self.labeling.f0_c2hz(cents) | |
possible_f0s = np.array([time_idx, cents, confidences]).T | |
f0_matrix.append(possible_f0s[np.abs(bends)<100]) # filter out pitch bends that are too large | |
if not to_midi: | |
return np.vstack(f0_matrix) | |
else: | |
return note_events_with_pitch_bends | |
def midi_pitch_to_contour_bin(self, pitch_midi: int) -> np.array: | |
"""Convert midi pitch to corresponding index in contour matrix | |
Args: | |
pitch_midi: pitch in midi | |
Returns: | |
index in contour matrix | |
""" | |
pitch_hz = midi_to_hz(pitch_midi) | |
return np.argmin(np.abs(self.labeling.f0_centers_hz - pitch_hz)) | |
# SPOTIFY | |
def get_inferred_onsets(onset_roll: np.array, note_roll: np.array, n_diff: int = 2) -> np.array: | |
""" | |
Infer onsets from large changes in note roll matrix amplitudes. | |
Modified from https://github.com/spotify/basic-pitch/blob/main/basic_pitch/note_creation.py | |
:param onset_roll: Onset activation matrix (n_times, n_freqs). | |
:param note_roll: Frame-level note activation matrix (n_times, n_freqs). | |
:param n_diff: Differences used to detect onsets. | |
:return: The maximum between the predicted onsets and its differences. | |
""" | |
diffs = [] | |
for n in range(1, n_diff + 1): | |
frames_appended = np.concatenate([np.zeros((n, note_roll.shape[1])), note_roll]) | |
diffs.append(frames_appended[n:, :] - frames_appended[:-n, :]) | |
frame_diff = np.min(diffs, axis=0) | |
frame_diff[frame_diff < 0] = 0 | |
frame_diff[:n_diff, :] = 0 | |
frame_diff = np.max(onset_roll) * frame_diff / np.max(frame_diff) # rescale to have the same max as onsets | |
max_onsets_diff = np.max([onset_roll, frame_diff], | |
axis=0) # use the max of the predicted onsets and the differences | |
return max_onsets_diff | |
def spotify_create_notes( | |
note_roll: np.array, | |
onset_roll: np.array, | |
onset_thresh: float, | |
frame_thresh: float, | |
min_note_len: int, | |
infer_onsets: bool, | |
note_low : int, #self.labeling.midi_centers[0] | |
note_high : int, #self.labeling.midi_centers[-1], | |
melodia_trick: bool = True, | |
energy_tol: int = 11, | |
) -> List[Tuple[int, int, int, float]]: | |
"""Decode raw model output to polyphonic note events | |
Modified from https://github.com/spotify/basic-pitch/blob/main/basic_pitch/note_creation.py | |
Args: | |
note_roll: Frame activation matrix (n_times, n_freqs). | |
onset_roll: Onset activation matrix (n_times, n_freqs). | |
onset_thresh: Minimum amplitude of an onset activation to be considered an onset. | |
frame_thresh: Minimum amplitude of a frame activation for a note to remain "on". | |
min_note_len: Minimum allowed note length in frames. | |
infer_onsets: If True, add additional onsets when there are large differences in frame amplitudes. | |
melodia_trick : Whether to use the melodia trick to better detect notes. | |
energy_tol: Drop notes below this energy. | |
Returns: | |
list of tuples [(start_time_frames, end_time_frames, pitch_midi, amplitude)] | |
representing the note events, where amplitude is a number between 0 and 1 | |
""" | |
n_frames = note_roll.shape[0] | |
# use onsets inferred from frames in addition to the predicted onsets | |
if infer_onsets: | |
onset_roll = get_inferred_onsets(onset_roll, note_roll) | |
peak_thresh_mat = np.zeros(onset_roll.shape) | |
peaks = argrelmax(onset_roll, axis=0) | |
peak_thresh_mat[peaks] = onset_roll[peaks] | |
onset_idx = np.where(peak_thresh_mat >= onset_thresh) | |
onset_time_idx = onset_idx[0][::-1] # sort to go backwards in time | |
onset_freq_idx = onset_idx[1][::-1] # sort to go backwards in time | |
remaining_energy = np.zeros(note_roll.shape) | |
remaining_energy[:, :] = note_roll[:, :] | |
# loop over onsets | |
note_events = [] | |
for note_start_idx, freq_idx in zip(onset_time_idx, onset_freq_idx): | |
# if we're too close to the end of the audio, continue | |
if note_start_idx >= n_frames - 1: | |
continue | |
# find time index at this frequency band where the frames drop below an energy threshold | |
i = note_start_idx + 1 | |
k = 0 # number of frames since energy dropped below threshold | |
while i < n_frames - 1 and k < energy_tol: | |
if remaining_energy[i, freq_idx] < frame_thresh: | |
k += 1 | |
else: | |
k = 0 | |
i += 1 | |
i -= k # go back to frame above threshold | |
# if the note is too short, skip it | |
if i - note_start_idx <= min_note_len: | |
continue | |
remaining_energy[note_start_idx:i, freq_idx] = 0 | |
if freq_idx < note_high: | |
remaining_energy[note_start_idx:i, freq_idx + 1] = 0 | |
if freq_idx > note_low: | |
remaining_energy[note_start_idx:i, freq_idx - 1] = 0 | |
# add the note | |
amplitude = np.mean(note_roll[note_start_idx:i, freq_idx]) | |
note_events.append( | |
( | |
note_start_idx, | |
i, | |
freq_idx + note_low, | |
amplitude, | |
) | |
) | |
if melodia_trick: | |
energy_shape = remaining_energy.shape | |
while np.max(remaining_energy) > frame_thresh: | |
i_mid, freq_idx = np.unravel_index(np.argmax(remaining_energy), energy_shape) | |
remaining_energy[i_mid, freq_idx] = 0 | |
# forward pass | |
i = i_mid + 1 | |
k = 0 | |
while i < n_frames - 1 and k < energy_tol: | |
if remaining_energy[i, freq_idx] < frame_thresh: | |
k += 1 | |
else: | |
k = 0 | |
remaining_energy[i, freq_idx] = 0 | |
if freq_idx < note_high: | |
remaining_energy[i, freq_idx + 1] = 0 | |
if freq_idx > note_low: | |
remaining_energy[i, freq_idx - 1] = 0 | |
i += 1 | |
i_end = i - 1 - k # go back to frame above threshold | |
# backward pass | |
i = i_mid - 1 | |
k = 0 | |
while i > 0 and k < energy_tol: | |
if remaining_energy[i, freq_idx] < frame_thresh: | |
k += 1 | |
else: | |
k = 0 | |
remaining_energy[i, freq_idx] = 0 | |
if freq_idx < note_high: | |
remaining_energy[i, freq_idx + 1] = 0 | |
if freq_idx > note_low: | |
remaining_energy[i, freq_idx - 1] = 0 | |
i -= 1 | |
i_start = i + 1 + k # go back to frame above threshold | |
assert i_start >= 0, "{}".format(i_start) | |
assert i_end < n_frames | |
if i_end - i_start <= min_note_len: | |
# note is too short, skip it | |
continue | |
# add the note | |
amplitude = np.mean(note_roll[i_start:i_end, freq_idx]) | |
note_events.append( | |
( | |
i_start, | |
i_end, | |
freq_idx + note_low, | |
amplitude, | |
) | |
) | |
return note_events | |
# TIKTOK | |
def note_detection_with_onset_offset_regress(frame_output, onset_output, | |
onset_shift_output, offset_output, offset_shift_output, velocity_output, | |
frame_threshold): | |
"""Process prediction matrices to note events information. | |
First, detect onsets with onset outputs. Then, detect offsets | |
with frame and offset outputs. | |
Args: | |
frame_output: (frames_num,) | |
onset_output: (frames_num,) | |
onset_shift_output: (frames_num,) | |
offset_output: (frames_num,) | |
offset_shift_output: (frames_num,) | |
velocity_output: (frames_num,) | |
frame_threshold: float | |
Returns: | |
output_tuples: list of [bgn, fin, onset_shift, offset_shift, normalized_velocity], | |
e.g., [ | |
[1821, 1909, 0.47498, 0.3048533, 0.72119445], | |
[1909, 1947, 0.30730522, -0.45764327, 0.64200014], | |
...] | |
""" | |
output_tuples = [] | |
bgn = None | |
frame_disappear = None | |
offset_occur = None | |
for i in range(onset_output.shape[0]): | |
if onset_output[i] == 1: | |
"""Onset detected""" | |
if bgn: | |
"""Consecutive onsets. E.g., pedal is not released, but two | |
consecutive notes being played.""" | |
fin = max(i - 1, 0) | |
output_tuples.append([bgn, fin, onset_shift_output[bgn], | |
0, velocity_output[bgn]]) | |
frame_disappear, offset_occur = None, None | |
bgn = i | |
if bgn and i > bgn: | |
"""If onset found, then search offset""" | |
if frame_output[i] <= frame_threshold and not frame_disappear: | |
"""Frame disappear detected""" | |
frame_disappear = i | |
if offset_output[i] == 1 and not offset_occur: | |
"""Offset detected""" | |
offset_occur = i | |
if frame_disappear: | |
if offset_occur and offset_occur - bgn > frame_disappear - offset_occur: | |
"""bgn --------- offset_occur --- frame_disappear""" | |
fin = offset_occur | |
else: | |
"""bgn --- offset_occur --------- frame_disappear""" | |
fin = frame_disappear | |
output_tuples.append([bgn, fin, onset_shift_output[bgn], | |
offset_shift_output[fin], velocity_output[bgn]]) | |
bgn, frame_disappear, offset_occur = None, None, None | |
if bgn and (i - bgn >= 600 or i == onset_output.shape[0] - 1): | |
"""Offset not detected""" | |
fin = i | |
output_tuples.append([bgn, fin, onset_shift_output[bgn], | |
offset_shift_output[fin], velocity_output[bgn]]) | |
bgn, frame_disappear, offset_occur = None, None, None | |
# Sort pairs by onsets | |
output_tuples.sort(key=lambda pair: pair[0]) | |
return output_tuples | |
class RegressionPostProcessor(object): | |
def __init__(self, frames_per_second, classes_num, onset_threshold, | |
offset_threshold, frame_threshold, pedal_offset_threshold, | |
begin_note): | |
"""Postprocess the output probabilities of a transription model to MIDI | |
events. | |
Args: | |
frames_per_second: float | |
classes_num: int | |
onset_threshold: float | |
offset_threshold: float | |
frame_threshold: float | |
pedal_offset_threshold: float | |
""" | |
self.frames_per_second = frames_per_second | |
self.classes_num = classes_num | |
self.onset_threshold = onset_threshold | |
self.offset_threshold = offset_threshold | |
self.frame_threshold = frame_threshold | |
self.pedal_offset_threshold = pedal_offset_threshold | |
self.begin_note = begin_note | |
self.velocity_scale = 128 | |
def output_dict_to_midi_events(self, output_dict): | |
"""Main function. Post process model outputs to MIDI events. | |
Args: | |
output_dict: { | |
'reg_onset_output': (segment_frames, classes_num), | |
'reg_offset_output': (segment_frames, classes_num), | |
'frame_output': (segment_frames, classes_num), | |
'velocity_output': (segment_frames, classes_num), | |
'reg_pedal_onset_output': (segment_frames, 1), | |
'reg_pedal_offset_output': (segment_frames, 1), | |
'pedal_frame_output': (segment_frames, 1)} | |
Outputs: | |
est_note_events: list of dict, e.g. [ | |
{'onset_time': 39.74, 'offset_time': 39.87, 'midi_note': 27, 'velocity': 83}, | |
{'onset_time': 11.98, 'offset_time': 12.11, 'midi_note': 33, 'velocity': 88}] | |
est_pedal_events: list of dict, e.g. [ | |
{'onset_time': 0.17, 'offset_time': 0.96}, | |
{'osnet_time': 1.17, 'offset_time': 2.65}] | |
""" | |
output_dict['frame_output'] = output_dict['note'] | |
output_dict['velocity_output'] = output_dict['note'] | |
output_dict['reg_onset_output'] = output_dict['onset'] | |
output_dict['reg_offset_output'] = output_dict['offset'] | |
# Post process piano note outputs to piano note and pedal events information | |
(est_on_off_note_vels, est_pedal_on_offs) = \ | |
self.output_dict_to_note_pedal_arrays(output_dict) | |
"""est_on_off_note_vels: (events_num, 4), the four columns are: [onset_time, offset_time, piano_note, velocity], | |
est_pedal_on_offs: (pedal_events_num, 2), the two columns are: [onset_time, offset_time]""" | |
# Reformat notes to MIDI events | |
est_note_events = self.detected_notes_to_events(est_on_off_note_vels) | |
if est_pedal_on_offs is None: | |
est_pedal_events = None | |
else: | |
est_pedal_events = self.detected_pedals_to_events(est_pedal_on_offs) | |
return est_note_events, est_pedal_events | |
def output_dict_to_note_pedal_arrays(self, output_dict): | |
"""Postprocess the output probabilities of a transription model to MIDI | |
events. | |
Args: | |
output_dict: dict, { | |
'reg_onset_output': (frames_num, classes_num), | |
'reg_offset_output': (frames_num, classes_num), | |
'frame_output': (frames_num, classes_num), | |
'velocity_output': (frames_num, classes_num), | |
...} | |
Returns: | |
est_on_off_note_vels: (events_num, 4), the 4 columns are onset_time, | |
offset_time, piano_note and velocity. E.g. [ | |
[39.74, 39.87, 27, 0.65], | |
[11.98, 12.11, 33, 0.69], | |
...] | |
est_pedal_on_offs: (pedal_events_num, 2), the 2 columns are onset_time | |
and offset_time. E.g. [ | |
[0.17, 0.96], | |
[1.17, 2.65], | |
...] | |
""" | |
# ------ 1. Process regression outputs to binarized outputs ------ | |
# For example, onset or offset of [0., 0., 0.15, 0.30, 0.40, 0.35, 0.20, 0.05, 0., 0.] | |
# will be processed to [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.] | |
# Calculate binarized onset output from regression output | |
(onset_output, onset_shift_output) = \ | |
self.get_binarized_output_from_regression( | |
reg_output=output_dict['reg_onset_output'], | |
threshold=self.onset_threshold, neighbour=2) | |
output_dict['onset_output'] = onset_output # Values are 0 or 1 | |
output_dict['onset_shift_output'] = onset_shift_output | |
# Calculate binarized offset output from regression output | |
(offset_output, offset_shift_output) = \ | |
self.get_binarized_output_from_regression( | |
reg_output=output_dict['reg_offset_output'], | |
threshold=self.offset_threshold, neighbour=4) | |
output_dict['offset_output'] = offset_output # Values are 0 or 1 | |
output_dict['offset_shift_output'] = offset_shift_output | |
if 'reg_pedal_onset_output' in output_dict.keys(): | |
"""Pedal onsets are not used in inference. Instead, frame-wise pedal | |
predictions are used to detect onsets. We empirically found this is | |
more accurate to detect pedal onsets.""" | |
pass | |
if 'reg_pedal_offset_output' in output_dict.keys(): | |
# Calculate binarized pedal offset output from regression output | |
(pedal_offset_output, pedal_offset_shift_output) = \ | |
self.get_binarized_output_from_regression( | |
reg_output=output_dict['reg_pedal_offset_output'], | |
threshold=self.pedal_offset_threshold, neighbour=4) | |
output_dict['pedal_offset_output'] = pedal_offset_output # Values are 0 or 1 | |
output_dict['pedal_offset_shift_output'] = pedal_offset_shift_output | |
# ------ 2. Process matrices results to event results ------ | |
# Detect piano notes from output_dict | |
est_on_off_note_vels = self.output_dict_to_detected_notes(output_dict) | |
est_pedal_on_offs = None | |
return est_on_off_note_vels, est_pedal_on_offs | |
def get_binarized_output_from_regression(self, reg_output, threshold, neighbour): | |
"""Calculate binarized output and shifts of onsets or offsets from the | |
regression results. | |
Args: | |
reg_output: (frames_num, classes_num) | |
threshold: float | |
neighbour: int | |
Returns: | |
binary_output: (frames_num, classes_num) | |
shift_output: (frames_num, classes_num) | |
""" | |
binary_output = np.zeros_like(reg_output) | |
shift_output = np.zeros_like(reg_output) | |
(frames_num, classes_num) = reg_output.shape | |
for k in range(classes_num): | |
x = reg_output[:, k] | |
for n in range(neighbour, frames_num - neighbour): | |
if x[n] > threshold and self.is_monotonic_neighbour(x, n, neighbour): | |
binary_output[n, k] = 1 | |
"""See Section III-D in [1] for deduction. | |
[1] Q. Kong, et al., High-resolution Piano Transcription | |
with Pedals by Regressing Onsets and Offsets Times, 2020.""" | |
if x[n - 1] > x[n + 1]: | |
shift = (x[n + 1] - x[n - 1]) / (x[n] - x[n + 1]) / 2 | |
else: | |
shift = (x[n + 1] - x[n - 1]) / (x[n] - x[n - 1]) / 2 | |
shift_output[n, k] = shift | |
return binary_output, shift_output | |
def is_monotonic_neighbour(self, x, n, neighbour): | |
"""Detect if values are monotonic in both side of x[n]. | |
Args: | |
x: (frames_num,) | |
n: int | |
neighbour: int | |
Returns: | |
monotonic: bool | |
""" | |
monotonic = True | |
for i in range(neighbour): | |
if x[n - i] < x[n - i - 1]: | |
monotonic = False | |
if x[n + i] < x[n + i + 1]: | |
monotonic = False | |
return monotonic | |
def output_dict_to_detected_notes(self, output_dict): | |
"""Postprocess output_dict to piano notes. | |
Args: | |
output_dict: dict, e.g. { | |
'onset_output': (frames_num, classes_num), | |
'onset_shift_output': (frames_num, classes_num), | |
'offset_output': (frames_num, classes_num), | |
'offset_shift_output': (frames_num, classes_num), | |
'frame_output': (frames_num, classes_num), | |
'onset_output': (frames_num, classes_num), | |
...} | |
Returns: | |
est_on_off_note_vels: (notes, 4), the four columns are onsets, offsets, | |
MIDI notes and velocities. E.g., | |
[[39.7375, 39.7500, 27., 0.6638], | |
[11.9824, 12.5000, 33., 0.6892], | |
...] | |
""" | |
est_tuples = [] | |
est_midi_notes = [] | |
classes_num = output_dict['frame_output'].shape[-1] | |
for piano_note in range(classes_num): | |
"""Detect piano notes""" | |
est_tuples_per_note = note_detection_with_onset_offset_regress( | |
frame_output=output_dict['frame_output'][:, piano_note], | |
onset_output=output_dict['onset_output'][:, piano_note], | |
onset_shift_output=output_dict['onset_shift_output'][:, piano_note], | |
offset_output=output_dict['offset_output'][:, piano_note], | |
offset_shift_output=output_dict['offset_shift_output'][:, piano_note], | |
velocity_output=output_dict['velocity_output'][:, piano_note], | |
frame_threshold=self.frame_threshold) | |
est_tuples += est_tuples_per_note | |
est_midi_notes += [piano_note + self.begin_note] * len(est_tuples_per_note) | |
est_tuples = np.array(est_tuples) # (notes, 5) | |
"""(notes, 5), the five columns are onset, offset, onset_shift, | |
offset_shift and normalized_velocity""" | |
est_midi_notes = np.array(est_midi_notes) # (notes,) | |
onset_times = (est_tuples[:, 0] + est_tuples[:, 2]) / self.frames_per_second | |
offset_times = (est_tuples[:, 1] + est_tuples[:, 3]) / self.frames_per_second | |
velocities = est_tuples[:, 4] | |
est_on_off_note_vels = np.stack((onset_times, offset_times, est_midi_notes, velocities), axis=-1) | |
"""(notes, 3), the three columns are onset_times, offset_times and velocity.""" | |
est_on_off_note_vels = est_on_off_note_vels.astype(np.float32) | |
return est_on_off_note_vels | |
def detected_notes_to_events(self, est_on_off_note_vels): | |
"""Reformat detected notes to midi events. | |
Args: | |
est_on_off_vels: (notes, 3), the three columns are onset_times, | |
offset_times and velocity. E.g. | |
[[32.8376, 35.7700, 0.7932], | |
[37.3712, 39.9300, 0.8058], | |
...] | |
Returns: | |
midi_events, list, e.g., | |
[{'onset_time': 39.7376, 'offset_time': 39.75, 'midi_note': 27, 'velocity': 84}, | |
{'onset_time': 11.9824, 'offset_time': 12.50, 'midi_note': 33, 'velocity': 88}, | |
...] | |
""" | |
midi_events = [] | |
for i in range(est_on_off_note_vels.shape[0]): | |
midi_events.append({ | |
'onset_time': est_on_off_note_vels[i][0], | |
'offset_time': est_on_off_note_vels[i][1], | |
'midi_note': int(est_on_off_note_vels[i][2]), | |
'velocity': int(est_on_off_note_vels[i][3] * self.velocity_scale)}) | |
return midi_events | |
def sync_visualize_step1(cost_matrices: List, | |
num_rows: int, | |
num_cols: int, | |
anchors: np.ndarray, | |
wp: np.ndarray) -> Tuple[plt.Figure, plt.Axes]: | |
fig, ax = plt.subplots(1, 1, dpi=72) | |
ax = __visualize_cost_matrices(ax, cost_matrices) | |
__visualize_constraint_rectangles(anchors[[1, 0], :], | |
edgecolor='firebrick') | |
__visualize_path_in_matrix(ax=ax, | |
wp=wp, | |
axisX=np.arange(0, num_rows), | |
axisY=np.arange(0, num_cols), | |
path_color='firebrick') | |
return fig, ax | |
def sync_visualize_step2(ax: plt.Axes, | |
cost_matrices: list, | |
wp_step2: np.ndarray, | |
wp_step1: np.ndarray, | |
num_rows_step1: int, | |
num_cols_step1: int, | |
anchors_step1: np.ndarray, | |
neighboring_anchors: np.ndarray, | |
plot_title: str = ""): | |
offset_x = neighboring_anchors[0, 0] - 1 | |
offset_y = neighboring_anchors[1, 0] - 1 | |
ax = __visualize_cost_matrices(ax=ax, | |
cost_matrices=cost_matrices, | |
offset_x=offset_x, | |
offset_y=offset_y) | |
__visualize_constraint_rectangles(anchors_step1[[1, 0], :], | |
edgecolor='firebrick') | |
__visualize_path_in_matrix(ax=ax, | |
wp=wp_step1, | |
axisX=np.arange(0, num_rows_step1), | |
axisY=np.arange(0, num_cols_step1), | |
path_color='firebrick') | |
__visualize_constraint_rectangles(neighboring_anchors[[1, 0], :] - 1, | |
edgecolor='orangered', | |
linestyle='--') | |
__visualize_path_in_matrix(ax=ax, | |
wp=wp_step2, | |
axisX=np.arange(0, num_rows_step1), | |
axisY=np.arange(0, num_cols_step1), | |
path_color='orangered') | |
ax.set_title(plot_title) | |
ax.set_ylabel("Version 1 (frames)") | |
ax.set_xlabel("Version 2 (frames)") | |
ax = plt.gca() # get the current axes | |
pcm = None | |
for pcm in ax.get_children(): | |
if isinstance(pcm, ScalarMappable): | |
break | |
plt.colorbar(pcm, ax=ax) | |
plt.tight_layout() | |
plt.show() | |
def __size_dtw_matrices(dtw_matrices: List) -> Tuple[List[np.ndarray], List[np.ndarray]]: | |
"""Gives information about the dimensionality of a DTW matrix | |
given in form of a list matrix | |
Parameters | |
---------- | |
dtw_matrices: list | |
The DTW matrix (cost matrix or accumulated cost matrix) given in form a list. | |
Returns | |
------- | |
axisX_list: list | |
A list containing a horizontal axis for each of the sub matrices | |
which specifies the horizontal position of the respective submatrix | |
in the overall cost matrix. | |
axis_y_list: list | |
A list containing a vertical axis for each of the | |
sub matrices which specifies the vertical position of the | |
respective submatrix in the overall cost matrix. | |
""" | |
num_matrices = len(dtw_matrices) | |
size_list = [dtw_mat.shape for dtw_mat in dtw_matrices] | |
axis_x_list = list() | |
axis_y_list = list() | |
x_acc = 0 | |
y_acc = 0 | |
for i in range(num_matrices): | |
curr_size_list = size_list[i] | |
axis_x_list.append(np.arange(x_acc, x_acc + curr_size_list[0])) | |
axis_y_list.append(np.arange(y_acc, y_acc + curr_size_list[1])) | |
x_acc += curr_size_list[0] - 1 | |
y_acc += curr_size_list[1] - 1 | |
return axis_x_list, axis_y_list | |
def __visualize_cost_matrices(ax: plt.Axes, | |
cost_matrices: list = None, | |
offset_x: float = 0.0, | |
offset_y: float = 0.0) -> plt.Axes: | |
"""Visualizes cost matrices | |
Parameters | |
---------- | |
ax : axes | |
The Axes instance to plot on | |
cost_matrices : list | |
List of DTW cost matrices. | |
offset_x : float | |
Offset on the x axis. | |
offset_y : float | |
Offset on the y axis. | |
Returns | |
------- | |
ax: axes | |
The Axes instance to plot on | |
""" | |
x_ax, y_ax = __size_dtw_matrices(dtw_matrices=cost_matrices) | |
for i, cur_cost in enumerate(cost_matrices[::-1]): | |
curr_x_ax = x_ax[i] + offset_x | |
curr_y_ax = y_ax[i] + offset_y | |
cur_cost = cost_matrices[i] | |
ax.imshow(cur_cost, cmap='gray_r', aspect='auto', origin='lower', | |
extent=[curr_y_ax[0], curr_y_ax[-1], curr_x_ax[0], curr_x_ax[-1]]) | |
return ax | |
def __visualize_path_in_matrix(ax, | |
wp: np.ndarray = None, | |
axisX: np.ndarray = None, | |
axisY: np.ndarray = None, | |
path_color: str = 'r'): | |
"""Plots a warping path on top of a given matrix. The matrix is | |
usually an accumulated cost matrix. | |
Parameters | |
---------- | |
ax : axes | |
The Axes instance to plot on | |
wp : np.ndarray | |
Warping path | |
axisX : np.ndarray | |
Array of X axis | |
axisY : np.ndarray | |
Array of Y axis | |
path_color : str | |
Color of the warping path to be plotted. (default: r) | |
""" | |
assert axisX is not None and isinstance(axisX, np.ndarray), 'axisX must be a numpy array!' | |
assert axisY is not None and isinstance(axisY, np.ndarray), 'axisY must be a numpy array!' | |
wp = wp.astype(int) | |
ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], '-k', linewidth=5) | |
ax.plot(axisY[wp[1, :]], axisX[wp[0, :]], color=path_color, linewidth=3) | |
def __visualize_constraint_rectangles(anchors: np.ndarray, | |
linestyle: str = '-', | |
edgecolor: str = 'royalblue', | |
linewidth: float = 1.0): | |
for k in range(anchors.shape[1]-1): | |
a1 = anchors[:, k] | |
a2 = anchors[:, k + 1] | |
# a rectangle is defined by [x y width height] | |
x = a1[0] | |
y = a1[1] | |
w = a2[0] - a1[0] + np.finfo(float).eps | |
h = a2[1] - a1[1] + np.finfo(float).eps | |
rect = Rectangle((x, y), w, h, | |
linewidth=linewidth, | |
edgecolor=edgecolor, | |
linestyle=linestyle, | |
facecolor='none') | |
plt.gca().add_patch(rect) | |
def project_alignment_on_a_new_feature_rate(alignment: np.ndarray, | |
feature_rate_old: int, | |
feature_rate_new: int, | |
cost_matrix_size_old: tuple = (), | |
cost_matrix_size_new: tuple = ()) -> np.ndarray: | |
"""Projects an alignment computed for a cost matrix on a certain | |
feature resolution on a cost matrix having a different feature | |
resolution. | |
Parameters | |
---------- | |
alignment : np.ndarray [shape=(2, N)] | |
Alignment matrix | |
feature_rate_old : int | |
Feature rate of the old cost matrix | |
feature_rate_new : int | |
Feature rate of the new cost matrix | |
cost_matrix_size_old : tuple | |
Size of the old cost matrix. Possibly needed to deal with border cases | |
cost_matrix_size_new : tuple | |
Size of the new cost matrix. Possibly needed to deal with border cases | |
Returns | |
------- | |
np.ndarray [shape=(2, N)] | |
Anchor sequence for the new cost matrix | |
""" | |
# Project the alignment on the new feature rate | |
fac = feature_rate_new / feature_rate_old | |
anchors = np.round(alignment * fac) + 1 | |
# In case the sizes of the cost matrices are given explicitly and the | |
# alignment specifies to align the first and last elements, handle this case | |
# separately since this might cause problems in the general projection | |
# procedure. | |
if cost_matrix_size_old is not None and cost_matrix_size_new is not None: | |
if np.array_equal(alignment[:, 0], np.array([0, 0])): | |
anchors[:, 0] = np.array([1, 1]) | |
if np.array_equal(alignment[:, -1], np.array(cost_matrix_size_old) - 1): | |
anchors[:, -1] = np.array(cost_matrix_size_new) | |
return anchors - 1 | |
def derive_anchors_from_projected_alignment(projected_alignment: np.ndarray, | |
threshold: int) -> np.ndarray: | |
"""Derive anchors from a projected alignment such that the area of the rectangle | |
defined by two subsequent anchors a1 and a2 is below a given threshold. | |
Parameters | |
---------- | |
projected_alignment : np.ndarray [shape=(2, N)] | |
Projected alignment array | |
threshold : int | |
Maximum area of the constraint rectangle | |
Returns | |
------- | |
anchors_res : np.ndarray [shape=(2, M)] | |
Resulting anchor sequence | |
""" | |
L = projected_alignment.shape[1] | |
a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1) | |
a2 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1) | |
if __compute_area(a1, a2) <= threshold: | |
anchors_res = np.concatenate([a1, a2], axis=1) | |
elif L > 2: | |
center = int(np.floor(L/2 + 1)) | |
a1 = np.array(projected_alignment[:, 0], copy=True).reshape(-1, 1) | |
a2 = np.array(projected_alignment[:, center - 1], copy=True).reshape(-1, 1) | |
a3 = np.array(projected_alignment[:, -1], copy=True).reshape(-1, 1) | |
if __compute_area(a1, a2) > threshold: | |
anchors_1 = derive_anchors_from_projected_alignment(projected_alignment[:, 0:center], threshold) | |
else: | |
anchors_1 = np.concatenate([a1, a2], axis=1) | |
if __compute_area(a2, a3) > threshold: | |
anchors_2 = derive_anchors_from_projected_alignment(projected_alignment[:, center - 1:], threshold) | |
else: | |
anchors_2 = np.concatenate([a2, a3], axis=1) | |
anchors_res = np.concatenate([anchors_1, anchors_2[:, 1:]], axis=1) | |
else: | |
if __compute_area(a1, a2) > threshold: | |
print('Only two anchor points are given which do not fulfill the constraint.') | |
anchors_res = np.concatenate([a1, a2], axis=1) | |
return anchors_res | |
def derive_neighboring_anchors(warping_path: np.ndarray, | |
anchor_indices: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
"""Compute anchor points in the neighborhood of previous anchor points. | |
Parameters | |
---------- | |
warping_path : np.ndarray [shape=(2, N)] | |
Warping path | |
anchor_indices : np.ndarray | |
Indices corresponding to the anchor points in the ``warping_path`` | |
Returns | |
------- | |
neighboring_anchors : np.ndarray [shape=(2, N-1)] | |
Sequence of neighboring anchors | |
neighboring_anchor_indices : np.ndarray | |
Indices into ``warping path`` corresponding to ``neighboring_anchors`` | |
""" | |
L = anchor_indices.shape[0] | |
neighboring_anchor_indices = np.zeros(L-1, dtype=int) | |
neighboring_anchors = np.zeros((2, L-1), dtype=int) | |
for k in range(1, L): | |
i1 = anchor_indices[k-1] | |
i2 = anchor_indices[k] | |
neighboring_anchor_indices[k-1] = i1 + np.floor((i2 - i1) / 2) | |
neighboring_anchors[:, k-1] = warping_path[:, neighboring_anchor_indices[k - 1]] | |
return neighboring_anchors, neighboring_anchor_indices | |
def __compute_area(a: tuple, | |
b: tuple): | |
"""Computes the area between two points, given as tuples""" | |
return (b[0] - a[0] + 1) * (b[1] - a[1] + 1) | |
class Transcriber(PitchEstimator): | |
def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160): | |
super().__init__(labeling, instrument=instrument, sr=sr, window_size=window_size, hop_length=hop_length) | |
def transcribe(self, audio, batch_size=128, postprocessing='spotify', include_pitch_bends=True, to_midi=True, | |
debug=False): | |
""" | |
Transcribe an audio file or mono waveform in numpy or torch into MIDI with pitch bends. | |
:param audio: str, pathlib.Path, np.ndarray, or torch.Tensor | |
:param batch_size: frames to process at once | |
:param postprocessing: note creation method. 'spotify'(default) or 'tiktok' | |
:param include_pitch_bends: whether to include pitch bends in the MIDI file | |
:param to_midi: whether to return a MIDI file or a list of note events (as tuple) | |
:return: transcribed MIDI file as a pretty_midi.PrettyMIDI object | |
""" | |
out = self.predict(audio, batch_size) | |
if debug: | |
plt.imshow(out['f0'].T, aspect='auto', origin='lower') | |
plt.show() | |
plt.imshow(out['note'].T, aspect='auto', origin='lower') | |
plt.show() | |
plt.imshow(out['onset'].T, aspect='auto', origin='lower') | |
plt.show() | |
plt.imshow(out['offset'].T, aspect='auto', origin='lower') | |
plt.show() | |
if to_midi: | |
return self.out2midi(out, postprocessing, include_pitch_bends) | |
else: | |
return self.out2note(out, postprocessing, include_pitch_bends) | |
def out2note(self, output: Dict[str, np.array], postprocessing='spotify', | |
include_pitch_bends: bool = True, | |
) -> List[Tuple[float, float, int, float, Optional[List[int]]]]: | |
"""Convert model output to notes | |
""" | |
if postprocessing == 'spotify': | |
estimated_notes = spotify_create_notes( | |
output["note"], | |
output["onset"], | |
note_low=self.labeling.midi_centers[0], | |
note_high=self.labeling.midi_centers[-1], | |
onset_thresh=0.5, | |
frame_thresh=0.3, | |
infer_onsets=True, | |
min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length))), #127.70 | |
melodia_trick=True, | |
) | |
if postprocessing == 'rebab': | |
estimated_notes = spotify_create_notes( | |
output["note"], | |
output["onset"], | |
note_low=self.labeling.midi_centers[0], | |
note_high=self.labeling.midi_centers[-1], | |
onset_thresh=0.2, | |
frame_thresh=0.2, | |
infer_onsets=True, | |
min_note_len=int(np.round(127.70 / 1000 * (self.sr / self.hop_length))), #127.70 | |
melodia_trick=True, | |
) | |
elif postprocessing == 'tiktok': | |
postprocessor = RegressionPostProcessor( | |
frames_per_second=self.sr / self.hop_length, | |
classes_num=self.labeling.midi_centers.shape[0], | |
begin_note=self.labeling.midi_centers[0], | |
onset_threshold=0.2, | |
offset_threshold=0.2, | |
frame_threshold=0.3, | |
pedal_offset_threshold=0.5, | |
) | |
tiktok_note_dict, _ = postprocessor.output_dict_to_midi_events(output) | |
estimated_notes = [] | |
for list_item in tiktok_note_dict: | |
if list_item['offset_time'] > 0.6 + list_item['onset_time']: | |
estimated_notes.append((int(np.floor(list_item['onset_time']/(output['time'][1]))), | |
int(np.ceil(list_item['offset_time']/(output['time'][1]))), | |
list_item['midi_note'], list_item['velocity']/128)) | |
if include_pitch_bends: | |
estimated_notes_with_pitch_bend = self.get_pitch_bends(output["f0"], estimated_notes) | |
else: | |
estimated_notes_with_pitch_bend = [(note[0], note[1], note[2], note[3], None) for note in estimated_notes] | |
times_s = output['time'] | |
estimated_notes_time_seconds = [ | |
(times_s[note[0]], times_s[note[1]], note[2], note[3], note[4]) for note in estimated_notes_with_pitch_bend | |
] | |
return estimated_notes_time_seconds | |
def out2midi(self, output: Dict[str, np.array], postprocessing: str = 'spotify', include_pitch_bends: bool = True, | |
) -> PrettyMIDI: | |
"""Convert model output to MIDI | |
Args: | |
output: A dictionary with shape | |
{ | |
'frame': array of shape (n_times, n_freqs), | |
'onset': array of shape (n_times, n_freqs), | |
'contour': array of shape (n_times, 3*n_freqs) | |
} | |
representing the output of the basic pitch model. | |
postprocessing: spotify or tiktok postprocessing. | |
include_pitch_bends: If True, include pitch bends. | |
Returns: | |
note_events: A list of note event tuples (start_time_s, end_time_s, pitch_midi, amplitude) | |
""" | |
estimated_notes_time_seconds = self.out2note(output, postprocessing, include_pitch_bends) | |
midi_tempo = 120 # todo: infer tempo from the onsets | |
return self.note2midi(estimated_notes_time_seconds, midi_tempo) | |
def note2midi( | |
self, note_events_with_pitch_bends: List[Tuple[float, float, int, float, Optional[List[int]]]], | |
midi_tempo: float = 120, | |
): | |
"""Create a pretty_midi object from note events | |
:param note_events_with_pitch_bends: list of tuples | |
[(start_time_seconds, end_time_seconds, pitch_midi, amplitude)] | |
:param midi_tempo: #todo: infer tempo from the onsets | |
:return: transcribed MIDI file as a pretty_midi.PrettyMIDI object | |
""" | |
mid = PrettyMIDI(initial_tempo=midi_tempo) | |
program = instrument_name_to_program(self.instrument) | |
instruments: DefaultDict[int, Instrument] = defaultdict( | |
lambda: Instrument(program=program) | |
) | |
for start_time, end_time, note_number, amplitude, pitch_bend in note_events_with_pitch_bends: | |
instrument = instruments[note_number] | |
note = Note( | |
velocity=int(np.round(127 * amplitude)), | |
pitch=note_number, | |
start=start_time, | |
end=end_time, | |
) | |
instrument.notes.append(note) | |
if not isinstance(pitch_bend, np.ndarray): | |
continue | |
pitch_bend_times = np.linspace(start_time, end_time, len(pitch_bend)) | |
for pb_time, pb_midi in zip(pitch_bend_times, pitch_bend): | |
instrument.pitch_bends.append(PitchBend(pb_midi, pb_time)) | |
mid.instruments.extend(instruments.values()) | |
return mid | |
def sync_via_mrmsdtw_with_anchors(f_chroma1: np.ndarray, | |
f_chroma2: np.ndarray, | |
f_onset1: np.ndarray = None, | |
f_onset2: np.ndarray = None, | |
input_feature_rate: float = 50, | |
step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32), | |
step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), | |
threshold_rec: int = 10000, | |
win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]), | |
downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]), | |
verbose: bool = False, | |
dtw_implementation: str = 'synctoolbox', | |
normalize_chroma: bool = True, | |
chroma_norm_ord: int = 2, | |
chroma_norm_threshold: float = 0.001, | |
visualization_title: str = "MrMsDTW result", | |
anchor_pairs: List[Tuple] = None, | |
linear_inp_idx: List[int] = [], | |
alpha=0.5) -> np.ndarray: | |
"""Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features. | |
MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint | |
regions defined by the alignment found on the previous, coarser level. | |
If onset features are provided, these are used on the finest level in addition to chroma | |
to provide higher synchronization accuracy. | |
Parameters | |
---------- | |
f_chroma1 : np.ndarray [shape=(12, N)] | |
Chroma feature matrix of the first sequence | |
f_chroma2 : np.ndarray [shape=(12, M)] | |
Chroma feature matrix of the second sequence | |
f_onset1 : np.ndarray [shape=(L, N)] | |
Onset feature matrix of the first sequence (optional, default: None) | |
f_onset2 : np.ndarray [shape=(L, M)] | |
Onset feature matrix of the second sequence (optional, default: None) | |
input_feature_rate: int | |
Input feature rate of the chroma features (default: 50) | |
step_sizes: np.ndarray | |
DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) | |
step_weights: np.ndarray | |
DTW step weights (np.array([1.0, 1.0, 1.0])) | |
threshold_rec: int | |
Defines the maximum area that is spanned by the rectangle of two | |
consecutive elements in the alignment (default: 10000) | |
win_len_smooth : np.ndarray | |
Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1])) | |
downsamp_smooth : np.ndarray | |
Downsampling factors (default: np.array([50, 25, 5, 1])) | |
verbose : bool | |
Set `True` for visualization (default: False) | |
dtw_implementation : str | |
DTW implementation, librosa or synctoolbox (default: synctoolbox) | |
normalize_chroma : bool | |
Set `True` to normalize input chroma features after each downsampling | |
and smoothing operation. | |
chroma_norm_ord: int | |
Order of chroma normalization, relevant if ``normalize_chroma`` is True. | |
(default: 2) | |
chroma_norm_threshold: float | |
If the norm falls below threshold for a feature vector, then the | |
normalized feature vector is set to be the unit vector. Relevant, if | |
``normalize_chroma`` is True (default: 0.001) | |
visualization_title : str | |
Title for the visualization plots. Only relevant if 'verbose' is True | |
(default: "MrMsDTW result") | |
anchor_pairs: List[Tuple] | |
Anchor pairs given in seconds. Note that | |
* (0, 0) and (<audio-len1>, <audio-len2>) are not allowed. | |
* Anchors must be monotonously increasing. | |
linear_inp_idx: List[int] | |
List of the indices of intervals created by anchor pairs, for which | |
MrMsDTW shouldn't be run, e.g., if the interval only involves silence. | |
0 ap1 ap2 ap3 | |
| | | | | |
| idx0 | idx1 | idx2 | idx3 OR idx-1 | |
| | | | | |
Note that index -1 corresponds to the last interval, which begins with | |
the last anchor pair until the end of the audio files. | |
alpha: float | |
Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm. | |
C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5) | |
Returns | |
------- | |
wp : np.ndarray [shape=(2, T)] | |
Resulting warping path which indicates synchronized indices. | |
""" | |
if anchor_pairs is None: | |
wp = sync_via_mrmsdtw(f_chroma1=f_chroma1, | |
f_chroma2=f_chroma2, | |
f_onset1=f_onset1, | |
f_onset2=f_onset2, | |
input_feature_rate=input_feature_rate, | |
step_sizes=step_sizes, | |
step_weights=step_weights, | |
threshold_rec=threshold_rec, | |
win_len_smooth=win_len_smooth, | |
downsamp_smooth=downsamp_smooth, | |
verbose=verbose, | |
dtw_implementation=dtw_implementation, | |
normalize_chroma=normalize_chroma, | |
chroma_norm_ord=chroma_norm_ord, | |
chroma_norm_threshold=chroma_norm_threshold, | |
visualization_title=visualization_title, | |
alpha=alpha) | |
else: | |
# constant_intervals = [((0, x1), (0, y1), False), | |
# ((x1, x2), (y1, y2), True), | |
# ((x2, -1), (y2, -1), False)] | |
wp = None | |
if verbose: | |
print('Anchor points are given!') | |
__check_anchor_pairs(anchor_pairs, f_chroma1.shape[1], f_chroma2.shape[1], input_feature_rate) | |
# Add ending as the anchor point | |
anchor_pairs.append((-1, -1)) | |
prev_a1 = 0 | |
prev_a2 = 0 | |
for idx, anchor_pair in enumerate(anchor_pairs): | |
cur_a1, cur_a2 = anchor_pair | |
# Split the features | |
f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split = __split_features(f_chroma1, | |
f_onset1, | |
f_chroma2, | |
f_onset2, | |
cur_a1, | |
cur_a2, | |
prev_a1, | |
prev_a2, | |
input_feature_rate) | |
if idx in linear_inp_idx or idx == len(anchor_pairs) - 1 and -1 in linear_inp_idx: | |
# Generate a diagonal warping path, if the algorithm is not supposed to executed. | |
# A typical scenario is the silence breaks which are enclosed by two anchor points. | |
if verbose: | |
print('A diagonal warping path is generated for the interval \n\t Feature sequence 1: %.2f - %.2f' | |
'\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2)) | |
wp_cur = __diagonal_warping_path(f_chroma1_split, f_chroma2_split) | |
else: | |
if verbose: | |
if cur_a1 != -1 and cur_a2 != -1: | |
print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - %.2f' | |
'\n\t Feature sequence 2: %.2f - %.2f\n' % (prev_a1, cur_a1, prev_a2, cur_a2)) | |
else: | |
print('MrMsDTW is applied for the interval \n\t Feature sequence 1: %.2f - end' | |
'\n\t Feature sequence 2: %.2f - end\n' % (prev_a1, prev_a2)) | |
wp_cur = sync_via_mrmsdtw(f_chroma1=f_chroma1_split, | |
f_chroma2=f_chroma2_split, | |
f_onset1=f_onset1_split, | |
f_onset2=f_onset2_split, | |
input_feature_rate=input_feature_rate, | |
step_sizes=step_sizes, | |
step_weights=step_weights, | |
threshold_rec=threshold_rec, | |
win_len_smooth=win_len_smooth, | |
downsamp_smooth=downsamp_smooth, | |
verbose=verbose, | |
dtw_implementation=dtw_implementation, | |
normalize_chroma=normalize_chroma, | |
chroma_norm_ord=chroma_norm_ord, | |
chroma_norm_threshold=chroma_norm_threshold, | |
alpha=alpha) | |
if wp is None: | |
wp = np.array(wp_cur, copy=True) | |
# Concatenate warping paths | |
else: | |
wp = np.concatenate([wp, wp_cur + wp[:, -1].reshape(2, 1) + 1], axis=1) | |
prev_a1 = cur_a1 | |
prev_a2 = cur_a2 | |
anchor_pairs.pop() | |
return wp | |
def sync_via_mrmsdtw(f_chroma1: np.ndarray, | |
f_chroma2: np.ndarray, | |
f_onset1: np.ndarray = None, | |
f_onset2: np.ndarray = None, | |
input_feature_rate: float = 50, | |
step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int32), | |
step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), | |
threshold_rec: int = 10000, | |
win_len_smooth: np.ndarray = np.array([201, 101, 21, 1]), | |
downsamp_smooth: np.ndarray = np.array([50, 25, 5, 1]), | |
verbose: bool = False, | |
dtw_implementation: str = 'synctoolbox', | |
normalize_chroma: bool = True, | |
chroma_norm_ord: int = 2, | |
chroma_norm_threshold: float = 0.001, | |
visualization_title: str = "MrMsDTW result", | |
alpha=0.5) -> np.ndarray: | |
"""Compute memory-restricted multi-scale DTW (MrMsDTW) using chroma and (optionally) onset features. | |
MrMsDTW is performed on multiple levels that get progressively finer, with rectangular constraint | |
regions defined by the alignment found on the previous, coarser level. | |
If onset features are provided, these are used on the finest level in addition to chroma | |
to provide higher synchronization accuracy. | |
Parameters | |
---------- | |
f_chroma1 : np.ndarray [shape=(12, N)] | |
Chroma feature matrix of the first sequence | |
f_chroma2 : np.ndarray [shape=(12, M)] | |
Chroma feature matrix of the second sequence | |
f_onset1 : np.ndarray [shape=(L, N)] | |
Onset feature matrix of the first sequence (optional, default: None) | |
f_onset2 : np.ndarray [shape=(L, M)] | |
Onset feature matrix of the second sequence (optional, default: None) | |
input_feature_rate: int | |
Input feature rate of the chroma features (default: 50) | |
step_sizes: np.ndarray | |
DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) | |
step_weights: np.ndarray | |
DTW step weights (np.array([1.0, 1.0, 1.0])) | |
threshold_rec: int | |
Defines the maximum area that is spanned by the rectangle of two | |
consecutive elements in the alignment (default: 10000) | |
win_len_smooth : np.ndarray | |
Window lengths for chroma feature smoothing (default: np.array([201, 101, 21, 1])) | |
downsamp_smooth : np.ndarray | |
Downsampling factors (default: np.array([50, 25, 5, 1])) | |
verbose : bool | |
Set `True` for visualization (default: False) | |
dtw_implementation : str | |
DTW implementation, librosa or synctoolbox (default: synctoolbox) | |
normalize_chroma : bool | |
Set `True` to normalize input chroma features after each downsampling | |
and smoothing operation. | |
chroma_norm_ord: int | |
Order of chroma normalization, relevant if ``normalize_chroma`` is True. | |
(default: 2) | |
chroma_norm_threshold: float | |
If the norm falls below threshold for a feature vector, then the | |
normalized feature vector is set to be the unit vector. Relevant, if | |
``normalize_chroma`` is True (default: 0.001) | |
visualization_title : str | |
Title for the visualization plots. Only relevant if 'verbose' is True | |
(default: "MrMsDTW result") | |
alpha: float | |
Coefficient for the Chroma cost matrix in the finest scale of the MrMsDTW algorithm. | |
C = alpha * C_Chroma + (1 - alpha) * C_act (default: 0.5) | |
Returns | |
------- | |
alignment: np.ndarray [shape=(2, T)] | |
Resulting warping path which indicates synchronized indices. | |
""" | |
# If onset features are given as input, high resolution MrMsDTW is activated. | |
high_res = False | |
if f_onset1 is not None and f_onset2 is not None: | |
high_res = True | |
if high_res and (f_chroma1.shape[1] != f_onset1.shape[1] or f_chroma2.shape[1] != f_onset2.shape[1]): | |
raise ValueError('Chroma and onset features must be of the same length.') | |
if downsamp_smooth[-1] != 1 or win_len_smooth[-1] != 1: | |
raise ValueError('The downsampling factor of the last iteration must be equal to 1, i.e.' | |
'at the last iteration, it is computed at the input feature rate!') | |
num_iterations = win_len_smooth.shape[0] | |
cost_matrix_size_old = tuple() | |
feature_rate_old = input_feature_rate / downsamp_smooth[0] | |
alignment = None | |
total_computation_time = 0.0 | |
# If the area is less than the threshold_rec, don't apply the multiscale DTW. | |
it = (num_iterations - 1) if __compute_area(f_chroma1, f_chroma2) < threshold_rec else 0 | |
while it < num_iterations: | |
tic1 = perf_counter() | |
# Smooth and downsample given raw features | |
f_chroma1_cur, _ = smooth_downsample_feature(f_chroma1, | |
input_feature_rate=input_feature_rate, | |
win_len_smooth=win_len_smooth[it], | |
downsamp_smooth=downsamp_smooth[it]) | |
f_chroma2_cur, feature_rate_new = smooth_downsample_feature(f_chroma2, | |
input_feature_rate=input_feature_rate, | |
win_len_smooth=win_len_smooth[it], | |
downsamp_smooth=downsamp_smooth[it]) | |
if normalize_chroma: | |
f_chroma1_cur = normalize_feature(f_chroma1_cur, | |
norm_ord=chroma_norm_ord, | |
threshold=chroma_norm_threshold) | |
f_chroma2_cur = normalize_feature(f_chroma2_cur, | |
norm_ord=chroma_norm_ord, | |
threshold=chroma_norm_threshold) | |
# Project path onto new resolution | |
cost_matrix_size_new = (f_chroma1_cur.shape[1], f_chroma2_cur.shape[1]) | |
if alignment is None: | |
# Initialize the alignment with the start and end frames of the feature sequence | |
anchors = np.array([[0, f_chroma1_cur.shape[1] - 1], [0, f_chroma2_cur.shape[1] - 1]]) | |
else: | |
projected_alignment = project_alignment_on_a_new_feature_rate(alignment=alignment, | |
feature_rate_old=feature_rate_old, | |
feature_rate_new=feature_rate_new, | |
cost_matrix_size_old=cost_matrix_size_old, | |
cost_matrix_size_new=cost_matrix_size_new) | |
anchors = derive_anchors_from_projected_alignment(projected_alignment=projected_alignment, | |
threshold=threshold_rec) | |
# Cost matrix and warping path computation | |
if high_res and it == num_iterations - 1: | |
# Compute cost considering chroma and pitch onset features and alignment only in the last iteration, | |
# where the features are at the finest level. | |
cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, | |
f_chroma2=f_chroma2_cur, | |
f_onset1=f_onset1, | |
f_onset2=f_onset2, | |
anchors=anchors, | |
alpha=alpha) | |
else: | |
cost_matrices_step1 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, | |
f_chroma2=f_chroma2_cur, | |
anchors=anchors, | |
alpha=alpha) | |
wp_list = compute_warping_paths_from_cost_matrices(cost_matrices_step1, | |
step_sizes=step_sizes, | |
step_weights=step_weights, | |
implementation=dtw_implementation) | |
# Concatenate warping paths | |
wp = build_path_from_warping_paths(warping_paths=wp_list, | |
anchors=anchors) | |
anchors_step1 = None | |
wp_step1 = None | |
num_rows_step1 = 0 | |
num_cols_step1 = 0 | |
ax = None | |
toc1 = perf_counter() | |
if verbose and cost_matrices_step1 is not None: | |
anchors_step1 = np.array(anchors, copy=True) | |
wp_step1 = np.array(wp, copy=True) | |
num_rows_step1, num_cols_step1 = np.sum(np.array([dtw_mat.shape for dtw_mat in cost_matrices_step1], int), | |
axis=0) | |
fig, ax = sync_visualize_step1(cost_matrices_step1, | |
num_rows_step1, | |
num_cols_step1, | |
anchors, | |
wp) | |
tic2 = perf_counter() | |
# Compute neighboring anchors and refine alignment using local path between neighboring anchors | |
anchor_indices_in_warping_path = find_anchor_indices_in_warping_path(wp, anchors=anchors) | |
# Compute neighboring anchors for refinement | |
neighboring_anchors, neighboring_anchor_indices = \ | |
derive_neighboring_anchors(wp, anchor_indices=anchor_indices_in_warping_path) | |
if neighboring_anchor_indices.shape[0] > 1 \ | |
and it == num_iterations - 1 and high_res: | |
cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, | |
f_chroma2=f_chroma2_cur, | |
f_onset1=f_onset1, | |
f_onset2=f_onset2, | |
anchors=neighboring_anchors, | |
alpha=alpha) | |
else: | |
cost_matrices_step2 = compute_cost_matrices_between_anchors(f_chroma1=f_chroma1_cur, | |
f_chroma2=f_chroma2_cur, | |
anchors=neighboring_anchors, | |
alpha=alpha) | |
wp_list_refine = compute_warping_paths_from_cost_matrices(cost_matrices=cost_matrices_step2, | |
step_sizes=step_sizes, | |
step_weights=step_weights, | |
implementation=dtw_implementation) | |
wp = __refine_wp(wp, anchors, wp_list_refine, neighboring_anchors, neighboring_anchor_indices) | |
toc2 = perf_counter() | |
computation_time_it = toc2 - tic2 + toc1 - tic1 | |
total_computation_time += computation_time_it | |
alignment = wp | |
feature_rate_old = feature_rate_new | |
cost_matrix_size_old = cost_matrix_size_new | |
if verbose and cost_matrices_step2 is not None: | |
sync_visualize_step2(ax, | |
cost_matrices_step2, | |
wp, | |
wp_step1, | |
num_rows_step1, | |
num_cols_step1, | |
anchors_step1, | |
neighboring_anchors, | |
plot_title=f"{visualization_title} - Level {it + 1}") | |
print('Level {} computation time: {:.2f} seconds'.format(it, computation_time_it)) | |
it += 1 | |
if verbose: | |
print('Computation time of MrMsDTW: {:.2f} seconds'.format(total_computation_time)) | |
return alignment | |
def __diagonal_warping_path(f1: np.ndarray, | |
f2: np.ndarray) -> np.ndarray: | |
"""Generates a diagonal warping path given two feature sequences. | |
Parameters | |
---------- | |
f1: np.ndarray [shape=(_, N)] | |
First feature sequence | |
f2: np.ndarray [shape=(_, M)] | |
Second feature sequence | |
Returns | |
------- | |
np.ndarray: Diagonal warping path [shape=(2, T)] | |
""" | |
max_size = np.maximum(f1.shape[1], f2.shape[1]) | |
min_size = np.minimum(f1.shape[1], f2.shape[1]) | |
if min_size == 1: | |
return np.array([max_size - 1, 0]).reshape(-1, 1) | |
elif max_size == f1.shape[1]: | |
return np.array([np.round(np.linspace(0, max_size - 1, min_size)), np.linspace(0, min_size - 1, min_size)]) | |
else: | |
return np.array([np.linspace(0, min_size-1, min_size), np.round(np.linspace(0, max_size - 1, min_size))]) | |
def __compute_area(f1, f2): | |
"""Computes the area of the cost matrix given two feature sequences | |
Parameters | |
---------- | |
f1: np.ndarray | |
First feature sequence | |
f2: np.ndarray | |
Second feature sequence | |
Returns | |
------- | |
int: Area of the cost matrix | |
""" | |
return f1.shape[1] * f2.shape[1] | |
def __split_features(f_chroma1: np.ndarray, | |
f_onset1: np.ndarray, | |
f_chroma2: np.ndarray, | |
f_onset2: np.ndarray, | |
cur_a1: float, | |
cur_a2: float, | |
prev_a1: float, | |
prev_a2: float, | |
feature_rate: int) -> Tuple[np.ndarray, Optional[np.ndarray], np.ndarray, Optional[np.ndarray]]: | |
if cur_a1 == -1: | |
f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):] | |
if f_onset1 is not None: | |
f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):] | |
else: | |
f_onset1_split = None | |
else: | |
# Split the features | |
f_chroma1_split = f_chroma1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)] | |
if f_onset1 is not None: | |
f_onset1_split = f_onset1[:, int(prev_a1 * feature_rate):int(cur_a1 * feature_rate)] | |
else: | |
f_onset1_split = None | |
if cur_a2 == -1: | |
f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):] | |
if f_onset2 is not None: | |
f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):] | |
else: | |
f_onset2_split = None | |
else: | |
f_chroma2_split = f_chroma2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)] | |
if f_onset2 is not None: | |
f_onset2_split = f_onset2[:, int(prev_a2 * feature_rate):int(cur_a2 * feature_rate)] | |
else: | |
f_onset2_split = None | |
return f_chroma1_split, f_onset1_split, f_chroma2_split, f_onset2_split | |
def __refine_wp(wp: np.ndarray, | |
anchors: np.ndarray, | |
wp_list_refine: List, | |
neighboring_anchors: np.ndarray, | |
neighboring_anchor_indices: np.ndarray) -> np.ndarray: | |
wp_length = wp[:, neighboring_anchor_indices[-1]:].shape[1] | |
last_list = wp[:, neighboring_anchor_indices[-1]:] - np.tile( | |
wp[:, neighboring_anchor_indices[-1]].reshape(-1, 1), wp_length) | |
wp_list_tmp = [wp[:, :neighboring_anchor_indices[0] + 1]] + wp_list_refine + [last_list] | |
A_tmp = np.concatenate([anchors[:, 0].reshape(-1, 1), neighboring_anchors, anchors[:, -1].reshape(-1, 1)], | |
axis=1) | |
wp_res = build_path_from_warping_paths(warping_paths=wp_list_tmp, | |
anchors=A_tmp) | |
return wp_res | |
def __check_anchor_pairs(anchor_pairs: List, | |
f_len1: int, | |
f_len2: int, | |
feature_rate: int): | |
"""Ensures that the anchors satisfy the conditions | |
Parameters | |
---------- | |
anchor_pairs: List[Tuple] | |
List of anchor pairs | |
f_len1: int | |
Length of the first feature sequence | |
f_len2: int | |
Length of the second feature sequence | |
feature_rate: int | |
Input feature rate of the features | |
""" | |
prev_a1 = 0 | |
prev_a2 = 0 | |
for anchor_pair in anchor_pairs: | |
a1, a2 = anchor_pair | |
if a1 <= 0 or a2 <= 0: | |
raise ValueError('Starting point must be a positive number!') | |
if a1 > f_len1 / feature_rate or a2 > f_len2 / feature_rate: | |
raise ValueError('Anchor points cannot be greater than the length of the input audio files!') | |
if a1 == f_len1 and a2 == f_len2: | |
raise ValueError('Both anchor points cannot be equal to the length of the audio files.') | |
if a1 == prev_a1 and a2 == prev_a2: | |
raise ValueError('Duplicate anchor pairs are not allowed!') | |
if a1 < prev_a1 or a2 < prev_a2: | |
raise ValueError('Anchor points must be monotonously increasing.') | |
prev_a1 = a1 | |
prev_a2 = a2 | |
class PerformanceLabel: | |
""" | |
The dataset labeling class for performance representations. Currently, includes onset, note, and fine-grained f0 | |
representations. Note min, note max, and f0_bin_per_semitone values are to be arranged per instrument. The default | |
values are for violin performance analysis. Fretted instruments might not require such f0 resolutions per semitone. | |
""" | |
def __init__(self, note_min='F#3', note_max='C8', f0_bins_per_semitone=9, f0_smooth_std_c=None, | |
onset_smooth_std=0.7, f0_tolerance_c=200): | |
midi_min = note_name_to_number(note_min) | |
midi_max = note_name_to_number(note_max) | |
self.midi_centers = np.arange(midi_min, midi_max) | |
self.onset_smooth_std=onset_smooth_std # onset smoothing along time axis (compensate for alignment) | |
f0_hz_range = note_to_hz([note_min, note_max]) | |
f0_c_min, f0_c_max = hz2cents(f0_hz_range) | |
self.f0_granularity_c = 100/f0_bins_per_semitone | |
if not f0_smooth_std_c: | |
f0_smooth_std_c = self.f0_granularity_c * 5/4 # Keep the ratio from the CREPE paper (20 cents and 25 cents) | |
self.f0_smooth_std_c = f0_smooth_std_c | |
self.f0_centers_c = np.arange(f0_c_min, f0_c_max, self.f0_granularity_c) | |
self.f0_centers_hz = 10 * 2 ** (self.f0_centers_c / 1200) | |
self.f0_n_bins = len(self.f0_centers_c) | |
self.pdf_normalizer = norm.pdf(0) | |
self.f0_c2hz = lambda c: 10*2**(c/1200) | |
self.f0_hz2c = hz2cents | |
self.midi_centers_c = self.f0_hz2c(midi_to_hz(self.midi_centers)) | |
self.f0_tolerance_bins = int(f0_tolerance_c/self.f0_granularity_c) | |
self.f0_transition_matrix = gaussian_filter1d(np.eye(2*self.f0_tolerance_bins + 1), 25/self.f0_granularity_c) | |
def f0_c2label(self, pitch_c): | |
""" | |
Convert a single f0 value in cents to a one-hot label vector with smoothing (i.e., create a gaussian blur around | |
the target f0 bin for regularization and training stability. The blur is controlled by self.f0_smooth_std_c | |
:param pitch_c: a single pitch value in cents | |
:return: one-hot label vector with frequency blur | |
""" | |
result = norm.pdf((self.f0_centers_c - pitch_c) / self.f0_smooth_std_c).astype(np.float32) | |
result /= self.pdf_normalizer | |
return result | |
def f0_label2c(self, salience, center=None): | |
""" | |
Convert the salience predictions to monophonic f0 in cents. Only outputs a single f0 value per frame! | |
:param salience: f0 activations | |
:param center: f0 center bin to calculate the weighted average. Use argmax if empty | |
:return: f0 array per frame (in cents). | |
""" | |
if salience.ndim == 1: | |
if center is None: | |
center = int(np.argmax(salience)) | |
start = max(0, center - 4) | |
end = min(len(salience), center + 5) | |
salience = salience[start:end] | |
product_sum = np.sum(salience * self.f0_centers_c[start:end]) | |
weight_sum = np.sum(salience) | |
return product_sum / np.clip(weight_sum, 1e-8, None) | |
if salience.ndim == 2: | |
return np.array([self.f0_label2c(salience[i, :]) for i in range(salience.shape[0])]) | |
raise Exception("label should be either 1d or 2d ndarray") | |
def fill_onset_matrix(self, onsets, window, feature_rate): | |
""" | |
Create a sparse onset matrix from window and onsets (per-semitone). Apply a gaussian smoothing (along time) | |
so that we can tolerate better the alignment problems. This is similar to the frequency smoothing for the f0. | |
The temporal smoothing is controlled by the parameter self.onset_smooth_std | |
:param onsets: A 2d np.array of individual note onsets with their respective time values | |
(Nx2: time in seconds - midi number) | |
:param window: Timestamps for the frame centers of the sparse matrix | |
:param feature_rate: Window timestamps are integer, this is to convert them to seconds | |
:return: onset_roll: A sparse matrix filled with temporally blurred onsets. | |
""" | |
onsets = self.get_window_feats(onsets, window, feature_rate) | |
onset_roll = np.zeros((len(window), len(self.midi_centers))) | |
for onset in onsets: | |
onset, note = onset # it was a pair with time and midi note | |
if self.midi_centers[0] < note < self.midi_centers[-1]: # midi note should be in the range defined | |
note = int(note) - self.midi_centers[0] # find the note index in our range | |
onset = (onset*feature_rate)-window[0] # onset index (as float but in frames, not in seconds!) | |
start = max(0, int(onset) - 3) | |
end = min(len(window) - 1, int(onset) + 3) | |
try: | |
vals = norm.pdf(np.linspace(start - onset, end - onset, end - start + 1) / self.onset_smooth_std) | |
# if you increase 0.7 you smooth the peak | |
# if you decrease it, e.g., 0.1, it becomes too peaky! around 0.5-0.7 seems ok | |
vals /= self.pdf_normalizer | |
onset_roll[start:end + 1, note] += vals | |
except ValueError: | |
print('start',start, 'onset', onset, 'end', end) | |
return onset_roll, onsets | |
def fill_note_matrix(self, notes, window, feature_rate): | |
""" | |
Create the note matrix (piano roll) from window timestamps and note values per frame. | |
:param notes: A 2d np.array of individual notes with their active time values Nx2 | |
:param window: Timestamps for the frame centers of the output | |
:param feature_rate: Window timestamps are integer, this is to convert them to seconds | |
:return note_roll: The piano roll in the defined range of [note_min, note_max). | |
""" | |
notes = self.get_window_feats(notes, window, feature_rate) | |
# take the notes in the midi range defined | |
notes = notes[np.logical_and(notes[:,1]>=self.midi_centers[0], notes[:,1]<=self.midi_centers[-1]),:] | |
times = (notes[:,0]*feature_rate - window[0]).astype(int) # in feature samples (fs:self.hop/self.sr) | |
notes = (notes[:,1] - self.midi_centers[0]).astype(int) | |
note_roll = np.zeros((len(window), len(self.midi_centers))) | |
note_roll[(times, notes)] = 1 | |
return note_roll, notes | |
def fill_f0_matrix(self, f0s, window, feature_rate): | |
""" | |
Unlike the labels for onsets and notes, f0 label is only relevant for strictly monophonic regions! Thus, this | |
function returns a boolean which represents where to apply the given values. | |
Never back-propagate without the boolean! Empty frames mean that the label is not that reliable. | |
:param f0s: A 2d np.array of f0 values with the time they belong to (2xN: time in seconds - f0 in Hz) | |
:param window: Timestamps for the frame centers of the output | |
:param feature_rate: Window timestamps are integer, this is to convert them to seconds | |
:return f0_roll: f0 label matrix and | |
f0_hz: f0 values in Hz | |
annotation_bool: A boolean array representing which frames have reliable f0 annotations. | |
""" | |
f0s = self.get_window_feats(f0s, window, feature_rate) | |
f0_cents = np.zeros_like(window, dtype=float) | |
f0s[:,1] = self.f0_hz2c(f0s[:,1]) # convert f0 in hz to cents | |
annotation_bool = np.zeros_like(window, dtype=bool) | |
f0_roll = np.zeros((len(window), len(self.f0_centers_c))) | |
times_in_frame = f0s[:, 0]*feature_rate - window[0] | |
for t, f0 in enumerate(f0s): | |
t = times_in_frame[t] | |
if t%1 < 0.25: # only consider it as annotation if the f0 values is really close to the frame center | |
t = int(np.round(t)) | |
f0_roll[t] = self.f0_c2label(f0[1]) | |
annotation_bool[t] = True | |
f0_cents[t] = f0[1] | |
return f0_roll, f0_cents, annotation_bool | |
def get_window_feats(time_feature_matrix, window, feature_rate): | |
""" | |
Restrict the feature matrix to the features that are inside the window | |
:param window: Timestamps for the frame centers of the output | |
:param time_feature_matrix: A 2d array of Nx2 per the entire file. | |
:param feature_rate: Window timestamps are integer, this is to convert them to seconds | |
:return: window_features: the features inside the given window | |
""" | |
start = time_feature_matrix[:,0]>(window[0]-0.5)/feature_rate | |
end = time_feature_matrix[:,0]<(window[-1]+0.5)/feature_rate | |
window_features = np.logical_and(start, end) | |
window_features = np.array(time_feature_matrix[window_features,:]) | |
return window_features | |
def represent_midi(self, midi, feature_rate): | |
""" | |
Represent a midi file as sparse matrices of onsets, offsets, and notes. No f0 is included. | |
:param midi: A midi file (either a path or a pretty_midi.PrettyMIDI object) | |
:param feature_rate: The feature rate in Hz | |
:return: dict {onset, offset, note, time}: Same format with the model's learning and outputs | |
""" | |
def _get_onsets_offsets_frames(midi_content): | |
if isinstance(midi_content, str): | |
midi_content = PrettyMIDI(midi_content) | |
onsets = [] | |
offsets = [] | |
frames = [] | |
for instrument in midi_content.instruments: | |
for note in instrument.notes: | |
start = int(np.round(note.start * feature_rate)) | |
end = int(np.round(note.end * feature_rate)) | |
note_times = (np.arange(start, end+0.5)/feature_rate)[:, np.newaxis] | |
note_pitch = np.full_like(note_times, fill_value=note.pitch) | |
onsets.append([note.start, note.pitch]) | |
offsets.append([note.end, note.pitch]) | |
frames.append(np.hstack([note_times, note_pitch])) | |
onsets = np.vstack(onsets) | |
offsets = np.vstack(offsets) | |
frames = np.vstack(frames) | |
return onsets, offsets, frames, midi_content | |
onset_array, offset_array, frame_array, midi_object = _get_onsets_offsets_frames(midi) | |
window = np.arange(frame_array[0, 0]*feature_rate, frame_array[-1, 0]*feature_rate, dtype=int) | |
onset_roll, _ = self.fill_onset_matrix(onset_array, window, feature_rate) | |
offset_roll, _ = self.fill_onset_matrix(offset_array, window, feature_rate) | |
note_roll, _ = self.fill_note_matrix(frame_array, window, feature_rate) | |
start_anchor = onset_array[onset_array[:, 0]==np.min(onset_array[:, 0])] | |
end_anchor = offset_array[offset_array[:, 0]==np.max(offset_array[:, 0])] | |
return { | |
'midi': midi_object, | |
'note': note_roll, | |
'onset': onset_roll, | |
'offset': offset_roll, | |
'time': window/feature_rate, | |
'start_anchor': start_anchor, | |
'end_anchor': end_anchor | |
} | |
class Synchronizer(Transcriber): | |
def __init__(self, labeling, instrument='Violin', sr=16000, window_size=1024, hop_length=160): | |
super().__init__(labeling, instrument=instrument, sr=sr, window_size=window_size, hop_length=hop_length) | |
def synchronize(self, audio, midi, batch_size=128, include_pitch_bends=True, to_midi=True, debug=False, | |
include_velocity=False, alignment_padding=50, timing_refinement_range_with_f0s=0): | |
""" | |
Synchronize an audio file or mono waveform in numpy or torch with a MIDI file. | |
:param audio: str, pathlib.Path, np.ndarray, or torch.Tensor | |
:param midi: str, pathlib.Path, or pretty_midi.PrettyMIDI | |
:param batch_size: frames to process at once | |
:param include_pitch_bends: whether to include pitch bends in the MIDI file | |
:param to_midi: whether to return a MIDI file or a list of note events (as tuple) | |
:param debug: whether to plot the alignment path and compare the alignment with the predicted notes | |
:param include_velocity: whether to embed the note confidence in place of the velocity in the MIDI file | |
:param alignment_padding: how many frames to pad the audio and MIDI representations with | |
:param timing_refinement_range_with_f0s: how many frames to refine the alignment with the f0 confidence | |
:return: aligned MIDI file as a pretty_midi.PrettyMIDI object | |
Args: | |
debug: | |
to_midi: | |
include_pitch_bends: | |
""" | |
audio = self.predict(audio, batch_size) | |
notes_and_midi = self.out2sync(audio, midi, include_velocity=include_velocity, | |
alignment_padding=alignment_padding) | |
if notes_and_midi: # it might be none | |
notes, midi = notes_and_midi | |
if debug: | |
import pandas as pd | |
estimated_notes = self.out2note(audio, postprocessing='spotify', include_pitch_bends=True) | |
est_df = pd.DataFrame(estimated_notes).sort_values(by=0) | |
note_df = pd.DataFrame(notes).sort_values(by=0) | |
fig, ax = plt.subplots(figsize=(20, 10)) | |
for row in notes: | |
t_start = row[0] # sec | |
t_end = row[1] # sec | |
freq = row[2] # Hz | |
ax.hlines(freq, t_start, t_end, color='k', linewidth=3, zorder=2, alpha=0.5) | |
for row in estimated_notes: | |
t_start = row[0] # sec | |
t_end = row[1] # sec | |
freq = row[2] # Hz | |
ax.hlines(freq, t_start, t_end, color='r', linewidth=3, zorder=2, alpha=0.5) | |
fig.suptitle('alignment (black) vs. estimated (red)') | |
fig.show() | |
if not include_pitch_bends: | |
if to_midi: | |
return midi['midi'] | |
else: | |
return notes | |
else: | |
notes = [(np.argmin(np.abs(audio['time']-note[0])), | |
np.argmin(np.abs(audio['time']-note[1])), | |
note[2], note[3]) for note in notes] | |
notes = self.get_pitch_bends(audio["f0"], notes, timing_refinement_range_with_f0s) | |
notes = [ | |
(audio['time'][note[0]], audio['time'][note[1]], note[2], note[3], note[4]) for note in | |
notes | |
] | |
if to_midi: | |
return self.note2midi(notes, 120) #int(midi['midi'].estimate_tempo())) | |
else: | |
return notes | |
def out2sync_old(self, out: Dict[str, np.array], midi, include_velocity=False, alignment_padding=50, debug=False): | |
""" | |
Synchronizes the output of the model with the MIDI file. | |
Args: | |
out: Model output dictionary | |
midi: Path to the MIDI file or PrettyMIDI object | |
include_velocity: Whether to encode the note confidence in place of velocity | |
alignment_padding: Number of frames to pad the MIDI features with zeros | |
debug: Visualize the alignment | |
Returns: | |
note events and the aligned PrettyMIDI object | |
""" | |
midi = self.labeling.represent_midi(midi, self.sr/self.hop_length) | |
audio_midi_anchors = self.prepare_for_synchronization(out, midi, feature_rate=self.sr/self.hop_length, | |
pad_length=alignment_padding) | |
if isinstance(audio_midi_anchors, str): | |
print(audio_midi_anchors) | |
return None # the file is corrupted! no possible alignment at all | |
else: | |
audio, midi, anchor_pairs = audio_midi_anchors | |
ALPHA = 0.6 # This is the coefficient of onsets, 1 - ALPHA for offsets | |
wp = sync_via_mrmsdtw_with_anchors(f_chroma1=audio['note'].T, | |
f_onset1=np.hstack([ALPHA * audio['onset'], | |
(1 - ALPHA) * audio['offset']]).T, | |
f_chroma2=midi['note'].T, | |
f_onset2=np.hstack([ALPHA * midi['onset'], | |
(1 - ALPHA) * midi['offset']]).T, | |
input_feature_rate=self.sr/self.hop_length, | |
step_weights=np.array([1.5, 1.5, 2.0]), | |
threshold_rec=10 ** 6, | |
verbose=debug, normalize_chroma=False, | |
anchor_pairs=anchor_pairs) | |
wp = make_path_strictly_monotonic(wp).astype(int) | |
audio_time = np.take(audio['time'], wp[0]) | |
midi_time = np.take(midi['time'], wp[1]) | |
notes = [] | |
for instrument in midi['midi'].instruments: | |
for note in instrument.notes: | |
note.start = np.interp(note.start, midi_time, audio_time) | |
note.end = np.interp(note.end, midi_time, audio_time) | |
if note.end - note.start <= 0.012: # notes should be at least 12 ms (i.e. 2 frames) | |
note.start = note.start - 0.003 | |
note.end = note.start + 0.012 | |
if include_velocity: # encode the note confidence in place of velocity | |
velocity = np.median(audio['note'][np.argmin(np.abs(audio['time']-note.start)): | |
np.argmin(np.abs(audio['time']-note.end)), | |
note.pitch-self.labeling.midi_centers[0]]) | |
note.velocity = max(1, velocity*127) # velocity should be at least 1 otherwise midi removes the note | |
else: | |
velocity = note.velocity/127 | |
notes.append((note.start, note.end, note.pitch, velocity)) | |
return notes, midi | |
def out2sync(self, out: Dict[str, np.array], midi, include_velocity=False, alignment_padding=50, debug=False): | |
""" | |
Synchronizes the output of the model with the MIDI file. | |
Args: | |
out: Model output dictionary | |
midi: Path to the MIDI file or PrettyMIDI object | |
include_velocity: Whether to encode the note confidence in place of velocity | |
alignment_padding: Number of frames to pad the MIDI features with zeros | |
debug: Visualize the alignment | |
Returns: | |
note events and the aligned PrettyMIDI object | |
""" | |
midi = self.labeling.represent_midi(midi, self.sr/self.hop_length) | |
audio_midi_anchors = self.prepare_for_synchronization(out, midi, feature_rate=self.sr/self.hop_length, | |
pad_length=alignment_padding) | |
if isinstance(audio_midi_anchors, str): | |
print(audio_midi_anchors) | |
return None # the file is corrupted! no possible alignment at all | |
else: | |
audio, midi, anchor_pairs = audio_midi_anchors | |
ALPHA = 0.6 # This is the coefficient of onsets, 1 - ALPHA for offsets | |
starts = (np.array(anchor_pairs[0])*self.sr/self.hop_length).astype(int) | |
ends = (np.array(anchor_pairs[1])*self.sr/self.hop_length).astype(int) | |
wp = sync_via_mrmsdtw_with_anchors(f_chroma1=audio['note'].T[:, starts[0]:ends[0]], | |
f_onset1=np.hstack([ALPHA * audio['onset'], | |
(1 - ALPHA) * audio['offset']]).T[:, starts[0]:ends[0]], | |
f_chroma2=midi['note'].T[:, starts[1]:ends[1]], | |
f_onset2=np.hstack([ALPHA * midi['onset'], | |
(1 - ALPHA) * midi['offset']]).T[:, starts[1]:ends[1]], | |
input_feature_rate=self.sr/self.hop_length, | |
step_weights=np.array([1.5, 1.5, 2.0]), | |
threshold_rec=10 ** 6, | |
verbose=debug, normalize_chroma=False, | |
anchor_pairs=None) | |
wp = make_path_strictly_monotonic(wp).astype(int) | |
wp[0] += starts[0] | |
wp[1] += starts[1] | |
wp = np.hstack((wp, ends[:,np.newaxis])) | |
audio_time = np.take(audio['time'], wp[0]) | |
midi_time = np.take(midi['time'], wp[1]) | |
notes = [] | |
for instrument in midi['midi'].instruments: | |
for note in instrument.notes: | |
note.start = np.interp(note.start, midi_time, audio_time) | |
note.end = np.interp(note.end, midi_time, audio_time) | |
if note.end - note.start <= 0.012: # notes should be at least 12 ms (i.e. 2 frames) | |
note.start = note.start - 0.003 | |
note.end = note.start + 0.012 | |
if include_velocity: # encode the note confidence in place of velocity | |
velocity = np.median(audio['note'][np.argmin(np.abs(audio['time']-note.start)): | |
np.argmin(np.abs(audio['time']-note.end)), | |
note.pitch-self.labeling.midi_centers[0]]) | |
note.velocity = max(1, velocity*127) # velocity should be at least 1 otherwise midi removes the note | |
else: | |
velocity = note.velocity/127 | |
notes.append((note.start, note.end, note.pitch, velocity)) | |
return notes, midi | |
def pad_representations(dict_of_representations, pad_length=10): | |
""" | |
Pad the representations so that the DTW does not enforce them to encompass the entire duration. | |
Args: | |
dict_of_representations: audio or midi representations | |
pad_length: how many frames to pad | |
Returns: | |
padded representations | |
""" | |
for key, value in dict_of_representations.items(): | |
if key == 'time': | |
padded_time = dict_of_representations[key] | |
padded_time = np.concatenate([padded_time[:2*pad_length], padded_time+padded_time[2*pad_length]]) | |
dict_of_representations[key] = padded_time - padded_time[pad_length] # this is to ensure that the | |
# first frame times are negative until the real zero time | |
elif key in ['onset', 'offset', 'note']: | |
dict_of_representations[key] = np.pad(value, ((pad_length, pad_length), (0, 0))) | |
elif key in ['start_anchor', 'end_anchor']: | |
anchor_time = dict_of_representations[key][0][0] | |
anchor_time = np.argmin(np.abs(dict_of_representations['time'] - anchor_time)) | |
dict_of_representations[key][:,0] = anchor_time | |
dict_of_representations[key] = dict_of_representations[key].astype(np.int) | |
return dict_of_representations | |
def prepare_for_synchronization(self, audio, midi, feature_rate=44100/256, pad_length=100): | |
""" | |
MrMsDTW works better with start and end anchors. This function finds the start and end anchors for audio | |
based on the midi notes. It also pads the MIDI representations since MIDI files most often start with an active | |
note and end with an active note. Thus, the DTW will try to align the active notes to the entire duration of the | |
audio. This is not desirable. Therefore, we pad the MIDI representations with a few frames of silence at the | |
beginning and end of the audio. This way, the DTW will not try to align the active notes to the entire duration. | |
Args: | |
audio: | |
midi: | |
feature_rate: | |
pad_length: | |
Returns: | |
""" | |
# first pad the MIDI | |
midi = self.pad_representations(midi, pad_length) | |
# sometimes f0s are more reliable than the notes. So, we use both the f0s and the notes together to find the | |
# start and end anchors. f0 lookup bins is the number of bins to look around the f0 to assign a note to it. | |
f0_lookup_bins = int(100//(2*self.labeling.f0_granularity_c)) | |
# find the start anchor for the audio | |
# first decide on which notes to use for the start anchor (take the entire chord where the MIDI file starts) | |
anchor_notes = midi['start_anchor'][:, 1] - self.labeling.midi_centers[0] | |
# now find which f0 bins to look at for the start anchor | |
anchor_f0s = [self.midi_pitch_to_contour_bin(an+self.labeling.midi_centers[0]) for an in anchor_notes] | |
anchor_f0s = np.array([list(range(f0-f0_lookup_bins, f0+f0_lookup_bins+1)) for f0 in anchor_f0s]).reshape(-1) | |
# first start anchor proposals come from the notes | |
anchor_vals = np.any(audio['note'][:, anchor_notes]>0.5, axis=1) | |
# now the f0s | |
anchor_vals_f0 = np.any(audio['f0'][:, anchor_f0s]>0.5, axis=1) | |
# combine the two | |
anchor_vals = np.logical_or(anchor_vals, anchor_vals_f0) | |
if not any(anchor_vals): | |
return 'corrupted' # do not consider the file if we cannot find the start anchor | |
audio_start = np.argmax(anchor_vals) | |
# now the end anchor (most string instruments use chords in cadences: in general the end anchor is polyphonic) | |
anchor_notes = midi['end_anchor'][:, 1] - self.labeling.midi_centers[0] | |
anchor_f0s = [self.midi_pitch_to_contour_bin(an+self.labeling.midi_centers[0]) for an in anchor_notes] | |
anchor_f0s = np.array([list(range(f0-f0_lookup_bins, f0+f0_lookup_bins+1)) for f0 in anchor_f0s]).reshape(-1) | |
# the same procedure as above | |
anchor_vals = np.any(audio['note'][::-1, anchor_notes]>0.5, axis=1) | |
anchor_vals_f0 = np.any(audio['f0'][::-1, anchor_f0s]>0.5, axis=1) | |
anchor_vals = np.logical_or(anchor_vals, anchor_vals_f0) | |
if not any(anchor_vals): | |
return 'corrupted' # do not consider the file if we cannot find the end anchor | |
audio_end = audio['note'].shape[0] - np.argmax(anchor_vals) | |
if audio_end - audio_start < (midi['end_anchor'][0][0] - midi['start_anchor'][0][0])/10: # no one plays x10 faster | |
return 'corrupted' # do not consider the interval between anchors is too short | |
anchor_pairs = [(audio_start - 5, midi['start_anchor'][0][0] - 5), | |
(audio_end + 5, midi['end_anchor'][0][0] + 5)] | |
if anchor_pairs[0][0] < 1: | |
anchor_pairs[0] = (1, midi['start_anchor'][0][0]) | |
if anchor_pairs[1][0] > audio['note'].shape[0] - 1: | |
anchor_pairs[1] = (audio['note'].shape[0] - 1, midi['end_anchor'][0][0]) | |
return audio, midi, [(anchor_pairs[0][0]/feature_rate, anchor_pairs[0][1]/feature_rate), | |
(anchor_pairs[1][0]/feature_rate, anchor_pairs[1][1]/feature_rate)] | |
class ConvBlock(nn.Module): | |
def __init__(self, f, w, s, d, in_channels): | |
super().__init__() | |
p1 = d*(w - 1) // 2 | |
p2 = d*(w - 1) - p1 | |
self.pad = nn.ZeroPad2d((0, 0, p1, p2)) | |
self.conv2d = nn.Conv2d(in_channels=in_channels, out_channels=f, kernel_size=(w, 1), stride=(s, 1), dilation=(d, 1)) | |
self.relu = nn.ReLU() | |
self.bn = nn.BatchNorm2d(f) | |
self.pool = nn.MaxPool2d(kernel_size=(2, 1)) | |
self.dropout = nn.Dropout(0.25) | |
def forward(self, x): | |
x = self.pad(x) | |
x = self.conv2d(x) | |
x = self.relu(x) | |
x = self.bn(x) | |
x = self.pool(x) | |
x = self.dropout(x) | |
return x | |
class NoPadConvBlock(nn.Module): | |
def __init__(self, f, w, s, d, in_channels): | |
super().__init__() | |
self.conv2d = nn.Conv2d(in_channels=in_channels, out_channels=f, kernel_size=(w, 1), stride=(s, 1), | |
dilation=(d, 1)) | |
self.relu = nn.ReLU() | |
self.bn = nn.BatchNorm2d(f) | |
self.pool = nn.MaxPool2d(kernel_size=(2, 1)) | |
self.dropout = nn.Dropout(0.25) | |
def forward(self, x): | |
x = self.conv2d(x) | |
x = self.relu(x) | |
x = self.bn(x) | |
x = self.pool(x) | |
x = self.dropout(x) | |
return x | |
class TinyPathway(nn.Module): | |
def __init__(self, dilation=1, hop=256, localize=False, | |
model_capacity="full", n_layers=6, chunk_size=256): | |
super().__init__() | |
capacity_multiplier = { | |
'tiny': 4, 'small': 8, 'medium': 16, 'large': 24, 'full': 32 | |
}[model_capacity] | |
self.layers = [1, 2, 3, 4, 5, 6] | |
self.layers = self.layers[:n_layers] | |
filters = [n * capacity_multiplier for n in [32, 8, 8, 8, 8, 8]] | |
filters = [1] + filters | |
widths = [512, 64, 64, 64, 32, 32] | |
strides = self.deter_dilations(hop//(4*(2**n_layers)), localize=localize) | |
strides[0] = strides[0]*4 # apply 4 times more stride at the first layer | |
dilations = self.deter_dilations(dilation) | |
for i in range(len(self.layers)): | |
f, w, s, d, in_channel = filters[i + 1], widths[i], strides[i], dilations[i], filters[i] | |
self.add_module("conv%d" % i, NoPadConvBlock(f, w, s, d, in_channel)) | |
self.chunk_size = chunk_size | |
self.input_window, self.hop = self.find_input_size_for_pathway() | |
self.out_dim = filters[n_layers] | |
def find_input_size_for_pathway(self): | |
def find_input_size(output_size, kernel_size, stride, dilation, padding): | |
num = (stride*(output_size-1)) + 1 | |
input_size = num - 2*padding + dilation*(kernel_size-1) | |
return input_size | |
conv_calc, n = {}, 0 | |
for i in self.layers: | |
layer = self.__getattr__("conv%d" % (i-1)) | |
for mm in layer.modules(): | |
if hasattr(mm, 'kernel_size'): | |
try: | |
d = mm.dilation[0] | |
except TypeError: | |
d = mm.dilation | |
conv_calc[n] = [mm.kernel_size[0], mm.stride[0], 0, d] | |
n += 1 | |
out = self.chunk_size | |
hop = 1 | |
for n in sorted(conv_calc.keys())[::-1]: | |
kernel_size_n, stride_n, padding_n, dilation_n = conv_calc[n] | |
out = find_input_size(out, kernel_size_n, stride_n, dilation_n, padding_n) | |
hop = hop*stride_n | |
return out, hop | |
def deter_dilations(self, total_dilation, localize=False): | |
n_layers = len(self.layers) | |
if localize: # e.g., 32*1023 window and 3 layers -> [1, 1, 32] | |
a = [total_dilation] + [1 for _ in range(n_layers-1)] | |
else: # e.g., 32*1023 window and 3 layers -> [4, 4, 2] | |
total_dilation = int(np.log2(total_dilation)) | |
a = [] | |
for layer in range(n_layers): | |
this_dilation = int(np.ceil(total_dilation/(n_layers-layer))) | |
a.append(2**this_dilation) | |
total_dilation = total_dilation - this_dilation | |
return a[::-1] | |
def forward(self, x): | |
x = x.view(x.shape[0], 1, -1, 1) | |
for i in range(len(self.layers)): | |
x = self.__getattr__("conv%d" % i)(x) | |
x = x.permute(0, 3, 2, 1) | |
return x | |
#@jit(nopython=True) | |
def cosine_distance(f1, f2, cos_meas_max=2.0, cos_meas_min=1.0): | |
"""For all pairs of vectors f1' and f2' in f1 and f2, computes 1 - (f1.f2), | |
where '.' is the dot product, and rescales the results to lie in the | |
range [cos_meas_min, cos_meas_max]. | |
Corresponds to regular cosine distance if f1' and f2' are normalized and | |
cos_meas_min==0.0 and cos_meas_max==1.0.""" | |
return (1 - f1.T @ f2) * (cos_meas_max - cos_meas_min) + cos_meas_min | |
#@jit(nopython=True) | |
def euclidean_distance(f1, f2, l2_meas_max=1.0, l2_meas_min=0.0): | |
"""Computes euclidean distances between the vectors in f1 and f2, and | |
rescales the results to lie in the range [cos_meas_min, cos_meas_max].""" | |
#S1 = np.zeros((f1.shape[1], f2.shape[1])) | |
#for n in range(f2.shape[1]): | |
# S1[:, n] = np.sqrt(np.sum((f1.T - f2[:, n]) ** 2, axis=1)) | |
S1 = euclidean_distances(f1.T, f2.T) | |
return S1 * (l2_meas_max - l2_meas_min) + l2_meas_min | |
def compute_high_res_cost_matrix(f_chroma1: np.ndarray, | |
f_chroma2: np.ndarray, | |
f_onset1: np.ndarray, | |
f_onset2: np.ndarray, | |
weights: np.ndarray = np.array([1.0, 1.0]), | |
cos_meas_min: float = 1.0, | |
cos_meas_max: float = 2.0, | |
l2_meas_min: float = 0.0, | |
l2_meas_max: float = 1.0): | |
"""Computes cost matrix of two sequences using two feature matrices | |
for each sequence. Cosine distance is used for the chroma sequences and | |
euclidean distance is used for the DLNCO sequences. | |
Parameters | |
---------- | |
f_chroma1 : np.ndarray [shape=(12, N)] | |
Chroma feature matrix of the first sequence (assumed to be normalized). | |
f_chroma2 : np.ndarray [shape=(12, M)] | |
Chroma feature matrix of the second sequence (assumed to be normalized). | |
f_onset1 : np.ndarray [shape=(12, N)] | |
DLNCO feature matrix of the first sequence | |
f_onset2 : np.ndarray [shape=(12, M)] | |
DLNCO feature matrix of the second sequence | |
weights : np.ndarray [shape=[2,]] | |
Weights array for the high-resolution cost computation. | |
weights[0] * cosine_distance + weights[1] * euclidean_distance | |
cos_meas_min : float | |
Cosine distances are shifted to be at least ``cos_meas_min`` | |
cos_meas_max : float | |
Cosine distances are scaled to be at most ``cos_meas_max`` | |
l2_meas_min : float | |
Euclidean distances are shifted to be at least ``l2_meas_min`` | |
l2_meas_max : float | |
Euclidean distances are scaled to be at most ``l2_meas_max`` | |
Returns | |
------- | |
C: np.ndarray [shape=(N, M)] | |
Cost matrix | |
""" | |
cos_dis = cosine_distance(f_chroma1, f_chroma2, cos_meas_min=cos_meas_min, cos_meas_max=cos_meas_max) | |
euc_dis = euclidean_distance(f_onset1, f_onset2, l2_meas_min=l2_meas_min, l2_meas_max=l2_meas_max) | |
return weights[0] * cos_dis + weights[1] * euc_dis | |
def __C_to_DE(C: np.ndarray = None, | |
dn: np.ndarray = np.array([1, 1, 0], np.int64), | |
dm: np.ndarray = np.array([1, 0, 1], np.int64), | |
dw: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), | |
sub_sequence: bool = False) -> tuple[np.ndarray, np.ndarray]: | |
"""This function computes the accumulated cost matrix D and the step index | |
matrix E. | |
Parameters | |
---------- | |
C : np.ndarray (np.float32 / np.float64) [shape=(N, M)] | |
Cost matrix | |
dn : np.ndarray (np.int64) [shape=(1, S)] | |
Integer array defining valid steps (N direction of C), default: [1, 1, 0] | |
dm : np.ndarray (np.int64) [shape=(1, S)] | |
Integer array defining valid steps (M direction of C), default: [1, 0, 1] | |
dw : np.ndarray (np.float64) [shape=(1, S)] | |
Double array defining the weight of the each step, default: [1.0, 1.0, 1.0] | |
sub_sequence : bool | |
Set `True` for SubSequence DTW, default: False | |
Returns | |
------- | |
D : np.ndarray (np.float64) [shape=(N, M)] | |
Accumulated cost matrix of type double | |
E : np.ndarray (np.int64) [shape=(N, M)] | |
Step index matrix. | |
E[n, m] holds the index of the step take to determine the value of D[n, m]. | |
If E[n, m] is zero, no valid step was possible. | |
NaNs in the cost matrix are preserved, invalid fields in the cost matrix are NaNs. | |
""" | |
if C is None: | |
raise ValueError('C must be a 2D numpy array.') | |
N, M = C.shape | |
S = dn.size | |
if S != dm.size or S != dw.size: | |
raise ValueError('The parameters dn,dm, and dw must be of equal length.') | |
# calc bounding box size of steps | |
sbbn = np.max(dn) | |
sbbm = np.max(dm) | |
# initialize E | |
E = np.zeros((N, M), np.int64) - 1 | |
# initialize extended D matrix | |
D = np.ones((sbbn + N, sbbm + M), np.float64) * np.inf | |
if sub_sequence: | |
for m in range(M): | |
D[sbbn, sbbm + m] = C[0, m] | |
else: | |
D[sbbn, sbbm] = C[0, 0] | |
# accumulate | |
for m in range(sbbm, M + sbbm): | |
for n in range(sbbn, N + sbbn): | |
for s in range(S): | |
cost = D[n - dn[s], m - dm[s]] + C[n - sbbn, m - sbbm] * dw[s] | |
if cost < D[n, m]: | |
D[n, m] = cost | |
E[n - sbbn, m - sbbm] = s | |
D = D[sbbn: N + sbbn, sbbm: M + sbbm] | |
return D, E | |
def __E_to_warping_path(E: np.ndarray, | |
dn: np.ndarray = np.array([1, 1, 0], np.int64), | |
dm: np.ndarray = np.array([1, 0, 1], np.int64), | |
sub_sequence: bool = False, | |
end_index: int = -1) -> np.ndarray: | |
"""This function computes a warping path based on the provided matrix E | |
and the allowed steps. | |
Parameters | |
---------- | |
E : np.ndarray (np.int64) [shape=(N, M)] | |
Step index matrix | |
dn : np.ndarray (np.int64) [shape=(1, S)] | |
Integer array defining valid steps (N direction of C), default: [1, 1, 0] | |
dm : np.ndarray (np.int64) [shape=(1, S)] | |
Integer array defining valid steps (M direction of C), default: [1, 0, 1] | |
sub_sequence : bool | |
Set `True` for SubSequence DTW, default: False | |
end_index : int | |
In case of SubSequence DTW | |
Returns | |
------- | |
warping_path : np.ndarray (np.int64) [shape=(2, M)] | |
Resulting optimal warping path | |
""" | |
N, M = E.shape | |
if not sub_sequence and end_index == -1: | |
end_index = M - 1 | |
m = end_index | |
n = N - 1 | |
warping_path = np.zeros((2, n + m + 1)) | |
index = 0 | |
def _loop(m, n, index): | |
warping_path[:, index] = np.array([n, m]) | |
step_index = E[n, m] | |
m -= dm[step_index] | |
n -= dn[step_index] | |
index += 1 | |
return m, n, index | |
if sub_sequence: | |
while n > 0: | |
m, n, index = _loop(m, n, index) | |
else: | |
while m > 0 or n > 0: | |
m, n, index = _loop(m, n, index) | |
warping_path[:, index] = np.array([n, m]) | |
warping_path = warping_path[:, index::-1] | |
return warping_path | |
def compute_warping_path(C: np.ndarray, | |
step_sizes: np.ndarray = np.array([[1, 0], [0, 1], [1, 1]], np.int64), | |
step_weights: np.ndarray = np.array([1.0, 1.0, 1.0], np.float64), | |
implementation: str = 'synctoolbox'): | |
"""Applies DTW on cost matrix C. | |
Parameters | |
---------- | |
C : np.ndarray (np.float32 / np.float64) [shape=(N, M)] | |
Cost matrix | |
step_sizes : np.ndarray (np.int64) [shape=(2, S)] | |
Array of step sizes | |
step_weights : np.ndarray (np.float64) [shape=(2, S)] | |
Array of step weights | |
implementation: str | |
Choose among ``synctoolbox`` and ``librosa``. (default: ``synctoolbox``) | |
Returns | |
------- | |
D : np.ndarray (np.float64) [shape=(N, M)] | |
Accumulated cost matrix | |
E : np.ndarray (np.int64) [shape=(N, M)] | |
Step index matrix | |
wp : np.ndarray (np.int64) [shape=(2, M)] | |
Warping path | |
""" | |
if implementation == 'librosa': | |
D, wp, E = dtw(C=C, | |
step_sizes_sigma=step_sizes, | |
weights_add=np.array([0, 0, 0]), | |
weights_mul=step_weights, | |
return_steps=True, | |
subseq=False) | |
wp = wp[::-1].T | |
elif implementation == 'synctoolbox': | |
dn = step_sizes[:, 0] | |
dm = step_sizes[:, 1] | |
D, E = __C_to_DE(C, | |
dn=dn, | |
dm=dm, | |
dw=step_weights, | |
sub_sequence=False) | |
wp = __E_to_warping_path(E=E, | |
dn=dn, | |
dm=dm, | |
sub_sequence=False) | |
else: | |
raise NotImplementedError(f'No implementation found called {implementation}') | |
return D, E, wp | |
def compute_warping_paths_from_cost_matrices(cost_matrices: List, | |
step_sizes: np.array = np.array([[1, 0], [0, 1], [1, 1]], int), | |
step_weights: np.array = np.array([1.0, 1.0, 1.0], np.float64), | |
implementation: str = 'synctoolbox') -> List: | |
"""Computes a path via DTW on each matrix in cost_matrices | |
Parameters | |
---------- | |
cost_matrices : list | |
List of cost matrices | |
step_sizes : np.ndarray | |
DTW step sizes (default: np.array([[1, 0], [0, 1], [1, 1]])) | |
step_weights : np.ndarray | |
DTW step weights (default: np.array([1.0, 1.0, 1.0])) | |
implementation : str | |
Choose among 'synctoolbox' and 'librosa' (default: 'synctoolbox') | |
Returns | |
------- | |
wp_list : list | |
List of warping paths | |
""" | |
return [compute_warping_path(C=C, | |
step_sizes=step_sizes, | |
step_weights=step_weights, | |
implementation=implementation)[2] for C in cost_matrices] | |
def compute_cost_matrices_between_anchors(f_chroma1: np.ndarray, | |
f_chroma2: np.ndarray, | |
anchors: np.ndarray, | |
f_onset1: np.ndarray = None, | |
f_onset2: np.ndarray = None, | |
alpha: float = 0.5) -> List: | |
"""Computes cost matrices for the given features between subsequent | |
pairs of anchors points. | |
Parameters | |
---------- | |
f_chroma1 : np.ndarray [shape=(12, N)] | |
Chroma feature matrix of the first sequence | |
f_chroma2 : np.ndarray [shape=(12, M)] | |
Chroma feature matrix of the second sequence | |
anchors : np.ndarray [shape=(2, R)] | |
Anchor sequence | |
f_onset1 : np.ndarray [shape=(L, N)] | |
Onset feature matrix of the first sequence | |
f_onset2 : np.ndarray [shape=(L, M)] | |
Onset feature matrix of the second sequence | |
alpha: float | |
Alpha parameter to weight the cost functions. | |
Returns | |
------- | |
cost_matrices: list | |
List containing cost matrices | |
""" | |
high_res = False | |
if f_onset1 is not None and f_onset2 is not None: | |
high_res = True | |
cost_matrices = list() | |
for k in range(anchors.shape[1] - 1): | |
a1 = np.array(anchors[:, k].astype(int), copy=True) | |
a2 = np.array(anchors[:, k + 1].astype(int), copy=True) | |
if high_res: | |
cost_matrices.append(compute_high_res_cost_matrix(f_chroma1[:, a1[0]: a2[0] + 1], | |
f_chroma2[:, a1[1]: a2[1] + 1], | |
f_onset1[:, a1[0]: a2[0] + 1], | |
f_onset2[:, a1[1]: a2[1] + 1], | |
weights=np.array([alpha, 1-alpha]))) | |
else: | |
cost_matrices.append(cosine_distance(f_chroma1[:, a1[0]: a2[0] + 1], | |
f_chroma2[:, a1[1]: a2[1] + 1])) | |
return cost_matrices | |
def build_path_from_warping_paths(warping_paths: List, | |
anchors: np.ndarray = None) -> np.ndarray: | |
"""The function builds a path from a given list of warping paths | |
and the anchors used to obtain these paths. The indices of the original | |
warping paths are adapted such that they cross the anchors. | |
Parameters | |
---------- | |
warping_paths : list | |
List of warping paths | |
anchors : np.ndarray [shape=(2, N)] | |
Anchor sequence | |
Returns | |
------- | |
path : np.ndarray [shape=(2, M)] | |
Merged path | |
""" | |
if anchors is None: | |
# When no anchor points are given, we can construct them from the | |
# subpaths in the wp_list | |
# To do this, we assume that the first path's element is the starting | |
# anchor | |
anchors = warping_paths[0][:, 0] | |
# Retrieve the last element of each path | |
anchors_tmp = np.zeros(len(warping_paths), np.float32) | |
for idx, x in enumerate(warping_paths): | |
anchors_tmp[idx] = x[:, -1] | |
# Correct indices, such that the indices of the anchors are given on a | |
# common path. Each anchor a_l = [Nnew_[l+1];Mnew_[l+1]] | |
# Nnew_[l+1] = N_l + N_[l+1] -1 | |
# Mnew_[l+1] = M_l + M_[l+1] -1 | |
anchors_tmp = np.cumsum(anchors_tmp, axis=1) | |
anchors_tmp[:, 1:] = anchors_tmp[:, 1:] - [np.arange(1, anchors_tmp.shape[1]), | |
np.arange(1, anchors_tmp.shape[1])] | |
anchors = np.concatenate([anchors, anchors_tmp], axis=1) | |
L = len(warping_paths) + 1 | |
path = None | |
wp = None | |
for anchor_idx in range(1, L): | |
anchor1 = anchors[:, anchor_idx - 1] | |
anchor2 = anchors[:, anchor_idx] | |
wp = np.array(warping_paths[anchor_idx - 1], copy=True) | |
# correct indices in warpingPath | |
wp += np.repeat(anchor1.reshape(-1, 1), wp.shape[1], axis=1).astype(wp.dtype) | |
# consistency checks | |
assert np.array_equal(wp[:, 0], anchor1), 'First entry of warping path does not coincide with anchor point' | |
assert np.array_equal(wp[:, -1], anchor2), 'Last entry of warping path does not coincide with anchor point' | |
if path is None: | |
path = np.array(wp[:, :-1], copy=True) | |
else: | |
path = np.concatenate([path, wp[:, :-1]], axis=1) | |
# append last index of warping path | |
path = np.concatenate([path, wp[:, -1].reshape(-1, 1)], axis=1) | |
return path | |
def find_anchor_indices_in_warping_path(warping_path: np.ndarray, | |
anchors: np.ndarray) -> np.ndarray: | |
"""Compute the indices in the warping path that corresponds | |
to the elements in 'anchors' | |
Parameters | |
---------- | |
warping_path : np.ndarray [shape=(2, N)] | |
Warping path | |
anchors : np.ndarray [shape=(2, M)] | |
Anchor sequence | |
Returns | |
------- | |
indices : np.ndarray [shape=(2, M)] | |
Anchor indices in the ``warping_path`` | |
""" | |
indices = np.zeros(anchors.shape[1]) | |
for k in range(anchors.shape[1]): | |
a = anchors[:, k] | |
indices[k] = np.where((a[0] == warping_path[0, :]) & (a[1] == warping_path[1, :]))[0] | |
return indices | |
def make_path_strictly_monotonic(P: np.ndarray) -> np.ndarray: | |
"""Compute strict alignment path from a warping path | |
Wrapper around "compute_strict_alignment_path_mask" from libfmp. | |
Parameters | |
---------- | |
P: np.ndarray [shape=(2, N)] | |
Warping path | |
Returns | |
------- | |
P_mod: np.ndarray [shape=(2, M)] | |
Strict alignment path, M <= N | |
""" | |
P_mod = compute_strict_alignment_path_mask(P.T) | |
return P_mod.T | |
def compute_strict_alignment_path_mask(P): | |
"""Compute strict alignment path from a warping path | |
Notebook: C3/C3S3_MusicAppTempoCurve.ipynb | |
Args: | |
P (list or np.ndarray): Wapring path | |
Returns: | |
P_mod (list or np.ndarray): Strict alignment path | |
""" | |
P = np.array(P, copy=True) | |
N, M = P[-1] | |
# Get indices for strict monotonicity | |
keep_mask = (P[1:, 0] > P[:-1, 0]) & (P[1:, 1] > P[:-1, 1]) | |
# Add first index to enforce start boundary condition | |
keep_mask = np.concatenate(([True], keep_mask)) | |
# Remove all indices for of last row or column | |
keep_mask[(P[:, 0] == N) | (P[:, 1] == M)] = False | |
# Add last index to enforce end boundary condition | |
keep_mask[-1] = True | |
P_mod = P[keep_mask, :] | |
return P_mod | |
def evaluate_synchronized_positions(ground_truth_positions: np.ndarray, | |
synchronized_positions: np.ndarray, | |
tolerances: List = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 250]): | |
"""Compute standard evaluation measures for evaluating the quality of synchronized (musical) positions. | |
When synchronizing two versions of a piece of music, one can evaluate the quality of the resulting alignment | |
by comparing errors at musical positions (e.g. beats or measures) that appear in both versions. | |
This function implements two measures: mean absolute error at positions and the percentage of correctly transferred | |
measures given a threshold. | |
Parameters | |
---------- | |
ground_truth_positions: np.ndarray [shape=N] | |
Positions (e.g. beat or measure positions) annotated in the target version of a piece of music, in milliseconds. | |
synchronized_positions: np.ndarray [shape=N] | |
The same musical positions as in 'ground_truth_positions' obtained by transfer using music synchronization, | |
in milliseconds. | |
tolerances: list of integers | |
Tolerances (in miliseconds) used for comparing annotated and synchronized positions. | |
Returns | |
------- | |
mean_absolute_error: float | |
Mean absolute error for synchronized positions, in miliseconds. | |
accuracy_at_tolerances: list of floats | |
Percentages of correctly transferred measures, for each entry in 'tolerances'. | |
""" | |
absolute_errors_at_positions = np.abs(synchronized_positions - ground_truth_positions) | |
print('Measure transfer from recording 1 to 2 yielded:') | |
mean_absolute_error = np.mean(absolute_errors_at_positions) | |
print('\nMean absolute error (MAE): %.2fms (standard deviation: %.2fms)' % (mean_absolute_error, | |
np.std(absolute_errors_at_positions))) | |
print('\nAccuracy of transferred positions at different tolerances:') | |
print('\t\t\tAccuracy') | |
print('################################') | |
accuracy_at_tolerances = [] | |
for tolerance in tolerances: | |
accuracy = np.mean((absolute_errors_at_positions < tolerance)) * 100.0 | |
accuracy_at_tolerances.append(accuracy) | |
print('Tolerance: {} ms \t{:.2f} %'.format(tolerance, accuracy)) | |
return mean_absolute_error, accuracy_at_tolerances | |
def smooth_downsample_feature(f_feature: np.ndarray, | |
input_feature_rate: float, | |
win_len_smooth: int = 0, | |
downsamp_smooth: int = 1) -> Tuple[np.ndarray, float]: | |
"""Temporal smoothing and downsampling of a feature sequence | |
Parameters | |
---------- | |
f_feature : np.ndarray | |
Input feature sequence, size dxN | |
input_feature_rate : float | |
Input feature rate in Hz | |
win_len_smooth : int | |
Smoothing window length. For 0, no smoothing is applied. | |
downsamp_smooth : int | |
Downsampling factor. For 1, no downsampling is applied. | |
Returns | |
------- | |
f_feature_stat : np.ndarray | |
Downsampled & smoothed feature. | |
new_feature_rate : float | |
New feature rate after downsampling | |
""" | |
if win_len_smooth != 0 or downsamp_smooth != 1: | |
# hack to get the same results as on MATLAB | |
stat_window = np.hanning(win_len_smooth+2)[1:-1] | |
stat_window /= np.sum(stat_window) | |
# upfirdn filters and downsamples each column of f_stat_help | |
f_feature_stat = upfirdn(h=stat_window, x=f_feature, up=1, down=downsamp_smooth) | |
seg_num = f_feature.shape[1] | |
stat_num = int(np.ceil(seg_num / downsamp_smooth)) | |
cut = int(np.floor((win_len_smooth - 1) / (2 * downsamp_smooth))) | |
f_feature_stat = f_feature_stat[:, cut: stat_num + cut] | |
else: | |
f_feature_stat = f_feature | |
new_feature_rate = input_feature_rate / downsamp_smooth | |
return f_feature_stat, new_feature_rate | |
def normalize_feature(feature: np.ndarray, | |
norm_ord: int, | |
threshold: float) -> np.ndarray: | |
"""Normalizes a feature sequence according to the l^norm_ord norm. | |
Parameters | |
---------- | |
feature : np.ndarray | |
Input feature sequence of size d x N | |
d: dimensionality of feature vectors | |
N: number of feature vectors (time in frames) | |
norm_ord : int | |
Norm degree | |
threshold : float | |
If the norm falls below threshold for a feature vector, then the | |
normalized feature vector is set to be the normalized unit vector. | |
Returns | |
------- | |
f_normalized : np.ndarray | |
Normalized feature sequence | |
""" | |
# TODO rewrite in vectorized fashion | |
d, N = feature.shape | |
f_normalized = np.zeros((d, N)) | |
# normalize the vectors according to the l^norm_ord norm | |
unit_vec = np.ones(d) | |
unit_vec = unit_vec / np.linalg.norm(unit_vec, norm_ord) | |
for k in range(N): | |
cur_norm = np.linalg.norm(feature[:, k], norm_ord) | |
if cur_norm < threshold: | |
f_normalized[:, k] = unit_vec | |
else: | |
f_normalized[:, k] = feature[:, k] / cur_norm | |
return f_normalized | |
class FourHeads(Synchronizer): | |
def __init__( | |
self, | |
pathway_multiscale: int = 32, | |
num_pathway_layers: int = 2, | |
chunk_size: int = 256, | |
hop_length: int = 256, | |
encoder_dim: int = 256, | |
sr: int = 44100, | |
num_heads: int = 4, | |
ffn_dim: int = 128, | |
num_separator_layers: int = 16, | |
num_representation_layers: int = 4, | |
depthwise_conv_kernel_size: int = 31, | |
dropout: float = 0.25, | |
use_group_norm: bool = False, | |
convolution_first: bool = False, | |
labeling=PerformanceLabel(), | |
wiring='tiktok' | |
): | |
super().__init__(labeling, sr=sr, hop_length=hop_length) | |
self.main = TinyPathway(dilation=1, hop=hop_length, localize=True, | |
n_layers=num_pathway_layers, chunk_size=chunk_size) | |
self.attendant = TinyPathway(dilation=pathway_multiscale, hop=hop_length, localize=False, | |
n_layers=num_pathway_layers, chunk_size=chunk_size) | |
assert self.main.hop == self.attendant.hop # they should output with the same sample rate | |
print('hop in samples:', self.main.hop) | |
self.input_window = self.attendant.input_window | |
self.encoder_dim = encoder_dim | |
self.dropout = nn.Dropout(dropout) | |
# merge two streams into a conformer input | |
self.stream_merger = nn.Sequential(self.dropout, | |
nn.Linear(self.main.out_dim + self.attendant.out_dim, self.encoder_dim)) | |
print('main stream window:', self.main.input_window, | |
', attendant stream window:', self.attendant.input_window, | |
', conformer input dim:', self.encoder_dim) | |
center = ((chunk_size - 1) * self.main.hop) # region labeled with pitch track | |
main_overlap = self.main.input_window - center | |
main_overlap = [int(np.floor(main_overlap / 2)), int(np.ceil(main_overlap / 2))] | |
attendant_overlap = self.attendant.input_window - center | |
attendant_overlap = [int(np.floor(attendant_overlap / 2)), int(np.ceil(attendant_overlap / 2))] | |
print('main frame overlap:', main_overlap, ', attendant frame overlap:', attendant_overlap) | |
main_crop_relative = [attendant_overlap[0] - main_overlap[0], main_overlap[1] - attendant_overlap[1]] | |
print('crop for main pathway', main_crop_relative) | |
print("Total sequence duration is", self.attendant.input_window, 'samples') | |
print('Main stream receptive field for one frame is', (self.main.input_window - center), 'samples') | |
print('Attendant stream receptive field for one frame is', (self.attendant.input_window - center), 'samples') | |
self.frame_overlap = attendant_overlap | |
self.main_stream_crop = main_crop_relative | |
self.max_window_size = self.attendant.input_window | |
self.chunk_size = chunk_size | |
self.separator_stream = nn.ModuleList( # source-separation, reinvented | |
[ | |
ConformerLayer( | |
input_dim=self.encoder_dim, | |
ffn_dim=ffn_dim, | |
num_attention_heads=num_heads, | |
depthwise_conv_kernel_size=depthwise_conv_kernel_size, | |
dropout=dropout, | |
use_group_norm=use_group_norm, | |
convolution_first=convolution_first, | |
) | |
for _ in range(num_separator_layers) | |
] | |
) | |
self.f0_stream = nn.ModuleList( | |
[ | |
ConformerLayer( | |
input_dim=self.encoder_dim, | |
ffn_dim=ffn_dim, | |
num_attention_heads=num_heads, | |
depthwise_conv_kernel_size=depthwise_conv_kernel_size, | |
dropout=dropout, | |
use_group_norm=use_group_norm, | |
convolution_first=convolution_first, | |
) | |
for _ in range(num_representation_layers) | |
] | |
) | |
self.f0_head = nn.Linear(self.encoder_dim, len(self.labeling.f0_centers_c)) | |
self.note_stream = nn.ModuleList( | |
[ | |
ConformerLayer( | |
input_dim=self.encoder_dim, | |
ffn_dim=ffn_dim, | |
num_attention_heads=num_heads, | |
depthwise_conv_kernel_size=depthwise_conv_kernel_size, | |
dropout=dropout, | |
use_group_norm=use_group_norm, | |
convolution_first=convolution_first, | |
) | |
for _ in range(num_representation_layers) | |
] | |
) | |
self.note_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers)) | |
self.onset_stream = nn.ModuleList( | |
[ | |
ConformerLayer( | |
input_dim=self.encoder_dim, | |
ffn_dim=ffn_dim, | |
num_attention_heads=num_heads, | |
depthwise_conv_kernel_size=depthwise_conv_kernel_size, | |
dropout=dropout, | |
use_group_norm=use_group_norm, | |
convolution_first=convolution_first, | |
) | |
for _ in range(num_representation_layers) | |
] | |
) | |
self.onset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers)) | |
self.offset_stream = nn.ModuleList( | |
[ | |
ConformerLayer( | |
input_dim=self.encoder_dim, | |
ffn_dim=ffn_dim, | |
num_attention_heads=num_heads, | |
depthwise_conv_kernel_size=depthwise_conv_kernel_size, | |
dropout=dropout, | |
use_group_norm=use_group_norm, | |
convolution_first=convolution_first, | |
) | |
for _ in range(num_representation_layers) | |
] | |
) | |
self.offset_head = nn.Linear(self.encoder_dim, len(self.labeling.midi_centers)) | |
self.labeling = labeling | |
self.double_merger = nn.Sequential(self.dropout, nn.Linear(2 * self.encoder_dim, self.encoder_dim)) | |
self.triple_merger = nn.Sequential(self.dropout, nn.Linear(3 * self.encoder_dim, self.encoder_dim)) | |
self.wiring = wiring | |
print('Total parameter count: ', self.count_parameters()) | |
def count_parameters(self) -> int: | |
""" Count parameters of encoder """ | |
return sum([p.numel() for p in self.parameters()]) | |
def stream(self, x, representation, key_padding_mask=None): | |
for i, layer in enumerate(self.__getattr__('{}_stream'.format(representation))): | |
x = layer(x, key_padding_mask) | |
return x | |
def head(self, x, representation): | |
return self.__getattr__('{}_head'.format(representation))(x) | |
def forward(self, x, key_padding_mask=None): | |
# two auditory streams followed by the separator stream to ensure timbre-awareness | |
x_attendant = self.attendant(x) | |
x_main = self.main(x[:, self.main_stream_crop[0]:self.main_stream_crop[1]]) | |
x = self.stream_merger(torch_cat((x_attendant, x_main), -1).squeeze(1)) | |
x = self.stream(x, 'separator', key_padding_mask) | |
f0 = self.stream(x, 'f0', key_padding_mask) # they say this is a low level feature :) | |
if self.wiring == 'parallel': | |
note = self.stream(x, 'note', key_padding_mask) | |
onset = self.stream(x, 'onset', key_padding_mask) | |
offset = self.stream(x, 'offset', key_padding_mask) | |
elif self.wiring == 'tiktok': | |
onset = self.stream(x, 'onset', key_padding_mask) | |
offset = self.stream(x, 'offset', key_padding_mask) | |
# f0 is disconnected, note relies on separator, onset, and offset | |
note = self.stream(self.triple_merger(torch_cat((x, onset, offset), -1)), 'note', key_padding_mask) | |
elif self.wiring == 'tiktok2': | |
onset = self.stream(x, 'onset', key_padding_mask) | |
offset = self.stream(x, 'offset', key_padding_mask) | |
# note is connected to f0, onset, and offset | |
note = self.stream(self.triple_merger(torch_cat((f0, onset, offset), -1)), 'note', key_padding_mask) | |
elif self.wiring == 'spotify': | |
# note is connected to f0 only | |
note = self.stream(f0, 'note', key_padding_mask) | |
# here onset and onsets are higher-level features informed by the separator and note | |
onset = self.stream(self.double_merger(torch_cat((x, note), -1)), 'onset', key_padding_mask) | |
offset = self.stream(self.double_merger(torch_cat((x, note), -1)), 'offset', key_padding_mask) | |
else: | |
# onset and offset are connected to f0 and separator streams | |
onset = self.stream(self.double_merger(torch_cat((x, f0), -1)), 'onset', key_padding_mask) | |
offset = self.stream(self.double_merger(torch_cat((x, f0), -1)), 'offset', key_padding_mask) | |
# note is connected to f0, onset, and offset streams | |
note = self.stream(self.triple_merger(torch_cat((f0, onset, offset), -1)), 'note', key_padding_mask) | |
return {'f0': self.head(f0, 'f0'), | |
'note': self.head(note, 'note'), | |
'onset': self.head(onset, 'onset'), | |
'offset': self.head(offset, 'offset')} | |
class PretrainedModel(FourHeads): | |
def __init__(self,model_json:dict,model:str,device): | |
super().__init__(pathway_multiscale=model_json['pathway_multiscale'],num_pathway_layers=model_json['num_pathway_layers'], wiring=model_json['wiring'],hop_length=model_json['hop_length'], chunk_size=model_json['chunk_size'],labeling=PerformanceLabel(note_min=model_json['note_low'], note_max=model_json['note_high'],f0_bins_per_semitone=model_json['f0_bins_per_semitone'],f0_tolerance_c=200,f0_smooth_std_c=model_json['f0_smooth_std_c'], onset_smooth_std=model_json['onset_smooth_std']), sr=model_json['sampling_rate']) | |
self.load_state_dict(torch_load(model, map_location=device,weights_only=True)) | |
self.eval() | |
def merge_violin_tracks(self,mid:MidiFile): | |
new_mid = MidiFile(ticks_per_beat=mid.ticks_per_beat) | |
new_track = MidiTrack() | |
new_mid.tracks.append(new_track) | |
events = [] | |
for track in mid.tracks: | |
current_time = 0 | |
for msg in track: | |
current_time += msg.time | |
events.append((current_time, msg)) | |
events.sort(key=lambda x: x[0]) | |
last_time = 0 | |
for event_time, msg in events: | |
delta_time = event_time - last_time | |
new_track.append(msg.copy(time=delta_time)) | |
last_time = event_time | |
for track in mid.tracks: | |
for msg in track: | |
if msg.type == 'set_tempo': | |
new_track.insert(0, msg) | |
return new_mid | |
def transcribe_music(self, audio, batch_size, postprocessing): | |
self.transcribe(audio, batch_size, postprocessing).write("output.mid") | |
self.merge_violin_tracks(MidiFile("output.mid")).save("output.mid") | |
return "output.mid" | |