Commit 45d73b04 authored by Jean-Didier Totow's avatar Jean-Didier Totow
Browse files

unique containerization of input api and influxdb

parent 8e455ba8
# InfluxDB setup details.
INFLUXDB_DATA_ENGINE=tsm1
INFLUXDB_REPORTING_DISABLED=false
INFLUXDB_ADMIN_USER=admin
......@@ -5,4 +6,7 @@ INFLUXDB_ADMIN_PASSWORD=password
INFLUXDB_DB=morphemic
INFLUXDB_USER=morphemic
INFLUXDB_USER_PASSWORD=password
INFLUXDB_HTTP_FLUX_ENABLED=true
\ No newline at end of file
INFLUXDB_HTTP_FLUX_ENABLED=true
# Python API message queue connection.
ACTIVEMQ_HOSTNAME=activemq
\ No newline at end of file
# --- Stage 1: API Dependency Loader
# :: Initial dependency loading image.
FROM python:3.7-slim as api-loader
# Get package dependencies.
COPY inputapi/requirements.txt .
RUN pip install --user -r requirements.txt
# --- Stage 2: Combined InfluxDB + Python API Image
FROM python:3.7-alpine3.12
# :: InfluxDB standard setup
RUN echo 'hosts: files dns' >> /etc/nsswitch.conf
RUN apk add --no-cache tzdata bash ca-certificates && \
update-ca-certificates
ENV INFLUXDB_VERSION 1.8.4
RUN set -ex && \
mkdir ~/.gnupg; \
echo "disable-ipv6" >> ~/.gnupg/dirmngr.conf; \
apk add --no-cache --virtual .build-deps wget gnupg tar && \
for key in \
05CE15085FC09D18E99EFB22684A14CF2582E0C5 ; \
do \
gpg --keyserver ha.pool.sks-keyservers.net --recv-keys "$key" || \
gpg --keyserver pgp.mit.edu --recv-keys "$key" || \
gpg --keyserver keyserver.pgp.com --recv-keys "$key" ; \
done && \
wget --no-verbose https://dl.influxdata.com/influxdb/releases/influxdb-${INFLUXDB_VERSION}-static_linux_amd64.tar.gz.asc && \
wget --no-verbose https://dl.influxdata.com/influxdb/releases/influxdb-${INFLUXDB_VERSION}-static_linux_amd64.tar.gz && \
gpg --batch --verify influxdb-${INFLUXDB_VERSION}-static_linux_amd64.tar.gz.asc influxdb-${INFLUXDB_VERSION}-static_linux_amd64.tar.gz && \
mkdir -p /usr/src && \
tar -C /usr/src -xzf influxdb-${INFLUXDB_VERSION}-static_linux_amd64.tar.gz && \
rm -f /usr/src/influxdb-*/influxdb.conf && \
chmod +x /usr/src/influxdb-*/* && \
cp -a /usr/src/influxdb-*/* /usr/bin/ && \
gpgconf --kill all && \
rm -rf *.tar.gz* /usr/src /root/.gnupg && \
apk del .build-deps
COPY influxdb.conf /etc/influxdb/influxdb.conf
EXPOSE 8086
VOLUME /var/lib/influxdb
COPY entrypoint.sh /entrypoint.sh
COPY init-influxdb.sh /init-influxdb.sh
# :: Python API setup
# Copy compiled dependencies from
# the standard user pip directory
# and update PATH.
COPY --from=api-loader /root/.local /root/.local
ENV PATH=/root/.local:$PATH
# Copy Python API.
RUN mkdir inputapi
COPY ./inputapi/src ./inputapi/
# Execute both in entrypoint.sh.
ENTRYPOINT ["/entrypoint.sh"]
CMD ["influxd"]
# Time Series Database
This component extends the standard InfluxDB Alpine Dockerfile with Python support and the local Input API.
## Maintenance
Check comments on the main Dockerfile.
#!/bin/bash
set -e
if [ "${1:0:1}" = '-' ]; then
set -- influxd "$@"
fi
if [ "$1" = 'influxd' ]; then
/init-influxdb.sh "${@:2}"
fi
# Run API
python inputapi/app.py &
# Run InfluxDB
exec "$@"
[meta]
dir = "/var/lib/influxdb/meta"
[data]
dir = "/var/lib/influxdb/data"
engine = "tsm1"
wal-dir = "/var/lib/influxdb/wal"
#!/bin/bash
set -e
AUTH_ENABLED="$INFLUXDB_HTTP_AUTH_ENABLED"
if [ -z "$AUTH_ENABLED" ]; then
AUTH_ENABLED="$(grep -iE '^\s*auth-enabled\s*=\s*true' /etc/influxdb/influxdb.conf | grep -io 'true' | cat)"
else
AUTH_ENABLED="$(echo "$INFLUXDB_HTTP_AUTH_ENABLED" | grep -io 'true' | cat)"
fi
INIT_USERS=$([ ! -z "$AUTH_ENABLED" ] && [ ! -z "$INFLUXDB_ADMIN_USER" ] && echo 1 || echo)
# Check if an environment variable for where to put meta is set.
# If so, then use that directory, otherwise use the default.
if [ -z "$INFLUXDB_META_DIR" ]; then
META_DIR="/var/lib/influxdb/meta"
else
META_DIR="$INFLUXDB_META_DIR"
fi
if ( [ ! -z "$INIT_USERS" ] || [ ! -z "$INFLUXDB_DB" ] || [ "$(ls -A /docker-entrypoint-initdb.d 2> /dev/null)" ] ) && [ ! "$(ls -d "$META_DIR" 2>/dev/null)" ]; then
INIT_QUERY=""
CREATE_DB_QUERY="CREATE DATABASE $INFLUXDB_DB"
if [ ! -z "$INIT_USERS" ]; then
if [ -z "$INFLUXDB_ADMIN_PASSWORD" ]; then
INFLUXDB_ADMIN_PASSWORD="$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c32;echo;)"
echo "INFLUXDB_ADMIN_PASSWORD:$INFLUXDB_ADMIN_PASSWORD"
fi
INIT_QUERY="CREATE USER \"$INFLUXDB_ADMIN_USER\" WITH PASSWORD '$INFLUXDB_ADMIN_PASSWORD' WITH ALL PRIVILEGES"
elif [ ! -z "$INFLUXDB_DB" ]; then
INIT_QUERY="$CREATE_DB_QUERY"
else
INIT_QUERY="SHOW DATABASES"
fi
INFLUXDB_INIT_PORT="8086"
INFLUXDB_HTTP_BIND_ADDRESS=127.0.0.1:$INFLUXDB_INIT_PORT INFLUXDB_HTTP_HTTPS_ENABLED=false influxd "$@" &
pid="$!"
INFLUX_CMD="influx -host 127.0.0.1 -port $INFLUXDB_INIT_PORT -execute "
for i in {30..0}; do
if $INFLUX_CMD "$INIT_QUERY" &> /dev/null; then
break
fi
echo 'influxdb init process in progress...'
sleep 1
done
if [ "$i" = 0 ]; then
echo >&2 'influxdb init process failed.'
exit 1
fi
if [ ! -z "$INIT_USERS" ]; then
INFLUX_CMD="influx -host 127.0.0.1 -port $INFLUXDB_INIT_PORT -username ${INFLUXDB_ADMIN_USER} -password ${INFLUXDB_ADMIN_PASSWORD} -execute "
if [ ! -z "$INFLUXDB_DB" ]; then
$INFLUX_CMD "$CREATE_DB_QUERY"
fi
if [ ! -z "$INFLUXDB_USER" ] && [ -z "$INFLUXDB_USER_PASSWORD" ]; then
INFLUXDB_USER_PASSWORD="$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c32;echo;)"
echo "INFLUXDB_USER_PASSWORD:$INFLUXDB_USER_PASSWORD"
fi
if [ ! -z "$INFLUXDB_USER" ]; then
$INFLUX_CMD "CREATE USER \"$INFLUXDB_USER\" WITH PASSWORD '$INFLUXDB_USER_PASSWORD'"
$INFLUX_CMD "REVOKE ALL PRIVILEGES FROM \"$INFLUXDB_USER\""
if [ ! -z "$INFLUXDB_DB" ]; then
$INFLUX_CMD "GRANT ALL ON \"$INFLUXDB_DB\" TO \"$INFLUXDB_USER\""
fi
fi
if [ ! -z "$INFLUXDB_WRITE_USER" ] && [ -z "$INFLUXDB_WRITE_USER_PASSWORD" ]; then
INFLUXDB_WRITE_USER_PASSWORD="$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c32;echo;)"
echo "INFLUXDB_WRITE_USER_PASSWORD:$INFLUXDB_WRITE_USER_PASSWORD"
fi
if [ ! -z "$INFLUXDB_WRITE_USER" ]; then
$INFLUX_CMD "CREATE USER \"$INFLUXDB_WRITE_USER\" WITH PASSWORD '$INFLUXDB_WRITE_USER_PASSWORD'"
$INFLUX_CMD "REVOKE ALL PRIVILEGES FROM \"$INFLUXDB_WRITE_USER\""
if [ ! -z "$INFLUXDB_DB" ]; then
$INFLUX_CMD "GRANT WRITE ON \"$INFLUXDB_DB\" TO \"$INFLUXDB_WRITE_USER\""
fi
fi
if [ ! -z "$INFLUXDB_READ_USER" ] && [ -z "$INFLUXDB_READ_USER_PASSWORD" ]; then
INFLUXDB_READ_USER_PASSWORD="$(< /dev/urandom tr -dc _A-Z-a-z-0-9 | head -c32;echo;)"
echo "INFLUXDB_READ_USER_PASSWORD:$INFLUXDB_READ_USER_PASSWORD"
fi
if [ ! -z "$INFLUXDB_READ_USER" ]; then
$INFLUX_CMD "CREATE USER \"$INFLUXDB_READ_USER\" WITH PASSWORD '$INFLUXDB_READ_USER_PASSWORD'"
$INFLUX_CMD "REVOKE ALL PRIVILEGES FROM \"$INFLUXDB_READ_USER\""
if [ ! -z "$INFLUXDB_DB" ]; then
$INFLUX_CMD "GRANT READ ON \"$INFLUXDB_DB\" TO \"$INFLUXDB_READ_USER\""
fi
fi
fi
for f in /docker-entrypoint-initdb.d/*; do
case "$f" in
*.sh) echo "$0: running $f"; . "$f" ;;
*.iql) echo "$0: running $f"; $INFLUX_CMD "$(cat ""$f"")"; echo ;;
*) echo "$0: ignoring $f" ;;
esac
echo
done
if ! kill -s TERM "$pid" || ! wait "$pid"; then
echo >&2 'influxdb init process failed. (Could not stop influxdb)'
exit 1
fi
fi
# Input API
The component which handles message queue inputs to InfluxDB within the Persistent Storage Module of the [MORPHEMIC](https://www.morphemic.cloud) platform.
\ No newline at end of file
flask
influxdb
stomp.py
requests
\ No newline at end of file
[metadata]
name = morphemic-inputapi
version = 1.0.0
author = Jean-Didier Totow
author_email = totow@unipi.gr
description = The InputAPI component for the Persistent Storage module of the MORPHEMIC platform.
long_description = file: README.md
long_description_content_type = text/markdown
url = https://git.dac.ds.unipi.gr/morphemic/persistent-storage
project_urls =
Bug Tracker = https://git.dac.ds.unipi.gr/morphemic/persistent-storage/-/issues
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
package_dir =
= src
packages = find:
python_requires = >=3.7
[options.packages.find]
where = src
\ No newline at end of file
import stomp, os, json, time
import stomp, os, json, time
from threading import Thread
data_format = os.environ.get("DATA_FORMAT","json")
data_format = os.environ.get("DATA_FORMAT", "json")
class Listener(object):
def __init__(self, conn,handler):
def __init__(self, conn, handler):
self.conn = conn
self.count = 0
self.handler = handler
self.handler = handler
self.start = time.time()
def on_error(self, headers, message):
print('received an error %s' % message)
print("received an error %s" % message)
def on_message(self, headers, message):
self.handler(message)
class Worker(Thread):
def __init__(self,hostname,port, username, password, topic, handler, sleeping, index):
self.hostname = hostname
self.port = port
def __init__(
self, hostname, port, username, password, topic, handler, sleeping, index
):
self.hostname = hostname
self.port = port
self.topic = topic
self.handler = handler
self.handler = handler
self.sleeping = sleeping
self.index = index
self.username = username
self.index = index
self.username = username
self.password = password
self.status = None
self.normal_stop = False
self.status = None
self.normal_stop = False
super(Worker, self).__init__()
def getStatus(self):
return self.status
def getIndex(self):
return self.index
return self.index
def stop(self):
self.normal_stop = True
self.normal_stop = True
def run(self):
print("Worker {0} started".format(self.index))
print("Hostname : {0}\nPort: {1}\nTopic: {2}".format(self.hostname,self.port,self.topic))
print(
"Hostname : {0}\nPort: {1}\nTopic: {2}".format(
self.hostname, self.port, self.topic
)
)
while True:
if self.normal_stop:
break
break
print("Trying to connect ...")
try:
conn = stomp.Connection(host_and_ports = [(self.hostname, self.port)])
conn.set_listener('', Listener(conn,self.handler))
conn.connect(login=self.username,passcode=self.password)
conn.subscribe(destination=self.topic, id=1, ack='auto')
conn = stomp.Connection(host_and_ports=[(self.hostname, self.port)])
conn.set_listener("", Listener(conn, self.handler))
conn.connect(login=self.username, passcode=self.password)
conn.subscribe(destination=self.topic, id=1, ack="auto")
self.status = "started"
print("Waiting for messages...")
while 1:
while 1:
if self.normal_stop:
break
time.sleep(self.sleeping)
break
time.sleep(self.sleeping)
except Exception as e:
print("Could not connect to ActiveMQ broker")
self.status = "error"
......@@ -66,21 +74,22 @@ class Worker(Thread):
self.status = "stopped"
class ActiveMQManager():
class ActiveMQManager:
def __init__(self, handler):
self.all_threads = []
self.handler = handler
thread_controller = Thread(target=self.workerController)
thread_controller.start()
def getData(self,data):
def getData(self, data):
if data_format == "json":
_data = None
_data = None
try:
_data = json.loads(data)
except Exception as e:
print("Could not decode json content")
print("data content", data)
return None
return None
self.handler(_data)
def workerController(self):
......@@ -94,14 +103,14 @@ class ActiveMQManager():
w.start()
time.sleep(20)
def startWorker(self,hostname,port, username, password, topic, key):
def startWorker(self, hostname, port, username, password, topic, key):
for w in self.all_threads:
if w.getIndex() == key:
print("Connection already registered")
return None
sleeping = 5 #5 seconds
worker = Worker(hostname,port,username,password,topic,self.handler,sleeping,key)
return None
sleeping = 5 # 5 seconds
worker = Worker(
hostname, port, username, password, topic, self.handler, sleeping, key
)
self.all_threads.append(worker)
worker.start()
\ No newline at end of file
import json, time, os, requests, stomp
from flask import Flask, request, Response
from flask import Flask, request, Response
from activemqlistermanager import ActiveMQManager
from influxdb import InfluxDBClient
from threading import Thread
url_api_ems = os.environ.get("URL_API_EMS","localhost:8080")
influxdb_url = os.environ.get("INFLUXDB_URL","http://localhost:8086")
metric_name_field_name = os.environ.get("METRIC_NAME_FIELD_NAME","metricName")
metric_name_field_value = os.environ.get("METRIC_NAME_FIELD_VALUE","metricValue")
metric_name_field_application = os.environ.get("METRIC_NAME_FIELD_APPLICATION","application")
metric_name_field_timestamp = os.environ.get("METRIC_NAME_FIELD_TIMESTAMP","timestamp")
metric_name_field_label = os.environ.get("METRIC_NAME_FIELD_LABEL","labels")
max_waiting_time = int(os.environ.get("MAX_WAITING_TIME","20")) #minutes
max_point_list = int(os.environ.get("MAX_POINT_LIST","1000"))
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic")
influxdb_password = os.environ.get("INFLUXDB_PASSWORD","password")
influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic")
ps_management_queue = os.environ.get("PS_MANAGEMENT_QUEUE","persistent_storage")
#"hostname": "localhost", "port": 61610, "topic": "static-topic-1", "metric": "somekey","username":"aaa","password": "111"
activemq_hostname = os.environ.get("ACTIVEMQ_HOSTNAME","localhost")
activemq_port = int(os.environ.get("ACTIVEMQ_PORT","61613"))
activemq_topic = os.environ.get("ACTIVEMQ_TOPIC","static-topic-1")
activemq_subs_key = os.environ.get("ACTIVEMQ_SUBS_KEY","subs-1")
activemq_username = os.environ.get("ACTIVEMQ_USERNAME","aaa")
activemq_password = os.environ.get("ACTIVEMQ_PASSWORD","111")
url_api_ems = os.environ.get("URL_API_EMS", "localhost:8080")
influxdb_url = os.environ.get("INFLUXDB_URL", "http://localhost:8086")
metric_name_field_name = os.environ.get("METRIC_NAME_FIELD_NAME", "metricName")
metric_name_field_value = os.environ.get("METRIC_NAME_FIELD_VALUE", "metricValue")
metric_name_field_application = os.environ.get(
"METRIC_NAME_FIELD_APPLICATION", "application"
)
metric_name_field_timestamp = os.environ.get("METRIC_NAME_FIELD_TIMESTAMP", "timestamp")
metric_name_field_label = os.environ.get("METRIC_NAME_FIELD_LABEL", "labels")
max_waiting_time = int(os.environ.get("MAX_WAITING_TIME", "20")) # minutes
max_point_list = int(os.environ.get("MAX_POINT_LIST", "1000"))
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME", "localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT", "8086"))
influxdb_username = os.environ.get("INFLUXDB_USERNAME", "morphemic")
influxdb_password = os.environ.get("INFLUXDB_PASSWORD", "password")
influxdb_dbname = os.environ.get("INFLUXDB_DBNAME", "morphemic")
ps_management_queue = os.environ.get("PS_MANAGEMENT_QUEUE", "persistent_storage")
# "hostname": "localhost", "port": 61610, "topic": "static-topic-1", "metric": "somekey","username":"aaa","password": "111"
activemq_hostname = os.environ.get("ACTIVEMQ_HOSTNAME", "localhost")
activemq_port = int(os.environ.get("ACTIVEMQ_PORT", "61613"))
activemq_topic = os.environ.get("ACTIVEMQ_TOPIC", "static-topic-1")
activemq_subs_key = os.environ.get("ACTIVEMQ_SUBS_KEY", "subs-1")
activemq_username = os.environ.get("ACTIVEMQ_USERNAME", "aaa")
activemq_password = os.environ.get("ACTIVEMQ_PASSWORD", "111")
class Publisher(Thread):
def __init__(self):
......@@ -38,196 +41,190 @@ class Publisher(Thread):
self.username = activemq_username
self.password = activemq_password
self.queue = []
self.connected = False
self.conn = None
super(Publisher,self).__init__()
self.conn = None
super(Publisher, self).__init__()
def connect(self):
while not self.connected:
try:
self.conn = stomp.Connection(host_and_ports = [(self.host, self.port)])
self.conn.connect(login=self.username,passcode=self.password)
print("Publisher is connected to ActiveMQ")
self.connected = True
except Exception as e:
print("Publisher coudn't connect to ActiveMQ")
print(e)
time.sleep(10)
try:
self.conn = stomp.Connection(host_and_ports=[(self.host, self.port)])
self.conn.connect(login=self.username, passcode=self.password)
print("Publisher is connected to ActiveMQ")
except Exception as e:
print("Publisher coudn't connect to ActiveMQ")
print(e)
def addInQueue(self, data, queue):
self.queue.append((data,queue))
self.queue.append((data, queue))
def run(self):
while True:
if len(self.queue) > 0:
data, destination = self.queue.pop(0)
try:
self.conn.send(body=json.dumps(data), destination=destination, persistent='false')
data, destination = self.queue.pop(0)
self.conn.send(
body=json.dumps(data),
destination=destination,
persistent="false",
)
except Exception as e:
print("An exception occured while publishing")
print(e)
self.connected = False
self.queue.append((data, destination))
self.connect()
else:
time.sleep(1)
class Consumer():
class Consumer:
def __init__(self, queue, application, metrics, name):
self.queue = queue
self.queue = queue
self.application = application
self.metrics = metrics #list of metrics
self.name = name
self.metrics = metrics # list of metrics
self.name = name
def getName(self):
return self.name
return self.name
def getQueue(self):
return self.queue
return self.queue
def getApplication(self):
return self.application
def getMetrics(self):
return self.metrics
return self.metrics
def match(self,data):
if self.application == "":
return False
else:
return self.application == data['labels']['application']
"""
def match(self,data):
def match(self