使用双门限法进行语音端点检测(SJTU智能语音识别作业1-1)

使用双门限法进行语音端点检测(SJTU智能语音识别作业1-1),第1张

原问题:

基于线性分类器和语音短时信号特征的简单语音端点检测算法:

▶ 利用语音的短时信号特征(短时能量,过零率,短时频谱, 以及基频等)

▶ 简单状态机的使用

▶ 简单线性分类器的使用(如:阈值分类器)

原理:

超全的相关基础知识:知乎上的语音端点检测手把手入门知识

代码:

对这位老兄 的代码进行了优化(添加高通滤波器、针对数据集调参、避免C.append[i]时语音段重合)。

还想到两点:可以删除过短的语音段,可以改善分帧加窗,不过我比较懒就没做。

伪代码和原代码参见我的报告“SJTU智能语音识别作业-语音端点检测latex原码/原代码”,发了CSDN不知道有没有过审。不建议直接食用下面的原代码,可能比较难懂。

import wave
import os
import numpy as np
from scipy import signal

#---------------------这是检测语音端点部分-------------------#
def sgn(data):
    '''一个简单的符号判别函数
    Args:
        data: a number
    Returns:
        a bool
    '''
    if data >= 0 :
        return 1
    else :
        return 0

# 计算每帧能量,256个采样点为一帧
def calEnergy(wave_data) :
    energy = []
    sum = 0
    for i in range(len(wave_data)) :
        sum = sum + (int(wave_data[i]) * int(wave_data[i]))
        if (i + 1) % 256 == 0 :
            energy.append(sum)
            sum = 0
        elif i == len(wave_data) - 1 :
            energy.append(sum)
    return energy

#计算过零率
def calZeroCrossingRate(wave_data) :
    zeroCrossingRate = []
    sum = 0
    for i in range(len(wave_data)) :
        if i % 256 == 0:
            continue
        sum = sum + np.abs(sgn(wave_data[i]) - sgn(wave_data[i - 1]))
        if (i + 1) % 256 == 0 :
            zeroCrossingRate.append(float(sum) / 255)
            sum = 0
        elif i == len(wave_data) - 1 :
            zeroCrossingRate.append(float(sum) / 255)
    return zeroCrossingRate

# 利用短时能量,短时过零率进行端点检测
def endPointDetect(energy, zeroCrossingRate) :
    sum = 0
    energyAverage = 0
    for en in energy :
        sum = sum + en
    energyAverage = sum / len(energy)

    sum = 0
    for en in energy[:5] :
        sum = sum + en                     
    MH = energyAverage / 1.5  #较高的能量阈值
    ML = (0.5 * sum + energyAverage) / 10      #较低的能量阈值
    sum = 0
    for zcr in zeroCrossingRate[:5] :
        sum = float(sum) + zcr             
    Zs = sum / 5 * 2            #过零率阈值

    A = []
    B = []
    C = []

    # 首先利用较大能量阈值 MH 进行初步检测
    flag = 0
    for i in range(len(energy)):
        #大于较高阈值,就认为开始
        if len(A) == 0 and flag == 0 and energy[i] > MH :
            A.append(i)
            flag = 1
        elif flag == 0 and energy[i] > MH and i - 21 > A[len(A) - 1]:
            A.append(i)
            flag = 1
        elif flag == 0 and energy[i] > MH and i - 21 <= A[len(A) - 1]:
            A = A[:len(A) - 1]
            flag = 1
        #小于较高阈值,就认为结束
        if flag == 1 and energy[i] < MH :
            A.append(i)
            flag = 0
    if(flag == 1):
         A.append(i-1)
    #print("较高能量阈值,计算后的浊音A:" + str(A))

    # 利用较小能量阈值 ML 进行第二步能量检测
    for j in range(len(A)) :
        i = A[j]
        #结束点是否要后移
        if j % 2 == 1 :
            while i < len(energy) and energy[i] > ML :
                #避免后移过度
                if( (not j + 1 == len(A)) and (i >= A[j + 1]) ):
                    break
                i = i + 1
            B.append(i)
        #开始点是否要前移
        else :
            while i > 0 and energy[i] > ML :
                #避免前移过度
                if( (not j == 0) and (i <= A[j - 1]) ):
                    break
                i = i - 1
            B.append(i)
    #print("较低能量阈值,增加一段语言B:" + str(B))

    # 利用过零率进行最后一步检测
    for j in range(len(B)) :
        i = B[j]
        #结束点是否要后移
        if j % 2 == 1 :
            while i < len(zeroCrossingRate) and zeroCrossingRate[i] >= Zs :
                #避免后移过度
                if( (not j + 1 == len(B)) and (i >= B[j + 1]) ):
                    break
                i = i + 1
            C.append(i)
        #开始点是否要前移
        else :
            while i > 0 and zeroCrossingRate[i] >= Zs :
                #避免前移过度
                if( (not j == 0) and (i <= B[j - 1]) ):
                    break
                i = i - 1
            C.append(i)
    #print("过零率阈值,最终语音分段C:" + str(C))

    #糊弄一下格式
    if(len(C) == 0):
        C.append(5)
        C.append(6)

    return C

def evaluate_result():
    '''处理read_label_from_file()的输出并调用evaluate文件

    Args:NAN

    Returns: 
        auc,err
    '''
    prediction = read_label_from_file(path="./dev_label_task1.txt")
    label = read_label_from_file(path="./vad/data/dev_label.txt")
    #得到的结果不能直接用,还需要处理一下
    pre = []    #拼接后的prediction
    labl = []   #拼接后的label
    for key in  prediction.keys():
        a = prediction[key]
        b = label[key]
        #长度对齐
        if (len(a) > len(b)):
            b = b + [0] * (len(a) - len(b))
        else:
            a = a + [0] * (len(b) - len(a))
        #放入pre/labl
        pre.extend(a)
        labl.extend(b)
    return( get_metrics(pre, labl) ) 

def initial():
    '''用来初始化一些文件、文件夹信息

    Args/Returns: NAN
    '''
    #创建文件夹
    if(not os.path.exists(r'./energy')):
        os.makedirs(r'./energy')
    if(not os.path.exists(r'./zeroCrossingRate')):
        os.makedirs(r'./zeroCrossingRate')
    #清空内容    
    with open("dev_label_task1.txt", "w") as f :
        f.truncate(0)

if __name__ == "__main__":
    path_wave = r'./vad/wavs/dev/' #其他数据集:/test /train
    filenames = os.listdir(path_wave)
    initial()

    #对每个文件判断语音端点并记录到一个txt里
    t = 1
    for filename in filenames:
        #甚至可以每10轮显示计算进行情况
        if(t % 10 == 0):
            print(t)
        t = t + 1

        f = wave.open(path_wave + filename ,"rb")
        # getparams() 一次性返回所有的WAV文件的格式信息
        # 返回的是一个组元 (tuple):声道数, 量化位数(byte单位), 采样频率, 采样点数, 压缩类型, 压缩类型的描述。
        params = f.getparams()
        # nframes 采样点数目
        nchannels, sampwidth, framerate, nframes = params[:4]
        # readframes() 按照采样点读取数据
        str_data = f.readframes(nframes)            # str_data 是二进制字符串
        # 转成二字节数组形式(每个采样点占两个字节)
        wave_data = np.frombuffer(str_data, dtype = np.short)
        f.close()

        #要滤除50hz以下频率成分,即截至频率为50hz,则wn = 50 * 2 / framerate
        wn = 100 / framerate
        b, a = signal.butter(8, wn, 'highpass')
        filtedData = signal.filtfilt(b, a, wave_data)#wave_data为要过滤的信号

        energy = calEnergy(filtedData)
        
        with open("./energy/" + filename + "_en.txt", "w") as f :
            for en in energy :
                f.write(str(en) + "\n")

        zeroCrossingRate = calZeroCrossingRate(filtedData)
        with open("./zeroCrossingRate/" + filename  + "_zero.txt","w") as f :
            for zcr in zeroCrossingRate :
                f.write(str(zcr) + "\n")
        N = endPointDetect(energy, zeroCrossingRate)
        #输出 store the results
        with open("dev_label_task1.txt", "a") as f :
            f.write(filename[:-4]+" ")
            l = len(N)
            i = 0
            while(i < l-1):
                f.write(str( round(N[i]/framerate*256,2) ) + "," + str( round(N[i+1]/framerate*256,2) ) + " ")
                i = i + 2
            f.write("\n")
       
#---------------------这是evaluate部分-------------------#
    import sys
    sys.path.append(r'F:/vs/python_projects/VoiceEndpointDetection/vad')
    from vad_utils import read_label_from_file as read_label_from_file
    from evaluate import get_metrics as get_metrics
    print( evaluate_result() )

    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/883914.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-14
下一篇 2022-05-14

发表评论

登录后才能评论

评论列表(0条)

保存