mirror of
https://github.com/mii443/tokenizers.git
synced 2025-08-22 16:25:30 +00:00
252 lines
8.2 KiB
Python
252 lines
8.2 KiB
Python
from collections import defaultdict, abc
|
|
from typing import cast
|
|
|
|
from docutils import nodes
|
|
from docutils.parsers.rst import Directive
|
|
|
|
import sphinx
|
|
from sphinx.locale import _
|
|
from sphinx.util.docutils import SphinxDirective
|
|
from sphinx.errors import ExtensionError
|
|
|
|
from conf import languages as LANGUAGES
|
|
|
|
logger = sphinx.util.logging.getLogger(__name__)
|
|
|
|
GLOBALNAME = "$GLOBAL$"
|
|
|
|
|
|
def update(d, u):
|
|
for k, v in u.items():
|
|
if isinstance(v, abc.Mapping):
|
|
d[k] = update(d.get(k, {}), v)
|
|
else:
|
|
d[k] = v
|
|
return d
|
|
|
|
|
|
class EntityNode(nodes.General, nodes.Element):
|
|
pass
|
|
|
|
|
|
class EntitiesNode(nodes.General, nodes.Element):
|
|
pass
|
|
|
|
|
|
class AllEntities:
|
|
def __init__(self):
|
|
self.entities = defaultdict(dict)
|
|
|
|
@classmethod
|
|
def install(cls, env):
|
|
if not hasattr(env, "entity_all_entities"):
|
|
entities = cls()
|
|
env.entity_all_entities = entities
|
|
return env.entity_all_entities
|
|
|
|
def merge(self, other):
|
|
self.entities.update(other.entities)
|
|
|
|
def purge(self, docname):
|
|
for env_docname in [GLOBALNAME, docname]:
|
|
self.entities[env_docname] = dict(
|
|
[
|
|
(name, entity)
|
|
for name, entity in self.entities[env_docname].items()
|
|
if entity["docname"] != docname
|
|
]
|
|
)
|
|
|
|
def _extract_entities(self, nodes):
|
|
pass
|
|
|
|
def _extract_options(self, nodes):
|
|
pass
|
|
|
|
def _add_entities(self, entities, language, is_global, docname):
|
|
scope = GLOBALNAME if is_global else docname
|
|
for entity in entities:
|
|
name = f'{language}-{entity["name"]}'
|
|
content = entity["content"]
|
|
|
|
if name in self.entities[scope]:
|
|
logger.warning(
|
|
f'Entity "{name}" has already been defined{" globally" if is_global else ""}',
|
|
location=docname,
|
|
)
|
|
|
|
self.entities[scope][name] = {"docname": docname, "content": content}
|
|
|
|
def _extract_global(self, nodes):
|
|
for node in nodes:
|
|
if node.tagname != "field":
|
|
raise Exception(f"Expected a field, found {node.tagname}")
|
|
|
|
name, _ = node.children
|
|
if name.tagname != "field_name":
|
|
raise Exception(f"Expected a field name here, found {name_node.tagname}")
|
|
|
|
if str(name.children[0]) == "global":
|
|
return True
|
|
|
|
def _extract_entities(self, nodes):
|
|
entities = []
|
|
for node in nodes:
|
|
if node.tagname != "definition_list_item":
|
|
raise Exception(f"Expected a list item here, found {node.tagname}")
|
|
|
|
name_node, content_node = node.children
|
|
if name_node.tagname != "term":
|
|
raise Exception(f"Expected a term here, found {name_node.tagname}")
|
|
if content_node.tagname != "definition":
|
|
raise Exception(f"Expected a definition here, found {content_node.tagname}")
|
|
if content_node.children[0].tagname != "paragraph":
|
|
raise Exception(
|
|
f"Expected a paragraph here, found {content_node.children[0].tagname}"
|
|
)
|
|
|
|
name = str(name_node.children[0])
|
|
content = content_node.children[0].children[0]
|
|
entities.append({"name": name, "content": content})
|
|
return entities
|
|
|
|
def extract(self, node, docname):
|
|
is_global = False
|
|
entities = []
|
|
|
|
language = None
|
|
for node in node.children:
|
|
if language is None and node.tagname != "paragraph":
|
|
raise Exception(f"Expected language name:\n.. entities:: <LANGUAGE>")
|
|
elif language is None and node.tagname == "paragraph":
|
|
language = str(node.children[0])
|
|
if language not in LANGUAGES:
|
|
raise Exception(
|
|
f'Unknown language "{language}. Might be missing a newline after language"'
|
|
)
|
|
elif node.tagname == "field_list":
|
|
is_global = self._extract_global(node.children)
|
|
elif node.tagname == "definition_list":
|
|
entities.extend(self._extract_entities(node.children))
|
|
else:
|
|
raise Exception(f"Expected a list of terms/options, found {node.tagname}")
|
|
|
|
self._add_entities(entities, language, is_global, docname)
|
|
|
|
def resolve_pendings(self, app):
|
|
env = app.builder.env
|
|
|
|
updates = defaultdict(dict)
|
|
for env_docname in self.entities.keys():
|
|
for name, entity in self.entities[env_docname].items():
|
|
docname = entity["docname"]
|
|
node = entity["content"]
|
|
|
|
if node.tagname == "pending_xref":
|
|
contnode = cast(nodes.TextElement, node[0].deepcopy())
|
|
newnode = None
|
|
|
|
typ = node["reftype"]
|
|
target = node["reftarget"]
|
|
refdoc = node.get("refdoc", docname)
|
|
domain = None
|
|
|
|
try:
|
|
if "refdomain" in node and node["refdomain"]:
|
|
# let the domain try to resolve the reference
|
|
try:
|
|
domain = env.domains[node["refdomain"]]
|
|
except KeyError as exc:
|
|
raise NoUri(target, typ) from exc
|
|
newnode = domain.resolve_xref(
|
|
env, refdoc, app.builder, typ, target, node, contnode
|
|
)
|
|
except NoUri:
|
|
newnode = contnode
|
|
|
|
updates[env_docname][name] = {
|
|
"docname": docname,
|
|
"content": newnode or contnode,
|
|
}
|
|
|
|
update(self.entities, updates)
|
|
|
|
def get(self, language, name, docname):
|
|
name = f"{language}-{name}"
|
|
if name in self.entities[docname]:
|
|
return self.entities[docname][name]
|
|
elif name in self.entities[GLOBALNAME]:
|
|
return self.entities[GLOBALNAME][name]
|
|
else:
|
|
return None
|
|
|
|
|
|
class EntitiesDirective(SphinxDirective):
|
|
has_content = True
|
|
|
|
def run(self):
|
|
content = nodes.definition_list()
|
|
self.state.nested_parse(self.content, self.content_offset, content)
|
|
|
|
try:
|
|
entities = AllEntities.install(self.env)
|
|
entities.extract(content, self.env.docname)
|
|
except Exception as err:
|
|
raise self.error(f'Malformed directive "entities": {err}')
|
|
|
|
return []
|
|
|
|
|
|
def entity_role(name, rawtext, text, lineno, inliner, options={}, content=[]):
|
|
node = EntityNode()
|
|
node.entity = text
|
|
|
|
return [node], []
|
|
|
|
|
|
def process_entity_nodes(app, doctree, docname):
|
|
""" Replace all the entities by their content """
|
|
env = app.builder.env
|
|
|
|
entities = AllEntities.install(env)
|
|
entities.resolve_pendings(app)
|
|
|
|
language = next(l for l in LANGUAGES if l in app.tags)
|
|
for node in doctree.traverse(EntityNode):
|
|
entity = entities.get(language, node.entity, docname)
|
|
if entity is None:
|
|
node.replace_self(nodes.Text(_(node.entity), _(node.entity)))
|
|
logger.warning(f'Entity "{node.entity}" has not been defined', location=node)
|
|
else:
|
|
node.replace_self(entity["content"])
|
|
|
|
|
|
def purge_entities(app, env, docname):
|
|
""" Purge any entity that comes from the given docname """
|
|
entities = AllEntities.install(env)
|
|
entities.purge(docname)
|
|
|
|
|
|
def merge_entities(app, env, docnames, other):
|
|
""" Merge multiple environment entities """
|
|
entities = AllEntities.install(env)
|
|
other_entities = AllEntities.install(other)
|
|
entities.merge(other_entities)
|
|
|
|
|
|
def setup(app):
|
|
app.add_node(EntityNode)
|
|
app.add_node(EntitiesNode)
|
|
app.add_directive("entities", EntitiesDirective)
|
|
app.add_role("entity", entity_role)
|
|
|
|
app.connect("doctree-resolved", process_entity_nodes)
|
|
app.connect("env-merge-info", merge_entities)
|
|
app.connect("env-purge-doc", purge_entities)
|
|
|
|
return {
|
|
"version": "0.1",
|
|
"parallel_read_safe": True,
|
|
"parallel_write_safe": True,
|
|
}
|