From f24d730294183c715ac538af4298ea750fc3c211 Mon Sep 17 00:00:00 2001 From: Tristan Gosselin-Hane Date: Fri, 9 Nov 2018 01:32:36 -0500 Subject: [PATCH] Add optional analytics logging to elasticsearch --- config.example.json | 7 ++++ config.py | 5 +++ elasticsearch_logger.py | 71 +++++++++++++++++++++++++++++++++++++++++ webhook-bridge.py | 41 +++++++++++++++++++----- 4 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 elasticsearch_logger.py diff --git a/config.example.json b/config.example.json index a3e55dd..df172f1 100644 --- a/config.example.json +++ b/config.example.json @@ -19,5 +19,12 @@ }, "DATABASE": { "CONNECTION_STRING": "sqlite:////data/db.sqlite" + }, + "ELASTICSEARCH": { + "ENABLED": false, + "URL": "", + "AUTH": false, + "USERNAME": "", + "PASSWORD": "" } } diff --git a/config.py b/config.py index 7011000..85f4608 100644 --- a/config.py +++ b/config.py @@ -20,6 +20,11 @@ class Configuration(object): self.auth_port = self._config["AUTH_SERVER"]["PORT"] self.auth_dns = self._config["AUTH_SERVER"]["DNS_WILDCARD"] self.database_connection_string = self._config["DATABASE"]["CONNECTION_STRING"] + self.es_enabled = self._config["ELASTICSEARCH"]["ENABLED"] + self.es_url = self._config["ELASTICSEARCH"]["URL"] + self.es_auth = self._config["ELASTICSEARCH"]["AUTH"] + self.es_username = self._config["ELASTICSEARCH"]["USERNAME"] + self.es_password = self._config["ELASTICSEARCH"]["PASSWORD"] else: print("error reading config") exit(1) diff --git a/elasticsearch_logger.py b/elasticsearch_logger.py new file mode 100644 index 0000000..2848409 --- /dev/null +++ b/elasticsearch_logger.py @@ -0,0 +1,71 @@ +import time +from enum import Enum +import logging + +import requests + +_username = None +_password = None +_auth = None +_url = None + + +def initialize(config): + global _username, _password, _url, _auth + if config.es_auth: + _auth = True + _username = config.es_username + _password = config.es_password + _url = config.es_url + + +def es_connection(uuid, reason, count=0): + if ConnectionReason(reason).name != "SEEN": + es_payload = { + "uuid": uuid, + "time": (lambda: int(round(time.time() * 1000)))(), + "reason": ConnectionReason(reason).name, + "count": count, + } + else: + es_payload = { + "uuid": uuid, + "time": (lambda: int(round(time.time() * 1000)))(), + "reason": ConnectionReason(reason).name, + } + es_post_request("connections/_doc/", es_payload) + + +def es_chat_message(uuid, display_name, message, message_unformatted): + es_payload = { + "uuid": uuid, + "display_name": display_name, + "message": message, + "message_unformatted": message_unformatted, + "time": (lambda: int(round(time.time() * 1000)))(), + } + es_post_request("chat_messages/_doc/", es_payload) + + +def es_raw_message(type, message): + es_payload = { + "time": (lambda: int(round(time.time() * 1000)))(), + "type": type, + "message": message, + } + es_post_request("raw_messages/_doc/", es_payload) + + +def es_post_request(endpoint, payload): + the_url = "{}{}".format(_url, endpoint) + if _auth: + post = requests.post(the_url, auth=(_username, _password), json=payload) + else: + post = requests.post(the_url, json=payload) + logging.debug("[Elasticsearch POST] {}".format(post.text)) + + +class ConnectionReason(Enum): + CONNECTED = "CONNECTED" + DISCONNECTED = "DISCONNECTED" + SEEN = "SEEN" diff --git a/webhook-bridge.py b/webhook-bridge.py index ce28d1e..803a525 100755 --- a/webhook-bridge.py +++ b/webhook-bridge.py @@ -4,6 +4,8 @@ from __future__ import print_function import sys import re +from enum import Enum + import requests import json import time @@ -16,6 +18,8 @@ from database import DiscordChannel, AccountLinkToken, DiscordAccount import database_session from datetime import datetime, timedelta, timezone +import elasticsearch_logger +from elasticsearch_logger import es_chat_message, es_connection, es_raw_message, ConnectionReason from minecraft import authentication from minecraft.exceptions import YggdrasilError @@ -133,12 +137,21 @@ def generate_random_auth_token(length): return ''.join(random.choice(letters) for i in range(length)) +# TODO: Get rid of this when pycraft's enum becomes usable +class ChatType(Enum): + CHAT = 0 # A player-initiated chat message. + SYSTEM = 1 # The result of running a command. + GAME_INFO = 2 # Displayed above the hotbar in vanilla clients. + + def main(): global BOT_USERNAME config = Configuration("config.json") setup_logging(config.logging_level) database_session.initialize(config) + if config.es_enabled: + elasticsearch_logger.initialize(config) reactor_thread = Thread(target=run_auth_server, args=(config.auth_port,)) reactor_thread.start() @@ -229,6 +242,10 @@ def main(): "Processing AddPlayerAction tab list packet, name: {}, uuid: {}".format(action.name, action.uuid)) username = action.name player_uuid = action.uuid + if action.name not in PLAYER_LIST.inv: + PLAYER_LIST.inv[action.name] = action.uuid + if action.name not in UUID_CACHE.inv: + UUID_CACHE.inv[action.name] = action.uuid # Initial tablist backfill if LAST_CONNECTION_TIME + timedelta(seconds=2.5) < datetime.now(timezone.utc): webhook_payload = { @@ -239,10 +256,11 @@ def main(): } for webhook in WEBHOOKS: post = requests.post(webhook,json=webhook_payload) - if action.name not in UUID_CACHE.inv: - UUID_CACHE.inv[action.name] = action.uuid - if action.name not in PLAYER_LIST.inv: - PLAYER_LIST.inv[action.name] = action.uuid + if config.es_enabled: + es_connection(uuid=action.uuid, reason=ConnectionReason.CONNECTED, count=len(PLAYER_LIST)) + return + if config.es_enabled: + es_connection(uuid=action.uuid, reason=ConnectionReason.SEEN) if isinstance(action, clientbound.play.PlayerListItemPacket.RemovePlayerAction): logging.debug("Processing RemovePlayerAction tab list packet, uuid: {}".format(action.uuid)) username = UUID_CACHE[action.uuid] @@ -255,6 +273,8 @@ def main(): } for webhook in WEBHOOKS: post = requests.post(webhook,json=webhook_payload) + if config.es_enabled: + es_connection(uuid=action.uuid, reason=ConnectionReason.DISCONNECTED, count=len(PLAYER_LIST)) del UUID_CACHE[action.uuid] del PLAYER_LIST[action.uuid] @@ -279,11 +299,11 @@ def main(): if username.lower() == BOT_USERNAME.lower(): # Don't relay our own messages return - message = regexp_match.group(2) + original_message = regexp_match.group(2) player_uuid = mc_username_to_uuid(username) - logging.info("Username: {} Message: {}".format(username, message)) - logging.debug("msg: {}".format(repr(message))) - message = remove_emoji(message.strip().replace("@", "@\N{zero width space}")) + logging.info("Username: {} Message: {}".format(username, original_message)) + logging.debug("msg: {}".format(repr(original_message))) + message = remove_emoji(original_message.strip().replace("@", "@\N{zero width space}")) webhook_payload = { 'username': username, 'avatar_url': "https://visage.surgeplay.com/face/160/{}".format(player_uuid), @@ -291,6 +311,11 @@ def main(): } for webhook in WEBHOOKS: post = requests.post(webhook, json=webhook_payload) + if config.es_enabled: + es_chat_message( + uuid=player_uuid, display_name=username, message=original_message, message_unformatted=chat_string) + if config.es_enabled: + es_raw_message(type=ChatType(chat_packet.position).name, message=chat_packet.json_data) def handle_health_update(health_update_packet): if health_update_packet.health <= 0: