Skip to content

Commit

Permalink
Merge pull request singnet#104 from singnet/senna-103-1
Browse files Browse the repository at this point in the history
Added support for Atomese files
  • Loading branch information
andre-senna authored Oct 13, 2022
2 parents ce69d36 + 4ecac6f commit b7483e5
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 17 deletions.
74 changes: 74 additions & 0 deletions das/atomese_lex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import ply.lex as lex
from das.exceptions import AtomeseLexerError

class AtomeseLex:
def __init__(self, **kwargs):

self.reserved = {
}

self.tokens = [
'ATOM_OPENNING',
'ATOM_CLOSING',
'ATOM_TYPE',
'NODE_NAME',
'STV',
'FLOAT',
'COMMENT',
'EOF',
] + list(self.reserved.values())

self.t_ATOM_OPENNING = r'\('
self.t_ATOM_CLOSING = r'\)'

self.lexer = lex.lex(module=self, **kwargs)
self.lexer.eof_reported_flag = False
self.action_broker = None
self.eof_handler = self.default_eof_handler
self.lexer.filename = ""


def t_NODE_NAME(self, t):
r'\"[^\"]+\"'
t.value = t.value[1:-1]
return t

def t_ATOM_TYPE(self, t):
r'[^\W0-9]\w*'
if t.value == 'STV' or t.value == 'stv':
t.type = 'STV'
else:
if t.value.endswith("Node") or t.value.endswith("Link"):
t.value = t.value[0:-4]
return t

t_FLOAT = r'\d+\.\d+'

t_ignore =' \t'

def t_COMMENT(self, t):
r'\;.*'
pass

def t_newline(self, t):
r'\n+'
t.lexer.lineno += len(t.value)

def t_eof(self, t):
return self.eof_handler(t)

def default_eof_handler(self, t):
if self.lexer.eof_reported_flag:
return None
else:
self.lexer.input("")
t.type = 'EOF'
self.lexer.eof_reported_flag = True
return t

def t_error(self, t):
source = f"File: {self.lexer.filename if self.lexer.filename else '<input string>'}"
n = 80 if len(t.value) > 30 else len(t.value) - 1
error_message = f"{source} - Illegal character at line {t.lexer.lineno}: '{t.value[0]}' " +\
f"Near: '{t.value[0:n]}...'"
raise AtomeseLexerError(error_message)
142 changes: 142 additions & 0 deletions das/atomese_lex_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import pytest
from das.atomese_lex import AtomeseLex

lex_test_data = """
(EvaluationLink (stv 1.0 0.964)
(PredicateNode "interacts_with")
(GeneNode "E"))
(ContextLink
(MemberLink
(ChebiNode "ChEBI:10033")
(ReactomeNode "R-HSA-6806664"))
(EvaluationLink
(PredicateNode "has_location")
(ListLink
(ChebiNode "ChEBI:10033")
(ConceptNode "cytosol"))))
(EvaluationLink
(PredicateNode "has_name")
(ListLink
(ChebiNode "ChEBI:10033")
(ConceptNode "warfarin")))
(ContextLink
(MemberLink
(ChebiNode "ChEBI:10036")
(ReactomeNode "R HSA 2142753"))
(EvaluationLink
(PredicateNode "has_location")
(ListLink
(ChebiNode "ChEBI:10036")
(ConceptNode "endoplasmic reticulum lumen"))))"""


def test_lexer():
wrap = AtomeseLex()
#wrap.build()
expected_tokens = [
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"STV",
"FLOAT",
"FLOAT",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_OPENNING",
"ATOM_TYPE",
"NODE_NAME",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_CLOSING",
"ATOM_CLOSING",
"EOF"
]

wrap.lexer.input(lex_test_data)
for expected_token in expected_tokens:
token = wrap.lexer.token()
assert token.type == expected_token
assert not wrap.lexer.token()
162 changes: 162 additions & 0 deletions das/atomese_yacc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
START -> LIST_OF_TOP_LEVEL_ATOMS
LIST_OF_TOP_LEVEL_ATOMS -> TOP_LEVEL_ATOM
| LIST_OF_TOP_LEVEL_ATOMS TOP_LEVEL_ATOM
TOP_LEVEL_ATOM -> ATOM
ATOM -> NODE
| LINK
NODE -> ATOM_OPENNING ATOM_TYPE NODE_NAME ATOM_CLOSING
LINK -> ATOM_OPENNING ATOM_TYPE ATOM_LIST ATOM_CLOSING
| ATOM_OPENNING ATOM_TYPE STV_DEFINITION ATOM_LIST ATOM_CLOSING
STV_DEFINITION -> ATOM_OPENNING STV FLOAT FLOAT ATOM_CLOSING
ATOM_LIST -> ATOM
| ATOM_LIST ATOM
"""

from typing import List, Any, Optional
import ply.yacc as yacc
from das.atomese_lex import AtomeseLex
from das.metta_lex import BASIC_TYPE
from das.exceptions import AtomeseSyntaxError, UndefinedSymbolError
from das.expression_hasher import ExpressionHasher
from das.expression import Expression
from das.base_yacc import BaseYacc

class AtomeseYacc(BaseYacc):

### Parser rules ###

def p_START(self, p):
"""START : LIST_OF_TOP_LEVEL_ATOMS EOF
| EOF
|"""
p[0] = 'SUCCESS'

def p_LIST_OF_TOP_LEVEL_ATOMS_base(self, p):
"""LIST_OF_TOP_LEVEL_ATOMS : TOP_LEVEL_ATOM"""
p[0] = [p[1]]
if self.check_mode or not self.action_broker:
return

def p_LIST_OF_TOP_LEVEL_ATOMS_recursion(self, p):
"""LIST_OF_TOP_LEVEL_ATOMS : LIST_OF_TOP_LEVEL_ATOMS TOP_LEVEL_ATOM"""
p[0] = [*p[1], p[2]]

def p_TOP_LEVEL_ATOM(self, p):
"""TOP_LEVEL_ATOM : ATOM"""
atom = p[1]
p[0] = p[1]
if self.check_mode or not self.action_broker:
return
if atom.elements is not None:
atom.toplevel = True
self.action_broker.new_top_level_expression(atom)

def p_ATOM_node(self, p):
"""ATOM : NODE"""
p[0] = p[1]

def p_ATOM_link(self, p):
"""ATOM : LINK"""
p[0] = p[1]

def p_NODE(self, p):
"""NODE : ATOM_OPENNING ATOM_TYPE NODE_NAME ATOM_CLOSING"""
if self.check_mode or not self.action_broker:
p[0] = f"<{p[2]}: {p[3]}>"
return
node_type = p[2]
node_name = p[3]
if node_type not in self.types:
self.types.add(node_type)
expression = self._typedef(node_type, BASIC_TYPE)
expression.toplevel = True
self.action_broker.new_top_level_typedef_expression(expression)
terminal_name = f"{node_type}:{node_name}"
if terminal_name not in self.nodes:
self.nodes.add(terminal_name)
expression = self._typedef(terminal_name, node_type)
expression.toplevel = True
self.action_broker.new_top_level_typedef_expression(expression)
expression = self._new_terminal(terminal_name)
self.action_broker.new_terminal(expression)
else:
expression = self._new_terminal(terminal_name)
p[0] = expression

def p_STV_DEFINITION(self, p):
"""STV_DEFINITION : ATOM_OPENNING STV FLOAT FLOAT ATOM_CLOSING"""
pass

def p_LINK_no_stv(self, p):
"""LINK : ATOM_OPENNING ATOM_TYPE ATOM_LIST ATOM_CLOSING"""
if self.check_mode or not self.action_broker:
p[0] = f"<{p[2]}: {p[3]}>"
return
link_type = p[2]
targets = p[3]
expression = self._new_link(link_type, targets)
p[0] = expression

def p_LINK_stv(self, p):
"""LINK : ATOM_OPENNING ATOM_TYPE STV_DEFINITION ATOM_LIST ATOM_CLOSING"""
if self.check_mode or not self.action_broker:
p[0] = f"<{p[2]}: {p[4]}>"
return
link_type = p[2]
targets = p[4]
expression = self._new_link(link_type, targets)
p[0] = expression

def p_ATOM_LIST_base(self, p):
"""ATOM_LIST : ATOM"""
atom = p[1]
p[0] = [atom]
if self.check_mode or not self.action_broker:
return
if atom.elements is not None:
self.action_broker.new_expression(atom)

def p_ATOM_LIST_recursion(self, p):
"""ATOM_LIST : ATOM_LIST ATOM"""
atom = p[2]
p[0] = [*p[1], atom]
if self.check_mode or not self.action_broker:
return
if atom.elements is not None:
self.action_broker.new_expression(atom)

def p_error(self, p):
error = f"Syntax error in line {self.lexer.lineno} " + \
f"Current token: {p}"
raise AtomeseSyntaxError(error)

### End of parser rules ###

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.lex_wrap = AtomeseLex()
super().setup()
self.parser = yacc.yacc(module=self)
self.types = set()
self.nodes = set()
named_type_hash = self._get_named_type_hash(BASIC_TYPE)
self.parent_type[named_type_hash] = named_type_hash

def _new_link(self, link_type, targets):
if link_type not in self.types:
self.types.add(link_type)
expression = self._typedef(link_type, BASIC_TYPE)
expression.toplevel = True
self.action_broker.new_top_level_typedef_expression(expression)
head_expression = self._new_symbol(link_type)
expression = self._nested_expression([head_expression, *targets])
return expression
Loading

0 comments on commit b7483e5

Please sign in to comment.