# -*- coding: utf-8 -*-
# Copyright 2013, Peter A. Bigot
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain a
# copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:copyright: Copyright 2013, Peter A. Bigot
:license: Apache-2.0
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import logging
_log = logging.getLogger(__name__)
import socket
import urlparse
import urllib
import random
import itertools
import coapy
import coapy.message
# Gross Hack: Update urlparse so it knows about the coap and coaps
# schemes, specifically that it should support joining relative URIs
# and process netloc and query.
urlparse.uses_relative.extend(['coap', 'coaps'])
urlparse.uses_netloc.extend(['coap', 'coaps'])
urlparse.uses_query.extend(['coap', 'coaps'])
class URIError (coapy.CoAPyException):
pass
[docs]class ReplyMessageError (coapy.CoAPyException):
"""Exception raised when :meth:`RcvdMessageCacheEntry.reply` is
invoked improperly.
The *args* are ``(diagnostic, cache_entry, message)`` where
*diagnostic* is one of the string values in this class,
*cache_entry* is the :class:`RcvdMessageCacheEntry`, and *message* is
the proposed reply message that was rejected.
"""
ID_MISMATCH = 'Message IDs do not match'
"""A reply message must match using the
:attr:`messageID<coapy.message.Message.messageID>` attributes.
"""
TOKEN_MISMATCH = 'Tokens do not match'
"""A piggy-backed response must match the
:attr:`Token<coapy.message.Message.token>` of the request.
"""
NOT_RESPONSE = 'Non-empty reply is not a response'
"""A piggy-backed response must be a :class:`response
message<coapy.message.Response>`.
"""
RESPONSE_NOT_ACK = 'Piggy-backed response is not ACK'
"""A piggy-backed response must have type
:attr:`ACK<coapy.message.Message.Type_ACK>`.
"""
ALREADY_GIVEN = 'Message already has reply'
"""A :attr:`reply_message<RcvdMessageCacheEntry.reply_message>`
has already been assigned.
"""
[docs]class MessageCacheEntry (coapy.util.TimeDueOrdinal):
"""A class holding data stored in a :class:`MessageCache`.
Instances sort based on
:attr:`coapy.util.TimeDueOrdinal.time_due`, and may be looked up
based on :attr:`message_id`.
Keyword parameters recognized:
* *cache* identifies the
:class:`MessageCache` instance to which this entry will
belong.
* *message* is a :class:`Message` instance from which :attr:`Message.messageID`
is used to initialize :attr:`message_id`
* *message_id* is used to initialize :attr:`message_id` if
*message* is absent
* *time_due* is used to initialize :attr:`coapy.util.TimeDueOrdinal.time_due`
*cache* is a required keyword parameter. One of *message* and
*message_id* must also be provided. The created cache entry is
automatically inserted into the cache upon creation.
"""
__cache = None
[docs] def _dissociate(self):
"""Remove the connection between the instance and the cache.
This is invoked by :class:`MessageCache` operations that
remove the entry from its cache.
"""
self.__cache = None
@property
[docs] def cache(self):
"""The :class:`MessageCache` to which this entry belongs.
This is a read-only property, set when the entry is created
and cleared when it has been removed from its cache.
"""
return self.__cache
[docs] def _get_time_due(self):
"""See :attr:`coapy.util.TimeDueOrdinal.time_due`. In this
class, modification of the value has the side-effect of moving
the entry within the :attr:`cache` queue.
"""
return self.__time_due
def _set_time_due(self, value):
self.__time_due = value
# If we're in the superclass constructor the cache has not
# yet been assigned (nor has the message_id that must be
# assigned prior to insertion). So don't do anything yet.
if self.__cache is not None:
self.__cache._reposition(self)
time_due = property(_get_time_due, _set_time_due)
message_id = None
"""The :attr:`Message.messageID` value associated with the cache
entry.
"""
def __init__(self, **kw):
cache = kw.pop('cache', None)
message = kw.pop('message', None)
message_id = kw.pop('message_id', None)
super(MessageCacheEntry, self).__init__(**kw)
if isinstance(message, coapy.message.Message):
self.message_id = message.messageID
elif isinstance(message_id, int):
self.message_id = message_id
else:
raise TypeError(message_id)
if not isinstance(cache, MessageCache):
raise TypeError(cache)
self.__cache = cache
cache._add(self)
[docs]class MessageCache (object):
"""Dual-view collection used for caches based on :attr:`Message.messageID`.
This class implements a cache. It simulates a dictionary allowing
lookup of items using :attr:`Message.messageID` values as keys. It also
simulates a priority queue, allowing items to be removed from the
cache based on age.
Elements in the cache are expected to be instances of
:class:`MessageCacheEntry`. Most lookup :class:`python:dict`
operations are supported.
Cache entries are placed in the cache when they are created. It
is an error to create a new cache entry when one with the same
:attr:`MessageCacheEntry.message_id` is already present.
Entries are removed from the cache by using :meth:`pop_oldest`.
Entry content may be updated while in the cache, but once removed
from the cache an entry cannot be re-inserted.
"""
__queue = None
__dict = None
@property
[docs] def endpoint(self):
"""The :class:`Endpoint` to which the cache belongs."""
return self.__endpoint
@property
[docs] def is_sent_cache(self):
"""``True`` if this cache is the sent-message cache for its
:attr:`endpoint`. ``False`` if this is the received-message
cache."""
return self.__is_sent_cache
def __init__(self, endpoint, is_sent_cache):
if not isinstance(endpoint, Endpoint):
raise ValueError(endpoint)
self.__endpoint = endpoint
self.__is_sent_cache = is_sent_cache
self.__queue = []
self.__dict = {}
self.keys = self.__dict.keys
self.values = self.__dict.values
self.items = self.__dict.items
self.get = self.__dict.get
# clear
# setdefault
import sys
if sys.version_info < (3, 0):
self.has_key = self.__dict.has_key
self.iterkeys = self.__dict.iterkeys
self.itervalues = self.__dict.itervalues
self.iteritems = self.__dict.iteritems
# pop
# popitem
# copy
# update
[docs] def queue(self):
"""The queue of cache entries sorted by :attr:`MessageCacheEntry.time_due`.
.. warning::
This method returns a reference to the underlying sorted
list. Callers are expected to refrain from changing the
list in any way other than through methods exposed on the
cache itself.
"""
return self.__queue
[docs] def peek_oldest(self):
"""Return the oldest item in the cache without removing it."""
return self.__queue[0]
[docs] def pop_oldest(self):
"""Return the oldest item in the cache after removing it."""
return self._remove(self.__queue[0])
[docs] def clear(self):
"""Remove all entries in the cache."""
while self.__queue:
self.pop_oldest()
[docs] def _add(self, value):
"""Add *value* to the cache.
"""
if not isinstance(value, MessageCacheEntry):
raise ValueError(value)
if value.message_id in self.__dict:
raise ValueError(value)
if value.cache != self:
raise ValueError(value)
value.queue_insert(self.__queue)
self.__dict[value.message_id] = value
[docs] def _remove(self, value):
"""Remove *value* from the cache.
"""
if not isinstance(value, MessageCacheEntry):
raise ValueError(value)
value.queue_remove(self.__queue)
del self.__dict[value.message_id]
value._dissociate()
return value
[docs] def _reposition(self, value):
"""Re-place *value* at its correct location in the queue.
This must be invoked whenever the underlying
:attr:`coapy.util.TimeDueOrdinal.time_due` attribute value is
changed."""
value.queue_reposition(self.__queue)
def __len__(self):
return len(self.__queue)
def __getitem__(self, key):
if isinstance(key, coapy.message.Message):
key = key.messageID
return self.__dict[key]
def __contains__(self, key):
return key in self.__dict
[docs]class SentMessageCacheEntry (MessageCacheEntry):
"""Data related to a message sent from a specific endpoint.
This cache holds a message that originated from a local
:attr:`coapy.message.Message.source_endpoint`, along with the
necessary state to retransmit it if it is
:meth:`confirmable<coapy.message.Message.is_confirmable>`.
"""
ST_untransmitted = 0
ST_unacknowledged = 1
ST_final_ack_wait = 2
ST_completed = 3
ST_removed = 4
@property
def state(self):
return self.__state
__state = None
@property
[docs] def transmissions(self):
"""Number of times this message has been transmitted."""
return self.__transmissions
__transmissions = None
@property
[docs] def created_clk(self):
"""The :func:`coapy.clock` value at the time the cache entry
was created.
"""
return self.__created_clk
__created_clk = None
__bebo = None
@property
[docs] def message(self):
"""The :class:`coapy.message.Message` being cached."""
return self.__message
__message = None
@property
[docs] def reply_message(self):
"""The :class:`coapy.message.Message` that was received in response to :attr:`message`.
The value is ``None`` unless either an
:attr:`acknowledgement<coapy.message.Message.Type_ACK>` (empty
or with a piggy-backed response) or a
:attr:`reset<coapy.message.Message.Type_RST>` has been
received.
"""
return self.__reply
__reply = None
@property
[docs] def destination_endpoint(self):
"""The endpoint to which the message is being sent."""
return self.__destination_endpoint
__destination_endpoint = None
@property
[docs] def stale_at(self):
"""Return the time at which the content of a response message is outdated.
This is calculated from the :class:`coapy.option.MaxAge`
option in conjunction with :attr:`created_clk`. Callers may
wish to update the :attr:`message` options to reflect the
change in age on subsequent retransmissions.
The value is ``None`` if the message is not a response.
"""
return self.__stale_at
__stale_at = None
def __init__(self, cache, message, destination_endpoint, transmission_parameters=None):
if not isinstance(message, coapy.message.Message):
raise ValueError(message)
if transmission_parameters is None:
transmission_parameters = coapy.transmissionParameters
self.__created_clk = coapy.clock()
self.__destination_endpoint = destination_endpoint
self.__expiry_due = self.__created_clk
if message.is_confirmable():
self.__expiry_due += transmission_parameters.EXCHANGE_LIFETIME
elif message.is_non_confirmable():
self.__expiry_due += transmission_parameters.NON_LIFETIME
else:
# ACK and RST messages should not be cached
raise ValueError(message)
if isinstance(message, coapy.message.Response):
self.__stale_at = self.__created_clk + message.maxAge()
self.__state = self.ST_untransmitted
self.__transmissions = 0
self.__message = message
self.__timeout = 0
time_due = self.__created_clk
super(SentMessageCacheEntry, self).__init__(cache=cache,
message_id=message.messageID,
time_due=time_due)
if message.is_confirmable():
self.__bebo = transmission_parameters.make_bebo()
def __complete(self):
self.__state = self.ST_completed
self.time_due = self.__expiry_due
def process_timeout(self):
now = coapy.clock()
if now < self.time_due:
raise Exception
if self.__state == self.ST_removed:
raise Exception
if self.__state == self.ST_completed:
self.cache._remove(self)
self.__state = self.ST_removed
return None
ep = self.cache.endpoint
data = self.message.to_packed()
ep.rawsendto(data, self.destination_endpoint)
self.__transmissions += 1
if self.ST_untransmitted == self.__state:
if self.__bebo is None:
self.__complete()
else:
self.__state = self.ST_unacknowledged
if self.ST_unacknowledged == self.__state:
try:
self.__timeout = next(self.__bebo)
except StopIteration:
self.__bebo = None
# Double last timeout (4.2) to wait for
# acknowledgement to final transmission
self.__timeout += self.__timeout
self.__state = self.ST_final_ack_wait
self.time_due += self.__timeout
elif self.ST_final_ack_wait == self.__state:
self.__complete()
return self
def process_reply(self, msg):
if self.__reply is not None:
_log.warning('Multiple replies')
return
if msg.is_reset() or ((self.ST_unacknowledged == self.__state) and msg.is_acknowledgement):
self.__reply = msg
self.__complete()
return self
raise ValueError(msg)
[docs]class RcvdMessageCacheEntry (MessageCacheEntry):
"""Data related to a message received by a specific endpoint.
This cache holds a message that originated from a remote
:attr:`coapy.message.Message.source_endpoint`, along with the
necessary state to cache the reply to that message.
*cache* must be the source endpoint cache for received messages.
*message* must be a message that originated on that host, and is
either a :meth:`confirmable<coapy.message.Message.is_confirmable>`
or
:meth:`non-confirmable<coapy.message.Message.is_non_confirmable>`
message. Acknowledgements and Resets are not recorded in the
cache.
"""
@property
[docs] def created_clk(self):
"""The :func:`coapy.clock` value at the time the cache entry
was created. This should correspond to the time at which
:attr:`message` was first received.
"""
return self.__created_clk
__created_clk = None
@property
[docs] def message(self):
"""The :class:`coapy.message.Message` being cached."""
return self.__message
__message = None
@property
[docs] def reception_count(self):
"""The number of times :attr:`message` was received. More
strictly, this is the number of times a message with the same
:attr:`messageID<coapy.message.Message.messageID>` was received
while this cache entry is live. Diagnostics may be emitted in a
situation where it appears a message ID has been re-used
prematurely.
"""
return self.__reception_count
__reception_count = None
@property
[docs] def reply_message(self):
"""The :class:`coapy.message.Message` that was sent in
response to this message. ``None`` until a response is sent,
then either an
:meth:`acknowledgement<coapy.message.Message.is_acknowledgement>`
(which may or may not have a :coapsect:`piggy-backed
response<>`) or a
:meth:`reset<coapy.message.Message.is_reset>` message. A
non-confirmable message may have no response at all.
"""
return self.__reply_message
__reply_message = None
[docs] def reply(self, reset=False, message=None):
"""Create the :attr:`reply_message` for the reception in this entry.
If *message* is provided, it should be an
:class:`coapy.message.Response` message with type
:attr:`ACK<coapy.message.Message.Type_ACK>` that is to be sent
as a :coapsect:`piggy-backed response<5.2.1>`.
If *message* is ``None``, this method will create an empty
:attr:`ACK<coapy.message.Message.Type_ACK>` (*reset* is
``False``) or
:attr:`RST<coapy.message.Message.Type_RST>` (*reset* is
``True``) message.
The reply message will be transmitted to the source endpoint
of the received message.
Erroneous use will raise :exc:`ReplyMessageError`.
"""
if self.__reply_message is not None:
raise ReplyMessageError(ReplyMessageError.ALREADY_GIVEN, self, message)
if message is None:
message = self.message.create_reply(reset=reset)
if message.messageID is None:
message.messageID = self.message.messageID
if message.messageID != self.message.messageID:
raise ReplyMessageError(ReplyMessageError.ID_MISMATCH, self, message)
if coapy.message.Message.Empty != message.code:
if not isinstance(message, coapy.message.Response):
raise ReplyMessageError(ReplyMessageError.NOT_RESPONSE, self, message)
if not message.is_acknowledgement():
raise ReplyMessageError(ReplyMessageError.RESPONSE_NOT_ACK, self, message)
if message.token != self.message.token:
raise ReplyMessageError(ReplyMessageError.TOKEN_MISMATCH, self, message)
if message.source_endpoint is None:
message.source_endpoint = self.message.destination_endpoint
if message.destination_endpoint is None:
message.destination_endpoint = self.message.source_endpoint
self.__reply_message = message
self.resend_reply()
def resend_reply(self):
rm = self.__reply_message
rm.source_endpoint.rawsendto(rm.to_packed(), rm.destination_endpoint)
def __init__(self, cache, message, transmission_parameters=None):
if not isinstance(message, coapy.message.Message):
raise ValueError(message)
if transmission_parameters is None:
transmission_parameters = coapy.transmissionParameters
self.__created_clk = coapy.clock()
self.__message = message
self.__reception_count = 1
time_due = self.created_clk
if message.is_confirmable():
time_due += transmission_parameters.EXCHANGE_LIFETIME
elif message.is_non_confirmable():
time_due += transmission_parameters.NON_LIFETIME
else:
# ACK and RST messages should not be cached
raise ValueError(message)
super(RcvdMessageCacheEntry, self).__init__(cache=cache,
message_id=message.messageID,
time_due=time_due)
[docs]class Endpoint (object):
"""A CoAP endpoint.
Per :coapsect:`1.2` this is an entity participating in the CoAP
protocol. In CoAPy it is used to aggregate all material related
to such an endpoint, which is uniquely identified by an IP
address, port, and security information. Various constraints in
CoAP such as :coapsect:`congestion control<4.7>`,
:coapsect:`default values for options<5.10.1>`, and re-usability
of message IDs, are associated with specific endpoints.
*sockaddr*, if not ``None``, must be a tuple the first two
elements of which are ``host, port`` which override the
user-provided *host* and *port*.
*host* specifies the host of the endpoint as a Unicode string,
representing a host name, an IP address literal, or other unique
key.
*family* is by default :data:`python:socket.AF_UNSPEC` supporting
resolution of *host* to any address family. A non-``None`` value
is passed to :func:`python:socket.getaddrinfo` when attempting to
resolve *host* as an address; the actual *family* value will be
the one selected by the resolution process (normally
:data:`python:socket.AF_INET` or :data:`python:socket.AF_INET6`)
Failure to successfully resolve *host* will raise
:exc:`python:socket.gaierror`. If you want a dummy endpoint
associated with *host* but that does not have an IP host
associated with it, make sure that *family* is ``None`` so that
*host* is left unresolved and instead serves as an *name* as
described in :attr:`sockaddr`.
*port* is the integer transport-layer port of the endpoint. This
would normally be either :const:`coapy.COAP_PORT` or
:const:`coapy.COAPS_PORT`.
*security_mode* is used to determine the DTLS protocol used for
secure CoAP, and at this time had probably better be left as
``None``.
To ensure consistency, :class:`Endpoint` instances are unique for
a given key comprising :attr:`family`, :attr:`ip_addr`,
:attr:`port`, and :attr:`security_mode`. Attempts to instantiate
a new endpoint with parameters that match a previously-created one
will return a reference to the original instance.
"""
@property
[docs] def sockaddr(self):
"""The Python :mod:`python:socket` address of the endpoint.
When :attr:`family` is :data:`python:socket.AF_INET` this is
the tuple ``(host, port)``.
When :attr:`family` is :data:`python:socket.AF_INET6` this is
the tuple ``(host, port, flowinfo, scopeid)``.
When :attr:`family` is ``None`` this is the tuple ``(name,
port)``. *name* functions like *host* but is not a resolvable
host name.
*host* will be the text representation of :attr:`in_addr`; it
will not be a host name. *port* will be a numeric port
*number*.
"""
return self.__sockaddr
__bound_socket = None
[docs] def set_bound_socket(self, socket):
"""Set the :attr:`bound_socket`.
*socket* may be ``None``, in which case the endpoint is
disassociated from any socket.
If *socket* is not ``None`` it must be an object suitable for
assignment to :attr:`bound_socket`. The endpoint adopts the
socket, in that if/when the endpoint is destroyed the socket
will be closed if it remains bound to the endpoint. (Since
:class:`Endpoint` instances are almost impossible to destroy,
this has little relevance at this time.)
In either case if the assignment succeed the previous value of
:attr:`bound_socket` is returned, with the caller taking
responsibility to close it when finished.
.. note::
For simulation and testing purposes *socket* might not be
a :func:`socket object<python:socket.socket>`, but it must
act like one with respect to the methods CoAPy expects it
to provide, including but not limited to:
* :meth:`getsockname()<python:socket.socket.getsockname>`
* :meth:`sendto()<python:socket.socket.sendto>`
"""
obs = self.__bound_socket
if (socket is not None) and (self.sockaddr != socket.getsockname()):
raise ValueError(socket)
self.__bound_socket = socket
return obs
@property
[docs] def bound_socket(self):
"""Return a :func:`socket object<python:socket.socket>`
instance that is bound to :attr:`sockaddr`.
This may only be set for endpoints that are local to the host.
It may set to ``None`` to disassociate the endpoint from a
socket, and may be changed from ``None`` to an object that
returns :attr:`sockaddr` when
:meth:`socket.getsockname<python:socket.socket.getsockname>`
is invoked on it.
See :meth:`create_bound_endpoint` and
:meth:`set_bound_socket`.
"""
return self.__bound_socket
[docs] def _rawsendto(self, data, destination_endpoint):
"""Send *data* from this endpoint to *destination_endpoint*.
This invokes :meth:`sendto<python:socket.socket.sendto>` on
:attr:`bound_socket` to transmit *data* to the
*destination_endpoint* via its :attr:`sockaddr`.
"""
return self.bound_socket.sendto(data, destination_endpoint.sockaddr)
[docs] def rawsendto(self, data, destination_endpoint):
"""Send *data* from this endpoint to *destination_endpoint*.
Normally this is shorthand for invoking
:meth:`sendto<python:socket.socket.sendto>` on
:attr:`bound_socket` to transmit *data* to the
*destination_endpoint* via its :attr:`sockaddr`. Subclasses
may override the implementation for the purposes of testing or
simulation.
"""
return self._rawsendto(data, destination_endpoint)
[docs] def _rawrecvfrom(self, bufsize):
"""Receive *data* from a *source_endpoint*.
Invokes :meth:`recvfrom<python:socket.socket.recvfrom>` on
:attr:`bound_socket` and replacing the returned source socket
address with the corresponding :class:`Endpoint` instance as
*source_endpoint*. Returns ``(data, source_endpoint)``.
"""
(data, addr) = self.bound_socket.recvfrom(bufsize)
return (data, Endpoint(sockaddr=addr, family=self.family))
[docs] def rawrecvfrom(self, bufsize):
"""Receive *data* from a *source_endpoint*.
Returns ``(data, source_endpoint)``. Normally this is simply
shorthand for invoking
:meth:`recvfrom<python:socket.socket.recvfrom>` on
:attr:`bound_socket` and replacing the returned source socket
address with the corresponding :class:`Endpoint` instance as
*source_endpoint*. Subclasses may override the implementation
for the purposes of testing or simulation.
"""
return self._rawrecvfrom(bufsize)
@classmethod
[docs] def create_bound_endpoint(cls, sockaddr=None, family=socket.AF_UNSPEC,
security_mode=None,
host=None, port=coapy.COAP_PORT):
"""Create an endpoint with a local socket bound to it.
*sockaddr*, *family*, *security_mode*, *host*, and *port* are
all as used with :class:`Endpoint`.
For use with a CoAP service on the local host (*host* as
``127.0.0.1`` or ``::1``), *port* may be 0 in this call. This
allows the bind operation to select an unused local port.
Returns the created endpoint with :attr:`bound_socket`
initialized and ready to send and receive messages.
"""
# First, figure out the resolved family and host/port part of
# the socket address.
(family, sockaddr) = cls._canonical_sockinfo(sockaddr=sockaddr,
family=family,
security_mode=security_mode,
host=host,
port=port)
if (family is None) or (family is socket.AF_UNSPEC):
raise ValueError
# Create a socket, bind it to the proposed socket address, then use
# the result as the endpoint socket address: this is necessary because
# if port was passed as 0 the act of binding assigned a local port.
s = socket.socket(family, socket.SOCK_DGRAM)
s.bind(sockaddr)
sockaddr = s.getsockname()
# Now we can create the instance and associate the socket with it.
ep = cls(sockaddr=sockaddr, family=family, security_mode=security_mode)
ep.set_bound_socket(s)
return ep
@property
[docs] def family(self):
"""The address family used for :attr:`sockaddr`.
This is normally :data:`python:socket.AF_INET` or
:data:`python:socket.AF_INET6`. It may be ``None`` for
testing situations where the actual endpoint does not
correspond to an network node.
"""
return self.__family
@property
[docs] def in_addr(self):
"""The address of the endpoint.
The representation is binary data encoding part of
:attr:`sockaddr`. Decoding it depends on :attr:`family`:
When :attr:`family` is :data:`python:socket.AF_INET` this is
the IPv4 address in network byte order.
When :attr:`family` is :data:`python:socket.AF_INET6` this is
the IPv6 address in network byte order.
When :attr:`family` is ``None`` this is the *name* in
Net-Unicode format.
"""
return self.__in_addr
@property
[docs] def port(self):
"""The transport-level port of the endpoint."""
return self.__port
@property
[docs] def security_mode(self):
"""The security mode of the endpoint. Generally ``None``
though if :coapsect:`DTLS<9>` CoAP is used it would be some
other value.
"""
return self.__security_mode
@property
[docs] def uri_host(self):
"""The text value for the ``host`` subcomponent of the
authority part of a URI involving this endpoint.
This is almost always either an ``IPv4address`` or
``IP-literal`` as defined by `section 3.2.2 of RFC3986`_. If
an :class:`Endpoint` is created by an application using a host
name, the resolved address of the host is used for
:attr:`sockaddr` and for this property.
.. _section 3.2.2 of RFC3986: http://tools.ietf.org/html/rfc3986#section-3.2.2
"""
return self.__uri_host
@property
[docs] def base_uri(self):
"""The base CoAP URI for resources on this endpoint.
This is used by :meth:`uri_to_options` to avoid the need to
specify the protocol and netloc when creating option lists.
"""
return self.__base_uri
__base_uri = None
__EndpointRegistry = {}
__nonInetIndex = 0
@staticmethod
[docs] def _key_for_sockaddr(sockaddr, family, security_mode=None):
"""Create the key used to look up endpoints.
*sockaddr* must be a tuple the first two elements of which are
``(host, port)``. Unless *family* is ``None``, *host* must be
a text representation of an IP address that can be converted
with :func:`python:socket.inet_pton` using *family*, not a
hostname. *port* must be a numeric port number.
If *family* is :data:`python:socket.AF_UNSPEC` a
:exc:`python:exception.ValueError` exception will be raised as
the system does not know how to decode the *host*.
"""
if not isinstance(sockaddr, tuple):
raise TypeError(sockaddr)
ip_literal = sockaddr[0]
if family is None:
in_addr = coapy.util.to_net_unicode(ip_literal)
elif family is socket.AF_UNSPEC:
raise ValueError
else:
# Use the network-byte-order binary representation of the
# IP address, to having to deal with non-canonical IPv6
# text representations. See RFC5952 for why this is
# necessary.
in_addr = socket.inet_pton(family, ip_literal)
return (family, in_addr, sockaddr[1], security_mode)
@staticmethod
[docs] def _canonical_sockinfo(sockaddr=None, family=socket.AF_UNSPEC,
security_mode=None,
host=None, port=coapy.COAP_PORT):
"""Get canonical socket information for an endpoint.
Returns a tuple ``(family, sockaddr)`` where *sockaddr* is a
an :attr:`address<sockaddr>` as constrained by *family*.
Failure to resolve the socket *host* to a numeric IP literal
within *family* (if required) will raise
:exc:`python:socket.gaierror`.
"""
if sockaddr is not None:
try:
key = Endpoint._key_for_sockaddr(sockaddr, family, security_mode)
ep = Endpoint.__EndpointRegistry.get(key)
if ep is not None:
return (ep.family, ep.sockaddr)
except:
pass
if not isinstance(sockaddr, tuple):
raise TypeError(sockaddr)
if 2 > len(sockaddr):
raise ValueError(sockaddr)
(host, port) = sockaddr[:2]
if host is None:
raise ValueError(host)
if not isinstance(port, int):
raise TypeError(port)
if family is None:
gai = (family, None, None, host, (host, port))
else:
gais = socket.getaddrinfo(host, port, family, socket.SOCK_DGRAM, 0,
(socket.AI_ADDRCONFIG | socket.AI_V4MAPPED
| socket.AI_NUMERICSERV))
gai = gais.pop(0)
return (gai[0], gai[4])
@classmethod
[docs] def lookup_endpoint(cls, sockaddr=None, family=socket.AF_UNSPEC,
security_mode=None,
host=None, port=coapy.COAP_PORT):
"""Look up an endpoint using the same algorithm as endpoint
creation.
Returns ``None`` if passing these parameters to
class:`Endpoint` would result in creation of a new endpoint.
"""
instance = None
if sockaddr is not None:
try:
key = Endpoint._key_for_sockaddr(sockaddr, family, security_mode)
instance = Endpoint.__EndpointRegistry.get(key)
except:
pass
if instance is None:
(family, sockaddr) = Endpoint._canonical_sockinfo(sockaddr, family,
security_mode, host, port)
key = Endpoint._key_for_sockaddr(sockaddr, family, security_mode)
instance = Endpoint.__EndpointRegistry.get(key)
return instance
def __new__(cls, sockaddr=None, family=socket.AF_UNSPEC,
security_mode=None,
host=None, port=coapy.COAP_PORT):
instance = None
if sockaddr is not None:
try:
key = Endpoint._key_for_sockaddr(sockaddr, family, security_mode)
instance = Endpoint.__EndpointRegistry.get(key)
except:
pass
if instance is None:
(family, sockaddr) = Endpoint._canonical_sockinfo(sockaddr, family,
security_mode, host, port)
key = Endpoint._key_for_sockaddr(sockaddr, family, security_mode)
instance = Endpoint.__EndpointRegistry.get(key)
if instance is None:
instance = super(Endpoint, cls).__new__(cls)
host = sockaddr[0]
port = sockaddr[1]
cls.__EndpointRegistry[key] = instance
instance.__family = family
instance.__in_addr = key[1]
instance.__port = port
instance.__security_mode = security_mode
instance.__sockaddr = sockaddr
if socket.AF_INET == family:
instance.__uri_host = '{0}'.format(socket.inet_ntop(instance.family,
instance.in_addr))
elif socket.AF_INET6 == family:
instance.__uri_host = '[{0}]'.format(socket.inet_ntop(instance.family,
instance.in_addr))
else:
instance.__uri_host = host
return instance
def __del__(self):
self._reset()
super(Endpoint, self).__del__(self)
[docs] def _reset(self):
"""Return all data to its initial state.
This is a back-door for unit-testing from a known state. It's
also used when a new Endpoint is constructed for the first
time. Only mutable state is reset; immutable values like
:attr:`family` and :attr:`sockaddr` are not affected.
"""
self._reset_next_messageID(random.randint(0, 65535))
if self.__bound_socket is not None:
try:
self.__bound_socket.close()
except:
pass
self.__bound_socket = None
self._sent_cache = MessageCache(self, True)
self._rcvd_cache = MessageCache(self, False)
def __init__(self, sockaddr=None, family=socket.AF_UNSPEC,
security_mode=None,
host=None, port=coapy.COAP_PORT):
# None of these arguments are used here; they apply in
# __new__.
super(Endpoint, self).__init__()
# Note: Only re-initialize if the instance was newly created.
if self.__base_uri is None:
self.__base_uri = self.uri_from_options([])
self._reset()
[docs] def next_messageID(self):
"""Return a new messageID suitable for a message to this endpoint.
This is sequentially generated starting from an initial value
that was randomly generated when the endpoint was created. It
is filtered so message IDs still present in the sent message
cache are not re-used.
"""
while True:
mid = next(self.__messageID_iter)
if not (mid in self._sent_cache):
return mid
def _reset_next_messageID(self, start):
# Back-door for unit testing from known starting point
self.__messageID_iter = itertools.imap(lambda _v: _v % 65536, itertools.count(start))
[docs] def get_peer_endpoint(self, sockaddr=None, host=None, port=coapy.COAP_PORT):
"""Find the endpoint at *sockaddr* that this endpoint can talk to.
This invokes :class:`Endpoint` with *family* and
*security_mode* set to the parameters used by this endpoint.
It is used to identify the source endpoint of a message
received by :meth:`python:socket.socket.recvfrom`. *sockaddr*
will be constructed from *host* and *port* if not provided
explicitly; at least one of *sockaddr* and *host* must be
given.
"""
if sockaddr is None:
if host is None:
raise ValueError
if not isinstance(port, int):
raise TypeError
sockaddr = (host, port)
return type(self)(sockaddr=sockaddr, family=self.family, security_mode=self.security_mode)
[docs] def is_same_host(self, host):
"""Determine whether *host* resolves to the same address as
this endpoint.
This is used for the algorithm in :coapsect:`6.4` to determine
that a :class:`UriHost<coapy.option.UriHost>` option may be
elided in favor of the default derived from an endpoint. This
can only be done if *host* is an ``IP-literal`` or
``IPv4address`` equivalent to :attr:`in_addr` in
:attr:`family`. DNS resolution is not used.
"""
if self.family is None:
return self.uri_host == host
try:
in_addr = socket.inet_pton(self.family, host)
return self.in_addr == in_addr
except socket.error:
pass
return False
@staticmethod
def _port_for_scheme(scheme):
return {'coap': coapy.COAP_PORT,
'coaps': coapy.COAPS_PORT}[scheme]
[docs] def uri_to_options(self, uri, base_uri=None):
"""Convert a URI to a list of CoAP options relative to this endpoint.
*uri* should be an :rfc:`3986` conformant absolute URI. For
convenience, if *base_uri* is not None the value of *uri* will
be recalculated assuming it is relative to *base_uri*.
*base_uri* itself will default to :attr:`base_uri` if no
non-``None`` value is provided.
The scheme part of *uri* must be either "coap" or "coaps".
Options will be returned in a list in the following order.
* :class:`coapy.option.UriHost`, absent if the URI host
matches the endpoint :attr:`family` and :attr:`in_addr`;
* :class:`coapy.option.UriPort`, absent if the URI port
matches the endpoint :attr:`port`;
* :class:`coapy.option.UriPath`, absent if the path is empty,
otherwise occurs once per path segment;
* :class:`coapy.option.UriQuery`, absent if there is no query
part, otherwise occurs once per ``&``-separated query
element.
"""
if base_uri is None:
base_uri = self.base_uri
if base_uri is not None:
uri = urlparse.urljoin(base_uri, uri)
res = urlparse.urlsplit(uri)
opts = []
# 6.4.1. absolute-URI = scheme ":" hier-part [ "?" query ]
if (not res.scheme) \
or ((res.netloc is None) and (res.path is None)) \
or res.fragment:
raise URIError('not absolute', uri)
# 6.4.2. Make this user's job or done by urljoin
# 6.4.3. Check scheme
scheme = res.scheme.lower()
if not (scheme in ('coap', 'coaps')):
raise URIError('invalid scheme', res.scheme)
# 6.4.4. Unnecessary: fragments aren't allowed in absolute-URIs,
# or in the restrictions for coap-URI and coaps-URI.
# 6.4.5. authority = [ userinfo "@" ] host [ ":" port] CoAP
# doesn't provide a way to pass userinfo, so defer to the
# ParseResult hostname and port values rather than try to re-parse
# netloc locally.
if res.hostname:
host = coapy.util.url_unquote(res.hostname)
if not self.is_same_host(host):
opts.append(coapy.option.UriHost(host))
# 6.4.6. Set port from URI or default from scheme
port = res.port
if port is None:
port = self._port_for_scheme(scheme)
# 6.4.7.
if port != self.port:
opts.append(coapy.option.UriPort(port))
# 6.4.8
path = res.path
if path and not ('/' == path):
if path.startswith('/'):
path = path[1:]
for segment in path.split('/'):
segment = coapy.util.url_unquote(segment)
opts.append(coapy.option.UriPath(segment))
# 6.4.9
query = res.query
if query:
for qseg in query.split('&'):
qseg = coapy.util.url_unquote(qseg)
opts.append(coapy.option.UriQuery(qseg))
return opts
[docs] def uri_from_options(self, opts):
"""Create a URI from endpoint data and the options.
The resulting URI scheme is "coap" unless
:attr:`security_mode` is set (in which case it is "coaps").
The authority is derived from
:class:`UriHost<coapy.option.UriHost>` and
:class:`UriPort<coapy.option.UriPort>` options in *opts*,
defaulting to :attr:`uri_host` and :attr:`port` if the
respective options are not provided.
The remainder of the URI is built up from
:class:`UriPath<coapy.option.UriPath>` and
:class:`UriQuery<coapy.option.UriQuery>` options in *opts*.
"""
scheme = 'coap'
if self.security_mode is not None:
scheme = 'coaps'
host = None
opt = coapy.option.UriHost.first_match(opts)
if opt is not None:
host = opt.value
if host is None:
raise URIError('empty Uri-Host')
if host and ('[' != host[0]):
host = coapy.util.url_quote(host)
if host is None:
host = self.uri_host
port = self.port
opt = coapy.option.UriPort.first_match(opts)
if opt is not None:
port = opt.value
if port is None:
raise URIError('empty Uri-Port')
if port == self._port_for_scheme(scheme):
netloc = host
else:
netloc = '{0}:{1}'.format(host, port)
# Paths are always absolute, so start with an empty segment so the
# encoded version begins with a slash.
elts = ['']
for segment_opt in coapy.option.UriPath.all_match(opts):
segment = segment_opt.value
segment = coapy.util.url_quote(segment, '')
elts.append(segment)
path = '/'.join(elts)
if not path:
# Make sure we still have the leading slash
path = '/'
elts = []
for qseg_opt in coapy.option.UriQuery.all_match(opts):
qseg = qseg_opt.value
qseg = coapy.util.url_quote(qseg, '?')
elts.append(qseg)
query = '&'.join(elts)
return urlparse.urlunsplit((scheme, netloc, path, query, None))
[docs] def finalize_message(self, message):
"""Final checks and refinements for *message* relative to this
endpoint.
The *message* is
:meth:`validated<coapy.message.Message.validate>`, then the
following final cleanup in its
:attr:`options<coapy.message.Message.options>` is done:
* A :class:`coapy.option.UriHost` that is the :meth:`same
host<is_same_host>` as the endpoint will be removed.
* A :class:`coapy.option.UriPort` that is the same port as the
endpoint is removed.
The finalized message is returned.
"""
message.validate()
nopt = []
for oi in xrange(len(message.options)):
opt = message.options[oi]
if isinstance(opt, coapy.option.UriHost) and self.is_same_host(opt.value):
continue
elif isinstance(opt, coapy.option.UriPort) and (opt.value == self.port):
continue
nopt.append(opt)
if len(nopt) != len(message.options):
message.options = nopt
return message
def _flush_rcvd_cache(self):
now = coapy.clock()
cache = self._rcvd_cache
while 0 < len(cache):
e = cache.peek_oldest()
if e.time_due > now:
break
cache.pop_oldest()
[docs] def receive(self):
"""Receive and decode a message from another endpoint.
Returns ``None`` if the message is so corrupt it should be
ignored, or if the received message is a duplicate. Raises
:exc:`coapy.message.MessageFormatError` if the message cannot
be fully decoded. Otherwise returns the message, in which
:attr:`destination_endpoint<coapy.message.Message.destination_endpoint>`
will be set to *self* and
:attr:`source_endpoint<coapy.message.Message.source_endpoint>`
will be set to *source_endpoint*.
Any message-layer processing (e.g. re-sending duplicate ACK or
RST, or sending a RST due to a message format error) will have
been done before this call returns.
"""
m = None
dkw = None
(data, source_endpoint) = self.rawrecvfrom(8192)
try:
m = coapy.message.Message.from_packed(data)
m.destination_endpoint = self
m.source_endpoint = source_endpoint
except coapy.message.MessageFormatError as e:
_log.exception('receive')
dkw = e.args[1]
if m is None:
mid = dkw['messageID']
mtype = dkw['type']
else:
mid = m.messageID
mtype = m.messageType
local_origin = not coapy.message.Message.source_originates_type(mtype)
if local_origin:
# local_origin means ACK or RST; look in send cache.
ce = self._sent_cache.get(mid)
if ce is None:
_log.error('Reply to unrecognized message')
return None
if m is None:
_log.error('Invalid reply to message')
return None
ce.process_reply(m)
return None
# not local origin means CON or NON; look in receive cache
ce = source_endpoint._rcvd_cache.get(mid)
if ce is not None:
_log.error('Received duplicate')
return None
if m is None:
_log.error('Need send RST')
return RcvdMessageCacheEntry(source_endpoint._rcvd_cache, m)
[docs] def send(self, msg, destination_endpoint=None):
"""Send *msg* to *destination_endpoint*.
*msg* must be an instance of :class:`coapy.message.Message`.
*destination_endpoint* specifies where the packed message will
be sent and defaults to *msg*'s
:attr:`destination_endpoint<coapy.message.Message.destination_endpoint>`.
The return value is the :class:`SentMessageCacheEntry` that
has message-level transmission state.
"""
if not isinstance(msg, coapy.message.Message):
raise TypeError(msg)
if destination_endpoint is None:
destination_endpoint = msg.destination_endpoint
return SentMessageCacheEntry(self._sent_cache, msg, destination_endpoint)
[docs] def create_request(self, uri,
confirmable=False,
code=coapy.message.Request.GET,
messageID=None,
token=None,
options=None,
payload=None):
"""Create and return a :class:`Request<coapy.message.Request>`
instance to retrieve *uri* from this endpoint.
*uri* should generally be a relative URI hosted on the
endpoint.
By default this creates a non-confirmable
:attr:`GET<coapy.message.Request.GET>` message. These
features can be overridden with *confirmable* and *code*.
*messageID* will default to :meth:`next_messageID`. The
caller may specify a token; if none is provided, an empty
token will be used. Any *options* are appended to the options
derived from *uri*, and *payload* is as in the
:class:`coapy.message.Message` constructor. The message
:attr:`destination_endpoint<coapy.message.Message.destination_endpoint>`
is set to *self*, and finally the message is returned to the
caller.
"""
uri_options = []
if uri is not None:
uri_options = self.uri_to_options(uri)
if messageID is None:
messageID = self.next_messageID()
if token is None:
token = b''
if options is not None:
uri_options.extend(options)
m = coapy.message.Request(confirmable=confirmable,
code=code, messageID=messageID,
token=token, options=uri_options,
payload=payload)
m.destination_endpoint = self
return m
def __unicode__(self):
return '{s.uri_host}:{s.port:d}'.format(s=self)
__str__ = __unicode__