Source code for gps_serial

# The MIT License (MIT)
#
# Copyright (c) 2017 Damien P. George
# Copyright (c) 2019 Brendan Doherty
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
========================================
gps_serial
========================================

Yet another NMEA sentence parser for serial UART based GPS modules. This implements the threading module for [psuedo] asynchronous applications. CAUTION: The individual satelite info is being ignored until we decide to support capturing it from the GPS module's output.
"""
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/DVC-Viking-Robotics/GPS_Serial.git"
import time
import threading
from serial import Serial

DEFAULT_LOC = {'lat': 37.96713657090229, 'lng': -122.0712176165581}
"""The default/fallback location to use when waiting for a fix upon power-up
of GPS device. This has been hard-coded to DVC Engineering buildings'
courtyard."""

def _convert2deg(nmea):
    """VERY IMPORTANT needed to go from format 'ddmm.mmmm' into decimal degrees"""
    if nmea is None or len(nmea) < 3:
        return None
    nmea = float(nmea)
    return (nmea // 100) + (nmea - ((nmea // 100) * 100)) / 60

[docs]class GPSserial: """ :param int address: The serial port address that the GPS module is connected to. For example, on the raspberry pi's GPIO pins, this is ``/dev/ttyS0``; on windows, this is something like ``com#`` where # is designated by windows. :param int timeout: Specific number of seconds till the threading :class:`~serial.Serial`'s `~serial.Serial.read_until()` operation expires. Defaults to 1 second. :param int baud: The specific baudrate to be used for the serial connection. If left """ def __init__(self, address, timeout=1.0, baud=9600): self._ser = Serial(address=address, baud=baud, timeout=timeout) # print('Successfully opened port {} @ {} to Arduino device'.format(address, baud)) self._line = self._ser.read_until() # discard any garbage artifacts self._ser.close() self._gps_thread = None # print('Successfully opened port', address, 'to GPS module') self._lat = DEFAULT_LOC['lat'] self._lng = DEFAULT_LOC['lng'] self._utc = None self._line = "" self._speed = {"knots": 0.0, "kmph": 0.0} self._course = {"true": 0.0, "mag": 0.0} self._sat = {"connected": 0, "view": 0, "quality": "Fix Unavailable"} self._altitude = 0.0 # self.azimuth = 0.0 # self.elevation = 0.0 self._data_status = 'Data not valid' self._fix = "no Fix" self._rx_status = "unknown" self._pdop = 0.0 self._hdop = 0.0 self._vdop = 0.0 @property def lat(self): """This attribute holds the latitude coordinate that was most recently parsed from the GPS module's data output.""" return self._lat @property def lng(self): """This attribute holds the longitude coordinate that was most recently parsed from the GPS module's data output.""" return self._lng @property def utc(self): """This attribute holds a tuple of time & date data that was most recently parsed from the GPS module's data output. This tuple conforms with python's time module functions.""" return self._utc @property def speed_knots(self): """This attribute holds the speed (in nautical knots) that was most recently parsed from the GPS module's data output.""" return self._speed['knots'] @property def speed_kmph(self): """This attribute holds the speed (in kilometers per hour) that was most recently parsed from the GPS module's data output.""" return self._speed['kmph'] @property def sat_connected(self): """This attribute holds the number of connected GPS satelites that was most recently parsed from the GPS module's data output.""" return self._sat['connected'] @property def sat_view(self): """This attribute holds the number of GPS satelites in the module's view that was most recently parsed from the GPS module's data output.""" return self._sat['view'] @property def sat_quality(self): """This attribute holds the description of the GPS satelites' quality that was most recently parsed from the GPS module's data output.""" return self._sat['quality'] @property def course_true(self): """This attribute holds the course direction (in terms of "true north") that was most recently parsed from the GPS module's data output.""" return self._course['true'] @property def course_mag(self): """This attribute holds the course direction (in terms of "magnetic north") that was most recently parsed from the GPS module's data output.""" return self._course['mag'] @property def altitude(self): """This attribute holds the GPS antenna's altitude that was most recently parsed from the GPS module's data output.""" return self._altitude @property def fix(self): """This attribute holds the description of GPS module's fix quality that was most recently parsed from the GPS module's data output.""" return self._fix @property def data_status(self): """This attribute holds the GPS module's data authenticity that was most recently parsed from the GPS module's data output.""" return self._data_status @property def rx_status(self): """This attribute holds the GPS module's receiving status that was most recently parsed from the GPS module's data output.""" return self._rx_status @property def pdop(self): """This attribute holds the GPS module's positional dilution of percision that was most recently parsed from the GPS module's data output.""" return self._pdop @property def vdop(self): """This attribute holds the GPS module's vertical dilution of percision that was most recently parsed from the GPS module's data output.""" return self._vdop @property def hdop(self): """This attribute holds the GPS module's horizontal dilution of percision that was most recently parsed from the GPS module's data output.""" return self._hdop def _parse_line(self, string): found = False if string.find('GLL') != -1: found = True arr = string.rsplit(',')[1:] # it would probably be helpful to other location-based APIs to have the # corrdinates also saved in the original 'DDMM.SS [cardinal direction]' self._lat = _convert2deg(arr[0]) if arr[1] != 'N' and arr[1] is not None: self._lat *= -1 self._lng = _convert2deg(arr[2]) if arr[3] != 'E' and arr[3] is not None: self._lng *= -1.0 type_state = {'A': 'data valid', 'V': 'Data not valid'} self._data_status = type_state[arr[5]] elif string.find('VTG') != -1: arr = string.rsplit(',')[1:] if len(arr[0]) > 1: self._course["true"] = float(arr[0]) if len(arr[1]) > 1: self._course["mag"] = float(arr[1]) if len(arr[2]) > 1: self._speed["knots"] = float(arr[2]) if len(arr[3]) > 1: self._speed["kmph"] = float(arr[3]) elif string.find('GGA') != -1: type_state = [ "Fix Unavailable", "Valid Fix (SPS)", "Valid Fix (GPS)", "unknown1", "unknown2", "unknown3"] arr = string.rsplit(',')[1:] self._sat["quality"] = type_state[int(arr[5])] self._sat["view"] = int(arr[6]) if len(arr[8]) > 1: self._altitude = float(arr[8]) elif string.find('GSA') != -1: arr = string.rsplit(',')[1:] type_fix = ["No Fix", "2D", "3D"] self._fix = type_fix[int(arr[1]) - 1] self._pdop = float(arr[14]) self._hdop = float(arr[15]) self._vdop = float(arr[16][:-3]) elif string.find('RMC') != -1: status = {"V": "Warning", "A": "Valid"} arr = string.rsplit(',')[1:] self._rx_status = status[arr[1]] if len(arr[0]) > 1 and len(arr[8]) > 1: self._utc = time.struct_time((2000+int(arr[8][4:6]), int(arr[8][2:4]), int(arr[8][0:2]), int(arr[0][0:2]), int(arr[0][2:4]), int(arr[0][4:6]), 0, 0, -1)) elif string.find('GSV') != -1: arr = string.rsplit(',')[1:] self._sat['connected'] = arr[0] # ignoring data specific to individual satelites # self.elevation = int(arr[4]) # self.azimuth = int(arr[5]) # print('sat["view"]:', self.sat["connected"], 'elevation:', self.elevation, 'Azimuth:', self.azimuth) return found def _threaded_read(self, raw): with self._ser as ser: found = False while ser.in_waiting or not found: self._line = ser.read_until() try: self._line = str(self._line, 'ascii').strip() except UnicodeError: continue # there was undecernable garbage data that couldn't get encoded to ASCII if raw: print(self._line) # found = true if gps coordinates are captured found = self._parse_line(self._line)
[docs] def get_data(self, raw=False): """ This function only starts the process of parsing the data from a GPS module (if any). :param bool raw: `True` prints the raw data being parsed from the GPS module. `False` doesn't print the raw data. Defaults to `False`. :returns: the last latitude and longitude coordinates obtained from either object instantiation (`DEFAULT_LOC` values) or previously completed parsing of GPS data. """ if self._gps_thread is not None and not self._gps_thread.is_alive(): self._gps_thread.join() self._gps_thread = threading.Thread( target=self._threaded_read, args=[raw]) self._gps_thread.start() elif self._gps_thread is None: self._gps_thread = threading.Thread( target=self._threaded_read, args=[raw]) self._gps_thread.start() return {"lat": self.lat, "lng": self.lng}