FFTの応用であるSTFTを使ったスペクトログラムは周波数波形の時間変化がわかるため、音声解析でよく使われます。これまでWATLABブログではwavファイルや生成した波形からスペクトログラムをつくっていましたが、この記事ではリアルタイムにスペクトログラムを計算する方法を紹介します。
こんにちは。wat(@watlablog)です。ここでは録音した音声をリアルタイムにスペクトログラムにする方法を紹介します!
スペクトログラムについて
計算方法
スペクトログラムには高速フーリエ変換(FFT:Fast Fourie Transform)を短時間で行うSTFT(Short Time Fourie Transform)という計算が使われています。STFTの計算方法やスペクトログラムの作り方はWATLABブログですでに説明しているので、以下の記事を参考にしてください。
・Pythonで音のSTFT計算を自作!スペクトログラム表示する方法
リアルタイムでスペクトログラムを計算する目的
これまではあらかじめ音声が録音されたwavファイルや生成した波形からスペクトログラムを作成していました。wavファイルを使えば、計算に多くのリソースを使えるので詳細な分析が可能です。しかし、その場で周波数分析をしたい時にいちいち録音してから波形分析をかけるのは若干面倒な時があります。このような場面でリアルタイム処理の需要がでてきます。
例えばリアルタイムにスペクトログラムを目視することができれば、分析対象の音が鳴った瞬間に何[Hz]の音がどのくらいの時間で鳴っていたのかが一目瞭然です。即時分析ができると音の測定現場における生産性が上がるため、リアルタイムに処理ができるプログラムは現場の音分析担当者にこそ役に立ちます。
STEP1:リアルタイムにチャート波形を表示するPythonコード
並列処理のプログラミング方法
以下の記事でPyAudioとMatplotlibをthreadingで並列処理するコードを紹介しています。基本的にはこの記事のリアルタイムFFTの仕組みをそのまま使います。
Check Point! : 並列処理を行うPythonコード
チャート波形を追加する
上記記事の内容にチャート波形を追加してみましょう。チャート波形とは、任意サイズのバッファを持ったデータを表示する波形の種類です。チャート波形の動画を以下に示します。
右側の波形がチャート波形です。10秒間のデータをバッファに保持し、流れていく波形となっています。
このプログラムは以下のコードで作成できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import numpy as np import queue import threading import pyaudio import matplotlib.pyplot as plt from scipy import fftpack # キュー data_queue = queue.Queue() def record_thread(index, samplerate, frames_per_buffer): """リアルタイムに音声を録音するスレッド""" # PyAudioインスタンスの生成とストリーム開始 pa = pyaudio.PyAudio() stream = pa.open(format=pyaudio.paInt16, channels=1, rate=samplerate, input=True, input_device_index=index, frames_per_buffer=frames_per_buffer) # リアルタイム録音 try: while True: data = stream.read(frames_per_buffer, exception_on_overflow=False) data = np.frombuffer(data, dtype="int16") / float((np.power(2, 16) / 2) - 1) data_queue.put(data) #print(len(data_queue.queue)) finally: stream.stop_stream() stream.close() pa.terminate() def get_mic_index(): ''' マイクチャンネルのindexをリストで取得する ''' # 最大入力チャンネル数が0でない項目をマイクチャンネルとしてリストに追加 pa = pyaudio.PyAudio() mic_list = [] for i in range(pa.get_device_count()): num_of_input_ch = pa.get_device_info_by_index(i)['maxInputChannels'] if num_of_input_ch != 0: mic_list.append(pa.get_device_info_by_index(i)['index']) return mic_list[0] def calc_fft(data, samplerate): """FFTを計算する関数""" spectrum = fftpack.fft(data) amp = np.sqrt((spectrum.real ** 2) + (spectrum.imag ** 2)) amp = amp / (len(data) / 2) phase = np.arctan2(spectrum.imag, spectrum.real) phase = np.degrees(phase) freq = np.linspace(0, samplerate, len(data)) return spectrum, amp, phase, freq def plot_waveform(samplerate, frame_per_buffer): """波形をプロットする関数""" # プロットの設定 plt.rcParams['font.size'] = 14 plt.rcParams['font.family'] = 'Times New Roman' plt.rcParams['xtick.direction'] = 'in' plt.rcParams['ytick.direction'] = 'in' fig = plt.figure(figsize=(12, 5)) ax1 = fig.add_subplot(221) ax2 = fig.add_subplot(223) ax3 = fig.add_subplot(122) ax1.yaxis.set_ticks_position('both') ax1.xaxis.set_ticks_position('both') ax2.yaxis.set_ticks_position('both') ax2.xaxis.set_ticks_position('both') ax3.yaxis.set_ticks_position('both') ax3.xaxis.set_ticks_position('both') ax1.set_xlabel('Time [s]') ax1.set_ylabel('Amplitude') ax2.set_xlabel('Frequency [Hz]') ax2.set_ylabel('Amplitude') ax3.set_xlabel('Time [s]') ax3.set_ylabel('Amplitude') ax1.set_ylim(-0.25, 0.25) ax2.set_yscale('log') ax2.set_xlim(0, 10000) ax2.set_ylim(0.00001, 1) ax3.set_xlim(0, 10) line1, = ax1.plot([], [], label='Time waveform', lw=1, color='red') line2, = ax2.plot([], [], label='Amplitude', lw=1, color='blue') line3, = ax3.plot([], [], label='Time chart', lw=1, color='red') # チャートデータ(初期値は空) history_data = np.array([]) i = 0 while plt.fignum_exists(fig.number): if not data_queue.empty(): i += 1 # デキュー data = data_queue.get() # 時間軸作成 time_axis = np.linspace(0, len(data) / samplerate, num=len(data)) # チャート波形作成 history_data = np.concatenate((history_data, data)) max_chart_time = 10 dt = 1 / samplerate # max_chart_timeを超えたら古いデータを削除 if len(history_data) / samplerate > max_chart_time: history_data = history_data[frame_per_buffer:] #print(len(history_data) * dt) time_chart_axis = np.linspace(0, len(history_data) / samplerate, num=len(history_data)) # FFT spectrum, amp, phase, freq = calc_fft(data, samplerate) line1.set_data(time_axis, data) line2.set_data(freq[:len(freq) // 2], amp[:len(amp) // 2]) line3.set_data(time_chart_axis, history_data) ax1.relim() ax1.autoscale_view() ax2.relim() ax2.autoscale_view() ax3.relim() ax3.autoscale_view() fig.tight_layout() try: plt.pause(0.01) except Exception as e: print('Error') if __name__ == '__main__': """メイン文""" # 録音設定:サンプリングレート/フレームサイズ/マイクチャンネル samplerate = 12800 frames_per_buffer = 2048 index = get_mic_index() print(index) # 測定スレッド threading.Thread(target=record_thread, args=(index, samplerate, frames_per_buffer), daemon=True).start() # プロットスレッド plot_waveform(samplerate, frames_per_buffer) |
96, 111〜117行目:チャート波形の作り方
history_dataを用意し、 np.concatenateでデキューしたデータを繋げていきます。その後、データ長が max_chart_dataを超えたらNumPyのスライスで古いデータを破棄する部分がポイントです。
STEP2:リアルタイムにスペクトログラムを作成するPythonコード
Pythonコード
普通の時間波形でチャート波形を描けたら、後は次のコードでこの仕組みを利用してスペクトログラムのチャート波形を作成しましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import numpy as np import queue import threading import pyaudio import matplotlib.pyplot as plt from scipy import fftpack # キュー data_queue = queue.Queue() def record_thread(index, samplerate, frames_per_buffer): """リアルタイムに音声を録音するスレッド""" # PyAudioインスタンスの生成とストリーム開始 pa = pyaudio.PyAudio() stream = pa.open(format=pyaudio.paInt16, channels=1, rate=samplerate, input=True, input_device_index=index, frames_per_buffer=frames_per_buffer) # リアルタイム録音 try: while True: data = stream.read(frames_per_buffer, exception_on_overflow=False) data = np.frombuffer(data, dtype="int16") / float((np.power(2, 16) / 2) - 1) data_queue.put(data) finally: stream.stop_stream() stream.close() pa.terminate() def get_mic_index(): ''' マイクチャンネルのindexをリストで取得する ''' # 最大入力チャンネル数が0でない項目をマイクチャンネルとしてリストに追加 pa = pyaudio.PyAudio() mic_list = [] for i in range(pa.get_device_count()): num_of_input_ch = pa.get_device_info_by_index(i)['maxInputChannels'] if num_of_input_ch != 0: mic_list.append(pa.get_device_info_by_index(i)['index']) return mic_list[0] def calc_fft(data, samplerate): """FFTを計算する関数""" spectrum = fftpack.fft(data) amp = np.sqrt((spectrum.real ** 2) + (spectrum.imag ** 2)) amp = amp / (len(data) / 2) phase = np.arctan2(spectrum.imag, spectrum.real) phase = np.degrees(phase) freq = np.linspace(0, samplerate, len(data)) return spectrum, amp, phase, freq def plot_waveform(samplerate, frames_per_buffer): """波形をプロットする関数""" # 最大保持時間 max_chart_time = 10 # プロットの設定 plt.rcParams['font.size'] = 14 plt.rcParams['font.family'] = 'Times New Roman' plt.rcParams['xtick.direction'] = 'in' plt.rcParams['ytick.direction'] = 'in' fig, ax = plt.subplots(figsize=(8, 5)) ax.set_xlabel('Time [s]') ax.set_ylabel('Frequency [Hz]') # スペクトログラムの初期データ initial_data = np.zeros((frames_per_buffer // 2, int(samplerate * max_chart_time / frames_per_buffer))) im = ax.imshow(20 * np.log10(initial_data), aspect='auto', extent=[0, max_chart_time, 0, samplerate // 2], cmap='jet', vmin=0, vmax=60) cbar = fig.colorbar(im) cbar.set_label('Noise level[dB]') # スペクトログラムチャートデータ(初期値は2次元の0配列) history_amp = initial_data # プロットループ while plt.fignum_exists(fig.number): if not data_queue.empty(): # デキュー data = data_queue.get() # FFTを実行して振幅を計算 spectrum, amp, phase, freq = calc_fft(data, samplerate) new_amp = amp[:frames_per_buffer // 2][::-1] # データの時間長が最大保持時間を超えた場合、古いデータから削除 if history_amp.shape[1] > int(samplerate * max_chart_time / frames_per_buffer): history_amp = history_amp[:, -int(samplerate * max_chart_time / frames_per_buffer):] history_amp = np.hstack((history_amp, new_amp.reshape(-1, 1))) # 振幅データをdBに変換してプロット im.set_array(20 * np.log10(history_amp / 2e-5)) ax.set_xlim(0, max_chart_time) ax.set_ylim(0, 3000) fig.tight_layout() plt.pause(0.01) if __name__ == '__main__': """メイン文""" # サンプリングレートとフレームサイズ、マイクチャンネルを設定 samplerate = 12800 frames_per_buffer = 2048 index = get_mic_index() # 録音関数を並列化実行 threading.Thread(target=record_thread, args=(index, samplerate, frames_per_buffer), daemon=True).start() # プロット関数を実行 plot_waveform(samplerate, frames_per_buffer) |
72〜79行目:スペクトログラムの初期データ
スペクトログラムを流れるように表示させるために、まずは initial_dataとして空(0)の2次元配列を最大時間長分作成します。これを history_ampにセットしておきます。
89, 91〜95行目:スペクトログラムデータの作成
new_ampでFFT後の振幅(Amplitude)を用意しますが、振幅として意味のあるデータはナイキスト周波数までなので // 2とデータを半分切り捨てます。スペクトログラムにした時に周波数成分が上下逆になってしまうことを防ぐために、 [::-1]で逆順に並び替えています。
97〜100行目:dB変換とプロット範囲の指定
スペクトログラムは
imにセットしますが、その際に対数の計算を行なってdBに変換します。dB変換については以下の記事を参考にしてください。
・Pythonで音圧のデシベル(dB)変換式と逆変換式!
こちらの動画が実行結果の例です。スペクトログラムが流れています!
注意点:オーバーラップ処理は入れていない
コードを見るとわかりますが、このコードはオーバーラップ処理は入れていません。オーバーラップ処理を入れると計算量が増えるのでリアルタイム性が損なわれます。そのためより詳細な分析をするためにはwavファイルにデータを保存しておき、それをポスト処理で再度オーバーラップ付きスペクトログラム計算にかけるのが良いでしょう。
詳細な計算の方は以下の記事を確認してください。
・Pythonで音のSTFT計算を自作!スペクトログラム表示する方法
おまけ:wav保存機能付きリアルタイムスペクトログラムコード
以下はおまけです。スペクトログラムで表示されている範囲をwavに保存できれば便利かなと思い、wav保存機能を追加してみました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import numpy as np import queue import threading import pyaudio import matplotlib.pyplot as plt from scipy import fftpack import soundfile as sf # キュー data_queue = queue.Queue() def record_thread(index, samplerate, frames_per_buffer): """リアルタイムに音声を録音するスレッド""" # PyAudioインスタンスの生成とストリーム開始 pa = pyaudio.PyAudio() stream = pa.open(format=pyaudio.paInt16, channels=1, rate=samplerate, input=True, input_device_index=index, frames_per_buffer=frames_per_buffer) # リアルタイム録音 try: while True: data = stream.read(frames_per_buffer, exception_on_overflow=False) data = np.frombuffer(data, dtype="int16") / float((np.power(2, 16) / 2) - 1) data_queue.put(data) finally: stream.stop_stream() stream.close() pa.terminate() def get_mic_index(): ''' マイクチャンネルのindexをリストで取得する ''' # 最大入力チャンネル数が0でない項目をマイクチャンネルとしてリストに追加 pa = pyaudio.PyAudio() mic_list = [] for i in range(pa.get_device_count()): num_of_input_ch = pa.get_device_info_by_index(i)['maxInputChannels'] if num_of_input_ch != 0: mic_list.append(pa.get_device_info_by_index(i)['index']) return mic_list[0] def calc_fft(data, samplerate): """FFTを計算する関数""" spectrum = fftpack.fft(data) amp = np.sqrt((spectrum.real ** 2) + (spectrum.imag ** 2)) amp = amp / (len(data) / 2) phase = np.arctan2(spectrum.imag, spectrum.real) phase = np.degrees(phase) freq = np.linspace(0, samplerate, len(data)) return spectrum, amp, phase, freq def plot_waveform(samplerate, frames_per_buffer): """波形をプロットする関数""" # 最大保持時間 max_chart_time = 10 # プロットの設定 plt.rcParams['font.size'] = 14 plt.rcParams['font.family'] = 'Times New Roman' plt.rcParams['xtick.direction'] = 'in' plt.rcParams['ytick.direction'] = 'in' fig, ax = plt.subplots(figsize=(8, 5)) ax.set_xlabel('Time [s]') ax.set_ylabel('Frequency [Hz]') # スペクトログラムの初期データ initial_data = np.zeros((frames_per_buffer // 2, int(samplerate * max_chart_time / frames_per_buffer))) im = ax.imshow(20 * np.log10(initial_data), aspect='auto', extent=[0, max_chart_time, 0, samplerate // 2], cmap='jet', vmin=0, vmax=60) cbar = fig.colorbar(im) cbar.set_label('Noise level[dB]') # スペクトログラムチャートデータ(初期値は2次元の0配列) history_amp = initial_data # wavファイル保存用のチャートデータ(初期値は空) history_data = np.array([]) # プロットループ while plt.fignum_exists(fig.number): if not data_queue.empty(): # デキュー data = data_queue.get() # チャート波形作成(wav保存用) history_data = np.concatenate((history_data, data)) # max_chart_timeを超えたら古いデータを削除 if len(history_data) / samplerate > max_chart_time: history_data = history_data[frames_per_buffer:] # FFTを実行して振幅を計算 spectrum, amp, phase, freq = calc_fft(data, samplerate) new_amp = amp[:frames_per_buffer // 2][::-1] # データの時間長が最大保持時間を超えた場合、古いデータから削除 if history_amp.shape[1] > int(samplerate * max_chart_time / frames_per_buffer): history_amp = history_amp[:, -int(samplerate * max_chart_time / frames_per_buffer):] history_amp = np.hstack((history_amp, new_amp.reshape(-1, 1))) # 振幅データをdBに変換してプロット im.set_array(20 * np.log10(history_amp / 2e-5)) ax.set_xlim(0, max_chart_time) ax.set_ylim(0, 3000) fig.tight_layout() plt.pause(0.01) return history_data if __name__ == '__main__': """メイン文""" # サンプリングレートとフレームサイズ、マイクチャンネルを設定 samplerate = 12800 frames_per_buffer = 2048 index = get_mic_index() # 録音関数を並列化実行 threading.Thread(target=record_thread, args=(index, samplerate, frames_per_buffer), daemon=True).start() # プロット関数を実行 waveform = plot_waveform(samplerate, frames_per_buffer) # wavファイルの保存 sf.write('recorded.wav', waveform, samplerate) print(f'waveform length = {len(waveform) / samplerate}[s]') |
このコードを実行すると、スペクトログラムで表示されている範囲のデータが recorded.wavというファイルに保存されます。録音の全範囲を保存するとデータ量が多くなってしまうので、実用的にはこのような仕様で問題ないと思いました。
目的とする音が測定できたらプロットを閉じるだけ!これでwavファイルが保存されます!
ちなみにwavファイルの音量をもっとあげたい時は波形の絶対値の最大値を1にする正規化する手もあります。
1 2 3 4 5 |
# wavファイルの保存(波形の絶対値の最大値を1にして最大限の音量に正規化) max_value = np.max(np.abs(waveform)) normalized_waveform = waveform / max_value sf.write('recorded.wav', waveform, samplerate) print(f'waveform length = {len(waveform) / samplerate}[s]') |
まとめ
本記事では録音した音声のチャート波形を表示する練習を行い、スペクトログラムをチャートで流してみました。この処理を行うことでリアルタイムに周波数波形の変化を目で確認できるので、現場における音分析が捗ります。
また、最後にスペクトログラムで表示されている範囲の音声をwavファイルに保存するコードも紹介しました。
ブログ立ち上げ当初からやりたかったリアルタイムスペクトログラム計算ができました!
Xでも関連情報をつぶやいているので、wat(@watlablog)のフォローお待ちしています!