Module python.infrastructureServices.modules.HubIoTModule
This module represents the component device that connects to Azure IoT Hub using an IoTHubSession and the thread definition that keeps listening incoming messages from cloud.
Expand source code
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""This module represents the component device that connects to Azure IoT Hub using an IoTHubSession
and the thread definition that keeps listening incoming messages from cloud."""
import calendar
import time
import asyncio
from azure.iot.device import IoTHubSession
from threading import Thread
import queue
CONNECTION_STRING = "HostName=IntelligentBackpackHub.azure-devices.net;"
TOTAL_MESSAGES_RECEIVED = 0
class HubIotThread (Thread):
"""
Thread that performs incoming messages listening.
"""
def __init__(self, messages_queue, device_id, primary_key):
"""
Constructor method that create the thread object of this module
Parameters:
messages_queue (queue): The synchronized queue to send all the received messages from the cloud
Returns:
void
"""
Thread.__init__(self)
self.connection_string = CONNECTION_STRING + "DeviceId=" + device_id + ";SharedAccessKey=" + primary_key
self.messages_queue = messages_queue
def run(self):
"""
Method that executes the thread
"""
try:
asyncio.run(self.main())
except KeyboardInterrupt or Exception:
print("User initiated exit. Exiting")
finally:
print("Received {} messages in total".format(TOTAL_MESSAGES_RECEIVED))
async def main(self):
"""
Main function that use IoTHubSession to connect to the relative IoT Hub cloud device and listens
all the incoming messages from the cloud, that could be different:
- REGISTER: sent with the email of the user that wants to registrate this device
- EXIT: message that force the exit of the application
- UNREGISTER: sent with the email of the user that wants to unregistrate this device
- NEW_DATA: sent with the data that will be added to the device. Used only for debug purpose
"""
global TOTAL_MESSAGES_RECEIVED
print("Starting C2D sample")
print("Press Ctrl-C to exit")
print("Connecting to IoT Hub...")
async with IoTHubSession.from_connection_string(self.connection_string) as session:
print("Connected to IoT Hub")
async with session.messages() as messages:
# print("Waiting to receive messages...")
async for message in messages:
TOTAL_MESSAGES_RECEIVED += 1
current_GMT = time.gmtime()
time_stamp = calendar.timegm(current_GMT)
print("Message received with payload: {}".format(message.payload))
print("TIMESTAMP: {}".format(time_stamp))
if message.payload == "EXIT":
self.messages_queue.put("EXIT")
raise Exception('Stop this thing')
if "UNREGISTER" in message.payload:
message_to_send = {
"type": "UNREGISTER",
"payload": ""
}
self.messages_queue.put(message_to_send)
else:
if "REGISTER" in message.payload:
message_to_send = {
"type": "REGISTER",
"payload": {
"email": message.payload.split(";")[1],
"hash": message.payload.split(";")[2]
}
}
self.messages_queue.put(message_to_send)
# test
if "NEW_DATA" in message.payload:
message_to_send = {
"type": "TAG_READ",
"payload": "fedfwefwe"
}
self.messages_queue.put(message_to_send)
if __name__ == "__main__":
try:
hub = HubIotThread(queue.Queue())
asyncio.run(hub.main())
except KeyboardInterrupt:
print("User initiated exit. Exiting")
finally:
print("Received {} messages in total".format(TOTAL_MESSAGES_RECEIVED))
Classes
class HubIotThread (messages_queue, device_id, primary_key)
-
Thread that performs incoming messages listening.
Constructor method that create the thread object of this module Parameters: messages_queue (queue): The synchronized queue to send all the received messages from the cloud
Returns: void
Expand source code
class HubIotThread (Thread): """ Thread that performs incoming messages listening. """ def __init__(self, messages_queue, device_id, primary_key): """ Constructor method that create the thread object of this module Parameters: messages_queue (queue): The synchronized queue to send all the received messages from the cloud Returns: void """ Thread.__init__(self) self.connection_string = CONNECTION_STRING + "DeviceId=" + device_id + ";SharedAccessKey=" + primary_key self.messages_queue = messages_queue def run(self): """ Method that executes the thread """ try: asyncio.run(self.main()) except KeyboardInterrupt or Exception: print("User initiated exit. Exiting") finally: print("Received {} messages in total".format(TOTAL_MESSAGES_RECEIVED)) async def main(self): """ Main function that use IoTHubSession to connect to the relative IoT Hub cloud device and listens all the incoming messages from the cloud, that could be different: - REGISTER: sent with the email of the user that wants to registrate this device - EXIT: message that force the exit of the application - UNREGISTER: sent with the email of the user that wants to unregistrate this device - NEW_DATA: sent with the data that will be added to the device. Used only for debug purpose """ global TOTAL_MESSAGES_RECEIVED print("Starting C2D sample") print("Press Ctrl-C to exit") print("Connecting to IoT Hub...") async with IoTHubSession.from_connection_string(self.connection_string) as session: print("Connected to IoT Hub") async with session.messages() as messages: # print("Waiting to receive messages...") async for message in messages: TOTAL_MESSAGES_RECEIVED += 1 current_GMT = time.gmtime() time_stamp = calendar.timegm(current_GMT) print("Message received with payload: {}".format(message.payload)) print("TIMESTAMP: {}".format(time_stamp)) if message.payload == "EXIT": self.messages_queue.put("EXIT") raise Exception('Stop this thing') if "UNREGISTER" in message.payload: message_to_send = { "type": "UNREGISTER", "payload": "" } self.messages_queue.put(message_to_send) else: if "REGISTER" in message.payload: message_to_send = { "type": "REGISTER", "payload": { "email": message.payload.split(";")[1], "hash": message.payload.split(";")[2] } } self.messages_queue.put(message_to_send) # test if "NEW_DATA" in message.payload: message_to_send = { "type": "TAG_READ", "payload": "fedfwefwe" } self.messages_queue.put(message_to_send)
Ancestors
- threading.Thread
Methods
async def main(self)
-
Main function that use IoTHubSession to connect to the relative IoT Hub cloud device and listens all the incoming messages from the cloud, that could be different: - REGISTER: sent with the email of the user that wants to registrate this device - EXIT: message that force the exit of the application - UNREGISTER: sent with the email of the user that wants to unregistrate this device - NEW_DATA: sent with the data that will be added to the device. Used only for debug purpose
Expand source code
async def main(self): """ Main function that use IoTHubSession to connect to the relative IoT Hub cloud device and listens all the incoming messages from the cloud, that could be different: - REGISTER: sent with the email of the user that wants to registrate this device - EXIT: message that force the exit of the application - UNREGISTER: sent with the email of the user that wants to unregistrate this device - NEW_DATA: sent with the data that will be added to the device. Used only for debug purpose """ global TOTAL_MESSAGES_RECEIVED print("Starting C2D sample") print("Press Ctrl-C to exit") print("Connecting to IoT Hub...") async with IoTHubSession.from_connection_string(self.connection_string) as session: print("Connected to IoT Hub") async with session.messages() as messages: # print("Waiting to receive messages...") async for message in messages: TOTAL_MESSAGES_RECEIVED += 1 current_GMT = time.gmtime() time_stamp = calendar.timegm(current_GMT) print("Message received with payload: {}".format(message.payload)) print("TIMESTAMP: {}".format(time_stamp)) if message.payload == "EXIT": self.messages_queue.put("EXIT") raise Exception('Stop this thing') if "UNREGISTER" in message.payload: message_to_send = { "type": "UNREGISTER", "payload": "" } self.messages_queue.put(message_to_send) else: if "REGISTER" in message.payload: message_to_send = { "type": "REGISTER", "payload": { "email": message.payload.split(";")[1], "hash": message.payload.split(";")[2] } } self.messages_queue.put(message_to_send) # test if "NEW_DATA" in message.payload: message_to_send = { "type": "TAG_READ", "payload": "fedfwefwe" } self.messages_queue.put(message_to_send)
def run(self)
-
Method that executes the thread
Expand source code
def run(self): """ Method that executes the thread """ try: asyncio.run(self.main()) except KeyboardInterrupt or Exception: print("User initiated exit. Exiting") finally: print("Received {} messages in total".format(TOTAL_MESSAGES_RECEIVED))