From 1834d935e1c67034d3e196201802ddc99105a3f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Lucas?= Date: Mon, 10 Dec 2018 17:23:43 +0100 Subject: [PATCH] Use Rest Api instead of Admin SDK --- mqtt2firebase.py | 58 +++++++++++++++++++++++++++++++++++++++--------- requirements.txt | 2 +- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/mqtt2firebase.py b/mqtt2firebase.py index bca5382..4135589 100644 --- a/mqtt2firebase.py +++ b/mqtt2firebase.py @@ -23,12 +23,14 @@ import os, re, time, json, argparse, signal, threading +import urllib.parse from queue import Queue, Empty +import requests import paho.mqtt.client as mqtt # pip install paho-mqtt -import firebase_admin -from firebase_admin import credentials -from firebase_admin import db +from google.oauth2 import service_account +from google.auth.transport.requests import AuthorizedSession + verbose = False FIREBASE_BASE_URL = 'https://{0}.firebaseio.com' @@ -53,7 +55,7 @@ def sendToFirebase(sensorName, payload): debug ("Start sendToFirebase") try: if not args.dryRun: - if (args.topicAsChild): + if args.topicAsChild: r = ref.child(sensorName).push(payload) else: r = ref.push(payload) @@ -67,18 +69,40 @@ def sendToFirebase(sensorName, payload): debug ("End sendToFirebase") def process_firebase_messages(lqueue, stop_event): + firebaseSession = AuthorizedSession(credentials) + baseUrl = FIREBASE_BASE_URL.format(args.firebaseAppName) + while not stop_event.is_set(): try: packet = lqueue.get(False) except Empty: - time.sleep(10) + time.sleep(5) pass else: if packet is None: continue debug("data from queue: " + format(packet)) - sendToFirebase(packet['topic'], packet['payload']) + firebasePath = args.firebasePath + if args.topicAsChild: + firebasePath = urllib.parse.urljoin(args.firebasePath + '/', packet['topic']) + firebasePath = baseUrl + '/' + firebasePath + '.json' + debug ("Sending {0} to this URL {1}".format(packet['payload'], firebasePath)) + try: + if not args.dryRun: + r = firebaseSession.post(firebasePath, json=packet['payload'], timeout=5) + debug ("payload inserted : " + r.text) + except requests.exceptions.Timeout: + print ("Firebase Timeout") + pass + except requests.exceptions.RequestException as e: + print ("Firebase Exception" + str(e)) + pass + except: + print ("Firebase Unknown Exception") + pass + #sendToFirebase(packet['topic'], packet['payload']) queue.task_done() + firebaseSession.close() debug("Stopping Firebase Thread ...") def on_connect(client, userdata, flags, rc): @@ -136,11 +160,23 @@ def on_message(client, userdata, msg): if (pathOrCredentials.startswith ('{')): pathOrCredentials = json.loads(pathOrCredentials) -cred = credentials.Certificate(pathOrCredentials) -default_app = firebase_admin.initialize_app(cred, { - 'databaseURL': FIREBASE_BASE_URL.format(args.firebaseAppName) -}) -ref = db.reference(args.firebasePath) +# Define the required scopes +scopes = [ + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/firebase.database" +] + +# Authenticate a credential with the service account +credentials = service_account.Credentials.from_service_account_info( + pathOrCredentials, scopes=scopes) +# credentials = service_account.Credentials.from_service_account_file( +# pathOrCredentials, scopes=scopes) + +# cred = credentials.Certificate(pathOrCredentials) +# default_app = firebase_admin.initialize_app(cred, { +# 'databaseURL': FIREBASE_BASE_URL.format(args.firebaseAppName) +# }) +# ref = db.reference(args.firebasePath) queue = Queue() stop_event = threading.Event() diff --git a/requirements.txt b/requirements.txt index 73cdeb0..9a4ddea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ paho-mqtt -firebase-admin==2.13.0 +google-auth