Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active February 11, 2024 22:56
Show Gist options
  • Save anecdata/f46a1d07add5fc60cfbcf42dc7be6528 to your computer and use it in GitHub Desktop.
Save anecdata/f46a1d07add5fc60cfbcf42dc7be6528 to your computer and use it in GitHub Desktop.
CircuitPython example for Espressif ESP-NOW protocol
# SPDX-FileCopyrightText: 2023 anecdata
#
# SPDX-License-Identifier: MIT
import time
import traceback
import supervisor
import os
import rtc
import espnow
import espidf
import wifi
import socketpool
import adafruit_ntp
from sekrets import *
#### ESPNOW Receiver
TZ_DEFAULT = -5
SNDR_CH = 0 # channel 1 (unless connected to an AP or acting as an AP)
def struct_time_to_iso_time():
st = time.localtime()
tz = TZ_DEFAULT
return f"{st[0]:04d}-{st[1]:02d}-{st[2]:02d}T{st[3]:02d}:{st[4]:02d}:{st[5]:02d}{tz:+03}:00"
def connect():
try:
wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"), bssid=AP_BSSID) # use AP on channel <> 1
time.sleep(1) # wait for ap_info
print(f"{struct_time_to_iso_time()} ipv4={wifi.radio.ipv4_address} channel={wifi.radio.ap_info.channel} rssi={wifi.radio.ap_info.rssi}")
except ConnectionError as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
def ntp_to_rtc():
wifi.radio.enabled = True
connect()
try:
ntp = adafruit_ntp.NTP(pool, tz_offset=TZ_DEFAULT)
rtc.RTC().datetime = ntp.datetime
print(f"{struct_time_to_iso_time()} RTC time set with NTP time")
except Exception as e:
traceback.print_exception(e, e, e.__traceback__)
wifi.radio.enabled = False # lose the wifi channel
time.sleep(3) # wait for serial
print(f"{'='*25}")
pool = socketpool.SocketPool(wifi.radio)
ntp_to_rtc()
peers = [espnow.Peer(mac=SNDR_MAC, lmk=SNDR_LMK, encrypted=True, channel=SNDR_CH),]
while True:
with espnow.ESPNow() as e:
e.set_pmk(RCVR_PMK)
peers_report = ""
for peer in peers:
e.peers.append(peer)
peers_report += f"mac={peer.mac} lmk={peer.lmk} ch={peer.channel} if={peer.interface} enc={peer.encrypted}\n"
print(f"{'-'*25}\n{struct_time_to_iso_time()} Receiving...", end=" ")
while True:
if e:
try:
packet = e.read()
print(f"{packet}")
break
except ValueError as ex: # Invalid buffer
traceback.print_exception(ex, ex, ex.__traceback__)
supervisor.reload()
break
print(f"send=[{e.send_success} {e.send_failure}] read=[{e.read_success} {e.read_failure}] buf={e.buffer_size} phy={e.phy_rate} peers={peers_report}", end="")
@anecdata
Copy link
Author

anecdata commented Mar 29, 2023

espnow_sender.py

Alternate between:

  • monitor (random channels)
  • wifi with requests (not channel 1 here, but can be any channel)
  • ESP-NOW (default to channel 1):
# SPDX-FileCopyrightText: 2023 anecdata
#
# SPDX-License-Identifier: MIT

import time
import traceback
import random
import supervisor
import os
import rtc
import espnow
import espidf
import wifi
import socketpool
import ssl
import adafruit_ntp
import adafruit_requests

from sekrets import *


### ESPNOW Sender
TZ_DEFAULT = -5
URL = "http://wifitest.adafruit.com/testwifi/index.html"
RCVR_CH = 0  # channel 1 (unless connected to an AP or acting as an AP)


def struct_time_to_iso_time():
    st = time.localtime()
    tz = TZ_DEFAULT
    return f"{st[0]:04d}-{st[1]:02d}-{st[2]:02d}T{st[3]:02d}:{st[4]:02d}:{st[5]:02d}{tz:+03}:00"

def connect():
    try:
        wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"), bssid=AP_BSSID)  # use AP on channel <> 1
        time.sleep(1)  # wait for ap_info
        print(f"{struct_time_to_iso_time()} ipv4={wifi.radio.ipv4_address} channel={wifi.radio.ap_info.channel} rssi={wifi.radio.ap_info.rssi}")
    except ConnectionError as ex:
        traceback.print_exception(ex, ex, ex.__traceback__)

def ntp_to_rtc():
    wifi.radio.enabled = True
    connect()
    try:
        ntp = adafruit_ntp.NTP(pool, tz_offset=TZ_DEFAULT)
        rtc.RTC().datetime = ntp.datetime
        print(f"{struct_time_to_iso_time()} RTC time set with NTP time")
    except Exception as e:
        traceback.print_exception(e, e, e.__traceback__)
    wifi.radio.enabled = False  # lose the wifi channel

def do_monitor():
    c = random.randrange(1, 14)
    print(f"{'-'*25}\n{struct_time_to_iso_time()} Monitor starting on channel {c}...")
    m = wifi.Monitor(channel=c)
    time.sleep(1)
    received = m.packet()
    if received != {}:
        print(f"ch=qlen={m.queued()} lost={m.lost()} len={received[wifi.Packet.LEN]} ch={received[wifi.Packet.CH]} rssi={received[wifi.Packet.RSSI]}")
        print(f"raw={received[wifi.Packet.RAW]}")
    m.deinit()

def do_wifi():
    print(f"{'-'*25}\n{struct_time_to_iso_time()} Wi-Fi starting...")
    wifi.radio.enabled = True
    connect()
    try:
        with requests.get(URL) as r:
            print(f"{struct_time_to_iso_time()} {r.status_code} {r.reason.decode()} {r.headers}")
    except Exception as ex:
        traceback.print_exception(ex, ex, ex.__traceback__)
    wifi.radio.enabled = False  # lose the wifi channel

def do_espnow():
    print(f"{'-'*25}\n{struct_time_to_iso_time()} ESPNow sender starting...")
    with espnow.ESPNow() as e:
        e.set_pmk(SNDR_PMK)

        peers_report = ""
        for peer in peers:
            e.peers.append(peer)
            peers_report += f"mac={peer.mac} lmk={peer.lmk} ch={peer.channel} if={peer.interface} enc={peer.encrypted}\n"

        print(f"{struct_time_to_iso_time()} Sending...", end=" ")
        try:
            e.send(struct_time_to_iso_time().encode())
        except espidf.IDFError as ex:  # ESP-NOW error 0x306a
            traceback.print_exception(ex, ex, ex.__traceback__)
            supervisor.reload()
        print(f"send=[{e.send_success} {e.send_failure}] read=[{e.read_success} {e.read_failure}] buf={e.buffer_size} phy={e.phy_rate} peers={peers_report}", end="")


time.sleep(3)  # wait for serial
print(f"{'='*25}")
pool = socketpool.SocketPool(wifi.radio)
context = ssl.create_default_context()
requests = adafruit_requests.Session(pool, context)
ntp_to_rtc()

peers = [espnow.Peer(mac=RCVR_MAC, lmk=RCVR_LMK, encrypted=True, channel=RCVR_CH),]
while True:
    do_monitor()
    time.sleep(1)
    do_wifi()
    time.sleep(1)
    do_espnow()
    time.sleep(3)

@anecdata
Copy link
Author

anecdata commented Mar 29, 2023

Above code tested on:

Adafruit CircuitPython 8.1.0-beta.0-80-g22636e056 on 2023-03-29; Adafruit Feather ESP32-S2 TFT with ESP32S2

General comments on use of espnow and wifi:

Open questions:

  • e = espnow.ESPNow() results in RuntimeError: Already running if it was already done, but it's a Singleton?
  • is this the proper way to use the PMK and LMK (PMK is the same for all nodes; LMK can be unique per Sender-Receiver pair)?
  • trying to send on a channel other than the default 0 (becomes 1) results in ESP-NOW error 0x306a
  • when encryption is used, RSSI is always returned as0
  • ValueError: phy_rate must be 0-42 but should allow up to 54Mbps? 36 is allowed, but 24 is the highest that seems to get received.
  • using broadcast peer address results in ESP-NOW error 0x3066 on send
  • ESP-NOW error 0x306b if peer is added more than once?

@anecdata
Copy link
Author

anecdata commented Apr 2, 2023

It can typically take up to several milliseconds for e.send_success and e.send_failure to become valid. One way to handle that:

    try:
        e.send(f"{struct_time_to_iso_time().encode()}")

        delivery_time = 0
        start = time.monotonic_ns()
        while not e.send_success and not e.send_failure:
            if (delivery_time := time.monotonic_ns() - start) > SEND_TIMEOUT:
                break

@anecdata
Copy link
Author

anecdata commented Apr 5, 2023

Needs some parameter validation?

Adafruit CircuitPython 8.1.0-beta.0-80-g22636e056 on 2023-03-29; Adafruit Feather ESP32-S2 TFT with ESP32S2
>>> import time
>>> import espnow
>>> from sekrets import *
>>> 
>>> RCVR_CH = 0 
>>> 
>>> e = espnow.ESPNow()
>>> e.set_pmk(SYS_PMK)
>>> peer = espnow.Peer(mac=RCVR_MAC, lmk=LMK_181, encrypted=True, channel=RCVR_CH)
>>> e.peers.append(peer)
>>> 
>>> e.send(f"{'#'*250}")
>>> time.sleep(0.1)  # delivery confirmation takes non-zero time
>>> print(f"send=[{e.send_success} {e.send_failure}] read=[{e.read_success} {e.read_failure}]")
send=[1 0] read=[1 0]
>>> 
>>> e.send(f"{'#'*251}")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
espidf.IDFError: ESP-NOW error 0x306a
>>> time.sleep(0.1)  # delivery confirmation takes non-zero time
>>> print(f"send=[{e.send_success} {e.send_failure}] read=[{e.read_success} {e.read_failure}]")
send=[1 0] read=[1 0]
>>> 
>>> e.send(f"{'#'*250}")
>>> time.sleep(0.1)  # delivery confirmation takes non-zero time
>>> print(f"send=[{e.send_success} {e.send_failure}] read=[{e.read_success} {e.read_failure}]")
send=[2 0] read=[1 0]
>>> 

@anecdata
Copy link
Author

anecdata commented Apr 24, 2023

For ease of navigation...
Original Issue: adafruit/circuitpython#3999
Original PR: adafruit/circuitpython#7470
Issue filed for above comments: adafruit/circuitpython#7903

@tyeth
Copy link

tyeth commented Feb 11, 2024

Thanks for compiling/sharing this @anecdata 🥇 I hadn't seen the micropython notes on esp-now.

Updated link from Archive.org for micropython-espnow power related operation notes:
https://web.archive.org/web/20230520021624/https://micropython-glenn20.readthedocs.io/en/latest/library/espnow.html#espnow-and-wifi-operation

@anecdata
Copy link
Author

Thanks for the updated link @tyeth. CircuitPython does power management slightly differently adafruit/circuitpython#6976 which I think reduces complexity for ESP-Now. Channel I think is still a challenge to manage for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment