Pythonによる信号処理のGUIアプリ作成挑戦記第3弾は「周波数波形編」です。読み込んだ時間波形に対し平均化フーリエ変換を施す処理をGUIで行います。今回は設定の読み込み、変更処理や例外処理も含めて実装する方法を紹介します。
こんにちは。wat(@watlablog)です。引き続き信号処理のGUIアプリを作っていきます!今回はフーリエ変換部分を実装します!
バックナンバー
この記事は以下の記事の内容を前提とした続きものです。
是非こちらの記事も読んでみてください。
①アプリ構想とフレーム構築編
「フレーム構築編:wxPythonで信号処理のGUIアプリをつくろう①」の記事で信号処理のGUIアプリを作り始めました。
この記事はどんなコンセプトでアプリを作るのか、各GUIをどうやって作っていくのか、開発環境や実際に作ったGUIコードを紹介しています。
②波形読み込み編
「波形読み込み編:wxPythonで信号処理のGUIアプリをつくろう②」の記事ではファイル読み込み部分を実装しました。ボタンによるファイル選択、拡張子制限、matplotlibへのデータ渡しといったコードを紹介しています。
この記事でやること
平均化フーリエ変換をGUIコードへ実装する
本記事はフーリエ変換部分を実装します。
フーリエ変換は「PythonでFFT実装!SciPyのフーリエ変換まとめ」で紹介した平均化フーリエ変換を使います。
平均化フーリエ変換は時間波形全範囲を周波数変換するのではなく、時間波形をあるフレーム区間でオーバーラップしながら切り出して周波数領域で平均化する処理です。
オーバーラップ処理は「PythonでFFTをする前にオーバーラップ処理をしよう!」で図解した下図を参考にしてください。
また、平均化フーリエ変換の処理フローを以下に示します。今回はこの流れを念頭に置きつつGUI化しましょう。
GUI上のイメージはこちら。
GUI特有の例外処理を実装する
CUIプログラムとして自分だけで使っている時は、その都度出力ウィンドウに表示されるエラーメッセージを参考にデバッグが可能です。
しかしGUIアプリはユーザが自由に設定を変更し計算を行い、かつ出力ウィンドウは基本見ません。
そのため、プログラマ側で想定される例外処理を行う必要があります。
今回は計算部分を実装しますが、計算アルゴリズム自体は当ブログで既に扱った既知の内容です。今回はこの例外処理を実装する部分が新しい内容です。
編集するファイル
前回と同様に、今回も編集するファイルはイベントを記述するwlFrontPanel.pyと計算部分を記述するdspToolkit.pyです。
ファイル構成のイメージ図を下図に再掲します。
それでは実際にコードを書いてから詳細を見ていきましょう!
フーリエ変換をGUIで行うPythonコード
dspToolkit.py
dspToolkit.pyには「PythonでFFT実装!SciPyのフーリエ変換まとめ」で紹介した以下のメソッドを追記します。
def calc_overlap(self):
オーバーラップ処理をするメソッドです。詳細は「PythonでFFTをする前にオーバーラップ処理をしよう!」をご覧ください。
def hanning(self, time_array):
ハニング窓(もしくはハン窓とも呼ぶ)という窓関数をかけるメソッドです。窓関数には色々な種類がありますが、今回はハニング窓一択のシンプルなアプリとします。
窓関数については「PythonでFFT!SciPyで窓関数をかける」の記事をご覧ください。
また、窓関数を使う場合は補正係数も重要です。振幅補正係数については「窓関数使用時の補正!FFTの時に忘れがちな計算とは?」の記事を参照ください。
def fft(self, time_array):
フーリエ変換は自由なフレームサイズをユーザが指定して行います。\(2^{n}\)のデータ点数であればFFT(高速フーリエ変換)、それ以外であればDFT(離散フーリエ変換)を行います。
def db(self, x, dBref):
デシベル(dB)変換をするメソッドです。振動データはリニアスケールだと振幅視認性の悪い場合が多く、対数スケールであるdB値に変換することが一般的です。dB変換については「Pythonで音圧のデシベル(dB)変換式と逆変換式!」に詳細を記載していますので、是非参考にしてください。
全コード
コピペできるように、dspToolkit.pyの全文を以下に示します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
import soundfile as sf import numpy as np import pandas as pd from scipy import fftpack from scipy import signal class DspToolkit(): def __init__(self): '''コンストラクタ''' # 時間波形読み込みで使う変数 self.time_y = np.array(0) self.time_x = np.array(0) self.sampling = 0 self.dt = 0 self.length = 0 # フーリエ変換で使う変数 self.frame_size = 1024 self.overlap = 0 self.dbref = 2e-5 self.averaging = 0 self.fft_axis = np.array(0) self.fft_mean = np.array(0) self.fft_array = np.array(0) self.acf = 0 def open_wav(self, path): '''パスを受け取ってwavファイルを読み込む''' self.time_y, self.sampling = sf.read(path) self.get_time_information() def open_csv(self, path): '''パスを受け取ってcsvファイルを読み込む''' df = pd.read_csv(path, encoding='SHIFT-JIS') self.time_y = df.T.iloc[1] self.sampling = 1 / df.T.iloc[0, 1] self.get_time_information() def get_time_information(self): '''Time plotの表示器に表示させる情報の計算と時間軸作成を行う''' self.dt = 1 / self.sampling self.time_x = np.arange(0, len(self.time_y), 1) * self.dt self.length = len(self.time_x) * self.dt print('Time waveform information was obtained.') def calc_overlap(self): '''時間波形をオーバーラップ率で切り出してリスト化する''' frame_cycle = self.frame_size / self.sampling x_ol = self.frame_size * (1 - (self.overlap / 100)) self.averaging = int((self.length - (frame_cycle * (self.overlap / 100))) / (frame_cycle * (1 - (self.overlap / 100)))) time_array = [] if self.averaging != 0: for i in range(self.averaging): ps = int(x_ol * i) time_array.append(self.time_y[ps:ps+self.frame_size:1]) print('Frame size=', self.frame_size) print('Frame cycle=', frame_cycle) print('averaging=', self.averaging) return time_array def hanning(self, time_array): '''ハニング窓をかけ振幅補正係数ACFを計算する''' han = signal.hann(self.frame_size) self.acf = 1 / (sum(han) / self.frame_size) # オーバーラップされた複数時間波形全てに窓関数をかける for i in range(self.averaging): time_array[i] = time_array[i] * han return time_array def fft(self, time_array): '''平均化フーリエ変換をする''' fft_array = [] for i in range(self.averaging): # FFTをして配列に追加、窓関数補正値をかけ、(Fs/2)の正規化を実施。 fft_array.append(self.acf * np.abs(fftpack.fft(np.array(time_array[i])) / (self.frame_size / 2))) # 全てのFFT波形のパワー平均を計算してから振幅値とする。 self.fft_axis = np.linspace(0, self.sampling, self.frame_size) self.fft_array = np.array(fft_array) self.fft_mean = np.sqrt(np.mean(self.fft_array ** 2, axis=0)) def db(self, x, dBref): '''dB変換をする''' y = 20 * np.log10(x / dBref) return y |
wlFrontPanel.py
wlFrontPanel.pyのイベント挙動で先ほど書いたメソッドを使います。
今回コンストラクタの編集はせず、以下のメソッドを追記しただけです。
def clear_freq_plot(self):
オーバーラップ処理ができない時や数値制御器(textCtrl)が数値でないといった場合は時間波形は読み込むことができますがフーリエ変換はできません。
そんな時、ただエラー回避しただけだと前の周波数波形が残っている状態になりユーザが誤解するため、周波数波形をクリアにするメソッドを用意しました。
def freq_data_plot(self):
フーリエ変換のメインメソッドです。
最初の「try:, except:」はフレームサイズ、オーバーラップ率、dBrefが数値でない場合に発生するValueErrorを捉える例外処理です。
ValueErrorが発生した場合は即returnでこのメソッドを終わらせています。
念のためprintでメッセージを残していますが、これは開発者の動作チェック用です。もしエラーを使って分岐処理をする場合は、return時にタプルで作成したcodeを返すことで拡張可能と思います。
フレームサイズが全時間長よりも長い場合はオーバーラップができず、エラーになる場合が考えられます。
オーバーラップのメソッドoverlapの戻り値にtime_arrayを設定しており、周波数平均化回数が1未満(つまりintで0)の場合は空リスト「[ ]」が来るようにしています。
このtime_arrayが空であれば先ほどの例外処理と同じようにreturnでメソッドを終了するようにしてみました。
正常に計算ができる場合は最後まで行い、周波数波形をプロットして終了します。
もし開発中にテストデータを使ってその他のエラーが発生した場合は、上記のような例外処理を随時入れていくことでロバスト性の高いアプリになることでしょう。
def Button_open_wavOnButtonClick(self, event):
wavファイルを読み込むこのメソッドは時間波形読み込み時に中身を書いたものですが、最後に「self.freq_data_plot()」を実行することでフーリエ変換を同時に行うようにしました。
def Button_open_csvOnButtonClick(self, event):
csvファイルを読み込むこちらのメソッドもwavの時と同様に最後にフーリエ変換メソッドを追記しました。
def textCtrl_****OnText(self, event):
textCtrl(FrameSize, Overlap, dBref)のOnTextには数値が変更された場合、変数に値をセットする処理を書いています。
例外処理にはValueErrorを検出するようにしていますが、ここでは出力ウィンドウにエラー文を発生させないことだけのために書いており、エラー処理自体は上記freq_data_plotに任せています。
def Button_re_calcOnButtonClick(self, event):
アプリのフロントパネルに配置している「Re:Calculation」のイベントメソッドです。ユーザがフーリエ変換に関する設定変更をした時に再度計算をする場合に使います。
例外処理は上に記載したメソッドで既に十分であるため、ここでは計算のメソッドを一回呼びだすだけにしました。
全コード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
"""Subclass of FrontPanel, which is generated by wxFormBuilder.""" import wx import numpy as np import MainFrame from dspToolkit import DspToolkit # Implementing FrontPanel class WlFrontPanel(MainFrame.FrontPanel): def __init__(self, parent): MainFrame.FrontPanel.__init__(self, parent) # 初期化 # ファイル self.input_file = '' # 信号処理クラスのインスタンス化 self.dsp = DspToolkit() # ファイルダイアログの戻り値定数(条件分岐用) self.FLAG_OPEN = 5100 self.FLAG_CANCEL = 5101 self.FLAG_YES = 5103 self.FLAG_NO = 5104 # GUI表示器に初期値を格納 self.textCtrl_FrameSize.SetValue(str(self.dsp.frame_size)) self.textCtrl_Overlap.SetValue(str(self.dsp.overlap)) self.textCtrl_dBref.SetValue(str(self.dsp.dbref)) def file_selector(self, message, extension): '''指定した拡張子のファイルパスをダイアログで取得する''' # ダイアログでファイルオープン open_dialog = wx.FileDialog(None, message, wildcard=extension) # FLAG_OPEN=5100, FLAG_CANCEL=5101 file_ans = open_dialog.ShowModal() if file_ans == self.FLAG_OPEN: print('User selected file open operation.') self.input_file = open_dialog.GetPath() print('Selected path is;', self.input_file) return self.FLAG_OPEN else: print('User canceled file open operation.') return self.FLAG_CANCEL def time_data_plot(self, time_x, time_y, length): '''時間波形をプロットしてタブをTime plot(0)に切り替える''' time_data = self.ax1_time.plot(time_x, time_y, color='red') # オートスケールで軸設定する場合 self.ax1_time.set_xlim(0, length) self.ax1_time.set_ylim(np.min(time_y), np.max(time_y)) self.canvas_timeplot.draw() line_time = time_data.pop(0) line_time.remove() # タブ(wx.Notebook)の切り替え self.Tab.SetSelection(0) def clear_freq_plot(self): '''周波数波形プロットと平均化回数情報をクリアにする''' fft_data = self.ax1_fft.plot(0, 0) self.canvas_fftplot.draw() line_fft = fft_data.pop(0) line_fft.remove() self.textCtrl_Average.SetValue('0') def freq_data_plot(self): '''読み込んだ時間波形に対しフーリエ変換をかけて周波数プロットする''' try: int(self.textCtrl_FrameSize.Value) int(self.textCtrl_Overlap.Value) float(self.textCtrl_dBref.Value) except ValueError: # 設定値が計算に使えない値の時はエラーメッセージを表示し、周波数プロットクリアと表示器の初期化をする。 message = 'Frequency calculation was ignored. Unusable value(s) were entered into frequency calculation setting.' code = (1, message) print(code) self.clear_freq_plot() return # 時間波形をオーバーラップ処理する。 time_array = self.dsp.calc_overlap() # オーバーラップ処理した波形リストに窓間数をかける。 time_array = self.dsp.hanning(time_array) # オーバーラップができない場合は処理を終わらせる。 if time_array == []: message = 'Overlap calculation was ignored. Frame size was larger than data length.' code = 1, message print(code) self.clear_freq_plot() return # 平均化フーリエ変化をする。 self.dsp.fft(time_array) # 周波数波形にプロットする。 if self.checkBox_dBswitch.Value == True: # dBrefチェックボックスがONの時は振幅をdB変換する。 self.dsp.fft_mean = self.dsp.db(self.dsp.fft_mean, float(self.textCtrl_dBref.Value)) fft_data = self.ax1_fft.plot(self.dsp.fft_axis, self.dsp.fft_mean, color='red') # オートスケールで軸設定する場合 self.ax1_fft.set_xlim(0, self.dsp.fft_axis[int((len(self.dsp.fft_axis)-1)/2)]) # ナイキスト周波数までの表示 self.ax1_fft.set_ylim(np.min(self.dsp.fft_mean), np.max(self.dsp.fft_mean)*1.1) self.canvas_fftplot.draw() line_fft = fft_data.pop(0) line_fft.remove() # 平均化回数を表示器に入力する。 self.textCtrl_Average.SetValue(str(self.dsp.averaging)) message = 'Frequency calculation was done successfully.' code = 0, message print(code) return # Handlers for FrontPanel events. def FrontPanelOnClose(self, event): '''ダイアログボックス選択式のウィンドウクローズ''' # Yes/Noダイアログを使ってユーザに選択させる。 message_dialog = wx.MessageDialog(None, 'Do you really want to close window?', 'Window close dialog', wx.YES_NO) ans = message_dialog.ShowModal() # FLAG_YES=5103, FLAG_NO=5104 if ans == self.FLAG_YES: print('User selected YES button to close window.') self.Destroy() def Button_open_wavOnButtonClick(self, event): '''wavファイルを開き時間波形を表示させる''' print('Open.wav was clicked.') # ファイルダイアログを開きwavファイルのパスを取得する。 ans = self.file_selector(message='Select a wav file.', extension='*.wav') # Open操作の時のみ操作を実行する。 if ans == self.FLAG_OPEN: self.dsp.open_wav(self.input_file) # 時間波形情報をTextCtrlに表示する。 self.textCtrl_dt.SetValue(str('{:.4e}'.format(self.dsp.dt))) self.textCtrl_Samplingrate.SetValue(str(self.dsp.sampling)) self.textCtrl_length.SetValue(str(np.round(self.dsp.length, 2))) # 時間波形をプロットする。 self.time_data_plot(self.dsp.time_x, self.dsp.time_y, self.dsp.length) # フーリエ変換をして周波数波形をプロットする。 self.freq_data_plot() def Button_open_csvOnButtonClick(self, event): '''csvファイルを開き時間波形を表示させる''' print('Open.csv was clicked.') # ファイルダイアログを開きwavファイルのパスを取得する。 ans = self.file_selector(message='Select a csv file.', extension='*.csv') # Open操作の時のみ操作を実行する。 if ans == self.FLAG_OPEN: self.dsp.open_csv(self.input_file) # 時間波形情報をTextCtrlに表示する。 self.textCtrl_dt.SetValue(str('{:.4e}'.format(self.dsp.dt))) self.textCtrl_Samplingrate.SetValue(str(self.dsp.sampling)) self.textCtrl_length.SetValue(str(np.round(self.dsp.length, 2))) # 時間波形をプロットする。 self.time_data_plot(self.dsp.time_x, self.dsp.time_y, self.dsp.length) # フーリエ変換をして周波数波形をプロットする。 self.freq_data_plot() def Button_save_wavOnButtonClick(self, event): # TODO: Implement Button_save_wavOnButtonClick pass def Button_save_csvOnButtonClick(self, event): # TODO: Implement Button_save_csvOnButtonClick pass def Button_Save_fft_csvOnButtonClick(self, event): # TODO: Implement Button_Save_fft_csvOnButtonClick pass def textCtrl_FrameSizeOnText(self, event): print('Set frame size.') try: self.dsp.frame_size = int(self.textCtrl_FrameSize.Value) except ValueError: pass def textCtrl_OverlapOnText(self, event): print('Set overlap ratio.') try: self.dsp.overlap = int(self.textCtrl_Overlap.Value) except ValueError: pass def textCtrl_dBrefOnText(self, event): print('Set dB-reference.') try: self.dsp.dbref = float(self.textCtrl_dBref.Value) except ValueError: pass def Button_re_calcOnButtonClick(self, event): self.freq_data_plot() def Button_lpOnButtonClick(self, event): # TODO: Implement Button_lpOnButtonClick pass def textCtrl_lp_freqOnText(self, event): # TODO: Implement textCtrl_lp_freqOnText pass def Button_hpOnButtonClick(self, event): # TODO: Implement Button_hpOnButtonClick pass def textCtrl_hp_freqOnText(self, event): # TODO: Implement textCtrl_hp_freqOnText pass def textCtrl_attenuation_passOnText(self, event): # TODO: Implement textCtrl_attenuation_passOnText pass def textCtrl_attenuation_stopOnText(self, event): # TODO: Implement textCtrl_attenuation_stopOnText pass def Button_bpOnButtonClick(self, event): # TODO: Implement Button_bpOnButtonClick pass def textCtrl_bp_freq_lowOnText(self, event): # TODO: Implement textCtrl_bp_freq_lowOnText pass def textCtrl_bp_freq_highOnText(self, event): # TODO: Implement textCtrl_bp_freq_highOnText pass def Button_bsOnButtonClick(self, event): # TODO: Implement Button_bsOnButtonClick pass def textCtrl_bs_freq_lowOnText(self, event): # TODO: Implement textCtrl_bs_freq_lowOnText pass def textCtrl_bs_freq_highOnText(self, event): # TODO: Implement textCtrl_bs_freq_highOnText pass def Button_original_dataOnButtonClick(self, event): # TODO: Implement Button_original_dataOnButtonClick pass def checkbox_time_fixOnCheckBox(self, event): # TODO: Implement checkbox_time_fixOnCheckBox pass def textCtrl_time_xminOnText(self, event): # TODO: Implement textCtrl_time_xminOnText pass def textCtrl_time_xmaxOnText(self, event): # TODO: Implement textCtrl_time_xmaxOnText pass def textCtrl_time_yminOnText(self, event): # TODO: Implement textCtrl_time_yminOnText pass def textCtrl_ymaxOnText(self, event): # TODO: Implement textCtrl_ymaxOnText pass def checkbox_freq_fixOnCheckBox(self, event): # TODO: Implement checkbox_freq_fixOnCheckBox pass def textCtrl_freq_xminOnText(self, event): # TODO: Implement textCtrl_freq_xminOnText pass def textCtrl_freq_xmaxOnText(self, event): # TODO: Implement textCtrl_freq_xmaxOnText pass def textCtrl_freq_yminOnText(self, event): # TODO: Implement textCtrl_freq_yminOnText pass def textCtrl_freq_ymaxOnText(self, event): # TODO: Implement textCtrl_freq_ymaxOnText pass |
周波数波形プロット動作のデモ
YouTube動画による動作確認結果
例によってYouTube動画による動的な動作を確認します。
以下の動画は今回のコードを実行し、ファイルの読み込みやフレームサイズ、オーバーラップの変更、dB変換に切り替え…を行っています。
イメージ通り!…と思います。
まとめ
今回は時間波形を読み込んだ後に自動実行されるフーリエ変換部分を実装しました。
計算コード自体は数年前から当ブログで扱っていた内容でしたが、いざGUIアプリに実装する場合は色々な例外を考えなければいけないところに難しさがあると思いました。
print()を使って確認していた文はそのまま残してしまいましたが、正式にはLogging系のライブラリを使うのが良いのかも知れません。
ただ個人的には好きな文章を残せるprint()に便利さを感じてしまいます。
テキストボックス(textCtrl)のイベントは今回、エンターキーをクリックしなくてもイベント発生と認識するOnTextを使いました。ただこれだと1文字消したり追記したりする度にイベントが回るので、もしかしたら別のイベントトリガの方が適しているかも知れません。
後はスペクトログラムとフィルタ、軸設定のカスタマイズ機能が残りました。
非ITエンジニアがGUIアプリを作っていく過程をもう少しお楽しみください。
(筆者が途中で挫折しなければ…)
前回記事で書き方のスタイルを覚えたので、今回は楽勝でした!
Twitterでも関連情報をつぶやいているので、wat(@watlablog)のフォローお待ちしています!