[go: nahoru, domu]

Skip to content

Commit

Permalink
Use Rest Api instead of Admin SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
seblucas committed Dec 10, 2018
1 parent 6446db9 commit 1834d93
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
58 changes: 47 additions & 11 deletions mqtt2firebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@


import os, re, time, json, argparse, signal, threading
import urllib.parse
from queue import Queue, Empty

import requests
import paho.mqtt.client as mqtt # pip install paho-mqtt
import firebase_admin
from firebase_admin import credentials
from firebase_admin import db
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession


verbose = False
FIREBASE_BASE_URL = 'https://{0}.firebaseio.com'
Expand All @@ -53,7 +55,7 @@ def sendToFirebase(sensorName, payload):
debug ("Start sendToFirebase")
try:
if not args.dryRun:
if (args.topicAsChild):
if args.topicAsChild:
r = ref.child(sensorName).push(payload)
else:
r = ref.push(payload)
Expand All @@ -67,18 +69,40 @@ def sendToFirebase(sensorName, payload):
debug ("End sendToFirebase")

def process_firebase_messages(lqueue, stop_event):
firebaseSession = AuthorizedSession(credentials)
baseUrl = FIREBASE_BASE_URL.format(args.firebaseAppName)

while not stop_event.is_set():
try:
packet = lqueue.get(False)
except Empty:
time.sleep(10)
time.sleep(5)
pass
else:
if packet is None:
continue
debug("data from queue: " + format(packet))
sendToFirebase(packet['topic'], packet['payload'])
firebasePath = args.firebasePath
if args.topicAsChild:
firebasePath = urllib.parse.urljoin(args.firebasePath + '/', packet['topic'])
firebasePath = baseUrl + '/' + firebasePath + '.json'
debug ("Sending {0} to this URL {1}".format(packet['payload'], firebasePath))
try:
if not args.dryRun:
r = firebaseSession.post(firebasePath, json=packet['payload'], timeout=5)
debug ("payload inserted : " + r.text)
except requests.exceptions.Timeout:
print ("Firebase Timeout")
pass
except requests.exceptions.RequestException as e:
print ("Firebase Exception" + str(e))
pass
except:
print ("Firebase Unknown Exception")
pass
#sendToFirebase(packet['topic'], packet['payload'])
queue.task_done()
firebaseSession.close()
debug("Stopping Firebase Thread ...")

def on_connect(client, userdata, flags, rc):
Expand Down Expand Up @@ -136,11 +160,23 @@ def on_message(client, userdata, msg):
if (pathOrCredentials.startswith ('{')):
pathOrCredentials = json.loads(pathOrCredentials)

cred = credentials.Certificate(pathOrCredentials)
default_app = firebase_admin.initialize_app(cred, {
'databaseURL': FIREBASE_BASE_URL.format(args.firebaseAppName)
})
ref = db.reference(args.firebasePath)
# Define the required scopes
scopes = [
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/firebase.database"
]

# Authenticate a credential with the service account
credentials = service_account.Credentials.from_service_account_info(
pathOrCredentials, scopes=scopes)
# credentials = service_account.Credentials.from_service_account_file(
# pathOrCredentials, scopes=scopes)

# cred = credentials.Certificate(pathOrCredentials)
# default_app = firebase_admin.initialize_app(cred, {
# 'databaseURL': FIREBASE_BASE_URL.format(args.firebaseAppName)
# })
# ref = db.reference(args.firebasePath)

queue = Queue()
stop_event = threading.Event()
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
paho-mqtt
firebase-admin==2.13.0
google-auth

0 comments on commit 1834d93

Please sign in to comment.