본문 바로가기
Amazon AWS/IoT Core

AWS IoT Core로 데이터 퍼블리싱

by 홍띠 2022. 5. 8.

IoTCore로 데이터를 퍼블리싱 하기 위한 설정을 마쳤다면, 이제 코드를 작성하여 퍼블리싱하면 된다. 

아직 기본설정이 되지 않았다면, 이전 글(라즈베리파이에서 AWS IoT Core로 MQTT 퍼블리싱을 위한 설정)을 참고하여 설정 후 코드를 실행해야 한다.

 

데이터를 퍼블리싱하는 방법은 여러가지이고, 각 언어별로 라이브러리를 설치해서 진행하면 된다.

나는 파이썬을 사용해서 퍼블리싱을 하기 위해, 이전에 파이썬 라이브러리를 모두 설치한 상태이다.

 

아래의 코드는 간단하게 현재 시각과 디바이스 명을 3초간격으로 퍼블리싱 해보는 코드이다.

다른 데이터를 퍼블리싱하기 위해서는, 아래 코드를 가지고 응용해서 퍼블리싱하는 데이터를 변경해 주면 된다.

import datetime as dt
from email import message
from operator import index
import time
import datetime
import json
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import pandas as pd
from random import *
import sys
import threading
from uuid import uuid4

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))

# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)

def on_resubscribe_complete(resubscribe_future):
        resubscribe_results = resubscribe_future.result()
        print("Resubscribe results: {}".format(resubscribe_results))

        for topic, qos in resubscribe_results['topics']:
            if qos is None:
                sys.exit("Server rejected resubscribe to topic: {}".format(topic))

#aws iotCore에 연결하기 위한 정보 입력
mqtt_connection = mqtt_connection_builder.mtls_from_path(
       endpoint = <IoT Endpoint>,
       #3가지 인증서 파일 경로 입력
       cert_filepath="/home/raspi2/certs/device.pem.crt",
       pri_key_filepath="/home/raspi2/certs/private.pem.key",
       ca_filepath="/home/raspi2/certs/AmazonRootCA1.pem",
       on_connection_interrupted=on_connection_interrupted,
       on_connection_resumed=on_connection_resumed,
       clean_session=False,
       keep_alive_secs=30,
       #port를 지정해둔것이 아니면 주석처리해서 연결하면 됨
       # port=args.port,
       #client id 입력
       client_id="raspi-2",
       http_proxy_options=None
       )

print("Connecting to awsIotcore ...")

#iotCore에 연결
connect_future = mqtt_connection.connect()

# Future.result() waits until a result is available
connect_future.result()
print("Connected!")

while True:
	#발행 현재 시간 확인
    pub_time = time.localtime()
    now = "%04d/%02d/%02d %02d:%02d:%02d"%(pub_time.tm_year, pub_time.tm_mon, pub_time.tm_mday, pub_time.tm_hour, pub_time.tm_min, pub_time.tm_sec)
    pub_message = {
        "time": now,
        "device": "raspi----2"
    }
    #json형식으로 퍼블리싱
    mqtt_message = json.dumps(pub_message)
    #토픽지정 후 퍼블리싱
    mqtt_connection.publish(
          topic=raspi/2/data,
          payload=mqtt_message,
          #quality of service 지정, aws에서는 0(At most Once)과 1(At least Once)만 제공함
          qos = mqtt.QoS.AT_LEAST_ONCE)
    print("message published!"+str(mqtt_message))
    #3초간격 퍼블리싱
    time.sleep(3)

 

코드 작성 후, 코드가 실행되면 IotCore 콘솔에서 잘 퍼블리싱 되고 있는 지 테스트 subscribe를 통해 확인 가능하다.

내 경우에는 라즈베리파이 2대를 동시에 퍼블리싱 시켰는데, 두개의 디바이스에서 모두 정상적으로 퍼블리싱 되고 있는 것을 확인 할 수 있다.