python实现音频节拍的实时识别

概述

实验室最近有一个项目需要完成一个实施录音的节拍器。具体需求是通过话筒,实时接收音频,并且在录制音频的同时,预测下一个时刻的节拍,类似于可以自行调整速度的节拍器。

代码地址:https://github.com/mhy12345/rcaudio,值得一提的是,这个工程是我第一个打包上传到pypi.org的工程,也就是说,读者可以使用pip install rcaudio 来安装。

在整个rcaudio中,节奏识别是其中最重量级的一个模块,最终实现效果为:对于所有背景音中有节拍点的音乐,该节拍器都能够很快的通过自适应算法向正确的节拍点靠近。

在静态节奏识别模块,我使用了librosa库,这个库的教程在我另一篇文章中。

思路

录音,音频处理,以及节拍识别都需要较大的计算量,倘若使用单线程,势必会产生极高的延时。对于要求绝对精度的节拍识别是不可行的。因此整个工程使用python的threading库,并行运行多个线程。他们分别是

  • 一个线程a单独监听麦克风,并将音频数据源源不断写入一个数组Q。
  • 一个线程b监听数组Q,当数组Q产生变化之后,立即将新来的数据发送给其他线程。(这一部分的逻辑是,倘若python多线程存在竞争与冒险的话,我们应该尽量最小化能够直接操作数组Q的线程数量)
  • 一个线程c在得到数据后,截取当前最近10s的音频数据,使用音频分析库以及节奏分析库获取节奏信息。

录音

录音部分使用了pyaudio 库实现的,当指定了采样率sample rate以及声道数channels,pyaudio 可以将音频的采样点对应的强度向量储存在一个缓存numpy数组中,缓存数组的大小在录音前的pa.open 函数中已经指定好了。录音开始后,pyaudio 会将音频数据源源不断写入缓存,如果缓存溢出了,抛出异常。同时,每次调用read 函数都会将一定数量的缓存拉出来,这样的步骤使用一个while True 循环包裹,就能够源源不断将录音数据记录下来了。

注意,这里缓存可能出现不足的问题 OSError : [Errno -9981] Input overflowed . 出现了这种问题基本可以通过如下方式解决——

  • 修改buffer大小
  • 修改每次取出数据的块大小
  • 修改采样率
  • 尽量减少while循环中的无关代码以提高代码运行效率

这一部分对应的代码如下

import threading
import logging
from pyaudio import PyAudio,paInt16
import numpy as np
import queue
import time

class CoreRecorder(threading.Thread):
    def __init__(self,
            time = None, #How much time to the end
            sr = 20000, #Sample rate
            batch_num  = 600, #Batch size (how much data for a single fetch)
            frames_per_buffer = 600
            ):
        threading.Thread.__init__(self)
        self.time = time
        self.sr = sr
        self.batch_num = batch_num
        self.data_alter = threading.Lock()
        self.frames_per_buffer = frames_per_buffer
        self.logger = logging.getLogger(__name__ + '.CoreRecorder')
        self.buffer = queue.Queue()
        self.start_time = None
        self.__running = threading.Event()
        self.__running.set()

    def run(self):
        self.logger.debug("Start to recording...")
        self.logger.debug("  Time = %s"%self.time)
        self.logger.debug("  Sample Rate = %s"%self.sr)
        self.start_time = time.time()
        pa=PyAudio()
        stream=pa.open(format = paInt16,channels=1, rate=self.sr,input=True, frames_per_buffer=self.frames_per_buffer)
        my_buf=[]
        count=0
        if self.time is None:
            total_count = 1e10
        else:
            total_count = self.time * self.sr / self.batch_num
        while count< total_count and self.__running.isSet():
            datawav = stream.read(self.batch_num, exception_on_overflow = True)
            datause = np.fromstring(datawav,dtype = np.short)
            for w in datause:
                self.buffer.put(w)
            count+=1
        stream.close()

    def save_wave_file(self,filename,data):
        wf=wave.open(filename,'wb')
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(self.sr)
        wf.writeframes(b"".join(data))
        wf.close()

    def stop(self):
        self.__running.clear()

数据被取出到了buffer,我们通过一个守护线程SimpleRecorder ,将数据储存在本地供后续调用。这一步初衷是尽可能减少CoreRecorder 的工作量,以保证它能够即使把数据给读出来。

from .core_recorder import CoreRecorder
import threading
import logging
import time


class SimpleRecorder(threading.Thread):
    def __init__(self,
            sr = 2000, #Sample Rate
            ):
        threading.Thread.__init__(self)
        self.audio_data = []
        self.audio_lock = threading.Lock()
        self.logger = logging.getLogger(__name__+".SimpleWatcher")
        self.recorder = CoreRecorder(sr = sr)
        self.analyzers = []
        self.sr = sr
        self.start_time = None
        self.__running = threading.Event()
        self.__running.set()

    def register(self,analyzer):
        self.analyzers.append(analyzer)
        analyzer.register_recorder(self)

    def run(self):
        self.recorder.start()
        for analyzer in self.analyzers:
            analyzer.start()
        while self.__running.isSet():
            self.start_time = self.recorder.start_time
            if self.start_time is not None:
                break
            time.sleep(.05)

        while self.__running.isSet():
            while not self.recorder.buffer.empty():
                v = self.recorder.buffer.get()
                self.audio_data.append(v)

        for analyzer in self.analyzers:
            analyzer.stop()
            analyzer.join()
        self.recorder.stop()
        self.recorder.join()

    def stop(self):
        self.logger.warn("Stop Signal Received!")
        self.__running.clear()

节拍识别

由于音频分析本身非常消耗计算量,也非常慢,因此我们新开一个线程BeatAnalyzer 专门处理音频分析,这个线程将依次处理出节拍信息(用于校准节拍器),每一拍对应MFCC信息(用于音频分析,也可以是一些其他东西),音量信息(用于判定音乐结束)。

对于节拍识别模块,效率非常低,基本上一个10s的音频数据需要1s左右来识别。而且识别还可能存在一些误差,因此我们需要设计一个算法,能够不受程序噪音的影响——

我的算法基本思路为,当前第x拍的节拍点的时刻可以看做是一个等差数列P:B_x = kx+b,且由于我们是实时的节拍识别,因此对于所有x \leq T的正整数x,我们都已经给予用户反馈而不能够修改了。倘若没有任何新的观测,这个节拍将永远以k的频率进行下去。而新来的修改,只能够通过微调kb,来修改x > T对应的等差数列的点,特别的x=T+1对应的时刻,即下一拍的时刻。

现在,我们的节拍分析库新处理处了一个最近10s的节拍序列预测,同样是一个等差数列Q:k'x + b'(注意,这里甚至可能出现k' = 0.5 k的情况)。我们已知预测B_{T+1},我们找到Q上面,离这个B_{T+1}最近的那个点,然后将B_{T+1}向该点靠近(即间距变为原来的0.9倍),得到了下一拍的更新值,我们可以进一步更新kb

这一个过程中有很多细节,有需要的话,可以看看代码——

import threading
import logging
import time
import numpy as np
import librosa
import bisect
import math
from .base_analyzer import BaseAnalyzer

class BeatAnalyzer(BaseAnalyzer):
    def __init__(self,
            rec_time = 5,
            initial_bpm = 120,
            smooth_ratio = .2
            ):
        BaseAnalyzer.__init__(self)
        self.initial_k = 60 / initial_bpm
        self.current_b = time.time()
        self.current_k = self.initial_k
        self.expected_b = None
        self.expected_k = None
        self.beat_count = 0
        self.rec_time = rec_time
        self.smooth_ratio = smooth_ratio

    def register_recorder(self,recorder):
        BaseAnalyzer.register_recorder(self,recorder)
        self.rec_size = self.sr * self.rec_time

    def run(self):
        while self.recorder.start_time is None:
            time.sleep(1)
        self.current_b = time.time()
        self.start_time = self.recorder.start_time
        while self.running.isSet():
            if len(self.audio_data) < 4 * self.sr:
                time.sleep(.5)
                self.logger.debug("The data is not enough...")
                continue
            start_samples = len(self.audio_data) - self.rec_size if len(self.audio_data) > self.rec_size else 0
            data = np.array(self.audio_data[start_samples:]).astype(np.float32)
            start_time = start_samples / self.sr
            tmpo, _beat_frames = librosa.beat.beat_track(y=data,sr = self.sr)
            beat_times = librosa.frames_to_time(_beat_frames) + start_time + self.start_time

            if len(beat_times) < 5:
                self.logger.debug("The beats count <%d> is not enough..."%len(beat_times))
                continue

            self.expected_k,self.expected_b = np.polyfit(range(len(beat_times)),beat_times,1)

    def block_until_next_beat(self):
        if self.expected_b is None:
            now = time.time()
            pred_i = math.ceil((now - self.current_b)/self.current_k)
            self.beat_count += pred_i
            pred_v2 = pred_i * self.current_k + self.current_b
            pred_v1 = pred_v2 - self.current_k
            pred = pred_v2
        else:
            now = time.time()
            pred_i = math.ceil((now - self.current_b)/self.current_k)
            pred_v2 = pred_i * self.current_k + self.current_b
            pred_v1 = pred_v2 - self.current_k

            exp_i = math.ceil((pred_v2 - self.expected_b)/self.expected_k)
            exp_v2 = exp_i * self.expected_k + self.expected_b
            exp_v1 = exp_v2 - self.expected_k
            if (pred_v2-exp_v1) < (exp_v2-pred_v2):
                pred = pred_v2 + (exp_v1-pred_v2)*(1-self.smooth_ratio)
                self.beat_count += pred_i
            else:
                pred = pred_v2 + (exp_v2-pred_v2)*(1-self.smooth_ratio)
                self.beat_count += pred_i
        self.current_b = pred
        self.current_k = pred - pred_v1
        if not self.expected_k is None:
            self.current_k = self.current_k + (self.expected_k - self.current_k)*(1-self.smooth_ratio)
        if self.current_k < self.initial_k*.45:
            self.current_k *= 2
        if self.current_k > self.initial_k * 2.1:
            self.current_k /= 2
        while time.time() < pred:
            time.sleep(.01)
        return self.beat_count

所有的音频分析工作我们都借用了librosa 库进行处理。很幸运地发现,librosa 官方文档中要求输入的变量y 恰好就是我们之前录音出来的audio_data 。

 

《python实现音频节拍的实时识别》有一个想法

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据