Source code for pfxbrick.pfxstate

#! /usr/bin/env python3
#
# Copyright (C) 2018  Fx Bricks Inc.
# This file is part of the pfxbrick python module.
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# PFx Brick internal state helpers

from pfxbrick import *
from pfxbrick.pfxhelpers import *


class PFxMotorState:
    def __init__(self):
        self.dir = "Forward"
        self.target_speed = 0
        self.current_speed = 0
        self.pwm_speed = 0

    def __str__(self):
        return "Dir: %7s Target speed: 0x%02X  Current: 0x%02X  PWM: 0x%02X" % (
            self.dir,
            self.target_speed,
            self.current_speed,
            self.pwm_speed,
        )

    def from_bytes(self, msg):
        self.dir = "Reverse" if (msg[0] & 0x01) else "Forward"
        self.target_speed = msg[1]
        self.current_speed = msg[2]
        self.pwm_speed = msg[3]


class PFxLightState:
    def __init__(self):
        self.active = False
        self.target_level = 0
        self.current_level = 0

    def __str__(self):
        return "Active: %5s Target level: 0x%02X  Current level: 0x%02X" % (
            self.active,
            self.target_level,
            self.current_level,
        )


class PFxAudioChannel:
    def __init__(self):
        self.mode = 0
        self.file_id = PFX_FILE_INVALID_ID

    def __str__(self):
        return "Mode: %d Current file: %3d" % (self.mode, self.file_id)


class PFxBTState:
    def __init__(self):
        self.sleep = 0
        self.state = 0
        self.flags = 0
        self.error = 0
        self.features = 0
        self.services = 0
        self.auth = 0
        self.tx_count = 0
        self.rx_count = 0
        self.msg_len = 0
        self.msg_prefix = []

    def __str__(self):
        s = []
        s.append(
            "Sleep: %d State: 0x%04X  Flags: 0x%04X  Err: 0x%04X"
            % (self.sleep, self.state, self.flags, self.error)
        )
        s.append(
            "BLE Feautures: 0x%04X  Services: 0x%04X  Auth: 0x%04X"
            % (self.features, self.services, self.auth)
        )
        s.append("Tx Count: %d  Rx Count: %d" % (self.tx_count, self.rx_count))
        mb = " ".join(["0x%02X" % (x) for x in self.msg_prefix])
        s.append("Last message len: %d <%s>" % (self.msg_len, mb))
        return "\n".join(s)

    def from_bytes(self, msg):
        msg = msg[1:]
        self.sleep = msg[1]
        self.state = uint16_toint(msg[2:4])
        self.flags = uint16_toint(msg[4:6])
        self.error = uint16_toint(msg[6:8])
        self.features = uint16_toint(msg[8:10])
        self.services = uint16_toint(msg[10:12])
        self.auth = uint16_toint(msg[12:14])
        self.tx_count = uint16_toint(msg[14:16])
        self.rx_count = uint16_toint(msg[16:18])
        self.msg_len = msg[19]
        self.msg_prefix = msg[20:28]


class PFxFSState:
    def __init__(self):
        self.file_count = 0
        self.open_files = 0
        self.task_state = 0
        self.flags = 0
        self.erase_sector = 0
        self.init_timemout = 0
        self.auto_sync_dir = 0
        self.auto_sync_map = 0
        self.sector_capacity = 0
        self.free_sectors = 0
        self.empty_sectors = 0

    def __str__(self):
        s = []
        s.append(
            "State: 0x%02X  Flags: 0x%02X  Files: %d  Open: %d"
            % (self.task_state, self.flags, self.file_count, self.open_files)
        )
        s.append(
            "Erase: 0x%04X  InitTime: 0x%04X  SyncDir: 0x%04X  SyncMap: 0x%04X"
            % (
                self.erase_sector,
                self.init_timemout,
                self.auto_sync_dir,
                self.auto_sync_map,
            )
        )
        s.append(
            "Sector Capacity: 0x%04X  Free: 0x%04X  Empty: 0x%04X"
            % (self.sector_capacity, self.free_sectors, self.empty_sectors)
        )
        return "\n".join(s)

    def from_bytes(self, msg):
        self.file_count = msg[1]
        self.open_files = uint16_toint(msg[18:20])
        self.task_state = msg[2]
        self.flags = msg[3]
        self.erase_sector = uint16_toint(msg[4:6])
        self.init_timemout = uint16_toint(msg[6:8])
        self.auto_sync_dir = uint16_toint(msg[8:10])
        self.auto_sync_map = uint16_toint(msg[10:12])
        self.sector_capacity = uint16_toint(msg[12:14])
        self.free_sectors = uint16_toint(msg[14:16])
        self.empty_sectors = uint16_toint(msg[16:18])


[docs]class PFxState: """ PFx Brick internal data state container class. This class is used to store internal state data obtained with the `PFX_CMD_GET_CURRENT_STATE` ICD command. This internal state data can be useful for monitoring, debugging, and for building apps which can use this data for enhanced feedback of the PFx Brick runtime state and behaviour. Attributes: brightness (:obj:`int`): current global brightness value volume (:obj:`int`): current audio volume level millisec_count (:obj:`int`): PFx Brick millisecond counter value slow_count (:obj:`int`): PFx Brick 1 second counter value status_latch1 (:obj:`int`): PFx Brick status latch 1 bit flag state status_latch2 (:obj:`int`): PFx Brick status latch 2 bit flag state audio_peak (:obj:`int`): current audio peak level based on current playback audio_notch (:obj:`int`): current motor notch value for indexed audio playback fs_state (:obj:`int`): PFx Brick file system state flags script_state (:obj:`int`): PFx Brick script engine state script_line (:obj:`int`): current running script line pointer motors ([:obj:`PFxMotorState`]): motor channel runtime state container class lights ([:obj:`PFxLightState`]): light channel runtime state container class audio_ch ([:obj:`PFxAudioChannel`]): audio channel runtime state container class """ def __init__(self): self.brightness = 0 self.volume = 0 self.motors = [PFxMotorState() for ch in range(PFX_MOTOR_CHANNELS)] self.lights = [PFxLightState() for ch in range(PFX_LIGHT_CHANNELS)] self.audio_ch = [PFxAudioChannel() for ch in range(PFX_AUDIO_CHANNELS)] self.lightmask = 0 self.millisec_count = 0 self.slow_count = 0 self.status_latch1 = 0 self.status_latch2 = 0 self.audio_peak = 0 self.audio_notch = 0 self.fs_state = 0 self.script_state = 0 self.script_line = 0 self.motor_ptr = 0 self.motor_pwm_ptr = 0 self.motor_rate_ptr = 0 self.trig_change_dir_state = 0 self.trig_set_off_state = 0 self.trig_rapid_accel_state = 0 self.trig_rapid_decel_state = 0 self.trig_brake_state = 0 self.filesys = PFxFSState() self.bt = PFxBTState()
[docs] def from_bytes(self, msg): """ Converts the message string bytes read from the PFx Brick into the corresponding data members of this class. """ self.brightness = msg[1] self.volume = msg[2] for ch, idx in zip(self.motors, [3, 7]): ch.from_bytes(msg[idx : idx + 4]) self.motor_ptr = msg[11] self.motor_pwm_ptr = msg[12] self.motor_rate_ptr = msg[13] self.trig_change_dir_state = msg[14] self.trig_set_off_state = msg[15] self.trig_rapid_accel_state = msg[16] self.trig_rapid_decel_state = msg[17] self.trig_brake_state = msg[18] self.lightmask = msg[19] for i, ch in enumerate(self.lights): ch.target_level = msg[21 + i] ch.current_level = msg[33 + i] if i > 7: mask = 1 << (i - 8) ch.active = True if (msg[20] & mask) else False else: mask = 1 << i ch.active = True if (msg[19] & mask) else False for ch, idx in zip(self.audio_ch, [45, 47, 49, 51]): ch.mode = msg[idx] ch.file_id = msg[idx + 1] self.millisec_count = uint16_toint(msg[53:55]) self.slow_count = uint16_toint(msg[55:57]) self.status_latch1 = msg[57] self.status_latch2 = msg[58] self.fs_state = msg[59] self.audio_peak = msg[60] self.audio_notch = msg[61] self.script_state = msg[62] self.script_line = msg[63]
[docs] def __str__(self): s = [] s.append("Brightness: %3d Volume: %3d" % (self.brightness, self.volume)) s.append( "Slow count: %4X Millisec count: %4X" % (self.slow_count, self.millisec_count) ) s.append( "File system: %2X Script exec: %2X Script line: %3d" % (self.fs_state, self.script_state, self.script_line) ) s.append( "Status 1: %2X Status 2: %2X" % (self.status_latch1, self.status_latch2) ) for i, ch in enumerate(self.motors): s.append("Motor Ch %d : %s" % (i + 1, str(ch))) for i, ch in enumerate(self.lights): s.append("Light Ch %2d : %s" % (i + 1, str(ch))) for i, ch in enumerate(self.audio_ch): s.append("Audio Ch %d : %s" % (i + 1, str(ch))) s.append( "Audio Peak: %3d Audio notch: %d" % (self.audio_peak, self.audio_notch) ) return "\n".join(s)