Source code for httk.optimade.parse_optimade_filter

#!/usr/bin/env python
#
#    The high-throughput toolkit (httk)
#    Copyright (C) 2012-2020 Rickard Armiento
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os, re
from pprint import pprint

from httk import miniparser
from httk.core.miniparser import ParserError, ParserSyntaxError
#import parser, build_ls, ParserError

ls = None


[docs]def parse_optimade_filter(filter_string, verbosity=0): # To get diagnostic output, pass argument, e.g.,: verbosity=LogVerbosity(0,parser_verbosity=5)) parse_tree = parse_optimade_filter_raw(filter_string, verbosity) ojf = optimade_parse_tree_to_ojf(parse_tree) return ojf
[docs]def parse_optimade_filter_raw(filter_string, verbosity=0): global ls if ls is None: initialize_optimade_parser() return miniparser.parser(ls, filter_string, verbosity=verbosity)
[docs]def initialize_optimade_parser(): global ls here = os.path.abspath(os.path.dirname(__file__)) with open(os.path.join(here, "optimade_filter_grammar.ebnf")) as f: grammar = f.read() # Keywords literals = ["AND", "NOT", "OR", "KNOWN", "UNKNOWN", "IS", "CONTAINS", "STARTS", "ENDS", "WITH", "LENGTH", "HAS", "ALL", "ONLY", "EXACTLY", "ANY"," ","\t","\n","\r"] # Token definitions from Appendix 3 tokens = { "Operator": r'<|<=|>|>=|=|!=', "Identifier": "[a-z_][a-z_0-9]*", "String": r'"[^"\\]*(?:\\.[^"\\]*)*"', "Number": r"[-+]?([0-9]+(\.[0-9]*)?|\.[0-9]+)([eE][-+]?[0-9]+)?", "OpeningBrace": r"\(", "ClosingBrace": r"\)", "Dot": r'\.', "Colon": r":", "Comma": r"," } partial_tokens = { "Number": r"[-+]?[0-9]+\.?[0-9]*[eE]?[-+]?[0-9]*" } # We don't need these, because they are handled on higher level # by the token definitions. skip = [ "EscapedChar", "UnescapedChar", "Punctuator", "Exponent", "Sign", "Digits", "Digit", "Letter", "Operator", "UppercaseLetter", "LowercaseLetter", "OpeningBrace", "Dot", "ClosingBrace", "Comma", "Colon" ] ls = miniparser.build_ls(ebnf_grammar=grammar, start='Filter', ignore=' \t\n', tokens=tokens, partial_tokens=partial_tokens, literals=literals, verbosity=0, skip=skip, remove=[')', '('], simplify=[]) # , simplify=['Term', 'Atom', 'Expression']) return ls
[docs]def optimade_parse_tree_to_ojf(ast): assert(ast[0] == 'Filter') return optimade_parse_tree_to_ojf_recurse(ast[1])
def _fix_const(node): if node[0] == 'Property': assert(node[1][0]=='Identifier') return node[1] elif node[0] == 'String': assert(node[1][-1]=='"') assert(node[1][0]=='"') return ('String', node[1][1:-1]) else: return node
[docs]def optimade_parse_tree_to_ojf_recurse(node, recursion=0): tree = [None] pos = tree arg = 0 if node[0] in ['Expression', 'ExpressionClause', 'ExpressionPhrase', 'Comparison']: n = node[1:] if n[0][0] == "NOT": assert(arg is not None) pos[arg] = ['NOT', None] pos = pos[arg] arg = 1 n = list(n)[1:] for nn in n: if nn[0] in ['Expression', 'ExpressionClause', 'ExpressionPhrase', 'PropertyFirstComparison', 'ConstantFirstComparison', 'PredicateComparison', 'Comparison']: assert(arg is not None and pos[arg] is None) pos[arg] = optimade_parse_tree_to_ojf_recurse(nn, recursion=recursion+1) elif nn[0] in ["AND", "OR"]: assert(arg is not None and pos[arg] is not None) pos[arg] = [nn[0], tuple(pos[arg]), None] pos = pos[arg] arg = 2 elif nn[0] in ["OpeningBrace", "ClosingBrace"]: pass else: pprint(nn) raise Exception("Internal error: filter simplify on invalid ast: "+str(nn[0])) elif node[0] in ['PropertyFirstComparison', 'ConstantFirstComparison'] : assert(arg is not None and pos[arg] is None) if node[0] == 'PropertyFirstComparison': assert(node[1][0] == 'Property') left = ('Identifier',) + tuple([x[1] for x in node[1][1:] if x[0] != 'Dot']) #assert(node[1][1][0] == 'Identifier') #left = node[1] elif node[0] == 'ConstantFirstComparison': assert(node[1][0] == 'Constant') left = _fix_const(node[1][1]) else: raise Exception("Internal error: filter simplify on invalid ast, unrecognized comparison: "+str(node[0])) if node[2][0] == "ValueOpRhs": assert(node[2][1][0] == 'Operator') op = node[2][1][1] assert(node[2][2][0] == 'Value') right = _fix_const(node[2][2][1]) pos[arg] = (op, left, right) arg = None elif node[2][0] == "FuzzyStringOpRhs": assert(node[2][1][0] in ['CONTAINS', 'STARTS', 'ENDS']) op = node[2][1][1] if node[2][1][0] in ['STARTS', 'ENDS'] and node[2][2][0] == "WITH": right = node[2][3] else: right = node[2][2] assert(right[0] == 'Value' or right[0] == 'Property') if right[0] == 'Value': right = _fix_const(right[1]) pos[arg] = (op, left, right) arg = None elif node[2][0] == "KnownOpRhs": assert(node[2][1][0] == 'IS') op = node[2][1][1] assert(node[2][2][0] in ['KNOWN','UNKNOWN']) op += "_" + node[2][2][0] assert(node[1][0] == 'Property') operand = ('Identifier',) + tuple([x[1] for x in node[1][1:] if x[0] != 'Dot']) pos[arg] = (op, operand) arg = None elif node[2][0] == "SetOpRhs": assert(node[2][1][0] == 'HAS') if node[2][2][0] == 'Operator': assert(len(node[2]) == 4) op = "HAS" inop = node[2][2][1] right = node[2][3][1] pos[arg] = (op, (inop,), left, (right,)) elif len(node[2]) == 3: op = "HAS_ALL" assert(node[2][2][0] == 'Value') right = _fix_const(node[2][2][1]) pos[arg] = (op, ('=',), left, (right,)) elif len(node[2]) == 4: assert(node[2][2][0] in ['ONLY', 'ALL', 'EXACTLY', 'ANY']) assert(node[2][3][0] == 'ValueList') #if 'Operator' in [x[0] for x in node[2][3][1:]]: op = "HAS_"+node[2][2][0] inop = None right = [] inops = [] for x in node[2][3][1:]: if x[0] == 'Operator': assert(inop is None) inop = x[1] elif x[0] == 'Value': inops += [('=' if inop is None else inop)] right += [_fix_const(x[1])] inop = None pos[arg] = (op, tuple(inops), left, tuple(right)) #else: # op = "HAS_"+node[2][2][0] # right = tuple(_fix_const(x[1]) for x in node[2][3][1::2]) # pos[arg] = (op, left, right) else: raise Exception("Internal error: filter simplify on invalid ast, unexpected number of components in set op: "+str(node[2])) arg = None elif node[2][0] == "SetZipOpRhs": assert(node[2][1][0] == 'IdentifierZipAddon') left = (left,) + node[2][1][2::2] nzip = len(left) assert(node[2][2][0] == 'HAS') if len(node[2]) == 4: assert(node[2][3][0] == 'ValueZip') #if 'Operator' in [x[0] for x in node[2][3][1::2]]: op = "HAS_ZIP" inop = None inops = [] right = [] for x in node[2][3][1:]: if x[0] == 'Operator': assert(inop is None) inop = x[1] elif x[0] == 'Value': right += [_fix_const(x[1])] inops += [('=' if inop is None else inop)] inop = None pos[arg] = (op, tuple(inops), left, tuple(right)) #else: # op = "HAS_ZIP" # assert(node[2][3][1][0] == 'Value') # right = tuple(_fix_const(x[1]) for x in node[2][3][1::2]) # if not nzip==len(right): # raise ParserError("Parser context error: set zip operation with mismatching number of components for:"+str(right)+" lhs:"+str(nzip)+" rhs:"+str(right)) # pos[arg] = (op, left, right) elif len(node[2]) == 5: assert(node[2][3][0] in ['ONLY', 'ALL', 'EXACTLY', 'ANY']) assert(node[2][4][0] == 'ValueZipList') #if 'Operator' in [x[0] for y in node[2][4][1::2] for x in y[1:]]: op = "HAS_ZIP_"+node[2][3][0] inops = [] right = [] for y in node[2][4][1::2]: inop = None inops += [[]] right += [[]] for x in y[1:]: if x[0] == 'Operator': assert(inop is None) inop = x[1] elif x[0] == 'Value': right[-1] += [_fix_const(x[1])] inops[-1] += [('=' if inop is None else inop)] inop = None inops[-1] = tuple(inops[-1]) right[-1] = tuple(right[-1]) pos[arg] = (op, tuple(inops), left, tuple(right)) #else: # assert(node[2][3][0] in ['ONLY', 'ALL', 'EXACTLY', 'ANY']) # op = "HAS_ZIP_"+node[2][3][0] # assert(node[2][4][0] == 'ValueZipList') # right = tuple(tuple(_fix_const(y[1]) for y in x[1::2]) for x in node[2][4][1::2]) # if not all(nzip==len(x) for x in right): # raise ParserError("Parser context error: set zip operation with mismatching number of components for:"+str(right)+" lhs:"+str(nzip)+" rhs:"+str([len(x) for x in right])) # pos[arg] = (op, left, right) else: raise Exception("Internal error: filter simplify on invalid ast, unexpected number of components in set op: "+str(node[2])) arg = None elif node[2][0] == "LengthOpRhs": assert(node[2][1][0] == 'LENGTH') assert(node[2][2][0] == 'Value' or (node[2][2][0] == 'Operator' and node[2][3][0] == 'Value')) if node[2][2][0] == 'Value': right = _fix_const(node[2][2][1]) op = '=' else: op = node[2][2][1] right = _fix_const(node[2][3][1]) pos[arg] = ("LENGTH", left, op, right) #left = ('Identifier',) + tuple([x[1] for x in node[1][1:] if x[0] != 'Dot']) #assert(node[2][2][0] == 'Value' or (node[2][2][0] == 'Operator' and node[3][2][0] == 'Value')) #right = _fix_const(node[2][2][1]) #pos[arg] = ("LENGTH", op, left, right) #arg = None else: raise Exception("Internal error: filter simplify on invalid ast, unrecognized comparison: "+str(node[2][0])) #elif node[0] == 'PredicateComparison': # if node[1][0] == "LengthComparison": # assert(node[1][1][0]=="LENGTH") # assert(node[1][2][0]=="Identifier") # assert(node[1][3][0]=="Operator") # assert(node[1][4][0]=="Value") # left = node[1][2] # right = _fix_const(node[1][4][1]) # op = node[1][3][1] # pos[arg] = ("LENGTH", op, left, right) # arg = None # else: # raise Exception("Internal error: filter simplify on invalid ast, unrecognized predicate comparison: "+str(node[1][0])) else: raise Exception("Internal error: filter simplify on invalid ast, unrecognized node: "+str(node[0])) assert(arg is None or pos[arg] is not None) return tuple(tree[0])
if __name__ == "__main__": import os, sys from pprint import pprint if len(sys.argv) >= 2: input_string = sys.argv[1] else: input_string ='elements HAS ALL "Ga","Ti" AND (nelements=3 OR nelements=2)' filter_ast = parse_optimade_filter(input_string, verbosity=0) sys.stdout.write("==== FILTER STRING PARSE RESULT:\n") pprint(filter_ast) sys.stdout.write("====\n")