File size: 4,392 Bytes
4bec42e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import json
import sys
from time import gmtime, localtime, time
import machine
import ntptime
import uos
import urequests
from machine import SPI, Pin
from sdcard import sdcard
from uio import StringIO
# # uses a more robust ntptime
# from lib.ntptime import ntptime
def get_traceback(err):
try:
with StringIO() as f: # type: ignore
sys.print_exception(err, f)
return f.getvalue()
except Exception as err2:
print(err2)
return f"Failed to extract file and line number due to {err2}.\nOriginal error: {err}" # noqa: E501
def initialize_sdcard(
spi_id=1,
cs_pin=15,
sck_pin=10,
mosi_pin=11,
miso_pin=12,
baudrate=1000000,
polarity=0,
phase=0,
bits=8,
firstbit=SPI.MSB,
verbose=True,
):
try:
cs = Pin(cs_pin, Pin.OUT)
spi = SPI(
spi_id,
baudrate=baudrate,
polarity=polarity,
phase=phase,
bits=bits,
firstbit=firstbit,
sck=Pin(sck_pin),
mosi=Pin(mosi_pin),
miso=Pin(miso_pin),
)
# Initialize SD card
sd = sdcard.SDCard(spi, cs)
vfs = uos.VfsFat(sd)
uos.mount(vfs, "/sd") # type: ignore
if verbose:
print("SD Card initialized successfully")
return True
except Exception as e:
if verbose:
print(get_traceback(e))
print("SD Card failed to initialize")
return False
def write_payload_backup(payload_data: str, fpath: str = "/sd/experiments.txt"):
payload = json.dumps(payload_data)
with open(fpath, "a") as file:
# line = ",".join([str(payload[key]) for key in payload.keys()])
file.write(f"{payload}\r\n")
def log_to_mongodb(
document: dict,
api_key: str,
url: str,
cluster_name: str,
database_name: str,
collection_name: str,
verbose: bool = True,
retries: int = 2,
):
# based on https://medium.com/@johnlpage/introduction-to-microcontrollers-and-the-pi-pico-w-f7a2d9ad1394
headers = {"api-key": api_key}
insertPayload = {
"dataSource": cluster_name,
"database": database_name,
"collection": collection_name,
"document": document,
}
if verbose:
print(f"sending document to {cluster_name}:{database_name}:{collection_name}")
for _ in range(retries):
response = None
if _ > 0:
print(f"retrying... ({_} of {retries})")
try:
response = urequests.post(url, headers=headers, json=insertPayload)
txt = str(response.text)
status_code = response.status_code
if verbose:
print(f"Response: ({status_code}), msg = {txt}")
if response.status_code == 201:
print("Added Successfully")
break
else:
print("Error")
# Always close response objects so we don't leak memory
response.close()
except Exception as e:
if response is not None:
response.close()
if _ == retries - 1:
raise e
else:
print(e)
def get_timestamp(timeout=2, return_str=False):
ntptime.timeout = timeout # type: ignore
time_int = ntptime.time()
utc_tuple = gmtime(time_int)
year, month, mday, hour, minute, second, weekday, yearday = utc_tuple
time_str = f"{year}-{month}-{mday} {hour:02}:{minute:02}:{second:02}"
if return_str:
return time_int, time_str
return time_int
def get_local_timestamp(return_str=False):
t = time()
year, month, mday, hour, minute, second, _, _ = localtime(t)
time_str = f"{year}-{month}-{mday} {hour:02}:{minute:02}:{second:02}"
if return_str:
return t, time_str
return t
def get_onboard_temperature(unit="K"):
sensor_temp = machine.ADC(4)
conversion_factor = 3.3 / (65535)
reading = sensor_temp.read_u16() * conversion_factor
celsius_degrees = 27 - (reading - 0.706) / 0.001721
if unit == "C":
return celsius_degrees
elif unit == "K":
return celsius_degrees + 273.15
elif unit == "F":
return celsius_degrees * 9 / 5 + 32
else:
raise ValueError("Invalid unit. Must be one of 'C', 'K', or 'F")
|