kivyはPythonのみでGUIアプリを開発可能なライブラリです。今回はPythonが得意とする信号処理分野のアプリとして簡易的なFFTアナライザを作ってみました。録音したデータの時間波形/周波数波形切り替えやピーク検出機能を実装しましたので、その詳細コードを紹介します。
こんにちは。wat(@watlablog)です。今回は録音したデータの周波数分析に使用可能なアプリを作ってみました!
kivyによるFFTアナライザアプリのコード例
前提知識
今回はいきなりPythonコードとkvファイルを示しますが、やっていることや基礎的な内容は以下の記事に記載しました。kvファイルの書き方、イベント処理のやり方、信号処理クラスをまとめた自作ファイルについてはこの記事をご覧ください。
動作環境
このページのコードは以下のPC環境とPython環境で確認しました。
Mac | OS | macOS Catalina 10.15.7 |
---|---|---|
CPU | 1.4[GHz] | |
メモリ | 8[GB] |
Python | Python 3.9.6 |
---|---|
PyCharm (IDE) | PyCharm CE 2020.1 |
Numpy | 1.21.1 |
Scipy | 1.4.1 |
Pandas | 1.0.5 |
matplotlib | 3.4.3 |
PySoundFile | 0.9.0.post1 |
PyAudio | 0.2.11 |
kivy | 2.1.0 |
Kivy-Garden | 0.1.5 |
コード
今回はボリュームが少し大きいので、コードの説明はアプリの動作確認動画を使いながら後半にまとめました。まずは全コードと簡単な説明を先に紹介します。
main.py
main.pyがメインのPythonファイルです。このファイルをPythonで実行してアプリを起動します。
class Root()がkivyのウィジェット動作に関連するクラスで、matplotlibとの連携、ボタンイベントやウィジェット状態の監視を行っています。
Root()をインスタンス化しているclass MyApp()でウィンドウサイズも設定しています。今回はデスクトップ環境による確認をしていますが、最終的にはモバイルアプリ化を目指しているため画面アスペクト比を16:9の縦長にしています。デスクトップ環境で使用する場合は使いやすいサイズに変更しても良いと思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
from kivy.app import App from kivy.uix.boxlayout import BoxLayout from kivy.garden.matplotlib.backend_kivyagg import FigureCanvasKivyAgg from kivy.core.window import Window from matplotlib import pyplot as plt from dspToolkit import DspToolkit import numpy as np class Root(BoxLayout): ''' Pythonで計算をした結果をmatplotlibでプロットするクラス ''' def __init__(self, **kwargs): super().__init__(**kwargs) ''' コンストラクタ:プロットの初期化とDspToolkitのインスタンス化を行う''' # DspToolkitをインスタンス化する self.dsp = DspToolkit() # プロットの初期化 x = [] y = [] self.fig = self.plot(x, y) # ウィジェットとしてfigをラップする self.ids.viewer.add_widget(FigureCanvasKivyAgg(self.fig)) # 計測時間を時間波形の最大表示時間初期値とする self.ids.max_time.text = self.ids.record_time.text # 検証用(widgetの位置や状態の確認) print(self.children[0].children[2]) print(self.children[0].children[2].children[1]) print(self.ids.display_mode.state) def on_button(self): ''' ボタンがクリックされた時のイベント:波形を更新する ''' # TextInputからの入力値を引用する(23'01/09変更) try: self.dsp.record_time = float(self.ids.record_time.text) self.dsp.frame_size = int(self.ids.frame_size.text) except ValueError: print('record time or frame size could not convert to type of number.') return # 測定時間が指定範囲外であれば測定をしない min_time = 0 max_time = 60 if self.dsp.record_time > min_time and self.dsp.record_time <= max_time: pass else: print('Error:Record time is out of range!') return # フレームサイズ範囲外であれば測定をしない min_frame_size = 0 max_frame_size = 10000 if self.dsp.frame_size > min_frame_size and self.dsp.frame_size <= max_frame_size: pass else: print('Error:Frame size is out of range!') return # マイクチャンネルを自動で取得(最初のマイクを使用する) mic_ch = self.dsp.get_mic_index()[0] # 録音する print('Record time[s]=', self.dsp.record_time) print('Sampling rate[Hz]=', self.dsp.sampling) print('Frame size=', self.dsp.frame_size) self.dsp.time_y, self.dsp.time_x = self.dsp.record(mic_ch, self.dsp.sampling, self.dsp.frame_size, self.dsp.record_time) # フーリエ変換する self.dsp.dt = self.dsp.time_x[1] self.dsp.length = len(self.dsp.time_x) * self.dsp.dt self.fft() # 再描画 self.reflesh_plot() return def reflesh_plot(self): ''' 再描画を行う ''' # RuntimeWarningを防ぐ:メモリの効率化用。更新時にこれを入れないとメモリWarningがでる。 plt.close() # figの再作成 if self.ids.display_mode.state == 'down': # FFT waveform self.fig = self.plot(self.dsp.fft_axis, self.dsp.fft_mean) else: # Time waveform self.fig = self.plot(self.dsp.time_x, self.dsp.time_y) # 値の更新(ウィジェットを削除して新しく追加している。ボタンも一緒に消して再度addしている。なんかもっと良い方法が??) # PageLayoutの構成を変更した場合はchildrenの設定を変更する temp_button = self.children[0].children[2].children[1] self.children[0].children[2].clear_widgets() self.ids.viewer.add_widget(temp_button) self.ids.viewer.add_widget(FigureCanvasKivyAgg(self.fig)) return def change_record_time(self): ''' Record timeが変更された時のイベント:時間軸の範囲をセットしなおす ''' self.ids.min_time.text = '0' self.ids.max_time.text = self.ids.record_time.text def change_axis(self): ''' 軸設定が変更された時のイベント:無効値のチェックとプロット再描画 ''' if self.ids.display_mode.state == 'down': # Display mode = FFT waveform try: min_freq = float(self.ids.min_freq.text) max_freq = float(self.ids.max_freq.text) except ValueError: print('There are not float number in axis settings.') return if max_freq == 0: print('max_freq is zero.') return if min_freq >= max_freq: print('min_freq is larger than max_freq.') return else: # Display mode = Time waveform try: min_time = float(self.ids.min_time.text) max_time = float(self.ids.max_time.text) except ValueError: print('There are not float number in axis settings.') return if max_time == 0: print('max_time is zero.') return if min_time >= max_time: print('min_time is larger than max_time.') return # 再描画 self.reflesh_plot() return def change_display_mode(self): ''' Display modeを変更した時のイベント:テキストの切替と再描画 ''' if self.ids.display_mode.state == 'down': self.ids.display_mode.text = 'FFT waveform' else: self.ids.display_mode.text = 'Time waveform' print(self.ids.display_mode.state) # 再描画 self.reflesh_plot() return def change_peak_info(self): ''' Peak infoボタンを変更した時のイベント:テキストの切替と再描画 ''' if self.ids.peak_info.state == 'down': self.ids.peak_info.text = 'ON' else: self.ids.peak_info.text = 'OFF' print(self.ids.peak_info.state) # 再描画 self.reflesh_plot() return def fft(self): ''' フーリエ変換 ''' # TextInputからの入力値を引用する try: self.dsp.overlap = float(self.ids.overlap.text) except ValueError: print('Overlap value could not convert to type of number.') return # 時間波形をオーバーラップ処理する。 time_array, final_time = self.dsp.calc_overlap() # オーバーラップができない場合は処理を終わらせる。 if len(time_array[0]) == 0: print('Overlap calculation was ignored. Frame size was larger than data length.') return False # オーバーラップ処理した波形リストに窓間数をかける。 time_array = self.dsp.hanning(time_array) # 平均化フーリエ変化をする。 self.dsp.fft(time_array) # dB変換する(dBrefは2e-5固定)。 self.dsp.fft_mean = self.dsp.db(self.dsp.fft_mean, self.dsp.dbref) # A特性をかける。 self.dsp.fft_mean = self.dsp.fft_mean + self.dsp.aweightings(self.dsp.fft_axis) # ピーク検出をする。 self.dsp.findpeaks(self.dsp.fft_axis, self.dsp.fft_mean, self.dsp.num_peaks, self.dsp.find_peak_order) return True def plot(self, x, y): ''' プロットする共通のメソッド ''' # フォントの種類とサイズを設定する。 plt.rcParams['font.size'] = 20 plt.rcParams['font.family'] = 'Times New Roman' # 目盛を内側にする。 plt.rcParams['xtick.direction'] = 'in' plt.rcParams['ytick.direction'] = 'in' # Subplot設定とグラフの上下左右に目盛線を付ける。 self.fig = plt.figure() self.ax1 = self.fig.add_subplot(111) self.ax1.yaxis.set_ticks_position('both') self.ax1.xaxis.set_ticks_position('both') # スケールを設定する。TextInputから値を参照。 try: min_freq = float(self.ids.min_freq.text) max_freq = float(self.ids.max_freq.text) min_time = float(self.ids.min_time.text) max_time = float(self.ids.max_time.text) except ValueError: print('Axis region value could not converted to type of number.') print('Process was ignored.') if self.ids.display_mode.state == 'down': # FFT waveform self.ax1.set_xlim(min_freq, max_freq) # 軸のラベルを設定する。 self.ax1.set_xlabel('Frequency[s]') self.ax1.set_ylabel('Amplitude[dBA]') else: # Time waveform self.ax1.set_xlim(min_time, max_time) # 軸のラベルを設定する。 self.ax1.set_xlabel('Time[s]') self.ax1.set_ylabel('Amplitude') # 縦軸設定 if len(self.dsp.fft_mean) == 0: min_amp = 0 max_amp = 100 else: min_amp = np.mean(self.dsp.fft_mean) - 10 max_amp = np.max(self.dsp.fft_mean) + 20 if self.ids.display_mode.state == 'down': # FFT waveform if len(self.dsp.fft_mean) != 0: self.ax1.set_ylim(min_amp, max_amp) #self.ax1.set_yscale('log') else: # Time waveform pass # 波形のプロット self.ax1.plot(x, y, lw=1) # Peak infoボタンがONの時だけピークをプロット if self.ids.peak_info.state == 'down': # 周波数波形の場合はピークのプロット(重ね書き) if self.ids.display_mode.state == 'down': # FFT waveform n = len(self.dsp.peak_index_sort) for i in range(n): self.ax1.scatter(self.dsp.peak_index_sort[n - i - 1], self.dsp.peaks_sort[n - i - 1], color='yellow', edgecolor='black', s=100) self.ax1.text(min_freq + 100, min_amp + (i + 2) * 4, '(' + str(np.round(self.dsp.peak_index_sort[n - i - 1], 1)) + ', ' + str(np.round(self.dsp.peaks_sort[n - i - 1], 1)) + ')', fontsize=14) # レイアウト設定 self.fig.tight_layout() return self.fig class MyApp(App): title = 'WATLAB FFT' # デスクトップ環境上でモバイル画面の比率を検証するためにサイズ設定をしている(正式には外す) ratio = 16 / 9 w = 400 Window.size = (w, w * ratio) def build(self): return Root() if __name__ == '__main__': MyApp().run() |
my.kv
kvファイルを以下に示します。このファイルは各種ウィジェットをウィジェットツリーとして構成しています。今回はPageLayoutをルートウィジェットとし、BoxLayoutで各ButtonやLabel、TextInput、ToggleButtonを格納する構成です。
イベントに関連するウィジェットにはidの設定を行い、Pythonファイルで呼び出して制御します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
#:kivy 2.1.0 <Root>: PageLayout: BoxLayout: id:viewer orientation:'vertical' size:root.width, root.height Button: id:record_button text:'REC' size_hint_y:0.2 size_hint_x:1.0 on_release: self.state = 'down' root.on_button() self.state = 'normal' BoxLayout: orientation:'vertical' size:root.width, root.height canvas.before: Color: rgba:0, 0, 0, 0.9 Rectangle: pos:self.pos size:self.size BoxLayout: orientation:'horizontal' size_hint_y:0.25 Label: text:'Min freq.[Hz]' font_size:36 TextInput: id:min_freq text:'0' font_size:50 halign: 'center' input_filter:'float' padding_y:[50,0] on_text:root.change_axis() BoxLayout: orientation:'horizontal' size_hint_y:0.25 Label: text:'Max freq.[Hz]' font_size:36 TextInput: id:max_freq text:'5000' font_size:50 halign: 'center' input_filter:'float' padding_y:[50,0] on_text:root.change_axis() BoxLayout: orientation:'horizontal' size_hint_y:0.25 Label: text:'Min Time[s]' font_size:36 TextInput: id:min_time text:'0' font_size:50 halign: 'center' input_filter:'float' padding_y:[50,0] on_text:root.change_axis() BoxLayout: orientation:'horizontal' size_hint_y:0.25 Label: text:'Max Time[s]' font_size:36 TextInput: id:max_time text:'1' font_size:50 halign: 'center' input_filter:'float' padding_y:[50,0] on_text:root.change_axis() BoxLayout: orientation:'vertical' size:root.width, root.height canvas.before: Color: rgba:0, 0, 0.3, 0.9 Rectangle: pos:self.pos size:self.size BoxLayout: orientation:'horizontal' size_hint_y:0.2 Label: text:'Record time[s]' font_size:36 TextInput: id:record_time text:'1' font_size:50 halign: 'center' input_filter:'float' padding_y:[50,0] on_text: root.change_record_time() BoxLayout: orientation:'horizontal' size_hint_y:0.2 Label: text:'Frame size' font_size:36 TextInput: id:frame_size text:'1024' font_size:50 halign: 'center' input_filter:'int' padding_y:[50,0] BoxLayout: orientation:'horizontal' size_hint_y:0.2 Label: text:'Overlap[%]' font_size:36 TextInput: id:overlap text:'75' font_size:50 halign: 'center' input_filter:'float' padding_y:[50,0] BoxLayout: orientation:'horizontal' size_hint_y:0.2 Label: text:'Display Mode' font_size:36 ToggleButton: id:display_mode size_hint_y:0.5 pos_hint: {'x': 0.5, 'y': 0.25} text:'FFT waveform' state:'down' on_release: root.change_display_mode() BoxLayout: orientation:'horizontal' size_hint_y:0.2 Label: text:'Peak info.' font_size:36 ToggleButton: id:peak_info size_hint_y:0.5 pos_hint: {'x': 0.5, 'y': 0.25} text:'ON' state:'down' on_release: root.change_peak_info() |
dspToolkit.py
dspToolkit.pyはWATLABブログで作成した各種信号処理(Digital Signal Processing)関連のメソッドをまとめたファイルです。このファイルで定義しているDspToolkitクラスをmain.pyの中でインスタンス化して使うことでmain.pyの構成をシンプルにしています。
「アプリ完成編:wxPythonで信号処理のGUIアプリをつくろう⑥」の記事で使ったものがありますが、今回全て使うわけではありません。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
import soundfile as sf import numpy as np import pandas as pd from scipy import fftpack from scipy import signal import pyaudio class DspToolkit(): def __init__(self): '''コンストラクタ ''' # 時間波形関係の変数 self.time_y = [] self.time_y_original = [] self.time_x = [] self.sampling = 44100 self.dt = 0 self.length = 0 self.record_time = 0 # フーリエ変換で使う変数 self.frame_size = 1024 self.overlap = 0 self.dbref = 2e-5 self.averaging = 0 self.fft_axis = [] self.fft_mean = [] self.fft_array = [] self.acf = 0 # フィルタ処理で使う変数 self.lp_freq = 1000 self.hp_freq = 1000 self.bp_freq_low = 500 self.bp_freq_high = 1000 self.bs_freq_low = 500 self.bs_freq_high = 1000 self.attenuation_pass = 3 self.attenuation_stop = 40 # ピーク検出で使う変数 self.num_peaks = 10 self.find_peak_order = 3 self.peak_index = [] self.peaks = [] self.peak_index_sort = [] self.peaks_sort = [] def open_wav(self, path): ''' パスを受け取ってwavファイルを読み込む ''' self.time_y, self.sampling = sf.read(path) self.time_y_original = self.time_y.copy() self.get_time_information() def open_csv(self, path): ''' パスを受け取ってcsvファイルを読み込む ''' df = pd.read_csv(path, encoding='SHIFT-JIS') self.time_y = df.T.iloc[1] self.time_y_original = self.time_y.copy() self.sampling = 1 / df.T.iloc[0, 1] self.get_time_information() def get_time_information(self): ''' Time plotの表示器に表示させる情報の計算と時間軸作成を行う ''' self.dt = 1 / self.sampling self.time_x = np.arange(0, len(self.time_y), 1) * self.dt self.length = len(self.time_x) * self.dt print('Time waveform information was obtained.') def calc_overlap(self): ''' 時間波形をオーバーラップ率で切り出してリスト化する ''' frame_cycle = self.frame_size / self.sampling x_ol = self.frame_size * (1 - (self.overlap / 100)) self.averaging = int((self.length - (frame_cycle * (self.overlap / 100))) / (frame_cycle * (1 - (self.overlap / 100)))) time_array = [] final_time = 0 if self.averaging != 0: for i in range(self.averaging): ps = int(x_ol * i) time_array.append(self.time_y[ps:ps+self.frame_size:1]) final_time = (ps + self.frame_size) / self.sampling print('Frame size=', self.frame_size) print('Frame cycle=', frame_cycle) print('averaging=', self.averaging) return time_array, final_time return time_array, final_time def hanning(self, time_array): ''' ハニング窓をかけ振幅補正係数ACFを計算する ''' han = signal.hann(self.frame_size) self.acf = 1 / (sum(han) / self.frame_size) # オーバーラップされた複数時間波形全てに窓関数をかける for i in range(self.averaging): time_array[i] = time_array[i] * han return time_array def fft(self, time_array): ''' 平均化フーリエ変換をする ''' fft_array = [] for i in range(self.averaging): # FFTをして配列に追加、窓関数補正値をかけ、(Fs/2)の正規化を実施。 fft_array.append(self.acf * np.abs(fftpack.fft(np.array(time_array[i])) / (self.frame_size / 2))) # 全てのFFT波形のパワー平均を計算してから振幅値とする。 self.fft_axis = np.linspace(0, self.sampling, self.frame_size) self.fft_array = np.array(fft_array) self.fft_mean = np.sqrt(np.mean(self.fft_array ** 2, axis=0)) def db(self, x, dBref): ''' dB変換をする ''' y = 20 * np.log10(x / dBref) return y def idb(self, x, dBref): ''' dB逆変換をする ''' y = dBref * np.power(10, x / 20) return y def aweightings(self, f): ''' A補正(聴感補正)曲線の計算 ''' if f[0] == 0: f[0] = 1e-6 ra = (np.power(12194, 2) * np.power(f, 4)) / \ ((np.power(f, 2) + np.power(20.6, 2)) * np.sqrt((np.power(f, 2) + np.power(107.7, 2)) * (np.power(f, 2) + np.power(737.9, 2))) * (np.power(f, 2) + np.power(12194, 2))) a = 20 * np.log10(ra) + 2.00 return a def filter(self, filter_type): ''' フィルタをかける ''' # ナイキスト周波数fnを設定して通過域端周波数wpと阻止域端周波数wsを正規化 # 阻止域周波数は通過域周波数の2倍にしている仕様(ここは目的に応じて変えても良い) fn = self.sampling / 2 # Lowpassフィルタ if filter_type == 'low': wp = self.lp_freq / fn ws = (self.lp_freq * 2) / fn try: # フィルタ次数とバタワース正規化周波数を計算 N, Wn = signal.buttord(wp, ws, self.attenuation_pass, self.attenuation_stop) # フィルタ伝達関数の分子と分母を計算 b, a = signal.butter(N, Wn, filter_type) self.time_y = signal.filtfilt(b, a, self.time_y) except ValueError: return -1 # Highpassフィルタ if filter_type == 'high': wp = self.hp_freq / fn ws = (self.hp_freq * 2) / fn try: # フィルタ次数とバタワース正規化周波数を計算 N, Wn = signal.buttord(wp, ws, self.attenuation_pass, self.attenuation_stop) # フィルタ伝達関数の分子と分母を計算 b, a = signal.butter(N, Wn, filter_type) self.time_y = signal.filtfilt(b, a, self.time_y) except ValueError: return -1 # bandpassフィルタ if filter_type == 'band': fp = np.array([float(self.bp_freq_low), float(self.bp_freq_high)]) fs = np.array([float(self.bp_freq_low)/2, float(self.bp_freq_high)*2]) wp = fp / fn ws = fs / fn try: # フィルタ次数とバタワース正規化周波数を計算 N, Wn = signal.buttord(wp, ws, self.attenuation_pass, self.attenuation_stop) # フィルタ伝達関数の分子と分母を計算 b, a = signal.butter(N, Wn, filter_type) self.time_y = signal.filtfilt(b, a, self.time_y) except ValueError: return -1 # bandstopフィルタ if filter_type == 'bandstop': fp = np.array([float(self.bs_freq_low), float(self.bs_freq_high)]) fs = np.array([float(self.bs_freq_low) / 2, float(self.bs_freq_high) * 2]) wp = fp / fn ws = fs / fn try: # フィルタ次数とバタワース正規化周波数を計算 N, Wn = signal.buttord(wp, ws, self.attenuation_pass, self.attenuation_stop) # フィルタ伝達関数の分子と分母を計算 b, a = signal.butter(N, Wn, filter_type) self.time_y = signal.filtfilt(b, a, self.time_y) except ValueError: return -1 return 0 def get_mic_index(self): ''' マイクチャンネルのindexをリストで取得する ''' # 最大入力チャンネル数が0でない項目をマイクチャンネルとしてリストに追加 pa = pyaudio.PyAudio() mic_list = [] for i in range(pa.get_device_count()): num_of_input_ch = pa.get_device_info_by_index(i)['maxInputChannels'] if num_of_input_ch != 0: mic_list.append(pa.get_device_info_by_index(i)['index']) return mic_list def record(self, index, samplerate, fs, time): ''' 録音する関数 ''' pa = pyaudio.PyAudio() # ストリームの開始 data = [] dt = 1 / samplerate stream = pa.open(format=pyaudio.paInt16, channels=1, rate=samplerate, input=True, input_device_index=index, frames_per_buffer=fs) # フレームサイズ毎に音声を録音していくループ for i in range(int(((time / dt) / fs))): frame = stream.read(fs) data.append(frame) # ストリームの終了 stream.stop_stream() stream.close() pa.terminate() # データをまとめる処理 data = b"".join(data) # データをNumpy配列に変換/時間軸を作成 data = np.frombuffer(data, dtype="int16") / float((np.power(2, 16) / 2) - 1) t = np.arange(0, fs * (i + 1) * (1 / samplerate), 1 / samplerate) return data, t def findpeaks(self, x, y, n, w): ''' 周波数波形の振幅成分からPeakを検出する ''' # ピーク検出 index_all = list(signal.argrelmax(y, order=w)) self.peak_index = [] self.peaks = [] # n個分のピーク情報(指標、値)を格納 for i in range(n): # n個のピークに満たない場合は途中でループを抜ける(エラー処理) if i >= len(index_all[0]): break self.peak_index.append(index_all[0][i]) self.peaks.append(y[index_all[0][i]]) # 個数の足りない分を0で埋める(エラー処理) if len(self.peak_index) != n: self.peak_index = self.peak_index + ([0] * (n - len(self.peak_index))) self.peaks = self.peaks + ([0] * (n - len(self.peaks))) # xの分解能x[1]をかけて指標を物理軸に変換 self.peak_index = np.array(self.peak_index) * x[1] # ピークを大きい順(降順)にソートする self.peaks_sort = np.sort(self.peaks)[::-1] self.peak_index_sort = self.peak_index[np.argsort(self.peaks)[::-1]] return |
アプリの使い方とコードとの対応(動画多数)
画面構成とウィジェット構成
画面構成図を下に示します。本アプリ(WATLAB FFT)は3画面構成です。メイン画面は録音ボタンとmatplotlibのFigureが表示され、この画面が初期画面となります。
右にスワイプすると画面が切り替わり、2ページ目は軸設定画面です。このテキスト値を変更することで、プロットの軸の範囲が変更されます。テキストはfloat型に変換可能な文字しか入力を受け付けません。縦軸はオートスケールの仕様です。
さらに右にスワイプした3ページ目は分析条件設定とモード切り替え画面です。
スワイプ動作例の動画がこちら。右にスワイプして切り替えた画面は左にスワイプして戻ることができます。
各ページのウィジェット図と、設定があるものはid値を以下に示します。上記kvファイル(my.kv)と見比べることでkvファイルの学習にも使えると思いますのでご確認ください。
FigureCanvasKivyAgg(self.fig)はPythonコード上でadd.widgetするので、kvファイルにはありません。位置指定はBoxLayoutのid値を使い、self.ids.viewer.add_widget(widget)と書きます。
各LabelとTextInputの縦サイズはsize_hint_yを使って全体が1になるよう比率で指定します。こうすることで画面サイズを変更したとしても比率を維持できます。
PageLayoutでスワイプ機能を実装していますが、画面遷移先の背景が設定されていないと前の画面が見えてしまいます。これはcanvas.beforeを使ってウィジェットの後に背景を描画することで見やすくしました。背景色はrgbaで与え、アルファチャンネルを持つため若干の透明度を持つようにしてみました(これは好み)。
ToggleButtonはモード切り替えのために用意しましたが、これは通常のButtonと比べると押された時の状態を維持できるという違いがあります。アナログのボタンでたとえるとButtonがポチっと押して指を離すと戻るやつで、ToggleButtonがカチっと押して戻ってこないやつです(わかりにくい?)。
続いて機能の説明をします。
録音データの周波数波形とピーク値を確認する
RECボタンをタップするとmy.kvのon_releaseが検知され、設定した測定時間、フレームサイズ、オーバーラップ率に従ってPyAudioによる測定とSciPyによるフーリエ変換がmain.pyのon_button()により実行されます。各設定値の意味は以下のとおりです。
- 測定時間:Record time[s]
指定した時間長分データが取得されます。 - フレームサイズ:Frame size
録音時は1ループで取得するデータ数(チャンク)として、平均化フーリエ変換時は時間波形を切り出すデータ点数(フレームサイズ)として作用します。チャンクや平均化フーリエ変換については以下の2つの記事に詳細を記載しましたので、意味についてはそちらを参照してください。
・PythonのPyAudioで音声録音する簡単な方法
・PythonでFFT実装!SciPyのフーリエ変換まとめ - オーバーラップ率:Overlap[%]
平均化フーリエ変換をする際にフレームサイズでデータを切り出しますが、切り出す時の始点をずらして(オーバーラップさせて)フレーム数を多くとったり、ハニング窓の悪影響を小さくしたりします。デフォルトは75[%]です。
オーバーラップについては以下の記事に図解をまとめました。
・PythonでFFTをする前にオーバーラップ処理をしよう!
録音ボタンをタップして周波数波形を得た例を動画で示します。matplotlibのFigureにデータが表示されますが、デフォルトではピーク検出結果としてピーク点が重ね書きされます。さらにピークの情報(周波数, 振幅のデシベル値)がプロットの左下に大きい順で記載されるようにしました。
ピーク情報は最大10個検出されます。
ピーク検出アルゴリズムは以下の記事のとおり、SciPyのsignal.argrelmax()を使っています(DspToolkitクラスのfindpeaks())。orderは3固定という仕様ですが、もし読者の方でカスタマイズするのであればTextInputから入力できるようにすると面白いかも知れません。
・PythonでFFT波形から任意個数のピークを自動検出する方法
ピーク情報のON/OFF切り替え
密集したピーク情報はプロットを見にくくさせてしまいます。また、画面キャプチャをするのに点やテキストが邪魔な時もあるでしょう。その場合はPeak info.をOFFにすることで非表示にできます。また、再びONにすれば再描画がされます。
この動作はchange_peak_info()で実装しています。
時間波形の表示
本アプリのメインは周波数分析ですが、Display Modeをタップしてボタンの表示をTime waveformにすることで時間波形の表示も可能です。時間波形にすることで時間に対する振幅の変動成分を確認できます。
再度Display ModeをタップするとFFT waveformになり、いつでも周波数波形に戻すことができます。
これらの動作はchange_display_mode()で実装しています。
軸範囲の変更
周波数軸の範囲変更
周波数波形の軸範囲はMin freq[Hz]とMax freq[Hz]で変更可能です。Min freqがMax freqより大きくなったり、Max freqに0が指定されたりする場合は変更が反映されません。
軸範囲変更は周波数波形と時間波形で共通のchange_axis()で実装しました。
時間軸の範囲変更と測定時間変更時の動作
時間波形も軸範囲をMin Time[s]とMax Time[s]で変更可能です。先ほどと同様に無効な指定をすると変更が反映されません。また、測定時間を変更するとkvファイル内のon_textがトリガーとなりPythonファイルのchange_record_time()が実行され、Max Timeの値が測定時間に自動変更、Min Timeも0にセットされます。
周波数分解能の変更(フレームサイズの変更)
録音時にフレームサイズの変更ができることを上記で説明しましたが、フレームサイズの変更により周波数分解能が変化します。
より細かい周波数分解能を得たい場合はフレームサイズを大きくしてください。ただし、フレームサイズは測定時間とサンプリングレート(このアプリはサンプリングレートが44100[Hz]で固定)で決まる総データ点数より小さくしなければいけません。
画面サイズの変更について
本アプリは縦長画面を想定していますが、スマホを横にして横長画面にした時でもなんとかピーク情報の確認や設定ができるようなサイズ感を目指しました。スマホを想定する場合、あまりたくさんの設定を追加すると画面レイアウトが崩れそうなので要注意です。
そしてFigureCanvasKivyAgg(self.fig)やkivyの各種ウィジェットは動的なサイズ変更にも対応しています。是非自分の使いやすいサイズを見つけて自分だけのツールにカスタマイズしてみてください。
まとめ
今回はこれまでの記事で学んだ知識を使って(まぁほぼ偉大な先人達が蓄積したライブラリの力ですが…)なんちゃってFFTアナライザを作ってみました。
自分が欲しいと思う機能(ピーク検出、時間波形と周波数波形の確認、分析条件の変更)をできるだけ絞ってシンプルな構成にしたつもりです。
おそらくフィルタまでかけようとすると操作がちょっと難解になりそうなのでこの辺でやめました。
また、matplotlibはsubplotの設定で上下に時間波形と周波数波形を並べるといったこともできますが、モバイルを想定した小さい画面だと非常に小さくなってしまうので「切り替え」という手段を選択しました。
wavファイル保存やスペクトログラム表示くらいは実装しても良かったかも知れません(すぐできそう)。ただ、ファイルを仮にiPhoneに保存したとするとどこに保存されるんだろうという謎がまだ未調査です。
環境構築さえ整ってしまえば記事内3つのコードをコピペするだけで、誰でもお手元のPCで本アプリが使えるはずです。そのためこの記事では使い方や何の動作がどこのコード(メソッド)と対応しているかを重視した取扱説明書のような構成にしてみました。
このあとはパッケージングにもトライしたいですが、おそらく実行ファイル化やアプリ化の段階でビルドできないというエラーに悩まされると思います(多くの方がそこで挫折している模様)。果たして一介の機械系エンジニアはその壁を乗り越えることができるのか…。続く。
実行ファイル化
デスクトップアプリ
まずはデスクトップアプリとして実行ファイルにすることができました!以下の記事を参考にすることでおそらく読者の皆様も同じようにつくることができると思います。
kivyで作ったアプリをNuitkaでexe化する時のエラー対処例
コンパクトな構成ながらも何とか形になりました!やっぱりアプリ制作は楽しいですね!
Twitterでも関連情報をつぶやいているので、wat(@watlablog)のフォローお待ちしています!