Python使用PyAudio制作录音工具的实现代码
来源:脚本之家    时间:2022-04-24 09:52:44
目录
应用平台音频录制部分音频播放部分GUI窗口所需属性值代码部分pynput监听键盘总结

最近有在使用屏幕录制软件录制桌面,在用的过程中突发奇想,使用python能不能做屏幕录制工具,也锻炼下自己的动手能力。
接下准备写使用python如何做屏幕录制工具的系列文章:

录制屏幕制作视频

录制音频

合成视频,音频

基于pyqt5制作可视化窗口

大概上述四个部分,希望自己能够尽快完善,上一篇文章利用opencv制作了屏幕录制部分,接下继续更新系列,使用python录制音频。

应用平台

windows 10python 3.7

音频录制部分

音频录制与视频录制相似,也是以数据帧的方式录制保存,这次使用强大的第三方包PyAudio和内置的wave模块编写主要部分代码:

pip install PyAudio

如果出现安装失败,可点击去此处下载对应.whl文件,cp37代表python3.7环境,64代表64位操作系统。
假如不是下载对应的whl包会导致安装失败,下载完成后,cmd窗口下进入whl的所在目录,使用pip install PyAudio-xx.whl即可完成安装。

音频录制主要代码:

from pyaudio import PyAudio, paInt16, paContinue, paComplete

# 设置固定参数
chunk = 1024  # 每个缓冲区的帧数
format_sample = paInt16  # 采样位数
channels = 2  # 声道: 1,单声道;2,双声道
fps = 44100  # 采样频率
# 这里采用回调的方式录制音频
def callback(in_data, frame_count, time_info, status):
    """录制回调函数"""
    wf.writeframes(in_data)
    if xx:  # 当某某条件满足时
        return in_data, paContinue
    else:
        return in_data, paComplete
# 实例化PyAudio
p = PyAudio()
stream = p.open(format=format_sample,
				channels=channels,
				rate=fps,
                frames_per_buffer=chunk,
                input=True,
                input_device_index=None,  # 输入设备索引, None为默认设备
                stream_callback=callback   # 回调函数
                )
# 开始流录制
stream.start_stream()
# 判断流是否活跃
while stream.is_active():
	time.sleep(0.1)    # 0.1为灵敏度
# 录制完成,关闭流及实例
stream.stop_stream()
stream.close()
p.terminate()

采取流式并用回调函数录制,需要先定义保存音频文件,用wave新建音频二进制文件:

import wave
wf = wave.open("test.wav", "wb")
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(format_sample))
wf.setframerate(fps)

为了后续代码可以很好的与之结合复用,将上面的代码包装成类

from pyaudio import PyAudio
class AudioRecord(PyAudio):
    def __init__(self,):

源码于文末补充。

音频播放部分

播放部分代码与录制部分代码相差不大,核心部分:

wf = wave.open("test.wav", "rb")
def callback(in_data, frame_count, time_info, status):
	data = wf.readframes(frame_count)
	return data, paContinue
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
				channels=wf.getnchannels(),
                rate=wf.getframerate(),
				output=True,
				output_device_index=output_device_index,  # 输入设备索引
				stream_callback=callback  # 输出用回调函数
                )
stream.start_stream()
while stream.is_active():
	time.sleep(0.1)

目前暂时测试了.wav.mp3格式可以正常录制及播放,其它类型格式音频可以自行调用代码进行测试。

GUI窗口所需属性值代码部分

考虑到GUI窗口能较为人性化的输出及输入值,编写该部分代码,内容含音频时长及获取输入设备及输出设备。

# 音频时长
duration = wf.getnframes() / wf.getframerate()
# 获取系统目前已安装的输入输出设备
dev_info = self.get_device_info_by_index(i)
default_rate = int(dev_info["defaultSampleRate"])
if not dev_info["hostApi"] and default_rate == fps and "映射器" not in dev_info["name"]:
	if dev_info["maxInputChannels"]:
		print("输入设备:", dev_info["name"])
	elif dev_info["maxOutputChannels"]:
		print("输出设备:", dev_info["name"])

pynput监听键盘

在这部分代码也暂时使用pynput监听键盘来对录音做中断处理。可以调用上一篇文章中的键盘监听代码。

def hotkey(self):
    """热键监听"""
    with keyboard.Listener(on_press=self.on_press) as listener:
        listener.join()

def on_press(self, key):
    try:
        if key.char == "t":  # t键,录制结束,保存音频
            self.flag = True
        elif key.char == "k":  # k键,录制中止,删除文件
            self.flag = True
            self.kill = True
    except Exception as e:
        print(e)

功能与上一篇类似,不再赘述。

总结

以上就是使用PyAudio调用windows的音频设备进行录制及播放,整体学习了使用类及其继承相关知识,用法在这只是展示了冰山一角,还有更多的知识等待着我们一起去探索!

于二零二一年十二月二十日作

源码:

import wave
import time
from pathlib import Path
from threading import Thread
from pyaudio import PyAudio, paInt16, paContinue, paComplete
from pynput import keyboard  # pip install pynput

class AudioRecord(PyAudio):
    def __init__(self, channels=2):
        super().__init__()
        self.chunk = 1024  # 每个缓冲区的帧数
        self.format_sample = paInt16  # 采样位数
        self.channels = channels  # 声道: 1,单声道;2,双声道
        self.fps = 44100  # 采样频率
        self.input_dict = None
        self.output_dict = None
        self.stream = None
        self.filename = "~test.wav"
        self.duration = 0   # 音频时长
        self.flag = False
        self.kill = False
    def __call__(self, filename):
        """重载文件名"""
        self.filename = filename
    def callback_input(self, in_data, frame_count, time_info, status):
        """录制回调函数"""
        self.wf.writeframes(in_data)
        if not self.flag:
            return in_data, paContinue
        else:
            return in_data, paComplete
    def callback_output(self, in_data, frame_count, time_info, status):
        """播放回调函数"""
        data = self.wf.readframes(frame_count)
        return data, paContinue
    def open_stream(self, name):
        """打开录制流"""
        input_device_index = self.get_device_index(name, True) if name else None
        return self.open(format=self.format_sample,
                         channels=self.channels,
                         rate=self.fps,
                         frames_per_buffer=self.chunk,
                         input=True,
                         input_device_index=input_device_index,  # 输入设备索引
                         stream_callback=self.callback_input
                         )
    def audio_record_run(self, name=None):
        """音频录制"""
        self.wf = self.save_audio_file(self.filename)
        self.stream = self.open_stream(name)
        self.stream.start_stream()
        while self.stream.is_active():
            time.sleep(0.1)
        self.wf.close()
        if self.kill:
            Path(self.filename).unlink()
        self.duration = self.get_duration(self.wf)
        print(self.duration)
        self.terminate_run()
    def run(self, filename=None, name=None, record=True):
        """音频录制线程"""
        thread_1 = Thread(target=self.hotkey, daemon=True)
        if record:
            # 录制
            if filename:
                self.filename = filename
            thread_2 = Thread(target=self.audio_record_run, args=(name,))
            # 播放
            if not filename:
                raise Exception("未输入音频文件名,不能播放,请输入后再试!")
            thread_2 = Thread(target=self.read_audio, args=(filename, name,))
        thread_1.start()
        thread_2.start()
    def read_audio(self, filename, name=None):
        """音频播放"""
        output_device_index = self.get_device_index(name, False) if name else None
        with wave.open(filename, "rb") as self.wf:
            self.duration = self.get_duration(self.wf)
            self.stream = self.open(format=self.get_format_from_width(self.wf.getsampwidth()),
                                    channels=self.wf.getnchannels(),
                                    rate=self.wf.getframerate(),
                                    output=True,
                                    output_device_index=output_device_index,  # 输出设备索引
                                    stream_callback=self.callback_output
                                    )
            self.stream.start_stream()
            while self.stream.is_active():
                time.sleep(0.1)
    @staticmethod
    def get_duration(wf):
        """获取音频时长"""
        return round(wf.getnframes() / wf.getframerate(), 2)
    def get_in_out_devices(self):
        """获取系统输入输出设备"""
        self.input_dict = {}
        self.output_dict = {}
        for i in range(self.get_device_count()):
            dev_info = self.get_device_info_by_index(i)
            default_rate = int(dev_info["defaultSampleRate"])
            if not dev_info["hostApi"] and default_rate == self.fps and "映射器" not in dev_info["name"]:
                if dev_info["maxInputChannels"]:
                    self.input_dict[dev_info["name"]] = i
                elif dev_info["maxOutputChannels"]:
                    self.output_dict[dev_info["name"]] = i
    def get_device_index(self, name, input_in=True):
        """获取选定设备索引"""
        if input_in and self.input_dict:
            return self.input_dict.get(name, -1)
        elif not input_in and self.output_dict:
            return self.output_dict.get(name, -1)
    def save_audio_file(self, filename):
        """音频文件保存"""
        wf = wave.open(filename, "wb")
        wf.setnchannels(self.channels)
        wf.setsampwidth(self.get_sample_size(self.format_sample))
        wf.setframerate(self.fps)
        return wf
    def terminate_run(self):
        """结束流录制或流播放"""
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
        self.terminate()
    def hotkey(self):
        """热键监听"""
        with keyboard.Listener(on_press=self.on_press) as listener:
            listener.join()
    def on_press(self, key):
        try:
            if key.char == "t":  # t键,录制结束,保存音频
                self.flag = True
            elif key.char == "k":  # k键,录制中止,删除文件
                self.kill = True
        except Exception as e:
            print(e)
if __name__ == "__main__":
    audio_record = AudioRecord()
    audio_record.get_in_out_devices()
    # 录制
    print(audio_record.input_dict)
    audio_record.run("test.mp3")
    # 播放
    print(audio_record.output_dict)
    audio_record.run("test.mp3", record=False)

到此这篇关于Python使用PyAudio制作录音工具的文章就介绍到这了,更多相关Python PyAudio录音工具内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

关键词: 回调函数 输入设备 输出设备 录音工具 删除文件

上一篇:

下一篇:

X 关闭

X 关闭