Source code for analysis.F0Analysis

from __future__ import print_function, division
import numpy as np
import logging
import pdb
import numpy as np
from numpy.lib import stride_tricks
from Analysis import Analysis
from scipy import signal
from numpy.fft import fft, ifft, fftshift
from sppysound import multirate
import warnings

from numpy import polyfit, arange

[docs]class F0Analysis(Analysis):
""" F0 analysis descriptor class for generation of fundamental frequency estimation. This descriptor calculates the fundamental frequency for overlapping grains of an AnalysedAudioFile object. A full definition of F0 analysis can be found in the documentation. Arguments: - analysis_group: the HDF5 file group to use for the storage of the analysis. - config: The configuration module used to configure the analysis """ def __init__(self, AnalysedAudioFile, frames, analysis_group, config=None): super(F0Analysis, self).__init__(AnalysedAudioFile,frames, analysis_group, 'F0') self.logger = logging.getLogger(__name__+'.{0}Analysis'.format(self.name)) # Store reference to the file to be analysed self.AnalysedAudioFile = AnalysedAudioFile self.nyquist_rate = self.AnalysedAudioFile.samplerate / 2. if config: self.window_size = config.f0["window_size"] self.overlap = 1. / config.f0["overlap"] self.threshold = config.f0["ratio_threshold"] else: self.window_size=512 self.overlap = 0.5 self.threshold = 0. self.analysis_group = analysis_group self.logger.info("Creating F0 analysis for {0}".format(self.AnalysedAudioFile.name)) self.create_analysis( frames, self.AnalysedAudioFile.samplerate, window_size=self.window_size, overlapFac=self.overlap, threshold=config.f0["ratio_threshold"] ) def get_analysis_grains(self, start, end): """ Retrieve analysis frames for period specified in start and end times. arrays of start and end time pairs will produce an array of equivelant size containing frames for these times. """ times = self.analysis_group["F0"]["times"][:] frames = self.analysis_group["F0"]["frames"][:] hr = self.analysis_group["F0"]["harmonic_ratio"][:] start = start / 1000 end = end / 1000 vtimes = times.reshape(-1, 1) nan_inds = hr < self.threshold hr[nan_inds] = np.nan frames[nan_inds] = np.nan selection = np.transpose((vtimes >= start) & (vtimes <= end)) if not selection.any(): frame_center = start + (end-start)/2. closest_frames = np.abs(vtimes-frame_center).argsort()[:2] selection[closest_frames] = True return ((frames, times, hr), selection) @staticmethod def create_f0_analysis( frames, samplerate, window_size=512, overlapFac=0.5, threshold=0.0, m0=None, M=None, ): """ Generate F0 contour analysis. Calculate the frequency and harmonic ratio values of windowed segments of the audio file and save to disk. """ if hasattr(frames, '__call__'): frames = frames() if not M: M=int(round(0.016*samplerate)) hopSize = int(window_size - np.floor(overlapFac * window_size)) # zeros at beginning (thus center of 1st window should be for sample nr. 0) samples = frames #samples = np.concatenate((np.zeros(np.floor(window_size/2.0)), frames)) # cols for windowing cols = np.ceil((len(samples) - window_size) / float(hopSize)) + 1 # zeros at end (thus samples can be fully covered by frames) samples = np.concatenate((samples, np.zeros(window_size))) frames = stride_tricks.as_strided( samples, shape=(cols, window_size), strides=(samples.strides[0]*hopSize, samples.strides[0]) ).copy() # TODO: Replace this with zero crossing object. def feature_zcr(window): window2 = np.zeros(window.size) window2[1:-1] = window[0:-2] Z = (1/(2*window.size)) * np.sum(np.abs(np.sign(window)-np.sign(window2))) return Z def parabolic(f, x): """ Quadratic interpolation for estimating the true position of an inter-sample maximum when nearby samples are known. f is a vector and x is an index for that vector. Returns (vx, vy), the coordinates of the vertex of a parabola that goes through point x and its two neighbors. Example: Defining a vector f with a local maximum at index 3 (= 6), find local maximum if points 2, 3, and 4 actually defined a parabola. In [3]: f = [2, 3, 1, 6, 4, 2, 3, 1] In [4]: parabolic(f, argmax(f)) Out[4]: (3.2142857142857144, 6.1607142857142856) Ref: https://gist.github.com/endolith/255291 """ if x >= f.size-1 or x <= 2: return x, f[x] xv = 1/2. * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x yv = f[x] - 1/4. * (f[x-1] - f[x+1]) * (xv - x) return (xv, yv) def per_frame_f0(frames, m0, M): if not frames.any(): HR = np.nan f0 = np.nan return f0, HR #R=autocorr([frames])[0] R = np.correlate(frames, frames, mode='full') g=R[frames.size] R=R[frames.size-1:] if not m0: # estimate m0 (as the first zero crossing of R) m0 = np.argmin(np.diff(np.sign(R[1:])))+1 if m0 == 1: m0 = R.size if M > R.size: M = R.size Gamma = np.zeros(M) CSum = np.cumsum(frames*frames) with warnings.catch_warnings(): warnings.filterwarnings('error') try: Gamma[m0:M] = R[m0:M] / (np.sqrt([g*CSum[-m0:-M:-1]])+np.finfo(float).eps) except Warning: pass # compute T0 and harmonic ratio: if np.isnan(Gamma).any(): HR = np.nan f0 = np.nan else: blag = np.argmax(Gamma) HR = Gamma[blag] interp, HR = parabolic(Gamma, blag) if not interp: f0 = np.nan HR = np.nan else: # get fundamental frequency: f0 = samplerate / interp if f0 > samplerate/2: raise ValueError("F0 value ({0}) is above the nyquist rate " "({1}). This shouldn't happen...".format(f0, samplerate/2)) if HR >= 1: HR = 1 return (f0, HR) output = np.apply_along_axis(per_frame_f0, 1, frames, m0, M) # output = np.empty((frames.shape[0], 2)) # for ind, i in enumerate(frames): # output[ind] = per_frame_f0(i, m0, M) return output def hdf5_dataset_formatter(self, *args, **kwargs): ''' Formats the output from the analysis method to save to the HDF5 file. ''' samplerate = self.AnalysedAudioFile.samplerate frames = args[0] # frames = multirate.interp(frames, 4) samplerate *= 1 data = self.create_f0_analysis(frames, samplerate, **kwargs) f0 = data[:, 0] harmonic_ratio = data[:, 1] f0_times = self.calc_f0_frame_times(f0, frames, samplerate) return ({'frames': f0, 'harmonic_ratio': harmonic_ratio, 'times': f0_times}, {}) @staticmethod def calc_f0_frame_times(f0frames, sample_frames, samplerate): """Calculate times for frames using sample size and samplerate.""" if hasattr(sample_frames, '__call__'): sample_frames = sample_frames() # Get number of frames for time and frequency timebins = f0frames.shape[0] # Create array ranging from 0 to number of time frames scale = np.arange(timebins+1) # divide the number of samples by the total number of frames, then # multiply by the frame numbers. f0_times = (sample_frames.shape[0]/timebins) * scale[:-1] # Divide by the samplerate to give times in seconds f0_times = f0_times / samplerate return f0_times def analysis_formatter(self, data, selection, format): """Calculate the average analysis value of the grain using the match format specified.""" frames, times, harm_ratio = data # Get indexes of all valid frames (that aren't nan) valid_inds = np.isfinite(frames) & np.isfinite(harm_ratio) format_style_dict = { 'mean': np.mean, 'median': np.median, 'log2_mean': self.log2_mean, 'log2_median': self.log2_median, } if not selection.size: # TODO: Add warning here return np.nan #for ind, i in enumerate(selection): # output[ind] = self.formatter_func(i, frames, valid_inds, harm_ratio, formatter=format_style_dict[format]) try: output = np.apply_along_axis( self.formatter_func, 1, selection, frames, valid_inds, formatter=format_style_dict[format] )/self.nyquist_rate except IndexError: pdb.set_trace() return output