Skip to content

Configuration

DVModbusConfiguration

All Modbus drivers accept modbus_config as either a dict or a DVModbusConfiguration instance. Pydantic validates it at construction time, and unknown keys raise a ValidationError immediately.

iface = get_interface('solarlog', '192.168.1.100', modbus_config={
    'slave_id': 3,
    'timeout': 10.0,
    'max_retries': 2,
    'retry_delay': 0.5,
})
from dv_interfaces import DVModbusConfiguration, get_interface

config = DVModbusConfiguration(slave_id=3, timeout=10.0)
iface = get_interface('solarlog', '192.168.1.100', modbus_config=config)
Key Type Default Constraint Description
slave_id int driver default 1–247 Modbus unit identifier
timeout float 5.0 > 0 TCP connection timeout in seconds
max_retries int 0 ≥ 0 Extra register-read attempts on transient failure
retry_delay float 0.2 ≥ 0 Seconds between retry attempts

If slave_id is omitted, each driver applies its own default: SolarLog 1, SMA 2, SmartDog 2, Meteocontrol 10.

max_retries retries individual register reads before raising — separate from task-level retries in your scheduler or async framework. Bus noise (transient CRC error) and device unreachability (TCP timeout) are different failure modes with different backoff strategies.


From environment variables

DVModbusConfiguration.from_env()

Build a configuration from environment variables without passing a dict:

from dv_interfaces import DVModbusConfiguration

cfg = DVModbusConfiguration.from_env()
# reads DV_SLAVE_ID, DV_TIMEOUT, DV_MAX_RETRIES, DV_RETRY_DELAY

Use a custom prefix for multiple devices in the same process:

cfg_a = DVModbusConfiguration.from_env(prefix='PLANT_A')
cfg_b = DVModbusConfiguration.from_env(prefix='PLANT_B')

Variables with the prefix:

Variable Maps to
{prefix}_SLAVE_ID slave_id
{prefix}_TIMEOUT timeout
{prefix}_MAX_RETRIES max_retries
{prefix}_RETRY_DELAY retry_delay

Polling task integration

read_dataset_result is designed as the entry point for polling tasks. Errors propagate as typed exceptions so the caller can apply appropriate retry logic per failure mode:

import asyncio
import logging
from dv_interfaces import get_interface
from dv_interfaces.exceptions import ErrorReadDVInterface, ErrorConnectionDVInterface

logger = logging.getLogger(__name__)

async def poll_plant(driver: str, host: str, interval_s: float = 60.0) -> None:
    read_retries = 0
    while True:
        try:
            with get_interface(driver, host, modbus_config={'max_retries': 2}) as iface:
                result = iface.read_dataset_result()
                logger.info('read: %s', result.to_dict())
                read_retries = 0
        except ErrorReadDVInterface as exc:
            read_retries += 1
            backoff = min(2 ** read_retries, 30)
            logger.warning('read failed (%s), retry in %ss', exc, backoff)
            await asyncio.sleep(backoff)
            continue
        except ErrorConnectionDVInterface as exc:
            logger.error('device unreachable (%s:%s), retry in 60s', exc.host, exc.port)
            await asyncio.sleep(60)
            continue
        await asyncio.sleep(interval_s)

Exceptions

All exceptions inherit from ErrorDVInterface:

ErrorDVInterface
├── ErrorConnectionDVInterface   device unreachable or socket lost
├── ErrorReadDVInterface         register read failed
├── ErrorTurnOnDVInterface       turn-on command failed
├── ErrorTurnOffDVInterface      turn-off command failed
├── ErrorLimitingDVInterface     power limit command failed
└── ErrorUnsupportedOperationDVInterface unsupported driver operation

ErrorConnectionDVInterface carries host and port attributes:

from dv_interfaces.exceptions import ErrorConnectionDVInterface

try:
    iface.connect()
except ErrorConnectionDVInterface as exc:
    logger.error('cannot reach %s:%s', exc.host, exc.port)

Logging

The library logs under the dv_interfaces logger hierarchy. Enable debug output to see every reconnect and register read attempt:

import logging
logging.getLogger('dv_interfaces').setLevel(logging.DEBUG)