Tuya Rollo-Relais zeigt Zustände falsch an

bastiries

New member
Guten Tag alle zusammen.
Ich hoffe mir kann hier als Neuling weitergeholfen werden.

Ich hab seit 2 Wochen HA auf einem PI4 am laufen.
Soweit funktioniert dank Youtube auch alles.
Allerdings weren meine Rollosteuerungen angezeit, funktionieren auch einwandfrei, nur ist alles verdreht.
Sind die Rollos geschlossen, wird mir 100% angezeit und geöffnet 0%
Die Steuertasten in HA sind auch verdreht auf der Taste die die Rollos öffnen soll löst die Funktion schließen aus und umgedreht.
Kann mann das in YAML umschreiben?

Hier die YAML aus der Tuya Config:
"""Platform to locally control Tuya-based cover devices."""
import asyncio
import logging
import time
from functools import partial

import voluptuous as vol
from homeassistant.components.cover import (
ATTR_POSITION,
DOMAIN,
SUPPORT_CLOSE,
SUPPORT_OPEN,
SUPPORT_SET_POSITION,
SUPPORT_STOP,
CoverEntity,
)

from .common import LocalTuyaEntity, async_setup_entry
from .const import (
CONF_COMMANDS_SET,
CONF_CURRENT_POSITION_DP,
CONF_POSITION_INVERTED,
CONF_POSITIONING_MODE,
CONF_SET_POSITION_DP,
CONF_SPAN_TIME,
)

_LOGGER = logging.getLogger(__name__)

COVER_ONOFF_CMDS = "on_off_stop"
COVER_OPENCLOSE_CMDS = "open_close_stop"
COVER_FZZZ_CMDS = "fz_zz_stop"
COVER_12_CMDS = "1_2_3"
COVER_MODE_NONE = "none"
COVER_MODE_POSITION = "position"
COVER_MODE_TIMED = "timed"
COVER_TIMEOUT_TOLERANCE = 3.0

DEFAULT_COMMANDS_SET = COVER_ONOFF_CMDS
DEFAULT_POSITIONING_MODE = COVER_MODE_NONE
DEFAULT_SPAN_TIME = 25.0


def flow_schema(dps):
"""Return schema used in config flow."""
return {
vol.Optional(CONF_COMMANDS_SET): vol.In(
[COVER_ONOFF_CMDS, COVER_OPENCLOSE_CMDS, COVER_FZZZ_CMDS, COVER_12_CMDS]
),
vol.Optional(CONF_POSITIONING_MODE, default=DEFAULT_POSITIONING_MODE): vol.In(
[COVER_MODE_NONE, COVER_MODE_POSITION, COVER_MODE_TIMED]
),
vol.Optional(CONF_CURRENT_POSITION_DP): vol.In(dps),
vol.Optional(CONF_SET_POSITION_DP): vol.In(dps),
vol.Optional(CONF_POSITION_INVERTED, default=False): bool,
vol.Optional(CONF_SPAN_TIME, default=DEFAULT_SPAN_TIME): vol.All(
vol.Coerce(float), vol.Range(min=1.0, max=300.0)
),
}


class LocaltuyaCover(LocalTuyaEntity, CoverEntity):
"""Tuya cover device."""

def __init__(self, device, config_entry, switchid, **kwargs):
"""Initialize a new LocaltuyaCover."""
super().__init__(device, config_entry, switchid, _LOGGER, **kwargs)
commands_set = DEFAULT_COMMANDS_SET
if self.has_config(CONF_COMMANDS_SET):
commands_set = self._config[CONF_COMMANDS_SET]
self._open_cmd = commands_set.split("_")[0]
self._close_cmd = commands_set.split("_")[1]
self._stop_cmd = commands_set.split("_")[2]
self._timer_start = time.time()
self._state = self._stop_cmd
self._previous_state = self._state
self._current_cover_position = 0
_LOGGER.debug("Initialized cover [%s]", self.name)

@property
def supported_features(self):
"""Flag supported features."""
supported_features = SUPPORT_OPEN | SUPPORT_CLOSE | SUPPORT_STOP
if self._config[CONF_POSITIONING_MODE] != COVER_MODE_NONE:
supported_features = supported_features | SUPPORT_SET_POSITION
return supported_features

@property
def current_cover_position(self):
"""Return current cover position in percent."""
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_NONE:
return None
return self._current_cover_position

@property
def is_opening(self):
"""Return if cover is opening."""
state = self._state
return state == self._open_cmd

@property
def is_closing(self):
"""Return if cover is closing."""
state = self._state
return state == self._close_cmd

@property
def is_closed(self):
"""Return if the cover is closed or not."""
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_NONE:
return None

if self._current_cover_position == 0:
return True
if self._current_cover_position == 100:
return False
return None

async def async_set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
self.debug("Setting cover position: %r", kwargs[ATTR_POSITION])
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
newpos = float(kwargs[ATTR_POSITION])

currpos = self.current_cover_position
posdiff = abs(newpos - currpos)
mydelay = posdiff / 100.0 * self._config[CONF_SPAN_TIME]
if newpos > currpos:
self.debug("Opening to %f: delay %f", newpos, mydelay)
await self.async_open_cover()
else:
self.debug("Closing to %f: delay %f", newpos, mydelay)
await self.async_close_cover()
self.hass.async_create_task(self.async_stop_after_timeout(mydelay))
self.debug("Done")

elif self._config[CONF_POSITIONING_MODE] == COVER_MODE_POSITION:
converted_position = int(kwargs[ATTR_POSITION])
if self._config[CONF_POSITION_INVERTED]:
converted_position = 100 - converted_position

if 0 <= converted_position <= 100 and self.has_config(CONF_SET_POSITION_DP):
await self._device.set_dp(
converted_position, self._config[CONF_SET_POSITION_DP]
)

async def async_stop_after_timeout(self, delay_sec):
"""Stop the cover if timeout (max movement span) occurred."""
await asyncio.sleep(delay_sec)
await self.async_stop_cover()

async def async_open_cover(self, **kwargs):
"""Open the cover."""
self.debug("Launching command %s to cover ", self._open_cmd)
await self._device.set_dp(self._open_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self.hass.async_create_task(
self.async_stop_after_timeout(
self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)

async def async_close_cover(self, **kwargs):
"""Close cover."""
self.debug("Launching command %s to cover ", self._close_cmd)
await self._device.set_dp(self._close_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self.hass.async_create_task(
self.async_stop_after_timeout(
self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)

async def async_stop_cover(self, **kwargs):
"""Stop the cover."""
self.debug("Launching command %s to cover ", self._stop_cmd)
await self._device.set_dp(self._stop_cmd, self._dp_id)

def status_restored(self, stored_state):
"""Restore the last stored cover status."""
if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
stored_pos = stored_state.attributes.get("current_position")
if stored_pos is not None:
self._current_cover_position = stored_pos
self.debug("Restored cover position %s", self._current_cover_position)

def status_updated(self):
"""Device status was updated."""
self._previous_state = self._state
self._state = self.dps(self._dp_id)
if self._state.isupper():
self._open_cmd = self._open_cmd.upper()
self._close_cmd = self._close_cmd.upper()
self._stop_cmd = self._stop_cmd.upper()

if self.has_config(CONF_CURRENT_POSITION_DP):
curr_pos = self.dps_conf(CONF_CURRENT_POSITION_DP)
if self._config[CONF_POSITION_INVERTED]:
self._current_cover_position = 100 - curr_pos
else:
self._current_cover_position = curr_pos
if (
self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED
and self._state != self._previous_state
):
if self._previous_state != self._stop_cmd:
# the state has changed, and the cover was moving
time_diff = time.time() - self._timer_start
pos_diff = round(time_diff / self._config[CONF_SPAN_TIME] * 100.0)
if self._previous_state == self._close_cmd:
pos_diff = -pos_diff
self._current_cover_position = min(
100, max(0, self._current_cover_position + pos_diff)
)

change = "stopped" if self._state == self._stop_cmd else "inverted"
self.debug(
"Movement %s after %s sec., position difference %s",
change,
time_diff,
pos_diff,
)

# store the time of the last movement change
self._timer_start = time.time()

# Keep record in last_state as long as not during connection/re-connection,
# as last state will be used to restore the previous state
if (self._state is not None) and (not self._device.is_connecting):
self._last_state = self._state


async_setup_entry = partial(async_setup_entry, DOMAIN, LocaltuyaCover, flow_schema)


Hoffe es kann mir jemand weiterhelfen.

Danke Basti
 
Code:
"""Platform to locally control Tuya-based cover devices."""
import asyncio
import logging
import time
from functools import partial

import voluptuous as vol
from homeassistant.components.cover import (
    ATTR_POSITION,
    DOMAIN,
    SUPPORT_CLOSE,
    SUPPORT_OPEN,
    SUPPORT_SET_POSITION,
    SUPPORT_STOP,
    CoverEntity,
)

from .common import LocalTuyaEntity, async_setup_entry
from .const import (
    CONF_COMMANDS_SET,
    CONF_CURRENT_POSITION_DP,
    CONF_POSITION_INVERTED,
    CONF_POSITIONING_MODE,
    CONF_SET_POSITION_DP,
    CONF_SPAN_TIME,
)

_LOGGER = logging.getLogger(__name__)

COVER_ONOFF_CMDS = "on_off_stop"
COVER_OPENCLOSE_CMDS = "open_close_stop"
COVER_FZZZ_CMDS = "fz_zz_stop"
COVER_12_CMDS = "1_2_3"
COVER_MODE_NONE = "none"
COVER_MODE_POSITION = "position"
COVER_MODE_TIMED = "timed"
COVER_TIMEOUT_TOLERANCE = 3.0

DEFAULT_COMMANDS_SET = COVER_ONOFF_CMDS
DEFAULT_POSITIONING_MODE = COVER_MODE_NONE
DEFAULT_SPAN_TIME = 25.0


def flow_schema(dps):
    """Return schema used in config flow."""
    return {
        vol.Optional(CONF_COMMANDS_SET): vol.In(
            [COVER_ONOFF_CMDS, COVER_OPENCLOSE_CMDS, COVER_FZZZ_CMDS, COVER_12_CMDS]
        ),
        vol.Optional(CONF_POSITIONING_MODE, default=DEFAULT_POSITIONING_MODE): vol.In(
            [COVER_MODE_NONE, COVER_MODE_POSITION, COVER_MODE_TIMED]
        ),
        vol.Optional(CONF_CURRENT_POSITION_DP): vol.In(dps),
        vol.Optional(CONF_SET_POSITION_DP): vol.In(dps),
        vol.Optional(CONF_POSITION_INVERTED, default=False): bool,
        vol.Optional(CONF_SPAN_TIME, default=DEFAULT_SPAN_TIME): vol.All(
            vol.Coerce(float), vol.Range(min=1.0, max=300.0)
        ),
    }


class LocaltuyaCover(LocalTuyaEntity, CoverEntity):
    """Tuya cover device."""

    def __init__(self, device, config_entry, switchid, **kwargs):
        """Initialize a new LocaltuyaCover."""
        super().__init__(device, config_entry, switchid, _LOGGER, **kwargs)
        commands_set = DEFAULT_COMMANDS_SET
        if self.has_config(CONF_COMMANDS_SET):
            commands_set = self._config[CONF_COMMANDS_SET]
        self._open_cmd = commands_set.split("_")[0]
        self._close_cmd = commands_set.split("_")[1]
        self._stop_cmd = commands_set.split("_")[2]
        self._timer_start = time.time()
        self._state = self._stop_cmd
        self._previous_state = self._state
        self._current_cover_position = 0
        _LOGGER.debug("Initialized cover [%s]", self.name)

    @property
    def supported_features(self):
        """Flag supported features."""
        supported_features = SUPPORT_OPEN | SUPPORT_CLOSE | SUPPORT_STOP
        if self._config[CONF_POSITIONING_MODE] != COVER_MODE_NONE:
            supported_features = supported_features | SUPPORT_SET_POSITION
        return supported_features

    @property
    def current_cover_position(self):
        """Return current cover position in percent."""
        if self._config[CONF_POSITIONING_MODE] == COVER_MODE_NONE:
            return None
        return self._current_cover_position

    @property
    def is_opening(self):
        """Return if cover is opening."""
        state = self._state
        return state == self._open_cmd

    @property
    def is_closing(self):
        """Return if cover is closing."""
        state = self._state
        return state == self._close_cmd

    @property
    def is_closed(self):
        """Return if the cover is closed or not."""
        if self._config[CONF_POSITIONING_MODE] == COVER_MODE_NONE:
            return None

        if self._current_cover_position == 0:
            return True
        if self._current_cover_position == 100:
            return False
        return None

    async def async_set_cover_position(self, **kwargs):
        """Move the cover to a specific position."""
        self.debug("Setting cover position: %r", kwargs[ATTR_POSITION])
        if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
            newpos = float(kwargs[ATTR_POSITION])

            currpos = self.current_cover_position
            posdiff = abs(newpos - currpos)
            mydelay = posdiff / 100.0 * self._config[CONF_SPAN_TIME]
            if newpos > currpos:
                self.debug("Opening to %f: delay %f", newpos, mydelay)
                await self.async_open_cover()
            else:
                self.debug("Closing to %f: delay %f", newpos, mydelay)
                await self.async_close_cover()
            self.hass.async_create_task(self.async_stop_after_timeout(mydelay))
            self.debug("Done")

        elif self._config[CONF_POSITIONING_MODE] == COVER_MODE_POSITION:
            converted_position = int(kwargs[ATTR_POSITION])
            if self._config[CONF_POSITION_INVERTED]:
                converted_position = 100 - converted_position

            if 0 <= converted_position <= 100 and self.has_config(CONF_SET_POSITION_DP):
                await self._device.set_dp(
                    converted_position, self._config[CONF_SET_POSITION_DP]
                )

    async def async_stop_after_timeout(self, delay_sec):
        """Stop the cover if timeout (max movement span) occurred."""
        await asyncio.sleep(delay_sec)
        await self.async_stop_cover()

    async def async_open_cover(self, **kwargs):
        """Open the cover."""
        self.debug("Launching command %s to cover ", self._open_cmd)
        await self._device.set_dp(self._open_cmd, self._dp_id)
        if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
            # for timed positioning, stop the cover after a full opening timespan
            # instead of waiting the internal timeout
            self.hass.async_create_task(
                self.async_stop_after_timeout(
                    self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
                )
            )

    async def async_close_cover(self, **kwargs):
        """Close cover."""
        self.debug("Launching command %s to cover ", self._close_cmd)
        await self._device.set_dp(self._close_cmd, self._dp_id)
        if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
            # for timed positioning, stop the cover after a full opening timespan
            # instead of waiting the internal timeout
            self.hass.async_create_task(
                self.async_stop_after_timeout(
                    self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
                )
            )

    async def async_stop_cover(self, **kwargs):
        """Stop the cover."""
        self.debug("Launching command %s to cover ", self._stop_cmd)
        await self._device.set_dp(self._stop_cmd, self._dp_id)

    def status_restored(self, stored_state):
        """Restore the last stored cover status."""
        if self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED:
            stored_pos = stored_state.attributes.get("current_position")
            if stored_pos is not None:
                self._current_cover_position = stored_pos
                self.debug("Restored cover position %s", self._current_cover_position)

    def status_updated(self):
        """Device status was updated."""
        self._previous_state = self._state
        self._state = self.dps(self._dp_id)
        if self._state.isupper():
            self._open_cmd = self._open_cmd.upper()
            self._close_cmd = self._close_cmd.upper()
            self._stop_cmd = self._stop_cmd.upper()

        if self.has_config(CONF_CURRENT_POSITION_DP):
            curr_pos = self.dps_conf(CONF_CURRENT_POSITION_DP)
            if self._config[CONF_POSITION_INVERTED]:
                self._current_cover_position = 100 - curr_pos
            else:
                self._current_cover_position = curr_pos
        if (
            self._config[CONF_POSITIONING_MODE] == COVER_MODE_TIMED
            and self._state != self._previous_state
        ):
            if self._previous_state != self._stop_cmd:
                # the state has changed, and the cover was moving
                time_diff = time.time() - self._timer_start
                pos_diff = round(time_diff / self._config[CONF_SPAN_TIME] * 100.0)
                if self._previous_state == self._close_cmd:
                    pos_diff = -pos_diff
                self._current_cover_position = min(
                    100, max(0, self._current_cover_position + pos_diff)
                )

                change = "stopped" if self._state == self._stop_cmd else "inverted"
                self.debug(
                    "Movement %s after %s sec., position difference %s",
                    change,
                    time_diff,
                    pos_diff,
                )

            # store the time of the last movement change
            self._timer_start = time.time()

        # Keep record in last_state as long as not during connection/re-connection,
        # as last state will be used to restore the previous state
        if (self._state is not None) and (not self._device.is_connecting):
            self._last_state = self._state


async_setup_entry = partial(async_setup_entry, DOMAIN, LocaltuyaCover, flow_schema)
 
Ich habe gelesen das es bei vielen Integrationen dieses Problem 0/100 vertauscht, gibt.
Deshalb haben die meisten einen Parameter zum invertieren.
Schau mal in der Doku nach
 

Zurzeit aktive Besucher

Keine Mitglieder online.

Letzte Anleitungen

Statistik des Forums

Themen
5.021
Beiträge
50.325
Mitglieder
4.712
Neuestes Mitglied
ChristianH
Zurück
Oben