Source code for archelond.data.memory

"""
In memory data store implementation for development and testing
"""
from __future__ import absolute_import, unicode_literals
from collections import OrderedDict
from datetime import datetime
import hashlib
import logging

import pytz

from archelond.data.abstract import HistoryData


log = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class MemoryData(HistoryData): """ A quick in memory deduplicated structure for standalone testing and development. """ INITIAL_DATA = [ 'cd', 'pwd', 'echo hi', 'cat /proc/cpuinfo' ] def __init__(self, config): """ Initialize internal data structure with init data """ super(MemoryData, self).__init__(config) self.data = OrderedDict() for item in self.INITIAL_DATA: self.add(item, None, None) @staticmethod def _doc_id(command): """ hash the command to make the id """ return hashlib.sha256(command.encode('utf-8')).hexdigest()
[docs] def add(self, command, username, host, **kwargs): """ Append item to data list """ cmd_id = self._doc_id(command) self.data[cmd_id] = { 'command': command, 'username': username, 'host': host, 'timestamp': datetime.utcnow().replace(tzinfo=pytz.utc), 'meta': kwargs } return cmd_id
[docs] def delete(self, command_id, username, host, **kwargs): """ Remove key from internal dictionary """ del self.data[command_id]
[docs] def get(self, command_id, username, host, **kwargs): """ Pull the specified command out of the data store. """ command = self.data[command_id] command['id'] = command_id return command
[docs] def all(self, order, username, host, page=0, **kwargs): """ Simply rewrap the data structure, order, and return """ if page != 0: return [] return self.filter(None, order, username, host)
[docs] def filter(self, term, order, username, host, page=0, **kwargs): """ Return filtered and reversed OrderedDict. """ # we reverse a list, and pylint thinks that is a type change... # pylint: disable=redefined-variable-type if page != 0: return [] if order and order == 'r': ordered_set = reversed(list(self.data.items())) else: ordered_set = list(self.data.items()) result_list = [] for command_id, meta in ordered_set: if term is None or term in meta['command']: meta['id'] = command_id result_list.append(meta) return result_list