概述
实验室最近有一个项目需要完成一个实施录音的节拍器。具体需求是通过话筒,实时接收音频,并且在录制音频的同时,预测下一个时刻的节拍,类似于可以自行调整速度的节拍器。
代码地址:https://github.com/mhy12345/rcaudio,值得一提的是,这个工程是我第一个打包上传到pypi.org的工程,也就是说,读者可以使用pip install rcaudio 来安装。
在整个rcaudio中,节奏识别是其中最重量级的一个模块,最终实现效果为:对于所有背景音中有节拍点的音乐,该节拍器都能够很快的通过自适应算法向正确的节拍点靠近。
在静态节奏识别模块,我使用了librosa库,这个库的教程在我另一篇文章中。
思路
录音,音频处理,以及节拍识别都需要较大的计算量,倘若使用单线程,势必会产生极高的延时。对于要求绝对精度的节拍识别是不可行的。因此整个工程使用python的threading库,并行运行多个线程。他们分别是
- 一个线程a单独监听麦克风,并将音频数据源源不断写入一个数组Q。
- 一个线程b监听数组Q,当数组Q产生变化之后,立即将新来的数据发送给其他线程。(这一部分的逻辑是,倘若python多线程存在竞争与冒险的话,我们应该尽量最小化能够直接操作数组Q的线程数量)
- 一个线程c在得到数据后,截取当前最近10s的音频数据,使用音频分析库以及节奏分析库获取节奏信息。
录音
录音部分使用了pyaudio 库实现的,当指定了采样率sample rate以及声道数channels,pyaudio 可以将音频的采样点对应的强度向量储存在一个缓存numpy数组中,缓存数组的大小在录音前的pa.open 函数中已经指定好了。录音开始后,pyaudio 会将音频数据源源不断写入缓存,如果缓存溢出了,抛出异常。同时,每次调用read 函数都会将一定数量的缓存拉出来,这样的步骤使用一个while True 循环包裹,就能够源源不断将录音数据记录下来了。
注意,这里缓存可能出现不足的问题 OSError : [Errno -9981] Input overflowed . 出现了这种问题基本可以通过如下方式解决——
- 修改buffer大小
- 修改每次取出数据的块大小
- 修改采样率
- 尽量减少while循环中的无关代码以提高代码运行效率
这一部分对应的代码如下
import threading import logging from pyaudio import PyAudio,paInt16 import numpy as np import queue import time class CoreRecorder(threading.Thread): def __init__(self, time = None, #How much time to the end sr = 20000, #Sample rate batch_num = 600, #Batch size (how much data for a single fetch) frames_per_buffer = 600 ): threading.Thread.__init__(self) self.time = time self.sr = sr self.batch_num = batch_num self.data_alter = threading.Lock() self.frames_per_buffer = frames_per_buffer self.logger = logging.getLogger(__name__ + '.CoreRecorder') self.buffer = queue.Queue() self.start_time = None self.__running = threading.Event() self.__running.set() def run(self): self.logger.debug("Start to recording...") self.logger.debug(" Time = %s"%self.time) self.logger.debug(" Sample Rate = %s"%self.sr) self.start_time = time.time() pa=PyAudio() stream=pa.open(format = paInt16,channels=1, rate=self.sr,input=True, frames_per_buffer=self.frames_per_buffer) my_buf=[] count=0 if self.time is None: total_count = 1e10 else: total_count = self.time * self.sr / self.batch_num while count< total_count and self.__running.isSet(): datawav = stream.read(self.batch_num, exception_on_overflow = True) datause = np.fromstring(datawav,dtype = np.short) for w in datause: self.buffer.put(w) count+=1 stream.close() def save_wave_file(self,filename,data): wf=wave.open(filename,'wb') wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(self.sr) wf.writeframes(b"".join(data)) wf.close() def stop(self): self.__running.clear()
数据被取出到了buffer,我们通过一个守护线程SimpleRecorder ,将数据储存在本地供后续调用。这一步初衷是尽可能减少CoreRecorder 的工作量,以保证它能够即使把数据给读出来。
from .core_recorder import CoreRecorder import threading import logging import time class SimpleRecorder(threading.Thread): def __init__(self, sr = 2000, #Sample Rate ): threading.Thread.__init__(self) self.audio_data = [] self.audio_lock = threading.Lock() self.logger = logging.getLogger(__name__+".SimpleWatcher") self.recorder = CoreRecorder(sr = sr) self.analyzers = [] self.sr = sr self.start_time = None self.__running = threading.Event() self.__running.set() def register(self,analyzer): self.analyzers.append(analyzer) analyzer.register_recorder(self) def run(self): self.recorder.start() for analyzer in self.analyzers: analyzer.start() while self.__running.isSet(): self.start_time = self.recorder.start_time if self.start_time is not None: break time.sleep(.05) while self.__running.isSet(): while not self.recorder.buffer.empty(): v = self.recorder.buffer.get() self.audio_data.append(v) for analyzer in self.analyzers: analyzer.stop() analyzer.join() self.recorder.stop() self.recorder.join() def stop(self): self.logger.warn("Stop Signal Received!") self.__running.clear()
节拍识别
由于音频分析本身非常消耗计算量,也非常慢,因此我们新开一个线程BeatAnalyzer 专门处理音频分析,这个线程将依次处理出节拍信息(用于校准节拍器),每一拍对应MFCC信息(用于音频分析,也可以是一些其他东西),音量信息(用于判定音乐结束)。
对于节拍识别模块,效率非常低,基本上一个10s的音频数据需要1s左右来识别。而且识别还可能存在一些误差,因此我们需要设计一个算法,能够不受程序噪音的影响——
我的算法基本思路为,当前第拍的节拍点的时刻可以看做是一个等差数列P:
,且由于我们是实时的节拍识别,因此对于所有
的正整数x,我们都已经给予用户反馈而不能够修改了。倘若没有任何新的观测,这个节拍将永远以
的频率进行下去。而新来的修改,只能够通过微调
和
,来修改
对应的等差数列的点,特别的
对应的时刻,即下一拍的时刻。
现在,我们的节拍分析库新处理处了一个最近10s的节拍序列预测,同样是一个等差数列Q:(注意,这里甚至可能出现
的情况)。我们已知预测
,我们找到Q上面,离这个
最近的那个点,然后将
向该点靠近(即间距变为原来的0.9倍),得到了下一拍的更新值,我们可以进一步更新
和
。
这一个过程中有很多细节,有需要的话,可以看看代码——
import threading import logging import time import numpy as np import librosa import bisect import math from .base_analyzer import BaseAnalyzer class BeatAnalyzer(BaseAnalyzer): def __init__(self, rec_time = 5, initial_bpm = 120, smooth_ratio = .2 ): BaseAnalyzer.__init__(self) self.initial_k = 60 / initial_bpm self.current_b = time.time() self.current_k = self.initial_k self.expected_b = None self.expected_k = None self.beat_count = 0 self.rec_time = rec_time self.smooth_ratio = smooth_ratio def register_recorder(self,recorder): BaseAnalyzer.register_recorder(self,recorder) self.rec_size = self.sr * self.rec_time def run(self): while self.recorder.start_time is None: time.sleep(1) self.current_b = time.time() self.start_time = self.recorder.start_time while self.running.isSet(): if len(self.audio_data) < 4 * self.sr: time.sleep(.5) self.logger.debug("The data is not enough...") continue start_samples = len(self.audio_data) - self.rec_size if len(self.audio_data) > self.rec_size else 0 data = np.array(self.audio_data[start_samples:]).astype(np.float32) start_time = start_samples / self.sr tmpo, _beat_frames = librosa.beat.beat_track(y=data,sr = self.sr) beat_times = librosa.frames_to_time(_beat_frames) + start_time + self.start_time if len(beat_times) < 5: self.logger.debug("The beats count <%d> is not enough..."%len(beat_times)) continue self.expected_k,self.expected_b = np.polyfit(range(len(beat_times)),beat_times,1) def block_until_next_beat(self): if self.expected_b is None: now = time.time() pred_i = math.ceil((now - self.current_b)/self.current_k) self.beat_count += pred_i pred_v2 = pred_i * self.current_k + self.current_b pred_v1 = pred_v2 - self.current_k pred = pred_v2 else: now = time.time() pred_i = math.ceil((now - self.current_b)/self.current_k) pred_v2 = pred_i * self.current_k + self.current_b pred_v1 = pred_v2 - self.current_k exp_i = math.ceil((pred_v2 - self.expected_b)/self.expected_k) exp_v2 = exp_i * self.expected_k + self.expected_b exp_v1 = exp_v2 - self.expected_k if (pred_v2-exp_v1) < (exp_v2-pred_v2): pred = pred_v2 + (exp_v1-pred_v2)*(1-self.smooth_ratio) self.beat_count += pred_i else: pred = pred_v2 + (exp_v2-pred_v2)*(1-self.smooth_ratio) self.beat_count += pred_i self.current_b = pred self.current_k = pred - pred_v1 if not self.expected_k is None: self.current_k = self.current_k + (self.expected_k - self.current_k)*(1-self.smooth_ratio) if self.current_k < self.initial_k*.45: self.current_k *= 2 if self.current_k > self.initial_k * 2.1: self.current_k /= 2 while time.time() < pred: time.sleep(.01) return self.beat_count
所有的音频分析工作我们都借用了librosa 库进行处理。很幸运地发现,librosa 官方文档中要求输入的变量y 恰好就是我们之前录音出来的audio_data 。
非常感谢,你的文章是我在无意中在github上找到,本人大三,四川大学,做项目的时候涉及语音分析,copy了部分代码,你的讲解浅显易懂,比较适合我这个水平,对我的学习很有帮助。