Module orphe_core
Functions
def to_timestamp(hours, minutes, seconds, ms_high, ms_low)
-
v3のデータについてくるタイムスタンプをフォーマットする関数
Args
hours
- 時
minutes
- 分
seconds
- 秒
ms_high
- ミリ秒の上位バイト
ms_low
- ミリ秒の下位バイト
Classes
class AccData
-
加速度センサの値を格納するクラス
Attributes
x
- x軸の加速度
y
- y軸の加速度
z
- z軸の加速度
timestamp
- タイムスタンプ
serial_number
- シリアルナンバー
packet_number
- パケットナンバー
Expand source code
class AccData: """ 加速度センサの値を格納するクラス Attributes: x: x軸の加速度 y: y軸の加速度 z: z軸の加速度 timestamp: タイムスタンプ serial_number: シリアルナンバー packet_number: パケットナンバー """ def __init__(self): self.x = 0 self.y = 0 self.z = 0 self.timestamp = 0 self.serial_number = 0 self.packet_number = 0 def print(self): print( f"Acc[{self.serial_number}][{self.packet_number}][{self.timestamp}]: {self.x}, {self.y}, {self.z}")
Methods
def print(self)
class DeviceInformation (data)
-
デバイス情報を格納するクラス data: 生データ battery: バッテリー残量 lr: LR rec: REC auto_run: Auto Run led(int): LED発光の強さ 0-255 log_high: Log High log_low: Log Low range: レンジ device_information: 取得したデバイス情報の保管用メンバ変数
Expand source code
class DeviceInformation: """ デバイス情報を格納するクラス data: 生データ battery: バッテリー残量 lr: LR rec: REC auto_run: Auto Run led(int): LED発光の強さ 0-255 log_high: Log High log_low: Log Low range: レンジ device_information: 取得したデバイス情報の保管用メンバ変数 """ def __init__(self, data): self.data = data self.battery = int.from_bytes(data[0:1], byteorder='big', signed=False) self.lr = int.from_bytes(data[1:2], byteorder='big', signed=False) self.rec = int.from_bytes(data[2:3], byteorder='big', signed=False) self.auto_run = int.from_bytes( data[3:4], byteorder='big', signed=False) self.led = int.from_bytes(data[4:5], byteorder='big', signed=False) self.log_high = int.from_bytes( data[6:7], byteorder='big', signed=False) self.log_low = int.from_bytes(data[7:8], byteorder='big', signed=False) self.range = Range() self.range.acc = int.from_bytes( data[8:9], byteorder='big', signed=False) self.range.gyro = int.from_bytes( data[9:10], byteorder='big', signed=False) self.device_information = None
class GaitData (data)
-
歩行解析の値を格納するクラス
Attributes
step_count
- 歩数
gait_type
- 歩容タイプ(0:無し、1:歩行、2:走行,3:直立静止)
direction
- ストライド方向(0:なし, 1:前方, 2:後方,3:内側,4:外側)
calorie
- 総消費カロリー
distance
- 総移動距離
standing_phase_duration
- 立脚期継続時間
swing_phase_duration
- 遊脚期継続時間
Expand source code
class GaitData: """ 歩行解析の値を格納するクラス Attributes: step_count: 歩数 gait_type: 歩容タイプ(0:無し、1:歩行、2:走行,3:直立静止) direction: ストライド方向(0:なし, 1:前方, 2:後方,3:内側,4:外側) calorie: 総消費カロリー distance: 総移動距離 standing_phase_duration: 立脚期継続時間 swing_phase_duration: 遊脚期継続時間 """ def __init__(self, data): # 2,3は Uint16 で歩数が入っている self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) # 4番目は最初の2ビット分が enumで歩容タイプ(0:無し、1:歩行、2:走行,3:直立静止) self.gait_type = data[4] & 0b00000011 # 4番目は2,3,4ビット分がenumでストライド方向(0:なし, 1:前方, 2:後方,3:内側,4:外側) self.direction = (data[4] & 0b00011100) >> 2 # data[6],data[7]はfloat16で総消費カロリー self.calorie = struct.unpack('>e', data[6:8]) # 8,9,10,11はfloat32で総移動距離 self.distance = struct.unpack('>f', data[8:12]) # 12,13,14,15はfloat32で立脚期継続時間(standing phase duration) self.standing_phase_duration = struct.unpack('>f', data[12:16]) # 16,17,18,19はflaot32で遊脚期継続時間(swing_phase_duration) self.swing_phase_duration = struct.unpack('>f', data[16:20]) def print(self): print(f"Step count: {self.step_count}") print(f"Gait type: {self.gait_type}") print(f"Direction: {self.direction}") print(f"Calorie: {self.calorie}") print(f"Distance: {self.distance}") print(f"Standing phase duration: {self.standing_phase_duration}") print(f"Swing phase duration: {self.swing_phase_duration}")
Methods
def print(self)
class GyroData
-
ジャイロセンサの値を格納するクラス
Attributes
x
- x軸の角速度
y
- y軸の角速度
z
- z軸の角速度
timestamp
- タイムスタンプ
serial_number
- シリアルナンバー
packet_number
- パケットナンバー
Expand source code
class GyroData: """ ジャイロセンサの値を格納するクラス Attributes: x: x軸の角速度 y: y軸の角速度 z: z軸の角速度 timestamp: タイムスタンプ serial_number: シリアルナンバー packet_number: パケットナンバー """ def __init__(self): self.x = 0 self.y = 0 self.z = 0 self.timestamp = 0 self.serial_number = 0 self.packet_number = 0 def print(self): print( f"Gyro[{self.serial_number}][{self.packet_number}][{self.timestamp}]: {self.x}, {self.y}, {self.z}")
Methods
def print(self)
class Orphe
-
ORPHE COREのBLE通信を行うクラス
コンストラクタ
Expand source code
class Orphe: """ ORPHE COREのBLE通信を行うクラス """ def __init__(self): """ コンストラクタ """ self.serial_number_prev = 0 self.client = None self.step_count = StepCount() # 歩数 def set_lost_data_callback(self, callback): """ データが欠損したときに呼び出されるコールバック関数を設定する Args: callback: コールバック関数 """ self.lost_data_callback = callback def set_got_acc_callback(self, callback): """加速度センサの値を取得したときに呼び出されるコールバック関数を設定する。 例えば静止状態ではz方向の1Gの値はレンジ設定によって変わります。加速度レンジが2であれば 0.5 、16であれば 0.0625 (値は理論値なので誤差が生じます)です。 """ self.got_acc_callback = callback def set_got_gyro_callback(self, callback): """ ジャイロセンサの値を取得したときに呼び出されるコールバック関数を設定する """ self.got_gyro_callback = callback def set_got_converted_acc_callback(self, callback): """ 加速度センサの値を取得したときに呼び出されるコールバック関数を設定する それぞれの値は加速度レンジの値によって変換されます。 例えば静止状態ではz方向の1Gの値は常に1.0です。 """ self.got_converted_acc_callback = callback def set_got_converted_gyro_callback(self, callback): """ ジャイロセンサの値を取得したときに呼び出されるコールバック関数を設定する """ self.got_converted_gyro_callback = callback def set_got_quat_callback(self, callback): """ クォータニオンの値を取得したときに呼び出されるコールバック関数を設定する """ self.got_quat_callback = callback def set_got_gait_callback(self, callback): """ 歩行解析の値を取得したときに呼び出されるコールバック関数を設定する """ self.got_gait_callback = callback def set_got_stride_callback(self, callback): """ ストライドの値を取得したときに呼び出されるコールバック関数を設定する """ self.got_stride_callback = callback def set_got_pronation_callback(self, callback): """ プロネーションの値を取得したときに呼び出されるコールバック関数を設定する """ self.got_pronation_callback = callback def set_got_quat_distance_callback(self, callback): """ クォータニオンと差分値を取得したときに呼び出されるコールバック関数を設定する """ self.got_quat_distance_callback = callback def set_on_disconnect_callback(self, callback): """ ORPHE COREとの接続が切断されたときに呼び出されるコールバック関数を設定する """ self.on_disconnect_callback = callback async def scan_all_devices(self): """ すべてのBLEデバイスをスキャンして、その結果を返す Returns: BLEデバイスのリスト """ print("Scanning all BLE devices...") devices = await BleakScanner.discover() return devices async def connect(self, address=None): """ ORPHE COREと接続する Args: address: 接続するデバイスのアドレス。指定しない場合はスキャンしてSERVICE UUIDで合致するものに接続する Returns: 接続に成功した場合はTrue、失敗した場合はFalse """ print( f"Scanning for ORPHE CORE BLE device...[address specified: {address}]") devices = await BleakScanner.discover() target_device = None if address is not None: for device in devices: if device.address == address: target_device = device print( f"Found target device: {device.name}(name), {device.address}(address)") break else: for device in devices: # device.name に CR-* が含まれている場合に接続する if device.name is not None and "CR-" in device.name: target_device = device print( f"Found target device: {device.name}(name), {device.address}(address)") break if target_device is None: print("Target device not found.") return False self.client = BleakClient(target_device.address) await self.client.connect() if self.client.is_connected: print("Connected to the device") # 別のタスクで接続状態を監視 asyncio.create_task(self.monitor_connection(self.client)) return True else: print("Failed to connect to the device") return False async def monitor_connection(self, client): while True: await asyncio.sleep(1) # 1秒ごとに接続状態をチェック if not client.is_connected: await self.disconnect_callback(self) break async def disconnect_callback(self, owner): # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(owner, 'on_disconnect_callback') and owner.on_disconnect_callback: owner.on_disconnect_callback() def is_connected(self): """ ORPHE COREに接続されているかどうかを返す Returns: 接続されている場合はTrue、されていない場合はFalse """ if self.client is None: return False return self.client.is_connected async def read_device_information(self): """ ORPHE COREのデバイス情報を取得する。一度取得したデバイス情報はself.device_informationメンバ変数に保存される。 Returns: DeviceInformationクラスのインスタンス """ di = await self.client.read_gatt_char(CHARACTERISTIC_DEVICE_INFORMATION_UUID) di = DeviceInformation(di) self.device_information = di # デバイス情報をメンバ変数として保存(更新)しておく return di async def print_device_information(self): """ ORPHE COREのデバイス情報を取得し、標準出力に表示する。ほぼデバッグ用途 """ di = await self.read_device_information() print(f"Battery: {di.battery}") print(f"LR: {di.lr}") print(f"REC: {di.rec}") print(f"Auto Run: {di.auto_run}") print(f"LED: {di.led}") print(f"Log High: {di.log_high}") print(f"Log Low: {di.log_low}") print(f"ACC Range: {di.range.acc}") print(f"GYRO Range: {di.range.gyro}") async def set_led(self, is_on, pattern): """ is_on(int): 0 or 1 pattern(int): 0-4 Returns: None """ # print(f"Setting LED: {is_on}, {pattern}") # is_on, patternの値の範囲をチェック if is_on < 0 or is_on > 1: print("is_on must be 0 or 1.") return if pattern < 0 or pattern > 4: print("pattern must be 0-4.") return ba = bytearray([0x02, is_on, pattern] + [0x00] * 17) await self.write_device_information(ba) async def set_led_brightness(self, brightness): """ 引数に明るさ(0-255)をもらい、その値にLEDを設定する。LED輝度の設定以外は変更しないので、一旦 device informationを取得しなおし、LED輝度以外はすべて従来値を使う Args: brightness: 0-255 Returns: None """ if brightness < 0 or brightness > 255: print("brightness must be 0-255.") return di = await self.read_device_information() ba = bytearray([0x01, di.lr, brightness, 0x00, di.auto_run, di.log_high, di.log_low, di.range.acc, di.range.gyro] + [0x00] * 11) await self.write_device_information(ba) async def set_lr(self, lr): """ lr(int): 0 or 1 Returns: None """ # lrの値の範囲をチェック if lr < 0 or lr > 1: print("lr must be 0 or 1.") return di = await self.read_device_information() ba = bytearray([0x01, lr, di.led, 0x00, di.auto_run, di.log_high, di.log_low, di.range.acc, di.range.gyro] + [0x00] * 11) await self.write_device_information(ba) async def set_acc_range(self, acc_range): """ acc_range(int): 2,4,8,16G を順番に 0,1,2,3 で指定 Returns: None """ # acc_rangeの値は2,4,8,16のいずれかなので、チェックする if acc_range != 2 and acc_range != 4 and acc_range != 8 and acc_range != 16: print("acc_range must be 2, 4, 8, or 16[g].") return # acc_range を 0,1,2,3 に変換 acc_range = [2, 4, 8, 16].index(acc_range) # デバイス情報を読み込む di = await self.read_device_information() # デバイス情報のレンジ設定を変更 di.range.acc = acc_range # デバイス情報を書き込む ba = bytearray([0x01, di.lr, di.led, 0x00, di.auto_run, di.log_high, di.log_low, di.range.acc, di.range.gyro]+[0x00]*11) await self.write_device_information(ba) async def set_gyro_range(self, gyro_range): """ gyro_range(int): 250,500,1000,2000[deg/s] を順番に 0,1,2,3 で指定 Returns: None """ # acc_rangeの値は2,4,8,16のいずれかなので、チェックする if gyro_range != 250 and gyro_range != 500 and gyro_range != 1000 and gyro_range != 2000: print("gyro_range must be 250, 500, 1000, or 2000[deg/s].") return # gyro_range を 0,1,2,3 に変換 gyro_range = [250, 500, 1000, 2000].index(gyro_range) # デバイス情報を読み込む di = await self.read_device_information() # デバイス情報のレンジ設定を変更 di.range.gyro = gyro_range # デバイス情報を書き込む ba = bytearray([0x01, di.lr, di.led, 0x00, di.auto_run, di.log_high, di.log_low, di.range.acc, di.range.gyro]+[0x00]*11) await self.write_device_information(ba) async def write_device_information(self, ba): """ デバイス情報を書き込む。書き込み後にすぐデバイスインフォメーションを読み込むと正しいデータが取得できないため、WRITE_WAIT_INTERVAL_SEC秒待つ """ # print(f"Writing device information: {ba}") await self.client.write_gatt_char(CHARACTERISTIC_DEVICE_INFORMATION_UUID, ba) # 100ms待つ(これがないと即座にdevice informationを読み込まれると正しいデータ取得ができないため) await asyncio.sleep(WRITE_WAIT_INTERVAL_SEC) async def sensor_values_notification_handler(self, sender, data): """ センサの値を取得したときに呼び出されるハンドラ """ if (self.is_connected() == False): return # データの長さを確認 if data[0] == 50: sensor_values = SensorValuesData( self, data, self.device_information.range) if (sensor_values.serial_number - self.serial_number_prev) > 1: # データ欠損の場合 # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(self, 'lost_data_callback') and self.lost_data_callback: self.lost_data_callback( self.serial_number_prev, sensor_values.serial_number) self.serial_number_prev = sensor_values.serial_number elif data[0] == 40: sensor_values = SensorValuesData( self, data, self.device_information.range) async def start_sensor_values_notification(self): """ センサの値の通知を開始する。ただしセンサ値のレンジを取得しておかないといけないので、最初にデバイス情報を取得する """ await self.read_device_information() await self.client.start_notify(CHARACTERISTIC_SENSOR_VALUES_UUID, self.sensor_values_notification_handler) async def step_analysis_notification_handler(self, sender, data): """ ステップ解析の値を取得したときに呼び出されるハンドラ """ # print(f"Step analysis: {data[1]}") StepAnalysisData(self, data) async def start_step_analysis_notification(self): """ ステップ解析の通知を開始する """ await self.client.start_notify(CHARACTERISTIC_STEP_ANALYSIS_UUID, self.step_analysis_notification_handler) async def stop_sensor_values_notification(self): """ センサの値の通知を停止する """ await self.client.stop_notify(CHARACTERISTIC_SENSOR_VALUES_UUID) async def stop_step_analysis_notification(self): """ ステップ解析の通知を停止する """ await self.client.stop_notify(CHARACTERISTIC_STEP_ANALYSIS_UUID) async def disconnect(self): """ ORPHE COREとの接続を切断する """ await self.client.disconnect()
Methods
async def connect(self, address=None)
-
ORPHE COREと接続する
Args
address
- 接続するデバイスのアドレス。指定しない場合はスキャンしてSERVICE UUIDで合致するものに接続する
Returns: 接続に成功した場合はTrue、失敗した場合はFalse
async def disconnect(self)
-
ORPHE COREとの接続を切断する
async def disconnect_callback(self, owner)
def is_connected(self)
-
ORPHE COREに接続されているかどうかを返す
Returns
接続されている場合はTrue、されていない場合はFalse
async def monitor_connection(self, client)
async def print_device_information(self)
-
ORPHE COREのデバイス情報を取得し、標準出力に表示する。ほぼデバッグ用途
async def read_device_information(self)
-
ORPHE COREのデバイス情報を取得する。一度取得したデバイス情報はself.device_informationメンバ変数に保存される。 Returns: DeviceInformationクラスのインスタンス
async def scan_all_devices(self)
-
すべてのBLEデバイスをスキャンして、その結果を返す
Returns
BLEデバイスのリスト
async def sensor_values_notification_handler(self, sender, data)
-
センサの値を取得したときに呼び出されるハンドラ
async def set_acc_range(self, acc_range)
-
acc_range(int): 2,4,8,16G を順番に 0,1,2,3 で指定 Returns: None
def set_got_acc_callback(self, callback)
-
加速度センサの値を取得したときに呼び出されるコールバック関数を設定する。
例えば静止状態ではz方向の1Gの値はレンジ設定によって変わります。加速度レンジが2であれば 0.5 、16であれば 0.0625 (値は理論値なので誤差が生じます)です。
def set_got_converted_acc_callback(self, callback)
-
加速度センサの値を取得したときに呼び出されるコールバック関数を設定する
それぞれの値は加速度レンジの値によって変換されます。 例えば静止状態ではz方向の1Gの値は常に1.0です。
def set_got_converted_gyro_callback(self, callback)
-
ジャイロセンサの値を取得したときに呼び出されるコールバック関数を設定する
def set_got_gait_callback(self, callback)
-
歩行解析の値を取得したときに呼び出されるコールバック関数を設定する
def set_got_gyro_callback(self, callback)
-
ジャイロセンサの値を取得したときに呼び出されるコールバック関数を設定する
def set_got_pronation_callback(self, callback)
-
プロネーションの値を取得したときに呼び出されるコールバック関数を設定する
def set_got_quat_callback(self, callback)
-
クォータニオンの値を取得したときに呼び出されるコールバック関数を設定する
def set_got_quat_distance_callback(self, callback)
-
クォータニオンと差分値を取得したときに呼び出されるコールバック関数を設定する
def set_got_stride_callback(self, callback)
-
ストライドの値を取得したときに呼び出されるコールバック関数を設定する
async def set_gyro_range(self, gyro_range)
-
gyro_range(int): 250,500,1000,2000[deg/s] を順番に 0,1,2,3 で指定 Returns: None
async def set_led(self, is_on, pattern)
-
is_on(int): 0 or 1 pattern(int): 0-4 Returns: None
async def set_led_brightness(self, brightness)
-
引数に明るさ(0-255)をもらい、その値にLEDを設定する。LED輝度の設定以外は変更しないので、一旦 device informationを取得しなおし、LED輝度以外はすべて従来値を使う
Args
brightness
- 0-255
Returns: None
def set_lost_data_callback(self, callback)
-
データが欠損したときに呼び出されるコールバック関数を設定する
Args
callback
- コールバック関数
async def set_lr(self, lr)
-
lr(int): 0 or 1 Returns: None
def set_on_disconnect_callback(self, callback)
-
ORPHE COREとの接続が切断されたときに呼び出されるコールバック関数を設定する
async def start_sensor_values_notification(self)
-
センサの値の通知を開始する。ただしセンサ値のレンジを取得しておかないといけないので、最初にデバイス情報を取得する
async def start_step_analysis_notification(self)
-
ステップ解析の通知を開始する
async def step_analysis_notification_handler(self, sender, data)
-
ステップ解析の値を取得したときに呼び出されるハンドラ
async def stop_sensor_values_notification(self)
-
センサの値の通知を停止する
async def stop_step_analysis_notification(self)
-
ステップ解析の通知を停止する
async def write_device_information(self, ba)
-
デバイス情報を書き込む。書き込み後にすぐデバイスインフォメーションを読み込むと正しいデータが取得できないため、WRITE_WAIT_INTERVAL_SEC秒待つ
class PronationData (data)
-
プロネーションの値を格納するクラス
Attributes
step_count
- 歩数
landing_impact
- 着地衝撃力
x
- プロネーションX
y
- プロネーションY
z
- プロネーションZ
Expand source code
class PronationData: """ プロネーションの値を格納するクラス Attributes: step_count: 歩数 landing_impact: 着地衝撃力 x: プロネーションX y: プロネーションY z: プロネーションZ """ def __init__(self, data): # 2,3は Uint16 で歩数が入っている self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) # 4,5,6,7はfloat32で着地衝撃力[kgf](landing_impact) self.landing_impact = struct.unpack('>f', data[4:8]) # 8,9,10,11はプロネーションX[deg](x) self.x = struct.unpack('>f', data[8:12]) # 12,13,14,15はプロネーションY[deg](y) self.y = struct.unpack('>f', data[12:16]) # 16,17,18,19はプロネーションZ[deg](z) self.z = struct.unpack('>f', data[16:20]) def print(self): print(f"Step count: {self.step_count}") print(f"Landing impact: {self.landing_impact}") print(f"Pronation X: {self.x}") print(f"Pronation Y: {self.y}") print(f"Pronation Z: {self.z}")
Methods
def print(self)
class QuatData
-
クォータニオンの値を格納するクラス
Attributes
w
- w
x
- x
y
- y
z
- z
timestamp
- タイムスタンプ
serial_number
- シリアルナンバー
packet_number
- パケットナンバー
Expand source code
class QuatData: """ クォータニオンの値を格納するクラス Attributes: w: w x: x y: y z: z timestamp: タイムスタンプ serial_number: シリアルナンバー packet_number: パケットナンバー """ def __init__(self): self.w = 0 self.x = 0 self.y = 0 self.z = 0 self.timestamp = 0 self.serial_number = 0 self.packet_number = 0 def print(self): print( f"Quat[{self.serial_number}][{self.packet_number}][{self.timestamp}]: {self.w}, {self.x}, {self.y}, {self.z}")
Methods
def print(self)
class QuatDistanceData (data)
-
クォータニオンと差分値を格納するクラス
Attributes
step_count
- 歩数
phase
- 歩容フェイズ
period
- 歩容ピリオド
event
- 歩容イベント
w
- クォータニオンのw
x
- クォータニオンのx
y
- クォータニオンのy
z
- クォータニオンのz
x_distance
- 加速度力算出された単位時間のx移動距離
y_distance
- 加速度力算出された単位時間のy移動距離
z_distance
- 加速度力算出された単位時間のz移動距離
Expand source code
class QuatDistanceData: """ クォータニオンと差分値を格納するクラス Attributes: step_count: 歩数 phase: 歩容フェイズ period: 歩容ピリオド event: 歩容イベント w: クォータニオンのw x: クォータニオンのx y: クォータニオンのy z: クォータニオンのz x_distance: 加速度力算出された単位時間のx移動距離 y_distance: 加速度力算出された単位時間のy移動距離 z_distance: 加速度力算出された単位時間のz移動距離 """ def __init__(self, data): # 2,3は Uint16 で歩数が入っている self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) # 4は01ビットがenumの歩容フェイズ(0:なし, 1:立脚期, 2:遊脚期) self.phase = data[4] & 0b00000001 # 4は2,3,4ビットがenumの歩容ピリオド(0:なし,1:LoadingResponse, 2:MidStance, 3:TerminalStance, 4:InitialSwing, 5:MidSwing, 6:TerminalSwing) self.period = (data[4] & 0b00011110) >> 1 # 4は5,6,7ビットがenumの歩容イベント(0:なし, 1:InitialContact, 2:FootFlat, 3:HeelRise, 4:ToeOff, 5:FeetAdjacent, 6:TibiaVertical) self.event = (data[4] & 0b11100000) >> 5 # 6,7はfloat16でクォータニオンのw self.w = struct.unpack('>e', data[6:8]) # 8,9はfloat16でクォータニオンのx self.x = struct.unpack('>e', data[8:10]) # 10,11はfloat16でクォータニオンのy self.y = struct.unpack('>e', data[10:12]) # 12,13はfloat16でクォータニオンのz self.z = struct.unpack('>e', data[12:14]) # 14,15はfloat16で加速度力算出された単位時間のx移動距離 self.x_distance = struct.unpack('>e', data[14:16]) # 16,17はfloat16で加速度力算出された単位時間のy移動距離 self.y_distance = struct.unpack('>e', data[16:18]) # 18,19はfloat16で加速度力算出された単位時間のz移動距離 self.z_distance = struct.unpack('>e', data[18:20]) def print(self): print(f"Step count: {self.step_count}") print(f"Phase: {self.phase}") print(f"Period: {self.period}") print(f"Event: {self.event}") print(f"Quat: {self.w}, {self.x}, {self.y}, {self.z}") print( f"Distance: {self.x_distance}, {self.y_distance}, {self.z_distance}")
Methods
def print(self)
class Range
-
加速度センサとジャイロセンサのレンジを格納するクラス
Attributes
acc
- 加速度センサのレンジ
gyro
- ジャイロセンサのレンジ
Expand source code
class Range: """ 加速度センサとジャイロセンサのレンジを格納するクラス Attributes: acc: 加速度センサのレンジ gyro: ジャイロセンサのレンジ """ def __init__(self): self.acc = 0 self.gyro = 0
class SensorValuesData (owner, data, sensor_range)
-
センサの値を格納するクラス
Attributes
owner
- オーナー(SensorValuesDataを生成したオブジェクトで、Orpheクラスのインスタンスが入っている。これはコールバック関数を登録するため)
data
- 生データ
type
- タイプ(40の場合は50Hzのv2, 50の場合は200Hzのv3)
serial_number
- シリアルナンバー
timestamp
- タイムスタンプ
acc
- 加速度センサの値
converted_acc
- 変換後の加速度センサの値
gyro
- ジャイロセンサの値
converted_gyro
- 変換後のジャイロセンサの値
quat
- クォータニオンの値
コンストラクタ
Args
owner
- オーナー(SensorValuesDataを生成したオブジェクトで、Orpheクラスのインスタンスが入っている。これはコールバック関数を登録するため)
data
- 生データ
sensor_range
- 加速度センサとジャイロセンサのレンジ。Rangeクラスのインスタンス
Expand source code
class SensorValuesData: """ センサの値を格納するクラス Attributes: owner: オーナー(SensorValuesDataを生成したオブジェクトで、Orpheクラスのインスタンスが入っている。これはコールバック関数を登録するため) data: 生データ type: タイプ(40の場合は50Hzのv2, 50の場合は200Hzのv3) serial_number: シリアルナンバー timestamp: タイムスタンプ acc: 加速度センサの値 converted_acc: 変換後の加速度センサの値 gyro: ジャイロセンサの値 converted_gyro: 変換後のジャイロセンサの値 quat: クォータニオンの値 """ def __init__(self, owner, data, sensor_range): """ コンストラクタ Args: owner: オーナー(SensorValuesDataを生成したオブジェクトで、Orpheクラスのインスタンスが入っている。これはコールバック関数を登録するため) data: 生データ sensor_range: 加速度センサとジャイロセンサのレンジ。Rangeクラスのインスタンス """ if data[0] == 50: self.data = data self.type = int.from_bytes( data[0:1], byteorder='big', signed=False) self.serial_number = int.from_bytes( data[1:3], byteorder='big', signed=False) self.timestamp = to_timestamp( data[3], data[4], data[5], data[6], data[7]) each_timestamp = self.timestamp for i in range(3, -1, -1): step = 21*i self.acc = AccData() self.acc.x = int.from_bytes( data[22+step:24+step], byteorder='big', signed=True) / 32768 self.acc.y = int.from_bytes( data[24+step:26+step], byteorder='big', signed=True) / 32768 self.acc.z = int.from_bytes( data[26+step:28+step], byteorder='big', signed=True) / 32768 self.converted_acc = AccData() amp_acc = [2, 4, 8, 16][sensor_range.acc] self.converted_acc.x = self.acc.x * amp_acc self.converted_acc.y = self.acc.y * amp_acc self.converted_acc.z = self.acc.z * amp_acc self.gyro = GyroData() self.gyro.x = int.from_bytes( data[16+step:18+step], byteorder='big', signed=True) / 32768 self.gyro.y = int.from_bytes( data[18+step:20+step], byteorder='big', signed=True) / 32768 self.gyro.z = int.from_bytes( data[20+step:22+step], byteorder='big', signed=True) / 32768 self.converted_gyro = GyroData() amp_gyro = [250, 500, 1000, 2000][sensor_range.gyro] self.converted_gyro.x = self.gyro.x * amp_gyro self.converted_gyro.y = self.gyro.y * amp_gyro self.converted_gyro.z = self.gyro.z * amp_gyro self.quat = QuatData() self.quat.w = int.from_bytes( data[8+step:10+step], byteorder='big', signed=True) / 32768 self.quat.x = int.from_bytes( data[10+step:12+step], byteorder='big', signed=True) / 32768 self.quat.y = int.from_bytes( data[12+step:14+step], byteorder='big', signed=True) / 32768 self.quat.z = int.from_bytes( data[14+step:16+step], byteorder='big', signed=True) / 32768 self.acc.serial_number = self.serial_number self.converted_acc.serial_number = self.serial_number self.gyro.serial_number = self.serial_number self.converted_gyro.serial_number = self.serial_number self.quat.serial_number = self.serial_number self.acc.packet_number = 3-i self.converted_acc.packet_number = 3-i self.gyro.packet_number = 3-i self.converted_gyro.packet_number = 3-i self.quat.packet_number = 3-i if i == 3: self.acc.timestamp = each_timestamp self.converted_acc.timestamp = each_timestamp self.gyro.timestamp = each_timestamp self.converted_gyro.timestamp = each_timestamp self.quat.timestamp = each_timestamp else: each_timestamp = each_timestamp + data[28+step] self.acc.timestamp = each_timestamp self.converted_acc.timestamp = each_timestamp self.gyro.timestamp = each_timestamp self.converted_gyro.timestamp = each_timestamp self.quat.timestamp = each_timestamp # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(owner, 'got_acc_callback') and owner.got_acc_callback: owner.got_acc_callback(self.acc) if hasattr(owner, 'got_converted_acc_callback') and owner.got_converted_acc_callback: owner.got_converted_acc_callback(self.converted_acc) if hasattr(owner, 'got_gyro_callback') and owner.got_gyro_callback: owner.got_gyro_callback(self.gyro) if hasattr(owner, 'got_converted_gyro_callback') and owner.got_converted_gyro_callback: owner.got_converted_gyro_callback(self.converted_gyro) if hasattr(owner, 'got_quat_callback') and owner.got_quat_callback: owner.got_quat_callback(self.quat) elif data[0] == 40: self.data = data self.type = int.from_bytes( data[0:1], byteorder='big', signed=False) self.timestamp = int.from_bytes( data[18:20], byteorder='big', signed=False) self.acc = AccData() self.acc.x = int.from_bytes( data[14:15], byteorder='big', signed=True)/127 self.acc.y = int.from_bytes( data[15:16], byteorder='big', signed=True)/127 self.acc.z = int.from_bytes( data[16:17], byteorder='big', signed=True)/127 self.converted_acc = AccData() amp_acc = [2, 4, 8, 16][sensor_range.acc] self.converted_acc.x = self.acc.x * amp_acc self.converted_acc.y = self.acc.y * amp_acc self.converted_acc.z = self.acc.z * amp_acc self.gyro = GyroData() self.gyro.x = int.from_bytes( data[9:10], byteorder='big', signed=True)/127 self.gyro.y = int.from_bytes( data[10:11], byteorder='big', signed=True)/127 self.gyro.z = int.from_bytes( data[11:12], byteorder='big', signed=True)/127 self.converted_gyro = GyroData() amp_gyro = [250, 500, 1000, 2000][sensor_range.gyro] self.converted_gyro.x = self.gyro.x * amp_gyro self.converted_gyro.y = self.gyro.y * amp_gyro self.converted_gyro.z = self.gyro.z * amp_gyro self.quat = QuatData() self.quat.w = int.from_bytes( data[1:3], byteorder='big', signed=True) / 32768 self.quat.x = int.from_bytes( data[3:5], byteorder='big', signed=True) / 32768 self.quat.y = int.from_bytes( data[5:7], byteorder='big', signed=True) / 32768 self.quat.z = int.from_bytes( data[7:9], byteorder='big', signed=True) / 32768 self.acc.serial_number = 0 self.converted_acc.serial_number = 0 self.gyro.serial_number = 0 self.converted_gyro.serial_number = 0 self.quat.serial_number = 0 self.acc.packet_number = 0 self.converted_acc.packet_number = 0 self.gyro.packet_number = 0 self.converted_gyro.packet_number = 0 self.quat.packet_number = 0 self.acc.timestamp = self.timestamp self.gyro.timestamp = self.timestamp self.quat.timestamp = self.timestamp self.converted_acc.timestamp = self.timestamp self.converted_gyro.timestamp = self.timestamp # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(owner, 'got_acc_callback') and owner.got_acc_callback: owner.got_acc_callback(self.acc) if hasattr(owner, 'got_converted_acc_callback') and owner.got_converted_acc_callback: owner.got_converted_acc_callback(self.converted_acc) if hasattr(owner, 'got_gyro_callback') and owner.got_gyro_callback: owner.got_gyro_callback(self.gyro) if hasattr(owner, 'got_converted_gyro_callback') and owner.got_converted_gyro_callback: owner.got_converted_gyro_callback(self.converted_gyro) if hasattr(owner, 'got_quat_callback') and owner.got_quat_callback: owner.got_quat_callback(self.quat)
class StepAnalysisData (owner, data)
-
ステップ解析の値を格納するクラス
コンストラクタ
data[1]のサブヘッダによって解析データの種類がわかる。対応するのは、0,1,2,3,4 である。また、0,1,2,3 に関してはデータ到着担保のために同じデータが二回連続で送信されてくるため、歩数カウントで更新すべきデータかどうかを判断する必要がある。
0: Gait Overview 1: Stride 2: Pronation 3: 未実装 4: クオータニオン 5: 未実装 6: 未実装
Args
owner
- オーナー(StepAnalysisDataを生成したオブジェクトで、Orpheクラスのインスタンスが入っている。これはコールバック関数を登録するため)
data
- 生データ
Expand source code
class StepAnalysisData: """ ステップ解析の値を格納するクラス """ def __init__(self, owner, data): """ コンストラクタ data[1]のサブヘッダによって解析データの種類がわかる。対応するのは、0,1,2,3,4 である。また、0,1,2,3 に関してはデータ到着担保のために同じデータが二回連続で送信されてくるため、歩数カウントで更新すべきデータかどうかを判断する必要がある。 0: Gait Overview 1: Stride 2: Pronation 3: 未実装 4: クオータニオン 5: 未実装 6: 未実装 Args: owner: オーナー(StepAnalysisDataを生成したオブジェクトで、Orpheクラスのインスタンスが入っている。これはコールバック関数を登録するため) data: 生データ """ self.data = data # gait overview if data[1] == 0: self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) if self.step_count > owner.step_count.gait: self.gait = GaitData(data) # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(owner, 'got_gait_callback') and owner.got_gait_callback: owner.got_gait_callback(self.gait) owner.step_count.gait = self.step_count # stride elif data[1] == 1: self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) if self.step_count > owner.step_count.stride: self.stride = StrideData(data) # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(owner, 'got_stride_callback') and owner.got_stride_callback: owner.got_stride_callback(self.stride) owner.step_count.stride = self.step_count # pronation elif data[1] == 2: self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) if self.step_count > owner.step_count.pronation: self.pronation = PronationData(data) # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(owner, 'got_pronation_callback') and owner.got_pronation_callback: owner.got_pronation_callback(self.pronation) owner.step_count.pronation = self.step_count # quaternion elif data[1] == 4: self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) # if self.step_count > owner.step_count.quat: self.quat_distance = QuatDistanceData(data) # コールバック関数が設定されている場合、コールバック関数を呼び出す if hasattr(owner, 'got_quat_distance_callback') and owner.got_quat_distance_callback: owner.got_quat_distance_callback(self.quat_distance) owner.step_count.quat = self.step_count
class StepCount
-
歩数を格納するクラス
Expand source code
class StepCount: """ 歩数を格納するクラス""" def __init__(self): self.gait = 0 self.stride = 0 self.pronation = 0 self.quat = 0
class StrideData (data)
-
ストライドの値を格納するクラス
Attributes
step_count
- 歩数
foot_angle
- フットアングル
x
- ストライドX
y
- ストライドY
z
- ストライドZ
Expand source code
class StrideData: """ ストライドの値を格納するクラス Attributes: step_count: 歩数 foot_angle: フットアングル x: ストライドX y: ストライドY z: ストライドZ """ def __init__(self, data): # 2,3は Uint16 で歩数が入っている self.step_count = int.from_bytes( data[2:4], byteorder='big', signed=False) # 4,5,6,7はfloat32でフットアングル self.foot_angle = struct.unpack('>f', data[4:8]) # 8,9,10,11はfloat32でストライドX self.x = struct.unpack('>f', data[8:12]) # 12,13,14,15はfloat32でストライドY self.y = struct.unpack('>f', data[12:16]) # 16,17,18,19はfloat32でストライドZ self.z = struct.unpack('>f', data[16:20]) def print(self): print(f"Step count: {self.step_count}") print(f"Foot angle: {self.foot_angle}") print(f"Stride X: {self.x}") print(f"Stride Y: {self.y}") print(f"Stride Z: {self.z}")
Methods
def print(self)