gabriel becker
2 years ago
12 changed files with 312 additions and 62 deletions
@ -0,0 +1,37 @@
|
||||
import time |
||||
from machine import Pin |
||||
import buzzer |
||||
from display import Display |
||||
import constants |
||||
|
||||
|
||||
SATURATED_MESSAGE = """Filtro saturado.""" |
||||
HOLE_MESSAGE = """Filtro ou linha \nfurado.""" |
||||
|
||||
class Alarm: |
||||
reset_btnn = Pin(constants.RESET_BUTTON_PIN, mode=Pin.IN) |
||||
@staticmethod |
||||
def trigger_saturated_alarm(): |
||||
Display.print(SATURATED_MESSAGE) |
||||
buzzer.sing_alarm() |
||||
while Alarm.reset_button_not_pressed(): |
||||
time.sleep(0.05) |
||||
Alarm.stop_alarm() |
||||
|
||||
@staticmethod |
||||
def trigger_hole_alarm(): |
||||
Display.print(HOLE_MESSAGE) |
||||
buzzer.sing_alarm() |
||||
while Alarm.reset_button_not_pressed(): |
||||
time.sleep(0.05) |
||||
Alarm.stop_alarm() |
||||
|
||||
@staticmethod |
||||
def stop_alarm(): |
||||
Display.clear() |
||||
buzzer.stop() |
||||
|
||||
@staticmethod |
||||
def reset_button_not_pressed(): |
||||
was_not_pressed = Alarm.reset_btnn.value() < 1 |
||||
return was_not_pressed |
@ -0,0 +1,22 @@
|
||||
from machine import Pin, PWM |
||||
import constants |
||||
import parameters |
||||
|
||||
|
||||
buzzer = PWM(Pin(constants.BUZZER_PIN)) |
||||
buzzer.duty_u16(0) |
||||
|
||||
def sing_alarm(): |
||||
if parameters.ANNOYING_BUZZER_ENABLED: |
||||
tone = constants.ANNOYTING_BUZZER_TONE |
||||
else: |
||||
tone = constants.NORMAL_BUZZER_TONE |
||||
buzzer.duty_u16(tone) |
||||
|
||||
|
||||
def stop(): |
||||
buzzer.duty_u16(0) |
||||
|
||||
|
||||
def sing_tone(tone): |
||||
buzzer.duty_u16(tone) |
@ -0,0 +1,15 @@
|
||||
"""RASPBERRY PICO PINS""" |
||||
"""PINOUTS""" |
||||
PRESSURE_SENSOR_PIN = 26 |
||||
HALL_SENSOR_PIN = 27 |
||||
RESET_BUTTON_PIN = 22 |
||||
BUZZER_PIN = 15 |
||||
LCD_SDA = 0 |
||||
LCD_SCL = 1 |
||||
|
||||
|
||||
"""FIXED NUMBERS""" |
||||
ANNOYTING_BUZZER_TONE = 51250 |
||||
NORMAL_BUZZER_TONE = 200 |
||||
ADC_RESOLUTION = 65535.0 |
||||
|
@ -0,0 +1,32 @@
|
||||
from machine import Pin, I2C |
||||
from pico_i2c_lcd import I2cLcd |
||||
import constants |
||||
|
||||
|
||||
class Display: |
||||
__i2c = I2C( |
||||
0, |
||||
sda=Pin(constants.LCD_SDA), |
||||
scl=Pin(constants.LCD_SCL), |
||||
freq=100000 |
||||
) |
||||
__I2C_ADDR = __i2c.scan()[0] |
||||
lcd = I2cLcd(__i2c, __I2C_ADDR, 2, 16) |
||||
|
||||
@staticmethod |
||||
def print_pressure(pressure: float): |
||||
str_pressure = "{:.3f}".format(pressure) |
||||
Display.lcd.putstr('Preassure: ' + str_pressure) |
||||
|
||||
@staticmethod |
||||
def print_frequency(frequency: float): |
||||
str_frequency = "{:.3f}".format(frequency) |
||||
Display.lcd.puts('Frequency: ' + str_frequency) |
||||
|
||||
@staticmethod |
||||
def print(content: str): |
||||
Display.lcd.putstr(content) |
||||
|
||||
@staticmethod |
||||
def clear(): |
||||
Display.lcd.clear() |
@ -0,0 +1,31 @@
|
||||
from machine import ADC, Pin |
||||
import time |
||||
import constants |
||||
|
||||
|
||||
ii = 0 |
||||
hall_sensor = Pin(constants.HALL_SENSOR_PIN, Pin.IN, Pin.PULL_UP) |
||||
previous_time = time.ticks_ms() |
||||
|
||||
|
||||
def hall_sensor_interruption(*args, **kargs): |
||||
global ii |
||||
ii += 1 |
||||
|
||||
|
||||
def start_hall_sensor(): |
||||
global previous_time |
||||
hall_sensor.irq( |
||||
handler=hall_sensor_interruption, |
||||
trigger=Pin.IRQ_RISING, |
||||
) |
||||
previous_time = time.ticks_ms() |
||||
time.sleep(0.05) |
||||
|
||||
|
||||
def get_rotation_frequency(): |
||||
global ii, previous_time |
||||
frequency = float(ii) / (time.ticks_ms() - previous_time) * 1e3 |
||||
previous_time = time.ticks_ms() |
||||
ii = 0 |
||||
return frequency |
@ -1,80 +1,30 @@
|
||||
|
||||
from machine import ADC, I2C, Pin, PWM |
||||
import time |
||||
from pico_i2c_lcd import I2cLcd |
||||
|
||||
pressure_sensor = ADC(Pin(26, mode=Pin.IN)) |
||||
hall_sensor = Pin(27, Pin.IN, Pin.PULL_UP) |
||||
btnn_reset = Pin(22, Pin.IN) |
||||
lcd = False |
||||
buzzer_satus = False |
||||
ii = 0 |
||||
|
||||
|
||||
def interruption_handler(*args, **kargs): |
||||
global ii |
||||
ii += 1 |
||||
|
||||
hall_sensor.irq( |
||||
handler=interruption_handler, |
||||
trigger=Pin.IRQ_RISING, |
||||
) |
||||
previous_time = time.ticks_ms() |
||||
import hall_sensor |
||||
import pressure_sensor |
||||
from display import Display |
||||
from states import StateMachine |
||||
|
||||
|
||||
def setup(): |
||||
global pressure_sensor, lcd, buzzer |
||||
i2c = I2C(0, sda=Pin(0), scl=Pin(1), freq=100000) |
||||
I2C_ADDR = i2c.scan()[0] |
||||
lcd = I2cLcd(i2c, I2C_ADDR, 2, 16) |
||||
buzzer = PWM(Pin(15)) |
||||
buzzer.duty_u16(0) |
||||
Display() |
||||
hall_sensor.start_hall_sensor() |
||||
|
||||
|
||||
def get_rotation_frequency(): |
||||
global ii, previous_time |
||||
frequency = float(ii) / (time.ticks_ms() - previous_time) * 1e3 |
||||
previous_time = time.ticks_ms() |
||||
ii = 0 |
||||
return frequency |
||||
|
||||
|
||||
def get_pressure(): |
||||
global pressure_sensor |
||||
pressure_measure = float(pressure_sensor.read_u16()) / 65535 * 3.3 |
||||
return pressure_measure |
||||
|
||||
|
||||
def loop(): |
||||
global lcd, buzzer, buzzer_satus |
||||
pressure_measure = get_pressure() |
||||
lcd.clear() |
||||
str_pressure = "{:.3f}".format(pressure_measure) |
||||
lcd.putstr('Preassure: ' + str_pressure + '\n') |
||||
frequency = get_rotation_frequency() |
||||
str_frequency = "{:.3f}".format(frequency) |
||||
lcd.putstr('Frequency: ' + str_frequency) |
||||
time.sleep(.07) |
||||
buzzer_satus = not buzzer_satus |
||||
if buzzer_satus: |
||||
buzzer.duty_u16(220) |
||||
else: |
||||
buzzer.duty_u16(0) |
||||
|
||||
StateMachine().start_task_loop() |
||||
|
||||
|
||||
def handle_error(): |
||||
global lcd |
||||
lcd.putstr('Exception') |
||||
Display.print('Exception') |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
setup() |
||||
try: |
||||
while(True): |
||||
while(True): |
||||
loop() |
||||
except: |
||||
handle_error() |
||||
while True: |
||||
pass |
||||
|
||||
raise |
||||
|
@ -0,0 +1,20 @@
|
||||
"""Percentage that the previous pressure can be lower than current one without triggering the hole alarm.""" |
||||
PRESSURE_TOLERANCE_PERCENTAGE = 10 |
||||
|
||||
"""Pressure saturation threshold in mV to trigger the saturation alarm.""" |
||||
PRESSURE_SATURATION_THRESHOLD = 2.8 |
||||
|
||||
"""Lower limit of frequency in Hz to be considered inside interest rotation interval. (type: float)""" |
||||
REFERENCE_ROTATION_DOWN_LIMIT = 250.0 |
||||
|
||||
"""Higher limit of frequency in Hz to be considered inside interest rotation interval. (type: float)""" |
||||
REFERENCE_ROTATION_UPPER_LIMIT = 270.0 |
||||
|
||||
"""Weather or not the buzzer should be set to an annoying tone (in oder to keep the mental healthness of the developer).""" |
||||
ANNOYING_BUZZER_ENABLED = False |
||||
|
||||
"""Period in milisseconds between each storage event.""" |
||||
PRESSURE_STORE_PREIOD = int(60*1e3) |
||||
|
||||
"""How many samples of pressure should be used in smoother.""" |
||||
PRESSURE_SMOOTH_LENGTH = 10 |
@ -0,0 +1,15 @@
|
||||
from machine import ADC, Pin |
||||
import constants |
||||
import parameters |
||||
|
||||
|
||||
pressure_sensor = ADC(Pin(constants.PRESSURE_SENSOR_PIN, mode=Pin.IN)) |
||||
|
||||
def get_pressure(): |
||||
global pressure_sensor |
||||
pressure_value = 0 |
||||
for i in range(parameters.PRESSURE_SMOOTH_LENGTH): |
||||
pressure_value += pressure_sensor.read_u16() |
||||
print(pressure_value / parameters.PRESSURE_SMOOTH_LENGTH) |
||||
pressure_measure = float(pressure_value) / constants.ADC_RESOLUTION * 3.3 |
||||
return pressure_measure |
@ -0,0 +1,70 @@
|
||||
import time |
||||
from display import Display |
||||
import hall_sensor |
||||
import pressure_sensor |
||||
import parameters |
||||
from alarm import Alarm |
||||
from storage import Storage |
||||
|
||||
|
||||
class StateMachine: |
||||
pressure= None |
||||
|
||||
def __init__(self) -> None: |
||||
Storage.enable_store_in_next_call() |
||||
self.__read_previous_pressure() |
||||
self.__measure_pressure() |
||||
|
||||
def start_task_loop(self): |
||||
time.sleep(0.02) |
||||
self.wait_for_correct_rotation_frequency() |
||||
self.handle_pressure() |
||||
|
||||
def wait_for_correct_rotation_frequency(self): |
||||
while self.__rotation_is_within_limits(): |
||||
print('waiting correct rotation') |
||||
time.sleep(0.02) |
||||
continue |
||||
|
||||
def __measure_pressure(self): |
||||
self.current_pressure = pressure_sensor.get_pressure() |
||||
|
||||
def handle_pressure(self): |
||||
self.__measure_pressure() |
||||
is_saturated = self.__pressure_is_higher_than_maximum_saturation() |
||||
if is_saturated: |
||||
Alarm.trigger_saturated_alarm() |
||||
else: |
||||
self.scan_holes_in_filter() |
||||
|
||||
def scan_holes_in_filter(self): |
||||
there_is_a_hole = self.__pressure_is_lower_than_before() |
||||
if there_is_a_hole: |
||||
Alarm.trigger_hole_alarm() |
||||
else: |
||||
self.__save_current_pressure() |
||||
|
||||
def __save_current_pressure(self): |
||||
Storage.save_pressure(self.current_pressure) |
||||
self.previous_pressure = self.current_pressure |
||||
|
||||
def __read_previous_pressure(self): |
||||
self.previous_pressure = Storage.retrieve_pressure() |
||||
|
||||
def __pressure_is_lower_than_before(self) -> bool: |
||||
pressure_is_lower = ( |
||||
self.current_pressure < |
||||
self.previous_pressure * (1 - parameters.PRESSURE_TOLERANCE_PERCENTAGE / 100) |
||||
) |
||||
return pressure_is_lower |
||||
|
||||
def __pressure_is_higher_than_maximum_saturation(self) -> bool: |
||||
pressure_is_not_lower = ( |
||||
self.current_pressure > parameters.PRESSURE_SATURATION_THRESHOLD |
||||
) |
||||
return pressure_is_not_lower |
||||
|
||||
def __rotation_is_within_limits(self) -> bool: |
||||
rotation = hall_sensor.get_rotation_frequency() |
||||
is_within_limits = parameters.REFERENCE_ROTATION_DOWN_LIMIT < rotation < parameters.REFERENCE_ROTATION_UPPER_LIMIT |
||||
return is_within_limits |
@ -0,0 +1,43 @@
|
||||
import json |
||||
from machine import Timer |
||||
import parameters |
||||
|
||||
|
||||
class __CONSTANTS: |
||||
PREVIOUS_PRESSURE = 'previous_pressure' |
||||
STORAGE_FILE_PATH = 'storage.json' |
||||
|
||||
|
||||
class Storage: |
||||
|
||||
__should_store = False |
||||
|
||||
@staticmethod |
||||
def enable_store_in_next_call(): |
||||
Storage.__should_store = True |
||||
|
||||
@staticmethod |
||||
def start_storage(): |
||||
def timer_call_back(): |
||||
Storage.enable_store_in_next_call() |
||||
tim = Timer() |
||||
tim.init(mode=Timer.PERIODIC, period=parameters.PRESSURE_STORE_PREIOD, callback=timer_call_back) |
||||
|
||||
|
||||
@staticmethod |
||||
def save_pressure(pressure: float): |
||||
if not Storage.__should_store: |
||||
return |
||||
content = { |
||||
__CONSTANTS.PREVIOUS_PRESSURE: pressure, |
||||
} |
||||
with open(__CONSTANTS.STORAGE_FILE_PATH, 'w') as f: |
||||
json.dump(content, f) |
||||
|
||||
@staticmethod |
||||
def retrieve_pressure() -> float: |
||||
pressure = None |
||||
with open(__CONSTANTS.STORAGE_FILE_PATH, 'r') as f: |
||||
content = json.load(f) |
||||
pressure = content[__CONSTANTS.PREVIOUS_PRESSURE] |
||||
return pressure |
Loading…
Reference in new issue