"""Repository."""
from __future__ import annotations
import sys
import typing
from collections.abc import Callable, Iterable
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, Self, TypeAlias, cast
import click
import curies
import sssom_pydantic
from pydantic import BaseModel, Field
from sssom_pydantic.process import Call
from .constants import (
DEFAULT_RESOLVER_BASE,
NEGATIVES_NAME,
POSITIVES_NAME,
PREDICTIONS_NAME,
UNSURE_NAME,
PredictionMethod,
ensure_converter,
insert,
)
if TYPE_CHECKING:
from curies import Converter
from sssom_pydantic import MappingTool, SemanticMapping, SemanticMappingPredicate
from .testing import IntegrityTestCase
__all__ = [
"OrcidNameGetter",
"Repository",
"UserGetter",
"add_commands",
]
#: A function that returns the current user
UserGetter: TypeAlias = Callable[[], curies.Reference]
#: A function that returns a dictionary from ORCID to name
OrcidNameGetter: TypeAlias = Callable[[], dict[str, str]]
#: How to decide what converter to use
ConverterStrategy: TypeAlias = Literal["bioregistry", "bioregistry-preferred", "passthrough"]
#: Configuration file
CONFIGURATION_FILENAME = "sssom-curator.json"
#: URL to Biomappings predictions SSSOM TSV
BIOMAPPINGS_PREDICTIONS_URL = (
"https://github.com/biopragmatics/biomappings/raw/refs/heads/"
"main/src/biomappings/resources/predictions.sssom.tsv"
)
strategy_option = click.option(
"--strategy",
type=click.Choice(list(typing.get_args(ConverterStrategy))),
default="passthrough",
show_default=True,
)
[docs]
class Repository(BaseModel):
"""A data structure containing information about a SSSOM repository.
There are two ways to configure a repository:
1. Parse from a JSON file representing a configuration
2. Configure using Python
####################################
Configuring a Repository with JSON
####################################
Since the :class:`Repository` class inherits from :class:`pydantic.BaseModel`, you
can define the data externally in a JSON file and parse it. Given the following
example configuration (corresponding to the Biomappings project), the following
Python code can be used to load the repository and run the CLI.
.. code-block:: json
{
"predictions_path": "predictions.sssom.tsv",
"positives_path": "positive.sssom.tsv",
"negatives_path": "negative.sssom.tsv",
"unsure_path": "unsure.sssom.tsv",
"purl_base": "https://w3id.org/biopragmatics/biomappings/sssom",
"mapping_set": {
"mapping_set_id": "https://w3id.org/biopragmatics/biomappings/sssom/biomappings.sssom.tsv",
"mapping_set_description": "Biomappings is a repository of community curated and predicted equivalences and related mappings between named biological entities that are not available from primary sources. It's also a place where anyone can contribute curations of predicted mappings or their own novel mappings.",
"mapping_set_title": "Biomappings",
"license": "https://creativecommons.org/publicdomain/zero/1.0/",
"creator_id": ["orcid:0000-0003-4423-4370"]
}
}
.. code-block:: python
from pathlib import Path
from sssom_curator import Repository
path = Path("sssom-curator.json")
repository = Repository.model_validate_json(path.read_text())
if __name__ == "__main__":
repository.run_cli()
######################################
Configuring a Repository with Python
######################################
You can configure your repository using the `sssom_curator.Repository` object
directly from within Python, which offers the full flexibility of a general purpose
programming language. Again using Biomappings as an example, here's how the Python
file would look:
.. code-block:: python
from sssom_pydantic import MappingSet
from sssom_curator import Repository
from pathlib import Path
# Assume files are all in the same folder
HERE = Path(__file__).parent.resolve()
repository = Repository(
positives_path=HERE.joinpath("positive.sssom.tsv"),
negatives_path=HERE.joinpath("negative.sssom.tsv"),
unsure_path=HERE.joinpath("unsure.sssom.tsv"),
predictions_path=HERE.joinpath("predictions.sssom.tsv"),
mapping_set=MappingSet(
title="Biomappings",
id="https://w3id.org/biopragmatics/biomappings/sssom/biomappings.sssom.tsv",
),
# Add the beginning part of the PURL used to
# construct exports.
purl_base="https://w3id.org/biopragmatics/biomappings/sssom/",
)
if __name__ == "__main__":
repository.run_cli()
""" # noqa:E501
predictions_path: Path
positives_path: Path
negatives_path: Path
unsure_path: Path
mapping_set: sssom_pydantic.MappingSet | None = None
purl_base: Annotated[
str | None,
Field(
description="The beginning part of URLs for files in this repository. For example, if "
"https://example.com/purl-base/ is given, then the SSSOM positive mappings file will "
"have the ID https://example.com/purl-base/positive.sssom.tsv"
),
] = None
basename: str | None = None
ndex_uuid: str | None = None
web_title: Annotated[
str | None,
Field(description="Custom HTML to put in the title for the SSSOM Curator web interface"),
] = None
web_disabled_message: str | None = None
web_footer: Annotated[
str | None,
Field(description="Custom HTML to put in the footer for the SSSOM Curator web interface"),
] = None
merge_standardize_bioregistry: Annotated[
bool | None,
Field(
description="""\
If set to true, uses the preferred prefixes in the Bioregistry
to standardize the merged SSSOM output. This maintains backwards
compatibility in the Biomappings repository. You shouldn't use this
field.
"""
),
] = None
[docs]
def update_relative_paths(self, directory: Path) -> None:
"""Update paths relative to the directory."""
if not self.predictions_path.is_file():
self.predictions_path = directory.joinpath(self.predictions_path).resolve()
if not self.positives_path.is_file():
self.positives_path = directory.joinpath(self.positives_path).resolve()
if not self.negatives_path.is_file():
self.negatives_path = directory.joinpath(self.negatives_path).resolve()
if not self.unsure_path.is_file():
self.unsure_path = directory.joinpath(self.unsure_path).resolve()
[docs]
@classmethod
def from_path(cls, path: str | Path) -> Self:
"""Load a configuration at a path."""
path = Path(path).expanduser().resolve()
repository = cls.model_validate_json(path.read_text())
repository.update_relative_paths(directory=path.parent)
return repository
[docs]
@classmethod
def from_directory(cls, directory: str | Path) -> Self:
"""Load an implicit configuration from a directory."""
directory = Path(directory).expanduser().resolve()
path = directory.joinpath(CONFIGURATION_FILENAME)
if path.is_file():
return cls.from_path(path)
positives_path = directory.joinpath(POSITIVES_NAME)
negatives_path = directory.joinpath(NEGATIVES_NAME)
predictions_path = directory.joinpath(PREDICTIONS_NAME)
unsure_path = directory.joinpath(UNSURE_NAME)
if (
positives_path.is_file()
and negatives_path.is_file()
and predictions_path.is_file()
and unsure_path.is_file()
):
return cls(
positives_path=positives_path,
negatives_path=negatives_path,
predictions_path=predictions_path,
unsure_path=unsure_path,
)
raise FileNotFoundError(
f"could not automatically construct a sssom-curator "
f"repository from directory {directory}"
)
@property
def curated_paths(self) -> list[Path]:
"""Get curated paths."""
return [self.positives_path, self.negatives_path, self.unsure_path]
@property
def export_paths(self) -> list[Path]:
"""Get export paths."""
return [self.positives_path, self.negatives_path, self.predictions_path]
@property
def paths(self) -> list[Path]:
"""Get all paths."""
return [self.positives_path, self.negatives_path, self.unsure_path, self.predictions_path]
@property
def call_to_path(self) -> dict[Call, Path]:
"""Get a dictionary from calls to paths."""
return {
"unsure": self.unsure_path,
"incorrect": self.negatives_path,
"correct": self.positives_path,
}
[docs]
def read_positive_mappings(self) -> list[SemanticMapping]:
"""Load the positive mappings."""
return sssom_pydantic.read(self.positives_path)[0]
[docs]
def read_negative_mappings(self) -> list[SemanticMapping]:
"""Load the negative mappings."""
return sssom_pydantic.read(self.negatives_path)[0]
[docs]
def read_unsure_mappings(self) -> list[SemanticMapping]:
"""Load the unsure mappings."""
return sssom_pydantic.read(self.unsure_path)[0]
[docs]
def read_predicted_mappings(self) -> list[SemanticMapping]:
"""Load the predicted mappings."""
return sssom_pydantic.read(self.predictions_path)[0]
[docs]
def get_converter(self) -> curies.Converter:
"""Get a converter chained from all files."""
return curies.chain([sssom_pydantic.read(path)[1] for path in self.paths])
[docs]
def append_positive_mappings(
self,
mappings: Iterable[SemanticMapping],
*,
converter: curies.Converter | None = None,
sort: bool = True,
**kwargs: Any,
) -> None:
"""Append new lines to the positive mappings document."""
converter = ensure_converter(converter)
insert(
self.positives_path,
converter=converter,
include_mappings=mappings,
sort=sort,
**kwargs,
)
[docs]
def append_negative_mappings(
self, mappings: Iterable[SemanticMapping], *, converter: curies.Converter | None = None
) -> None:
"""Append new lines to the negative mappings document."""
converter = ensure_converter(converter)
insert(
self.negatives_path,
converter=converter,
include_mappings=mappings,
)
[docs]
def append_predicted_mappings(
self, mappings: Iterable[SemanticMapping], *, converter: curies.Converter | None = None
) -> None:
"""Append new lines to the predicted mappings document."""
converter = ensure_converter(converter)
# FIXME exclude what's already in others? or is it better just
# to do a cleanup lint/prune step?
insert(
self.predictions_path,
converter=converter,
include_mappings=mappings,
)
[docs]
def run_cli(self, *args: Any, **kwargs: Any) -> None:
"""Run the CLI."""
_cli = self.get_cli()
_cli(*args, *kwargs)
[docs]
def get_cli(
self,
*,
enable_web: bool = True,
get_user: UserGetter | None = None,
output_directory: Path | None = None,
sssom_directory: Path | None = None,
image_directory: Path | None = None,
get_orcid_to_name: OrcidNameGetter | None = None,
) -> click.Group:
"""Get a CLI."""
@click.group()
@click.pass_context
def main(ctx: click.Context) -> None:
"""Run the CLI."""
ctx.obj = self
add_commands(
main,
enable_web=enable_web,
get_user=get_user,
output_directory=output_directory,
sssom_directory=sssom_directory,
image_directory=image_directory,
get_orcid_to_name=get_orcid_to_name,
)
@main.command()
@click.pass_context
def update(ctx: click.Context) -> None:
"""Run all summary, merge, and chart exports."""
click.secho("Generating summaries", fg="green")
ctx.invoke(main.commands["summarize"])
click.secho("Exporting SSSOM", fg="green")
ctx.invoke(main.commands["merge"])
return main
[docs]
def lexical_prediction_cli(
self,
prefix: str,
target: str | list[str],
/,
*,
mapping_tool: str | MappingTool | None = None,
**kwargs: Any,
) -> None:
"""Run the lexical predictions CLI."""
from .predict import lexical
return lexical.lexical_prediction_cli(
prefix,
target,
mapping_tool=mapping_tool,
path=self.predictions_path,
curated_paths=self.curated_paths,
**kwargs,
)
[docs]
def append_lexical_predictions(
self,
prefix: str,
target_prefixes: str | Iterable[str],
*,
mapping_tool: str | MappingTool | None = None,
force: bool = False,
force_process: bool = False,
cache: bool = True,
converter: curies.Converter | None = None,
**kwargs: Any,
) -> None:
"""Append lexical predictions."""
from .predict import lexical
# TODO this should reuse repository function for appending
return lexical.append_lexical_predictions(
prefix,
target_prefixes,
mapping_tool=mapping_tool,
path=self.predictions_path,
curated_paths=self.curated_paths,
force=force,
force_process=force_process,
cache=cache,
converter=converter,
**kwargs,
)
[docs]
def get_test_class(
self, converter_strategy: ConverterStrategy | None = None
) -> type[IntegrityTestCase]:
"""Get a test case class."""
from .testing import RepositoryTestCase
if converter_strategy is None or converter_strategy == "passthrough":
class PassthroughTestCurator(RepositoryTestCase):
"""A test case for this repository."""
repository: ClassVar[Repository] = self
return PassthroughTestCurator
elif converter_strategy == "bioregistry":
class BioregistryTestCurator(RepositoryTestCase):
"""A test case for this repository."""
repository: ClassVar[Repository] = self
converter: ClassVar[Converter] = ensure_converter(preferred=False)
return BioregistryTestCurator
elif converter_strategy == "bioregistry-preferred":
class BioregistryPreferredTestCurator(RepositoryTestCase):
"""A test case for this repository."""
repository: ClassVar[Repository] = self
converter: ClassVar[Converter] = ensure_converter(preferred=True)
return BioregistryPreferredTestCurator
else:
raise ValueError(f"invalid converter strategy: {converter_strategy}")
[docs]
def add_commands(
main: click.Group,
*,
enable_web: bool = True,
get_user: UserGetter | None = None,
output_directory: Path | None = None,
sssom_directory: Path | None = None,
image_directory: Path | None = None,
get_orcid_to_name: OrcidNameGetter | None = None,
) -> None:
"""Add parametrized commands."""
main.add_command(get_lint_command())
main.add_command(get_web_command(enable=enable_web, get_user=get_user))
main.add_command(get_merge_command(sssom_directory=sssom_directory))
main.add_command(get_ndex_command())
main.add_command(
get_summarize_command(
output_directory=output_directory,
image_directory=image_directory,
get_orcid_to_name=get_orcid_to_name,
)
)
main.add_command(get_predict_command())
main.add_command(get_test_command())
main.add_command(get_import_command())
def get_merge_command(sssom_directory: Path | None = None) -> click.Command:
"""Get the merge command."""
@click.command(name="merge")
@click.option(
"--sssom-directory",
type=click.Path(dir_okay=True, file_okay=False, exists=True),
default=sssom_directory,
required=True,
)
@click.pass_obj
def main(obj: Repository, sssom_directory: Path) -> None:
"""Merge files together to a single SSSOM."""
if sssom_directory is None:
click.secho("--sssom-directory is required, or add ", fg="red")
raise sys.exit(1)
if obj.mapping_set is None:
click.secho("repository doesn't configure ``mapping_set``", fg="red")
raise sys.exit(1)
if obj.purl_base is None:
click.secho("repository doesn't configure ``purl_base``", fg="red")
raise sys.exit(1)
from .export.merge import merge
merge(obj, directory=sssom_directory)
return main
def get_summarize_command(
output_directory: Path | None = None,
image_directory: Path | None = None,
get_orcid_to_name: OrcidNameGetter | None = None,
) -> click.Command:
"""Get the summary command."""
@click.command()
@click.option(
"--output-directory",
type=click.Path(file_okay=False, dir_okay=True, exists=True),
default=output_directory,
required=True,
)
@click.option(
"--image-directory",
type=click.Path(dir_okay=True, file_okay=False),
default=image_directory,
)
@click.pass_obj
def summarize(
obj: Repository, output_directory: Path | None, image_directory: Path | None
) -> None:
"""Generate summary charts and tables."""
if output_directory is None:
click.secho("--output-directory is required", fg="red")
raise sys.exit(1)
from .export.charts import make_charts
from .export.summary import summarize
output_directory = Path(output_directory).expanduser().resolve()
summarize(
obj, output_directory.joinpath("summary.yml"), get_orcid_to_name=get_orcid_to_name
)
make_charts(obj, output_directory, image_directory=image_directory)
return summarize
def get_lint_command(converter: curies.Converter | None = None) -> click.Command:
"""Get the lint command."""
@click.command()
@strategy_option
@click.pass_obj
def lint(obj: Repository, strategy: ConverterStrategy) -> None:
"""Sort files and remove duplicates."""
import sssom_pydantic
# nonlocal lets us mess with the variable even though
# it comes from an outside scope
nonlocal converter
if strategy == "passthrough":
pass
else:
converter = ensure_converter(preferred=strategy == "bioregistry-preferred")
exclude_mappings = []
for path in obj.curated_paths:
sssom_pydantic.lint(path, converter=converter)
exclude_mappings.extend(sssom_pydantic.read(path)[0])
sssom_pydantic.lint(
obj.predictions_path,
exclude_mappings=exclude_mappings,
drop_duplicates=True,
)
return lint
def get_web_command(*, enable: bool = True, get_user: UserGetter | None = None) -> click.Command:
"""Get the web command."""
if enable:
@click.command()
@click.option(
"--resolver-base",
help="A custom resolver base URL. Defaults to the Bioregistry.",
default=DEFAULT_RESOLVER_BASE,
show_default=True,
)
@click.option(
"--orcid",
help="Your ORCID, if not automatically loadable. Don't use this with --live-login.",
)
@click.option("--host", type=str, default="127.0.0.1", show_default=True)
@click.option("--port", type=int, default=8775, show_default=True)
@click.option(
"--eager-persist",
is_flag=True,
help="If set, will persist after each curation instead of waiting for the commit "
"button to be pushed",
)
@click.option(
"--implementation",
type=click.Choice(["dict", "sqlite"]),
show_default=True,
default="dict",
help="The type of backend for running the curation app. Dict means that data is stored "
"in an in-memory dictionary data structure and SQLite means it uses a database w/ ORM",
)
@click.option(
"--ssl-keyfile",
type=Path,
help="Path to SSL key file (with the -key.pem extension), which is used to enable the "
"web application to serve HTTPS requests",
)
@click.option(
"--ssl-certfile",
type=Path,
help="Path to a SSL certificate file (with the .pem extension) to "
"go along with the key file.",
)
@click.option("--live-login", is_flag=True, help="Use ORCiD for OAuth-based login")
@click.option(
"--orcid-client-id",
help="If using --live-login, explicitly set the ORCiD Client ID. Otherwise, "
"loaded via PyStow",
)
@click.option(
"--orcid-client-secret",
help="If using --live-login, explicitly set the ORCiD Client secret. Otherwise, "
"loaded via PyStow",
)
@click.option("--proxy-fix", is_flag=True, help="If set, sets passthroughs for proxies")
@click.option(
"--no-open", is_flag=True, help="Turn off automatic webpage opening on app start"
)
@click.pass_obj
def web(
obj: Repository,
resolver_base: str | None,
orcid: str,
host: str,
port: int,
eager_persist: bool,
implementation: Literal["dict", "sqlite"],
ssl_keyfile: Path | None,
ssl_certfile: Path | None,
live_login: bool,
orcid_client_id: str | None,
orcid_client_secret: str | None,
proxy_fix: bool,
no_open: bool,
) -> None:
"""Run the semantic mappings curation app."""
import fastapi
import uvicorn
from a2wsgi import WSGIMiddleware
from curies import NamableReference
from starlette.types import ASGIApp
from .web import get_app
if live_login:
import pystow
orcid_client_id = pystow.get_config(
"sssom_curator",
"orcid_client_id",
raise_on_missing=True,
passthrough=orcid_client_id,
)
orcid_client_secret = pystow.get_config(
"sssom_curator",
"orcid_client_secret",
raise_on_missing=True,
passthrough=orcid_client_secret,
)
user = None
elif orcid is not None:
user = NamableReference(prefix="orcid", identifier=orcid)
elif get_user is not None:
user = get_user()
click.echo(f"using repository-configured ORCiD: {user.identifier}")
else:
import pystow
orcid = pystow.get_config("sssom_curator", "orcid")
if orcid is None:
orcid = (
click.prompt("What's your ORCID?")
.removeprefix("https://orcid.org")
.rstrip("/")
)
pystow.write_config("sssom_curator", "orcid", orcid)
else:
click.echo(f"using PyStow-configured ORCiD: {orcid}")
user = NamableReference(prefix="orcid", identifier=orcid)
app = get_app(
repository=obj,
resolver_base=resolver_base,
user=user,
title=obj.web_title,
footer=obj.web_footer,
eager_persist=eager_persist,
implementation=implementation,
live_login=live_login,
orcid_client_secret=orcid_client_secret,
orcid_client_id=orcid_client_id,
)
fastapi_app = fastapi.FastAPI()
if proxy_fix:
from werkzeug.middleware.proxy_fix import ProxyFix
# only worry about applying the ProxyFix on Fly.io,
# or any probably any load balancer
proxy_fix_inst = ProxyFix(
app,
x_for=1, # get the real IP address of who makes the request
x_proto=1, # gets whether its http or https from the X-Forwarded header
# the other ones are left as default
)
middleware = WSGIMiddleware(proxy_fix_inst)
else:
middleware = WSGIMiddleware(app)
fastapi_app.mount("/", cast(ASGIApp, middleware))
protocol = "https" if ssl_keyfile and ssl_certfile else "http"
url = f"{protocol}://{host}:{port}"
if not no_open:
import webbrowser
webbrowser.open_new_tab(url)
uvicorn.run(
fastapi_app,
host=host,
port=port,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
)
else:
@click.command()
@click.pass_obj
def web(obj: Repository) -> None:
"""Show an error for the web interface."""
click.secho(
obj.web_disabled_message
or "web-based curator is not enabled, maybe because you're not in an editable "
"installation of a package that build on SSSOM-Curator?",
fg="red",
)
sys.exit(1)
return web
def get_ndex_command() -> click.Command:
"""Get a CLI for uploading to NDEx."""
@click.command()
@click.option("--username", help="NDEx username, also looks in pystow configuration")
@click.option("--password", help="NDEx password, also looks in pystow configuration")
@click.pass_obj
def ndex(obj: Repository, username: str | None, password: str | None) -> None:
"""Upload to NDEx."""
if not obj.ndex_uuid:
click.secho("can not upload to NDEx, no NDEx UUID is set in the curator configuration.")
raise sys.exit(1)
from sssom_pydantic.contrib.ndex import update_ndex
mappings = obj.read_positive_mappings()
update_ndex(
uuid=obj.ndex_uuid,
mappings=mappings,
metadata=obj.mapping_set,
username=username,
password=password,
)
click.echo(f"Uploaded to {DEFAULT_RESOLVER_BASE}/ndex:{obj.ndex_uuid}")
return ndex
def _pin_version_callback(
ctx: click.Context, option: click.Option | click.Parameter, value: Any
) -> Any:
if value:
import pyobo.api.utils
for prefix, version in cast(list[tuple[str, str]], value):
click.echo(f"pinning {prefix} to {version}")
pyobo.api.utils.pin_version(prefix, version)
PIN_VERSION_OPTION = click.option(
"-pv",
"--pin-version",
nargs=2,
multiple=True,
expose_value=False, # i.e., don't pass through to function
callback=_pin_version_callback,
help="Pin resource versions in PyOBO by giving a pair of prefix + version, such as "
"`--pin-version chmo 2025-10-21`",
)
def get_predict_command(
*,
source_prefix: str | None = None,
target_prefix: str | None | list[str] = None,
) -> click.Group:
"""Create a prediction command."""
from more_click import verbose_option
@click.group()
def predict() -> None:
"""Predict semantic mappings."""
if source_prefix is None:
source_prefix_argument = click.argument("source_prefix")
else:
source_prefix_argument = click.option("--source-prefix", default=source_prefix)
if target_prefix is None:
target_prefix_argument = click.argument("target_prefix", nargs=-1)
else:
target_prefix_argument = click.option(
"--target-prefix", multiple=True, default=[target_prefix]
)
@predict.command()
@verbose_option
@source_prefix_argument
@target_prefix_argument
@click.option("--relation", help="the predicate to assign to semantic mappings")
@click.option(
"--method",
type=click.Choice(list(typing.get_args(PredictionMethod))),
help="The prediction method to use",
)
@click.option(
"--cutoff",
type=float,
help="The cosine similarity cutoff to use for calling mappings when "
"using embedding predictions",
)
@click.option(
"--filter-mutual-mappings",
is_flag=True,
help="Remove predictions that correspond to already existing mappings "
"in either the subject or object resource",
)
@click.option(
"--force", is_flag=True, help="Force re-downloading and re-processing of resources"
)
@click.option(
"--force-process",
is_flag=True,
help="Force re-processing, but not re-downloading of resources",
)
@click.option(
"--cache/--no-cache",
is_flag=True,
help="Should a cache be made",
)
@click.option(
"--all-by-all",
is_flag=True,
help="Don't just predict from source to targets, but also between all targets",
)
@click.option(
"--identifiers-are-names",
is_flag=True,
help="Consider identifiers as names. This is typical for data models/schemas",
)
@PIN_VERSION_OPTION
@click.pass_obj
def lexical(
obj: Repository,
source_prefix: str,
target_prefix: str,
relation: str | None,
method: PredictionMethod | None,
cutoff: float | None,
filter_mutual_mappings: bool,
cache: bool,
force: bool,
force_process: bool,
all_by_all: bool,
identifiers_are_names: bool,
) -> None:
"""Predict semantic mappings with lexical methods."""
from .predict.lexical import append_lexical_predictions
append_lexical_predictions(
source_prefix,
target_prefix,
path=obj.predictions_path,
curated_paths=obj.curated_paths,
filter_mutual_mappings=filter_mutual_mappings,
relation=relation,
method=method,
cutoff=cutoff,
cache=cache,
force=force,
force_process=force_process,
all_by_all=all_by_all,
identifiers_are_names=identifiers_are_names,
)
return predict
def get_test_command() -> click.Command:
"""Get a command to run tests."""
@click.command()
@strategy_option
@click.pass_obj
def test(obj: Repository, strategy: ConverterStrategy) -> None:
"""Test the repository."""
import unittest
test_case_class = obj.get_test_class(converter_strategy=strategy)
loader = unittest.TestLoader()
suite = loader.loadTestsFromTestCase(test_case_class)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
# Exit with code 1 if tests failed, 0 otherwise
sys.exit(not result.wasSuccessful())
return test
def _get_latest_semra() -> tuple[str, None, int]:
url = "https://zenodo.org/records/15504009/files/mappings.sssom.tsv.gz"
version = None # TODO
count = 43_400_000
return url, version, count
def get_import_command() -> click.Group:
"""Get a command for importing."""
url, version, count = _get_latest_semra()
@click.group(name="import")
def import_group() -> None:
"""Import external SSSOM files."""
preview_warning = (
"This command is in preview mode, and functionality may change without warning"
)
@import_group.command(
name="semra",
help="Import raw mappings from SeMRA.\n\n"
"Currently, this workflow is configured to only import semantic mappings from the "
"SeMRA Raw Mapping database (https://doi.org/10.5281/zenodo.11082038) that are not "
"already marked as manual mapping curations, have a CC0 license, and are either "
f"an exact match or dbxref.\n\nNote: {preview_warning}",
)
@click.option(
"-p",
"--prefixes",
multiple=True,
help="Filter to mappings whose subject and objects are both in the prefix list."
"Must pass at least two.",
)
@click.pass_obj
def import_semra(obj: Repository, prefixes: list[str]) -> None:
"""Import mappings from SeMRA."""
if len(prefixes) < 2:
click.secho("requires two or more prefixes", fg="red")
raise sys.exit(1)
click.secho(preview_warning, fg="yellow")
import bioregistry
import pystow
path = pystow.ensure("semra", "cache", url=url, version=version)
converter = bioregistry.get_converter()
mappings, _, _ = sssom_pydantic.read(
path,
metadata={"mapping_set_id": url},
progress=True,
progress_kwargs={"total": count},
semantic_mapping_predicate=_get_predicate(prefixes),
converter=converter,
)
obj.append_predicted_mappings(mappings, converter=converter)
@import_group.command(
name="ontoportal",
help="Import uncurated mappings from an OntoPortal instance",
)
@click.argument("ontology_1")
@click.argument("ontology_2")
@click.option(
"--instance",
type=click.Choice(["bioportal", "agroportal", "ecoportal"]),
default="bioportal",
show_default=True,
)
@click.pass_obj
def import_ontoportal(obj: Repository, ontology_1: str, ontology_2: str, instance: str) -> None:
"""Import mappings from an OntoPortal instance."""
import bioregistry
from ontoportal_client import ontoportal_resolver
from sssom_pydantic.contrib.ontoportal import from_ontoportal
client = ontoportal_resolver.make(instance)
registry = bioregistry.get_registry(instance)
if registry is None:
click.secho(f"{instance} is not a valid Bioregistry registry", fg="red")
sys.exit(1)
left_resource = bioregistry.get_resource(ontology_1, strict=True)
left_bioportal = left_resource.get_mapped_prefix(instance)
if left_bioportal is None:
click.secho(
f"{ontology_1} does not have a {registry.get_short_name()} mapping", fg="red"
)
sys.exit(1)
right_resource = bioregistry.get_resource(ontology_2, strict=True)
right_bioportal = right_resource.get_mapped_prefix(instance)
if right_bioportal is None:
click.secho(
f"{ontology_2} does not have a {registry.get_short_name()} mapping", fg="red"
)
sys.exit(1)
converter = bioregistry.get_converter()
mappings = from_ontoportal(
left_bioportal, right_bioportal, converter=converter, client=client, progress=True
)
# Filter to only be mappings incident to the given prefixes
mappings_filtered = _keep_only_prefixes(
mappings, {left_resource.prefix, right_resource.prefix}
)
obj.append_predicted_mappings(mappings_filtered, converter=converter)
@import_group.command(name="url")
@click.argument("url")
@click.pass_obj
def import_url(obj: Repository, url: str) -> None:
"""Import mappings from a URL."""
mappings, converter, _metadata = sssom_pydantic.read(url)
obj.append_predicted_mappings(mappings, converter=converter)
@import_group.command(name="biomappings")
@click.pass_obj
def import_biomappings(obj: Repository) -> None:
"""Import predicted mappings from Biomappings."""
mappings, converter, _metadata = sssom_pydantic.read(BIOMAPPINGS_PREDICTIONS_URL)
obj.append_predicted_mappings(mappings, converter=converter)
return import_group
def _keep_only_prefixes(
mappings: Iterable[SemanticMapping], kk: set[str]
) -> Iterable[SemanticMapping]:
for m in mappings:
if m.subject.prefix in kk and m.object.prefix in kk:
yield m
def _get_predicate(prefixes: list[str]) -> SemanticMappingPredicate:
from curies.vocabulary import exact_match, has_dbxref, manual_mapping_curation
prefix_checker = set(prefixes).__contains__
predicate_checker = {exact_match, has_dbxref}.__contains__
license_checker = "CC0".__eq__
def _justification_checker(x: curies.Reference) -> bool:
return x != manual_mapping_curation
if prefixes:
def _predicate(m: SemanticMapping) -> bool:
return (
_justification_checker(m.justification)
and predicate_checker(m.predicate)
and prefix_checker(m.subject.prefix)
and prefix_checker(m.object.prefix)
and license_checker(m.license)
)
else:
def _predicate(m: SemanticMapping) -> bool:
return (
_justification_checker(m.justification)
and predicate_checker(m.predicate)
and license_checker(m.license)
)
return _predicate