Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions meshtastic/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@

logger = logging.getLogger(__name__)

# Map dotted preference paths to the protobuf enum that defines their flags.
# These fields are stored as uint32 bitmasks in the protobuf but have an
# associated enum that names the individual flags.
BITFIELD_ENUMS = {
"network.enabled_protocols": config_pb2.Config.NetworkConfig.ProtocolFlags,
"position.position_flags": config_pb2.Config.PositionConfig.PositionFlags,
}

def onReceive(packet, interface) -> None:
"""Callback invoked when a packet arrives"""
args = mt_config.args
Expand Down Expand Up @@ -238,13 +246,28 @@ def setPref(config, comp_name, raw_val) -> bool:
print("Warning: network.wifi_psk must be 8 or more characters.")
return False

# Handle uint32 bitfields that have an associated enum of flag names.
bitfield_enum = None
if config_type.message_type is not None:
bitfield_path = f"{config_type.name}.{pref.name}"
bitfield_enum = BITFIELD_ENUMS.get(bitfield_path)
if bitfield_enum and isinstance(val, str):
# At this point fromStr() could not parse val as int/float/bool/bytes,
# so treat it as a comma-separated list of bitfield flag names.
flag_names = [name.strip() for name in val.split(",") if name.strip()]
try:
val = meshtastic.util.flags_from_list(bitfield_enum, flag_names)
except ValueError as e:
print(f"ERROR: {e}")
return False

enumType = pref.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
# We've failed so far to convert this string into an enum, try to find it by reflection
e = enumType.values_by_name.get(val)
if e:
val = e.number
ev = enumType.values_by_name.get(val)
if ev:
val = ev.number
else:
print(
f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it."
Expand Down Expand Up @@ -1956,7 +1979,8 @@ def addPositionConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentP

group.add_argument(
"--pos-fields",
help="Specify fields to send when sending a position. Use no argument for a list of valid values. "
help="Deprecated: use '--set position.position_flags FLAG1,FLAG2' instead. "
"Specify fields to send when sending a position. Use no argument for a list of valid values. "
"Can pass multiple values as a space separated list like "
"this: '--pos-fields ALTITUDE HEADING SPEED'",
nargs="*",
Expand Down
47 changes: 47 additions & 0 deletions meshtastic/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
onConnection,
onNode,
onReceive,
setPref,
tunnelMain,
set_missing_flags_false,
)
from meshtastic import mt_config

from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611
from ..protobuf.config_pb2 import Config # pylint: disable=E0611

# from ..ble_interface import BLEInterface
from ..mesh_interface import MeshInterface
Expand Down Expand Up @@ -3204,3 +3206,48 @@ def test_main_ota_update_retries(mock_our_exit, mock_ota_class, capsys):

finally:
os.unlink(firmware_file)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_setPref_network_enabled_protocols_by_name(capsys):
"""Test setPref() accepts bitfield flag names for network.enabled_protocols."""
config = Config()
assert setPref(config, "network.enabled_protocols", "UDP_BROADCAST") is True
assert config.network.enabled_protocols == 1
out, _ = capsys.readouterr()
assert "Set network.enabled_protocols to UDP_BROADCAST" in out


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_setPref_position_flags_multiple(capsys):
"""Test setPref() accepts comma-separated bitfield flag names."""
config = Config()
assert setPref(config, "position.position_flags", "ALTITUDE,SPEED") is True
assert config.position.position_flags == 513
out, _ = capsys.readouterr()
assert "Set position.position_flags to ALTITUDE,SPEED" in out


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_setPref_bitfield_raw_integer(capsys):
"""Test setPref() still accepts raw integers for bitfields."""
config = Config()
assert setPref(config, "network.enabled_protocols", "0") is True
assert config.network.enabled_protocols == 0
out, _ = capsys.readouterr()
assert "Set network.enabled_protocols to 0" in out


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_setPref_bitfield_invalid_name(capsys):
"""Test setPref() rejects unknown bitfield flag names."""
config = Config()
assert setPref(config, "network.enabled_protocols", "TCP") is False
out, _ = capsys.readouterr()
assert "Unknown flag 'TCP'" in out
assert "NO_BROADCAST" in out
assert "UDP_BROADCAST" in out
35 changes: 35 additions & 0 deletions meshtastic/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
convert_mac_addr,
eliminate_duplicate_port,
findPorts,
flags_from_list,
flags_to_list,
fixme,
fromPSK,
Expand Down Expand Up @@ -880,6 +881,7 @@ def test_to_node_num_hypothesis_roundtrip(n):

_EXCLUDED_MODULES = mesh_pb2.ExcludedModules
_POSITION_FLAGS = config_pb2.Config.PositionConfig.PositionFlags
_NETWORK_PROTOCOLS = config_pb2.Config.NetworkConfig.ProtocolFlags


@pytest.mark.unit
Expand Down Expand Up @@ -935,3 +937,36 @@ def test_flags_to_list_conservation(flags):

assert accounted == (flags & known_union)
assert (accounted | leftover) == flags


@pytest.mark.unit
@pytest.mark.parametrize("flag_type, flags, expected", [
(_NETWORK_PROTOCOLS, ["UDP_BROADCAST"], 1),
(_NETWORK_PROTOCOLS, ["NO_BROADCAST"], 0),
(_NETWORK_PROTOCOLS, [], 0),
(_POSITION_FLAGS, ["ALTITUDE"], 1),
(_POSITION_FLAGS, ["ALTITUDE", "SPEED"], 513),
(_POSITION_FLAGS, ["ALTITUDE", " SPEED "], 513),
])
def test_flags_from_list(flag_type, flags, expected):
"""Test flags_from_list combines named flags into the expected bitmask."""
assert flags_from_list(flag_type, flags) == expected


@pytest.mark.unit
def test_flags_from_list_unknown_flag():
"""Test flags_from_list raises ValueError for an unknown flag name."""
with pytest.raises(ValueError, match="Unknown flag 'TCP'"):
flags_from_list(_NETWORK_PROTOCOLS, ["UDP_BROADCAST", "TCP"])


@pytest.mark.unit
@given(st.lists(st.sampled_from(list(_POSITION_FLAGS.keys())), unique=True))
def test_flags_from_list_roundtrip(flags):
"""Property: flags_from_list and flags_to_list are inverses for known position flags."""
combined = flags_from_list(_POSITION_FLAGS, flags)
decoded = flags_to_list(_POSITION_FLAGS, combined)
# flags_to_list drops zero-value flags and may report unknown remainders,
# but for combinations of known non-zero flags it should return the same set of names.
nonzero_flags = {f for f in flags if _POSITION_FLAGS.Value(f)}
assert set(decoded) == nonzero_flags
33 changes: 30 additions & 3 deletions meshtastic/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,14 +737,41 @@ def to_node_num(node_id: Union[int, str]) -> int:
return int(s, 16)

def flags_to_list(flag_type, flags: int) -> List[str]:
"""Given a flag_type that's a protobuf EnumTypeWrapper, and a flag int, give a list of flags enabled."""
"""Given a flag_type that's a protobuf EnumTypeWrapper, and a flag int, give a list of flags enabled.

Zero-valued members (e.g. EXCLUDED_NONE, UNSET, NO_BROADCAST) never appear in the
result: they hold no bit, so `flags & value` is always False for them, and a flags
value of 0 therefore decodes to an empty list rather than a named "no flags" entry.
Any leftover bits not corresponding to a known member are reported via
`UNKNOWN_ADDITIONAL_FLAGS(<leftover>)`.
"""
ret = []
for key in flag_type.keys():
if key == "EXCLUDED_NONE":
continue
if flags & flag_type.Value(key):
ret.append(key)
flags = flags - flag_type.Value(key)
if flags > 0:
ret.append(f"UNKNOWN_ADDITIONAL_FLAGS({flags})")
return ret


def flags_from_list(flag_type, flags: List[str]) -> int:
"""Given a flag_type that's a protobuf EnumTypeWrapper, and a list of flag names, return the combined bitmask.

Zero-valued members (e.g. EXCLUDED_NONE, UNSET, NO_BROADCAST) are accepted but are
no-ops: they OR in 0 and thus set nothing. A list consisting solely of such a member
(or an empty list) yields 0, which round-trips back through flags_to_list as an
empty list rather than the original member name -- see flags_to_list's docstring.
"""
result = 0
valid_names = list(flag_type.keys())
for flag_name in flags:
flag_name = flag_name.strip()
if not flag_name:
continue
if flag_name not in valid_names:
raise ValueError(
f"Unknown flag '{flag_name}'. Valid choices: {', '.join(sorted(valid_names))}"
)
result |= flag_type.Value(flag_name)
return result
Loading