LiquidGUI [1.3.0.0]
- Continued refactoring.
This commit is contained in:
@@ -0,0 +1,33 @@
|
||||
from interfaces.liquidctl_helper_interface import LiquidCTL_Helper_Interface
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
class LiquidCTL_Helper(LiquidCTL_Helper_Interface):
|
||||
device_name = str()
|
||||
device_temp = 0
|
||||
device_fanSpeed = 0
|
||||
device_pumpSpeed = 0
|
||||
device_fwVers = str()
|
||||
|
||||
devices = None
|
||||
|
||||
def ForceInit(self):
|
||||
NotImplemented
|
||||
|
||||
def TestConnectionState(self):
|
||||
output = subprocess.run(["liquidctl", "status"], stdout=subprocess.PIPE, universal_newlines=True)
|
||||
if len(output.stdout) > 0:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def Update(self):
|
||||
output = subprocess.run(["liquidctl", "status"], stdout=subprocess.PIPE, universal_newlines=True)
|
||||
self.device_name = str(re.search(r'^[^\n]*', output.stdout).group(0))
|
||||
self.device_temp = float(re.search(r'Liquid temperature\s+(\d+\.?\d*)', output.stdout).group(1))
|
||||
self.device_fanSpeed = int(re.search(r'Fan speed\s+(\d+)', output.stdout).group(1))
|
||||
self.device_pumpSpeed = int(re.search(r'Pump speed\s+(\d+)', output.stdout).group(1))
|
||||
|
||||
|
||||
def SetFanSpeed(self, speed):
|
||||
NotImplemented
|
||||
@@ -0,0 +1,50 @@
|
||||
from interfaces.liquidctl_helper_interface import LiquidCTL_Helper_Interface
|
||||
from liquidctl import find_liquidctl_devices, cli # type: ignore
|
||||
|
||||
class LiquidCTL_Helper(LiquidCTL_Helper_Interface):
|
||||
device_name = str()
|
||||
device_temp = 0
|
||||
device_fanSpeed = 0
|
||||
device_pumpSpeed = 0
|
||||
device_fwVers = str()
|
||||
|
||||
devices = find_liquidctl_devices()
|
||||
try:
|
||||
for dev in devices:
|
||||
with dev.connect():
|
||||
#print(f'{dev.description}')
|
||||
device_name = dev.description
|
||||
device_fwVers = ''.join(map(str, dev.firmware_version))
|
||||
except:
|
||||
pass
|
||||
|
||||
def ForceInit(self):
|
||||
init_status = self.dev.initialize()
|
||||
if init_status:
|
||||
for key, value, unit in init_status:
|
||||
#print(f'- {key}: {value} {unit}')
|
||||
device_fwVers = value
|
||||
|
||||
def TestConnectionState(self):
|
||||
if self.device_name != None:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def Update(self):
|
||||
with self.dev.connect():
|
||||
status = self.dev.get_status()
|
||||
for key, value, unit in status:
|
||||
#print(f'- {key}: {value} {unit}')
|
||||
if key == "Liquid temperature":
|
||||
self.device_temp = value
|
||||
elif key == "Fan speed":
|
||||
self.device_fanSpeed = value
|
||||
elif key == "Pump speed":
|
||||
self.device_pumpSpeed = value
|
||||
|
||||
def SetFanSpeed(self, speed):
|
||||
with self.dev.connect():
|
||||
cli._device_set_speed(self.dev, args={"<temperature>": [],
|
||||
"<channel>": "fan",
|
||||
"<percentage>": [str(speed)]})
|
||||
Reference in New Issue
Block a user