Source code for archelond.data.elastic

"""
ElasticSearch implementation of the data store.  Currently the
recommended default data store.
"""
from __future__ import absolute_import, unicode_literals
from datetime import datetime
import hashlib
import logging

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import (
    RequestError,
    NotFoundError,
    ConnectionError as ESConnectionError
)
import pytz

from archelond.data.abstract import HistoryData

log = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class ElasticData(HistoryData): """ An ElasticSearch implementation of HistoryData. This is what should be used in production """ DOC_TYPE = 'history' # Only return a max of 50 results NUM_RESULTS = 50 def __init__(self, config): """ Configure and setup the ES client """ super(ElasticData, self).__init__(config) # Connect self.elasticsearch = Elasticsearch( self.config['ELASTICSEARCH_URL'] ) # Create configured index. # Analyzer is setup such that every single character can # be part of the search query self.index = self.config['ELASTICSEARCH_INDEX'] # pylint: disable=unexpected-keyword-arg self.elasticsearch.indices.create( index=self.index, ignore=400, body={ 'settings': { 'analysis': { 'analyzer': { 'command_analyzer': { 'tokenizer': 'keyword', 'filter': 'lowercase' } } } }, 'mappings': { self.index: { 'properties': { 'command': { 'search_analyzer': 'command_analyzer', 'index_analyzer': 'command_analyzer', 'type': 'string' } } } } } ) def _doc_type(self, username): """ return doc type for given user """ return '{0}_{1}'.format(username, self.DOC_TYPE) @staticmethod def _doc_id(command): """ hash the command to make the document id """ return hashlib.sha256(command.encode('utf-8')).hexdigest()
[docs] def add(self, command, username, host, **kwargs): """ Add the command to the index with a time stamp and id by hash of the command and append username to doc type for user separation of data. """ doc_type = self._doc_type(username) doc_id = ElasticData._doc_id(command) document = { 'command': command, 'username': username, 'host': host, 'timestamp': datetime.utcnow().replace(tzinfo=pytz.utc), } # Add kwargs to meta key in document document['meta'] = kwargs result = self.elasticsearch.index( index=self.index, doc_type=doc_type, id=doc_id, body=document ) log.debug(result) return doc_id
[docs] def delete(self, command_id, username, host, **kwargs): """ Remove item from elasticsearch """ try: self.elasticsearch.delete( self.index, self._doc_type(username), command_id ) except NotFoundError: raise KeyError
[docs] def get(self, command_id, username, host, **kwargs): """ Pull one command out of elasticsearch """ try: hit = self.elasticsearch.get( self.index, command_id, self._doc_type(username) ) except NotFoundError: raise KeyError result = hit['_source'] result['id'] = hit['_id'] return result
[docs] def all(self, order, username, host, page=0, **kwargs): """ Just build a body with match all and return filter """ body = { "query": { "match_all": {} } } return self.filter(None, order, username, host, body, page)
[docs] def filter(self, term, order, username, host, body=None, page=0, **kwargs): """ Return filtered search that is ordered """ doc_type = self._doc_type(username) sort = '' if order and order == 'r': sort = 'timestamp:desc' if not body: body = { 'query': { 'match_phrase_prefix': { 'command': { 'query': term, 'max_expansions': self.NUM_RESULTS } } } } # Implicitly we are sorting by score without order set, which # is nice try: # pylint: disable=unexpected-keyword-arg results = self.elasticsearch.search( index=self.index, doc_type=doc_type, size=self.NUM_RESULTS, body=body, sort=sort, from_=self.NUM_RESULTS*page ) except (ESConnectionError, RequestError) as ex: log.exception(ex) return [] log.debug(results) log.debug('Got %s hits for %s', results['hits']['total'], term) results_list = [] for hit in results['hits']['hits']: result = hit['_source'] result['id'] = hit['_id'] result['score'] = hit['_score'] results_list.append(result) return results_list