In this project, we will use the Raspberry Pi Pico W & DHT11 sensor to monitor temperature and humidity in the room and send the data to the Telegram app on our smartphone. We will also be able to use the app to turn on and off a device in the room using the bot. also, the temperature and humidity readings will be displayed on a 16×2 display.
Telegram Messenger can be used as an IoT platform for monitoring sensors, controlling actuators, and triggering cloud functions just like other popular IoT platforms such as Blynk, Arduino IoT Cloud, Ubidots, Thingspeak, and Firebase.
Telegram Bot API allows developers to create bots that can interact with users and automate different tasks, including controlling devices and receiving sensor data.
Required Material
- Raspberry Pi Pico W board
- DHT11 temperature and humidity sensor
- LED
- Breadboard
- Jumper wires
- Micro-USB cable
Schematic Diagram
Connect the DHT11 sensor to the Pico W board using jumper wires.
Connect the VCC pin of the sensor to the 3.3V pin of the Raspberry Pi Pico W board, GND to GND, and the data pin to the GP4 of the Pico W.
Connect the I2C LCD display to the Pico W board using jumper wires.
Connect SDA to GPIO2, SCL to GPIO3, VCC to 5V, and GND to GND of the Pico board.
Sensor/LCD | Raspberry Pi Pico W |
---|---|
DHT11 VCC | 3.3V |
DHT11 GND | GND |
DHT11 Data | GP4 |
LCD SDA | GPIO2 |
LCD SCL | GPIO3 |
LCD VCC | 5V |
LCD GND | GND |
Setting up Raspberry Pi Pico W
Download the Thonny IDE from the official website: https://thonny.org/ and install it on your computer. then connect the Raspberry Pi Pico W to your computer: check out our previous post for Getting Started With Raspberry Pi Pico With Thonny IDE
Setting up the Telegram bot
Create a Telegram account (if you haven’t already) and log in. Search for the BotFather bot:
In the search bar at the top of the Telegram app, type “BotFather”
Create a new bot
Send the message “/newbot” to the BotFather bot.
Follow the prompt to select the name and username for your bot. BotFather will give you an authorization token which we will need to work on. Please, keep it secret and secure.
MicroPython Source code – Temperature & Humidity Reading | Pico W + Telegram Bot
MicroPython code for reading temperature and humidity using the DHT11 sensor and sending the data to a Telegram bot.
The code is divided into 4 parts. The 1st one is i2c_lcd.py, the 2nd one is LCD_API.py, 3rd one is dht.py & the 4th one is utelegram.py and the last 5th one is main.py. 16×2 LCD, DHT11 sensor, and Telegram bot doesn’t behave directly as it requires few libraries.
Importing libraries
dht.py
The “dht” library for reading data from the DHT11 sensor.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import array import micropython import utime from machine import Pin from micropython import const class InvalidChecksum(Exception): pass class InvalidPulseCount(Exception): pass MAX_UNCHANGED = const(100) MIN_INTERVAL_US = const(200000) HIGH_LEVEL = const(50) EXPECTED_PULSES = const(84) class DHT11: _temperature: int _humidity: int def __init__(self, pin): self._pin = pin self._last_measure = utime.ticks_us() self._temperature = -1 self._humidity = -1 def measure(self): current_ticks = utime.ticks_us() if utime.ticks_diff(current_ticks, self._last_measure) < MIN_INTERVAL_US and ( self._temperature > -1 or self._humidity > -1 ): # Less than a second since last read, which is too soon according # to the datasheet return self._send_init_signal() pulses = self._capture_pulses() buffer = self._convert_pulses_to_buffer(pulses) self._verify_checksum(buffer) self._humidity = buffer[0] + buffer[1] / 10 self._temperature = buffer[2] + buffer[3] / 10 self._last_measure = utime.ticks_us() @property def humidity(self): self.measure() return self._humidity @property def temperature(self): self.measure() return self._temperature def _send_init_signal(self): self._pin.init(Pin.OUT, Pin.PULL_DOWN) self._pin.value(1) utime.sleep_ms(50) self._pin.value(0) utime.sleep_ms(18) @micropython.native def _capture_pulses(self): pin = self._pin pin.init(Pin.IN, Pin.PULL_UP) val = 1 idx = 0 transitions = bytearray(EXPECTED_PULSES) unchanged = 0 timestamp = utime.ticks_us() while unchanged < MAX_UNCHANGED: if val != pin.value(): if idx >= EXPECTED_PULSES: raise InvalidPulseCount( "Got more than {} pulses".format(EXPECTED_PULSES) ) now = utime.ticks_us() transitions[idx] = now - timestamp timestamp = now idx += 1 val = 1 - val unchanged = 0 else: unchanged += 1 pin.init(Pin.OUT, Pin.PULL_DOWN) if idx != EXPECTED_PULSES: raise InvalidPulseCount( "Expected {} but got {} pulses".format(EXPECTED_PULSES, idx) ) return transitions[4:] def _convert_pulses_to_buffer(self, pulses): """Convert a list of 80 pulses into a 5 byte buffer The resulting 5 bytes in the buffer will be: 0: Integral relative humidity data 1: Decimal relative humidity data 2: Integral temperature data 3: Decimal temperature data 4: Checksum """ # Convert the pulses to 40 bits binary = 0 for idx in range(0, len(pulses), 2): binary = binary << 1 | int(pulses[idx] > HIGH_LEVEL) # Split into 5 bytes buffer = array.array("B") for shift in range(4, -1, -1): buffer.append(binary >> shift * 8 & 0xFF) return buffer def _verify_checksum(self, buffer): # Calculate checksum checksum = 0 for buf in buffer[0:4]: checksum += buf if checksum & 0xFF != buffer[4]: raise InvalidChecksum() |
i2c_lcd.py
This file contains the driver code for 16×2 LCD display.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
import utime import gc from lcd_api import LcdApi from machine import I2C # PCF8574 pin definitions MASK_RS = 0x01 # P0 MASK_RW = 0x02 # P1 MASK_E = 0x04 # P2 SHIFT_BACKLIGHT = 3 # P3 SHIFT_DATA = 4 # P4-P7 class I2cLcd(LcdApi): #Implements a HD44780 character LCD connected via PCF8574 on I2C def __init__(self, i2c, i2c_addr, num_lines, num_columns): self.i2c = i2c self.i2c_addr = i2c_addr self.i2c.writeto(self.i2c_addr, bytes([0])) utime.sleep_ms(20) # Allow LCD time to powerup # Send reset 3 times self.hal_write_init_nibble(self.LCD_FUNCTION_RESET) utime.sleep_ms(5) # Need to delay at least 4.1 msec self.hal_write_init_nibble(self.LCD_FUNCTION_RESET) utime.sleep_ms(1) self.hal_write_init_nibble(self.LCD_FUNCTION_RESET) utime.sleep_ms(1) # Put LCD into 4-bit mode self.hal_write_init_nibble(self.LCD_FUNCTION) utime.sleep_ms(1) LcdApi.__init__(self, num_lines, num_columns) cmd = self.LCD_FUNCTION if num_lines > 1: cmd |= self.LCD_FUNCTION_2LINES self.hal_write_command(cmd) gc.collect() def hal_write_init_nibble(self, nibble): # Writes an initialization nibble to the LCD. # This particular function is only used during initialization. byte = ((nibble >> 4) & 0x0f) << SHIFT_DATA self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E])) self.i2c.writeto(self.i2c_addr, bytes([byte])) gc.collect() def hal_backlight_on(self): # Allows the hal layer to turn the backlight on self.i2c.writeto(self.i2c_addr, bytes([1 << SHIFT_BACKLIGHT])) gc.collect() def hal_backlight_off(self): #Allows the hal layer to turn the backlight off self.i2c.writeto(self.i2c_addr, bytes([0])) gc.collect() def hal_write_command(self, cmd): # Write a command to the LCD. Data is latched on the falling edge of E. byte = ((self.backlight << SHIFT_BACKLIGHT) | (((cmd >> 4) & 0x0f) << SHIFT_DATA)) self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E])) self.i2c.writeto(self.i2c_addr, bytes([byte])) byte = ((self.backlight << SHIFT_BACKLIGHT) | ((cmd & 0x0f) << SHIFT_DATA)) self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E])) self.i2c.writeto(self.i2c_addr, bytes([byte])) if cmd <= 3: # The home and clear commands require a worst case delay of 4.1 msec utime.sleep_ms(5) gc.collect() def hal_write_data(self, data): # Write data to the LCD. Data is latched on the falling edge of E. byte = (MASK_RS | (self.backlight << SHIFT_BACKLIGHT) | (((data >> 4) & 0x0f) << SHIFT_DATA)) self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E])) self.i2c.writeto(self.i2c_addr, bytes([byte])) byte = (MASK_RS | (self.backlight << SHIFT_BACKLIGHT) | ((data & 0x0f) << SHIFT_DATA)) self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E])) self.i2c.writeto(self.i2c_addr, bytes([byte])) gc.collect() |
lcd_api.py
This “lcd_api” file operates its process to initialize the LCD display and write text to it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
"""Provides an API for talking to HD44780 compatible character LCDs.""" import time class LcdApi: """Implements the API for talking with HD44780 compatible character LCDs. This class only knows what commands to send to the LCD, and not how to get them to the LCD. It is expected that a derived class will implement the hal_xxx functions. """ # The following constant names were lifted from the avrlib lcd.h # header file, however, I changed the definitions from bit numbers # to bit masks. # # HD44780 LCD controller command set LCD_CLR = 0x01 # DB0: clear display LCD_HOME = 0x02 # DB1: return to home position LCD_ENTRY_MODE = 0x04 # DB2: set entry mode LCD_ENTRY_INC = 0x02 # --DB1: increment LCD_ENTRY_SHIFT = 0x01 # --DB0: shift LCD_ON_CTRL = 0x08 # DB3: turn lcd/cursor on LCD_ON_DISPLAY = 0x04 # --DB2: turn display on LCD_ON_CURSOR = 0x02 # --DB1: turn cursor on LCD_ON_BLINK = 0x01 # --DB0: blinking cursor LCD_MOVE = 0x10 # DB4: move cursor/display LCD_MOVE_DISP = 0x08 # --DB3: move display (0-> move cursor) LCD_MOVE_RIGHT = 0x04 # --DB2: move right (0-> left) LCD_FUNCTION = 0x20 # DB5: function set LCD_FUNCTION_8BIT = 0x10 # --DB4: set 8BIT mode (0->4BIT mode) LCD_FUNCTION_2LINES = 0x08 # --DB3: two lines (0->one line) LCD_FUNCTION_10DOTS = 0x04 # --DB2: 5x10 font (0->5x7 font) LCD_FUNCTION_RESET = 0x30 # See "Initializing by Instruction" section LCD_CGRAM = 0x40 # DB6: set CG RAM address LCD_DDRAM = 0x80 # DB7: set DD RAM address LCD_RS_CMD = 0 LCD_RS_DATA = 1 LCD_RW_WRITE = 0 LCD_RW_READ = 1 def __init__(self, num_lines, num_columns): self.num_lines = num_lines if self.num_lines > 4: self.num_lines = 4 self.num_columns = num_columns if self.num_columns > 40: self.num_columns = 40 self.cursor_x = 0 self.cursor_y = 0 self.implied_newline = False self.backlight = True self.display_off() self.backlight_on() self.clear() self.hal_write_command(self.LCD_ENTRY_MODE | self.LCD_ENTRY_INC) self.hide_cursor() self.display_on() def clear(self): """Clears the LCD display and moves the cursor to the top left corner. """ self.hal_write_command(self.LCD_CLR) self.hal_write_command(self.LCD_HOME) self.cursor_x = 0 self.cursor_y = 0 def show_cursor(self): """Causes the cursor to be made visible.""" self.hal_write_command(self.LCD_ON_CTRL | self.LCD_ON_DISPLAY | self.LCD_ON_CURSOR) def hide_cursor(self): """Causes the cursor to be hidden.""" self.hal_write_command(self.LCD_ON_CTRL | self.LCD_ON_DISPLAY) def blink_cursor_on(self): """Turns on the cursor, and makes it blink.""" self.hal_write_command(self.LCD_ON_CTRL | self.LCD_ON_DISPLAY | self.LCD_ON_CURSOR | self.LCD_ON_BLINK) def blink_cursor_off(self): """Turns on the cursor, and makes it no blink (i.e. be solid).""" self.hal_write_command(self.LCD_ON_CTRL | self.LCD_ON_DISPLAY | self.LCD_ON_CURSOR) def display_on(self): """Turns on (i.e. unblanks) the LCD.""" self.hal_write_command(self.LCD_ON_CTRL | self.LCD_ON_DISPLAY) def display_off(self): """Turns off (i.e. blanks) the LCD.""" self.hal_write_command(self.LCD_ON_CTRL) def backlight_on(self): """Turns the backlight on. This isn't really an LCD command, but some modules have backlight controls, so this allows the hal to pass through the command. """ self.backlight = True self.hal_backlight_on() def backlight_off(self): """Turns the backlight off. This isn't really an LCD command, but some modules have backlight controls, so this allows the hal to pass through the command. """ self.backlight = False self.hal_backlight_off() def move_to(self, cursor_x, cursor_y): """Moves the cursor position to the indicated position. The cursor position is zero based (i.e. cursor_x == 0 indicates first column). """ self.cursor_x = cursor_x self.cursor_y = cursor_y addr = cursor_x & 0x3f if cursor_y & 1: addr += 0x40 # Lines 1 & 3 add 0x40 if cursor_y & 2: # Lines 2 & 3 add number of columns addr += self.num_columns self.hal_write_command(self.LCD_DDRAM | addr) def putchar(self, char): """Writes the indicated character to the LCD at the current cursor position, and advances the cursor by one position. """ if char == '\n': if self.implied_newline: # self.implied_newline means we advanced due to a wraparound, # so if we get a newline right after that we ignore it. pass else: self.cursor_x = self.num_columns else: self.hal_write_data(ord(char)) self.cursor_x += 1 if self.cursor_x >= self.num_columns: self.cursor_x = 0 self.cursor_y += 1 self.implied_newline = (char != '\n') if self.cursor_y >= self.num_lines: self.cursor_y = 0 self.move_to(self.cursor_x, self.cursor_y) def putstr(self, string): """Write the indicated string to the LCD at the current cursor position and advances the cursor position appropriately. """ for char in string: self.putchar(char) def custom_char(self, location, charmap): """Write a character to one of the 8 CGRAM locations, available as chr(0) through chr(7). """ location &= 0x7 self.hal_write_command(self.LCD_CGRAM | (location << 3)) self.hal_sleep_us(40) for i in range(8): self.hal_write_data(charmap[i]) self.hal_sleep_us(40) self.move_to(self.cursor_x, self.cursor_y) def hal_backlight_on(self): """Allows the hal layer to turn the backlight on. If desired, a derived HAL class will implement this function. """ pass def hal_backlight_off(self): """Allows the hal layer to turn the backlight off. If desired, a derived HAL class will implement this function. """ pass def hal_write_command(self, cmd): """Write a command to the LCD. It is expected that a derived HAL class will implement this function. """ raise NotImplementedError def hal_write_data(self, data): """Write data to the LCD. It is expected that a derived HAL class will implement this function. """ raise NotImplementedError def hal_sleep_us(self, usecs): """Sleep for some time (given in microseconds).""" time.sleep_us(usecs) |
utelegram.py
The “utelegram” library for sending data to the Telegram bot.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import time import gc import ujson import urequests class ubot: def __init__(self, token, offset=0): self.url = 'https://api.telegram.org/bot' + token self.commands = {} self.default_handler = None self.message_offset = offset self.sleep_btw_updates = 3 messages = self.read_messages() if messages: if self.message_offset==0: self.message_offset = messages[-1]['update_id'] else: for message in messages: if message['update_id'] >= self.message_offset: self.message_offset = message['update_id'] break def send(self, chat_id, text): data = {'chat_id': chat_id, 'text': text} try: headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} response = urequests.post(self.url + '/sendMessage', json=data, headers=headers) response.close() return True except: return False def read_messages(self): result = [] self.query_updates = { 'offset': self.message_offset + 1, 'limit': 1, 'timeout': 30, 'allowed_updates': ['message']} try: update_messages = urequests.post(self.url + '/getUpdates', json=self.query_updates).json() if 'result' in update_messages: for item in update_messages['result']: result.append(item) return result except (ValueError): return None except (OSError): print("OSError: request timed out") return None def listen(self): while True: self.read_once() time.sleep(self.sleep_btw_updates) gc.collect() def read_once(self): messages = self.read_messages() if messages: if self.message_offset==0: self.message_offset = messages[-1]['update_id'] self.message_handler(messages[-1]) else: for message in messages: if message['update_id'] >= self.message_offset: self.message_offset = message['update_id'] self.message_handler(message) break def register(self, command, handler): self.commands[command] = handler def set_default_handler(self, handler): self.default_handler = handler def set_sleep_btw_updates(self, sleep_time): self.sleep_btw_updates = sleep_time def message_handler(self, message): if 'text' in message['message']: parts = message['message']['text'].split(' ') if parts[0] in self.commands: self.commands[parts[0]](message) else: if self.default_handler: self.default_handler(message) |
Main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
from machine import Pin, I2C from lcd_api import LcdApi from i2c_lcd import I2cLcd import time, network, utelegram, dht I2C_ADDR = 0x27 totalRows = 2 totalColumns = 16 # telegram API key telegram_api_key = "Your API Key" # I2C Pin Definitions led = Pin("LED", Pin.OUT) i2c = I2C(1, scl=Pin(3), sda=Pin(2), freq=100000) lcd = I2cLcd(i2c, I2C_ADDR, totalRows, totalColumns) lcd.clear() # dht11 Pin Definition d = dht.DHT11(machine.Pin(15)) # Wifi Credentials and Wifi Conenctions ssid = 'ESP Repeater' pswd = 'Antivirus' wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(ssid, pswd) print("Connecting Wifi.", end='') lcd.putstr("Connecting WiFi") while not wlan.isconnected() and wlan.status() >= 0: print('.', end='') lcd.putstr(".") time.sleep(0.5) print('') print(wlan.ifconfig()) lcd.clear() lcd.putstr("WiFi Connected") # Telegram default callback def get_message(message): lcd.clear() lcd.putstr(message['message']['text']) bot.send(message['message']['chat']['id'], message['message']['text'].upper()) # send PONG text as ping reply def reply_ping(message): print(message) bot.send(message['message']['chat']['id'], 'pong') # send humdity value as answer def hum_cb(message): d.measure() lcd.clear() lcd.putstr("Send Hum: " + str(d.humidity())) # print(message) bot.send(message['message']['chat']['id'], d.humidity()) # send temp value as answer def temp_cb(message): d.measure() lcd.clear() lcd.putstr("Send Temp: " + str(d.temperature())) # print(message) bot.send(message['message']['chat']['id'], d.temperature()) # change led status with given parameters in message text def led_cb(message): msg = message['message']['text'] msg_sp = msg.split(' ') print(msg_sp) if len(msg_sp) != 3: bot.send(message['message']['chat']['id'], "Yanlis Format /led 1 1") return if msg_sp[1] == '1': if msg_sp[2] == '1': led.on() bot.send(message['message']['chat']['id'], "LED YANDI") else: led.off() bot.send(message['message']['chat']['id'], "LED SONDU") # clear lcd screen with mesasge def clear_cb(message): lcd.clear() # start telegram bot bot = utelegram.ubot(telegram_api_key) bot.register('/ping', reply_ping) # ping message callback bot.register('/led', led_cb) # led message callback bot.register('/temp', temp_cb) # temp message callback bot.register('/hum', hum_cb) # humdity message callback bot.register('/clear', clear_cb) # clear screen callbacj bot.set_default_handler(get_message) # default message callback print('BOT LISTENING') lcd.clear() lcd.putstr("BOT LISTENING") bot.listen() |
Now click on the Run button to run all the libraries and the main files.
Working
After uploading the code to the Raspberry Pi Pico W board, open the Telegram application on our phone.
If we send the commands /temp to the Telegram bot, we will receive the temperature. If we send /humid, we will receive the percentage of humidity. and also Displayed on the LCD display. Additionally, we can control the onboard LED by sending the commands “led on” or “led off“. This will turn the LED on or off, respectively.
Very well explained and detailed article.
Hey i need some help in this project
i am getting an error while running main file
MPY: soft reboot
Traceback (most recent call last):
File “”, line 4, in
ImportError: no module named ‘network’
and also can’t figure out were to give that token.
please help..