#!/usr/bin/env python
#
# The high-throughput toolkit (httk)
# Copyright (C) 2012-2020 Rickard Armiento
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
This file provides functions to translate an OPTIMaDe filter string into an SQL query.
'''
from __future__ import print_function
import re, operator
from pprint import pprint
from httk.optimade.error import TranslatorError
from httk.optimade.httk_entries import httk_entry_info, httk_recognized_prefixes
from httk.atomistic import Structure
from httk.atomistic.results import Result_TotalEnergyResult
constant_types = ['String','Number']
table_mapper = {
'structures': Structure,
'calculations': Result_TotalEnergyResult,
}
invert_op = {'!=': '!=', '>':'<', '<':'>', '=':'=', '<=': '>=', '>=': '<='}
_python_opmap = {'!=': '__ne__', '>': '__gt__', '<': '__lt__', '=': '__eq__', '<=': '__le__', '>=': '__ge__', 'STARTS':'startswith','ENDS':'endswith'}
[docs]def optimade_filter_to_httk(filter_ast, entries, searcher):
search_variables = []
for entry in entries:
search_variable = searcher.variable(table_mapper[entry])
searcher.output(search_variable,entry)
search_variables += [search_variable]
if filter_ast is not None:
search_expr, needs_post = optimade_filter_to_httk_recurse(filter_ast, search_variable, entry, False)
searcher.add(search_expr)
if needs_post:
searcher.add_all(search_expr)
return searcher
[docs]def optimade_filter_to_httk_recurse(node, search_variable, entry, inv_toggle, recursion=0):
search_expr = None
needs_post = False
handlers = optimade_field_handlers[entry]
entry_info = httk_entry_info[entry]['properties']
if node[0] in ['AND']:
search_expr, needs_post = optimade_filter_to_httk_recurse(node[1], search_variable, entry, inv_toggle, recursion=recursion+1)
rhs_search_expr, rhs_needs_post = optimade_filter_to_httk_recurse(node[2], search_variable, entry, inv_toggle, recursion=recursion+1)
needs_post = needs_post or rhs_needs_post
search_expr = search_expr & rhs_search_expr
elif node[0] in ['OR']:
search_expr, needs_post = optimade_filter_to_httk_recurse(node[1], search_variable, entry, inv_toggle, recursion=recursion+1)
rhs_search_expr, rhs_needs_post = optimade_filter_to_httk_recurse(node[2], search_variable, entry, inv_toggle, recursion=recursion+1)
needs_post = needs_post or rhs_needs_post
search_expr = search_expr | rhs_search_expr
elif node[0] in ['NOT']:
search_expr, needs_post = optimade_filter_to_httk_recurse(node[1], search_variable, entry, not inv_toggle, recursion=recursion+1)
search_expr = ~ search_expr
elif node[0] in ['HAS_ALL', 'HAS_ANY', 'HAS_ONLY']:
ops = node[1]
left = node[2]
right = node[3]
assert(left[0] == 'Identifier')
if left[1] not in entry_info:
if left[1].startswith(httk_recognized_prefixes):
raise TranslatorError("Filter invokes unrecognized property name: "+left[1], 400, "Bad request")
else:
#TODO: this should warn
handler = unknown_has_handler
values = format_value('list of unknown',right)
else:
values = format_value(entry_info[left[1]]['fulltype'],right)
handler = handlers[left[1]]['HAS']
if ops != tuple(['=']*len(values)):
raise TranslatorError("HAS queries with non-equal operators not implemented yet.", 501, "Not implemented")
search_expr, needs_post = handler(left[1], ops, values, search_variable, node[0], inv_toggle)
elif node[0] in ['LENGTH']:
left = node[1]
op = node[2]
right = node[3]
assert(left[0] == 'Identifier')
if right[0] == 'Identifier':
raise TranslatorError("LENGTH comparisons with non-constant right hand side not implemented.", 501, "Not implemented")
if right[0] != 'Number':
raise TranslatorError("LENGTH comparison can only be done with Numbers. Unexpected right hand side type:"+right[0], 501, "Not implemented")
if left[1] not in entry_info:
if left[1].startswith(httk_recognized_prefixes):
raise TranslatorError("Filter invokes unrecognized property name: "+left[1], 400, "Bad request")
else:
#TODO: this should warn
handler = unknown_length_handler
values = format_value('list of unknown',right)
else:
handler = handlers[left[1]]['length']
assert(entry_info[left[1]]['fulltype'].startswith("list of "))
value = format_value("integer",right)
search_expr = handler(left[1], op, value, search_variable)
elif node[0] in ['>', '>=', '<', '<=', '=', '!=']:
op = node[0]
left = node[1]
right = node[2]
if left[0] in constant_types and right[0] in constant_types:
raise TranslatorError("Constant vs. Constant comparisons not implemented.", 501, "Not implemented")
elif left[0] == 'Identifier' and right[0] == 'Identifier':
raise TranslatorError("Identifier vs. Identifier comparisons not implemented.", 501, "Not implemented")
else:
if right[0] == 'Identifier' and left[0] in constant_types:
left, right = right, left
op = invert_op[op]
assert(left[0] == 'Identifier')
if left[1] not in entry_info:
if left[1].startswith(httk_recognized_prefixes):
raise TranslatorError("Filter invokes unrecognized property name: "+left[1], 400, "Bad request")
else:
#TODO: this should warn
handler = unknown_comparison_handler
value = format_value('unknown',right)
else:
handler = handlers[left[1]]['comparison']
value = format_value(entry_info[left[1]]['fulltype'],right)
search_expr = handler(left[1], op, value, search_variable)
elif node[0] in ['ENDS', 'STARTS', 'CONTAINS']:
op = node[0]
left = node[1]
right = node[2]
assert(left[0] == 'Identifier')
if right[0] == 'Identifier':
raise TranslatorError("Identifier vs. Identifier string comparisons not implemented.", 501, "Not implemented")
if left[1] not in entry_info:
if left[1].startswith(httk_recognized_prefixes):
raise TranslatorError("Filter invokes unrecognized property name: "+left[1], 400, "Bad request")
else:
#TODO: this should warn
handler = unknown_stringmatching_handler
values = format_value('unknown',right)
else:
handler = handlers[left[1]]['stringmatching']
value = format_value(entry_info[left[1]]['fulltype'],right)
search_expr = handler(left[1], value, node[0], search_variable)
elif node[0] in ['IS_UNKNOWN', 'IS_KNOWN']:
left = node[1]
assert(left[0] == 'Identifier')
if left[1] not in entry_info:
if left[1].startswith(httk_recognized_prefixes):
raise TranslatorError("Filter invokes unrecognized property name: "+left[1], 400, "Bad request")
else:
#TODO: this should warn
handler = unknown_unknown_handler
else:
handler = handlers[left[1]]['unknown']
search_expr = handler(left[1], search_variable, node[0])
else:
pprint(node)
raise TranslatorError("Unexpected translation error at: "+str(node[0]), 500, "Internal server error.")
assert(search_expr is not None)
return search_expr, needs_post
[docs]def true_handler(search_variable):
return getattr(getattr(search_variable,'hexhash'),'__eq__')(getattr(search_variable,'hexhash'))
[docs]def false_handler(search_variable):
return getattr(getattr(search_variable,'hexhash'),'__ne__')(getattr(search_variable,'hexhash'))
[docs]def unknown_unknown_handler(entry, search_variable, unknown_type):
if unknown_type=='IS_UNKNOWN':
return true_handler(search_variable)
elif unknown_type=='IS_KNOWN':
return false_handler(search_variable)
raise TranslatorError("Unexpected unknown operator type", 500, "Internal server error.")
[docs]def known_unknown_handler(entry, search_variable, unknown_type):
if unknown_type=='IS_UNKNOWN':
return false_handler(search_variable)
elif unknown_type=='IS_KNOWN':
return true_handler(search_variable)
raise TranslatorError("Unexpected unknown operator type", 500, "Internal server error.")
[docs]def unknown_comparison_handler(entry, ops, values, search_variable):
return false_handler(search_variable)
[docs]def unknown_stringmatching_handler(entry, values, stringmatching_type, search_variable):
return false_handler(search_variable)
[docs]def unknown_has_handler(entry, op, value, search_variable, has_type, inv_toggle):
return false_handler(search_variable)
[docs]def unknown_length_handler(entry, op, value, search_variable):
return false_handler(search_variable)
[docs]def string_handler(entry, op, value, search_variable):
httk_op = _python_opmap[op]
return getattr(getattr(search_variable,entry),httk_op)(value)
[docs]def stringmatching_handler(entry, value, stringmatching_type, search_variable):
escaped_value = value.replace("\\","\\\\").replace("%","\\%").replace("_","\\_")
if stringmatching_type == 'ENDS':
return getattr(getattr(search_variable,entry),'like')('%'+escaped_value)
elif stringmatching_type == 'STARTS':
return getattr(getattr(search_variable,entry),'like')(escaped_value+'%')
elif stringmatching_type == 'CONTAINS':
return getattr(getattr(search_variable,entry),'like')('%'+escaped_value+'%')
else:
raise TranslatorError("Unexpected stringmatching operator type", 500, "Internal server error.")
[docs]def constant_comparison_handler(val1, op, val2, search_variable):
op = _python_opmap[op]
if getattr(operator,op)(val1,val2):
return true_handler(search_variable)
else:
return false_handler(search_variable)
[docs]def constant_stringmatching_handler(val1, op, val2, stringmatching_type, search_variable):
op = _python_opmap[op]
if getattr(val1,op)(val2):
return true_handler(search_variable)
else:
return false_handler(search_variable)
[docs]def number_handler(entry, op, value, search_variable):
httk_op = _python_opmap[op]
return getattr(getattr(search_variable,entry),httk_op)(value)
[docs]def timestamp_handler(entry, op, value, search_variable):
raise TranslatorError("Timestamp comparison not yet implemented.", 501, "Not implemented.")
[docs]def set_handler(entry, ops, values, inv, has_type, search_variable):
if has_type == 'HAS_ALL':
if not inv:
search = getattr(getattr(search_variable,entry),'has_any')(values[0])
for value in values[1:]:
search = search & (getattr(getattr(search_variable,entry),'has_any')(value))
return search, False
else:
search = getattr(getattr(search_variable,entry),'has_inv_any')(values[0])
for value in values[1:]:
search = search & (getattr(getattr(search_variable,entry),'has_inv_any')(value))
return search, True
elif has_type == 'HAS_ANY':
if not inv:
return getattr(getattr(search_variable,entry),'has_any')(*values), False
else:
return getattr(getattr(search_variable,entry),'has_inv_any')(*values), True
elif has_type == 'HAS_ONLY':
if not inv:
return getattr(getattr(search_variable,entry),'has_only')(*values), True
else:
return getattr(getattr(search_variable,entry),'has_inv_only')(*values), True
[docs]def constant_set_handler(val1, ops, val2, has_type, inv, search_variable):
if has_type == 'HAS_ALL':
if (set(val2) <= set(val1)):
return true_handler(search_variable), False
else:
return false_handler(search_variable), False
elif has_type == 'HAS_ANY':
if set(val2).isdisjoint(val1):
return false_handler(search_variable), False
else:
return true_handler(search_variable), False
elif has_type == 'HAS_ONLY':
if (set(val1) <= set(val2)):
return true_handler(search_variable), False
else:
return false_handler(search_variable), False
[docs]def structure_features_set_handler(values, ops, inv, has_type, search_variable):
# Any HAS ANY, HAS ALL, HAS ONLY operation will check for precense of an identifier in structure_features.
# For now we don't support any structure features, hence, all such comparisons return False
return getattr(getattr(search_variable,'hexhash'),'__ne__')(getattr(search_variable,'hexhash')), False
[docs]def structure_features_length_handler(op, value, search_variable):
# structure_features is assumed to always be empty
if value == 0:
return getattr(getattr(search_variable,'hexhash'),'__eq__')(getattr(search_variable,'hexhash'))
else:
return getattr(getattr(search_variable,'hexhash'),'__ne__')(getattr(search_variable,'hexhash'))
optimade_field_handlers = {
'structures': {
'id': {
'comparison': lambda entry, op, value, search_variable: string_handler('__id', op, value, search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
'stringmatching': lambda entry, value, stringmatching_type, search_variable: stringmatching_handler('__id', value, stringmatching_type, search_variable)
},
'type': {
'comparison': lambda entry, op, value, search_variable: constant_comparison_handler(value, op, 'structures', search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
'stringmatching': lambda entry, value, stringmatching_type, search_variable: constant_stringmatching_handler(value, 'structures', stringmatching_type, search_variable)
},
'elements': {
'HAS': lambda entry, ops, values, search_variable, has_type, inv: set_handler('formula_symbols', ops, values, inv, has_type, search_variable),
'length': lambda entry, op, value, search_variable: number_handler('number_of_elements', op, value, search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
},
'nelements': {
'comparison': lambda entry, op, value, search_variable: number_handler('number_of_elements', op, value, search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
},
'nperiodic_dimensions': {
'comparison': lambda entry, op, value, search_variable: constant_comparison_handler(value, op, 3, search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
},
'dimension_types': {
'HAS': lambda entry, ops, values, search_variable, has_type, inv: constant_set_handler(values, ops, [1, 1, 1], has_type, inv, search_variable),
'length': lambda entry, op, value, search_variable: constant_comparison_handler(value, op, 3, search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
},
'chemical_formula_descriptive': {
'comparison': lambda entry, op, value, search_variable: string_handler('formula', op, value, search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
'stringmatching': lambda entry, value, stringmatching_type, search_variable: stringmatching_handler('formula', value, stringmatching_type, search_variable)
},
'structure_features': {
'HAS': lambda entry, ops, values, search_variable, has_type, inv: structure_features_set_handler(values, ops, inv, has_type, search_variable),
'length': lambda entry, op, value, search_variable: structure_features_length_handler(op, value, search_variable),
'unknown': lambda entry, search_variable, unknown_type: known_unknown_handler(entry, search_variable, unknown_type),
}
},
}