Commit 7bae958e authored by Jean-Didier Totow's avatar Jean-Didier Totow
Browse files

persistent storage ems time

parent f54ef6d7
Pipeline #16654 passed with stage
in 2 minutes and 6 seconds
# --- Stage 1: API Dependency Loader
# :: Initial dependency loading image.
FROM python:3.7-slim as api-loader
# Get package dependencies.
COPY inputapi/requirements.txt .
RUN pip install --user -r requirements.txt
# :: Python API setup
# Copy compiled dependencies from
# the standard user pip directory
# and update PATH.
COPY --from=api-loader /root/.local /root/.local
ENV PATH=/root/.local:$PATH
# Copy Python API.
RUN mkdir /app
RUN mkdir -p /app/log
COPY ./inputapi/src /app/
WORKDIR /app
# Execute both in entrypoint.sh.
CMD ["python","-u","app.py"]
......@@ -53,6 +53,7 @@ class Worker(Thread):
self.hostname, self.port, self.topic
)
)
_sleeping_time = 5
while True:
if self.normal_stop:
break
......@@ -74,7 +75,9 @@ class Worker(Thread):
print("Could not connect to ActiveMQ broker")
self.status = "error"
print(e)
time.sleep(5)
print("Sleep before reconnection {0}sec.".format(_sleeping_time))
time.sleep(_sleeping_time)
_sleeping_time = _sleeping_time * 2
print("End process")
self.status = "stopped"
......
......@@ -48,10 +48,8 @@ class Connection:
def connect(self, wait=True):
if not self.conn:
return
self.conn.connect(self.username, self.password, wait=wait)
def disconnect(self):
......
......@@ -51,7 +51,7 @@ class Publisher(Thread):
def connect(self):
try:
self.conn = Connection(username=self.username, password=self.password, host=self.hostname,port=self.port, debug=False)
self.conn = Connection(username=self.username, password=self.password, host=self.host,port=self.port, debug=False)
self.conn.connect()
print("Publisher is connected to ActiveMQ")
logging.info("Publisher is connected to ActiveMQ")
......@@ -408,7 +408,7 @@ class Ingestor(Thread):
database=self.database,
)
databases = self.influxdb.get_list_database()
print(databases)
#print(databases)
db_exist = False
for db in databases:
if db["name"] == self.database:
......@@ -449,6 +449,8 @@ class Ingestor(Thread):
else:
application = fields[metric_name_field_application]
timestamp = int(fields[metric_name_field_timestamp]/1000) #time in sec
#for an unknown error, i'll reduce the timestamp to 5 minutes for debugging purpose
#timestamp -= 60*5
metric = topic[topic.rindex('/')+1:]
value = fields[metric_name_field_value]
backet = self.backer_manager.getBacketBasedOnTime(application, timestamp)
......@@ -460,7 +462,7 @@ class Ingestor(Thread):
# if tolerance != None:
# backet.setTolerance(tolerance)
else:
backet = Backet(application, 2, timestamp)
backet = Backet(application, 1, timestamp)
backet.addLabels({"application": application, "level": fields["level"]})
backet.insert(metric, value, timestamp)
self.backer_manager.addBacket(application, backet)
......@@ -469,6 +471,7 @@ class Ingestor(Thread):
if backet != None:
metrics = backet.getBacketSeries()
timestamp = backet.getTimestamp()
metrics["ems_time"] = timestamp
labels = backet.getLabels()
self.insert2({"metrics": metrics, "labels": labels, "timestamp": timestamp})
......@@ -625,6 +628,24 @@ class InputApi:
def getSubscriberSize(self):
return len(self.subscriptions.keys())
def converterEMSToJSON(self, data):
if type(data) != type(""):
return None
try:
data = data.replace("{","").replace("}","")
data_splitted = data.split(",")
result = {}
for _d in data_splitted:
v, k = _d[_d.index("=")+1:], _d[:_d.index("=")]
k, v = k.strip(), v.strip()
if k == "level" or k == "timestamp":
result[k] = int(float(v))
else:
result[k] = float(v)
return json.dumps(result)
except:
return None
def getData(self, data, topic):
try:
_json = json.loads(data)
......@@ -633,9 +654,11 @@ class InputApi:
self.handleRequest(_json)
return True
except Exception as e:
print("Non JSON content received")
logging.warning("Non JSON content received")
return None
data = self.converterEMSToJSON(data)
if data == None:
print("Non JSON content received")
logging.warning("Non JSON content received")
return None
self.ingestor.addToList(data, topic)
self.data_points += 1
if time.time() - self.last_evaluation > self.evaluation_interval:
......
version: '2'
services:
database:
image: jdtotow/persistent_storage
container_name: database
persistent_storage:
image: gitlab.ow2.org:4567/melodic/morphemic-preprocessor/persistent_storage:latest
container_name: persistent_storage
restart: always
env_file:
- "./database/.env"
volumes:
- "./database/data:/var/lib/influxdb"
- "./log:/app/log/"
ports:
- 8086:8086
environment:
- "ACTIVEMQ_PORT=61610"
- "ACTIVEMQ_TOPIC=AAAA"
- "ACTIVEMQ_HOST=147.102.17.76"
- "ACTIVEMQ_USERNAME=aaa"
- "ACTIVEMQ_PASSWORD=111"
- "INFLUXDB_HOSTNAME=ui-influxdb"
- "INFLUXDB_PORT=8086"
- "INFLUXDB_USERNAME=morphemic"
- "INFLUXDB_PASSWORD=password"
publisher:
image: jdtotow/publisher
container_name: publisher
......@@ -23,45 +29,5 @@ services:
- "ACTIVEMQ_HOST=activemq"
#- "APPLICATION_NAME=custom_name"
activemq:
image: jdtotow/activemq
container_name: activemq
ports:
# mqtt
- "1883:1883"
# amqp
- "5672:5672"
# ui
- "8161:8161"
# stomp
- "61613:61613"
# ws
- "61614:61614"
# jms
- "61616:61616"
# jms prometheus agent
- "8080:8080"
#volumes: ["activemq-data:/opt/activemq/conf", "activemq-data:/data/activemq", "activemq-data:/var/log/activemq"]
environment:
ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT: "true"
ACTIVEMQ_ADMIN_LOGIN: aaa
ACTIVEMQ_ADMIN_PASSWORD: "111"
ACTIVEMQ_WRITE_LOGIN: aaa
ACTIVEMQ_WRITE_PASSWORD: "111"
ACTIVEMQ_READ_LOGIN: aaa
ACTIVEMQ_READ_PASSWORD: "111"
ACTIVEMQ_JMX_LOGIN: aaa
ACTIVEMQ_JMX_PASSWORD: "111"
ACTIVEMQ_STATIC_TOPICS: static-topic-1;static-topic-2
ACTIVEMQ_STATIC_QUEUES: static-queue-1;static-queue-2
ACTIVEMQ_ENABLED_SCHEDULER: "true"
ACTIVEMQ_MIN_MEMORY: 512
ACTIVEMQ_MAX_MEMORY: 2048
#prometheus:
# image: prom/prometheus
# ports:
# - 9090:9090
# container_name: prometheus
# volumes:
# - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
import os, json, time
_in = "{metricValue=0.0, level=3.0, timestamp=1.627629862279E12}"
def converterEMSToJSON(data):
if type(data) != type(""):
return None
data = data.replace("{","").replace("}","")
data_splitted = data.split(",")
result = {}
for _d in data_splitted:
v, k = _d[_d.index("=")+1:], _d[:_d.index("=")]
k, v = k.strip(), v.strip()
if k == "level" or k == "timestamp":
result[k] = int(float(v))
else:
result[k] = float(v)
return result
r = converterEMSToJSON(_in)
print(r)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment