Support for output in nquads

This commit is contained in:
Babibubebon 2024-06-19 00:39:00 +09:00
parent bb2998f781
commit fefb76e117
Signed by: Babibubebon
GPG key ID: 78C8FB2A2FEA1EE3
10 changed files with 93 additions and 29 deletions

View file

@ -9,6 +9,7 @@ import click
from . import __version__
from .client import GbizinfoClient
from .mappers import *
from .mappers import RDFFormatType
@click.group()
@ -64,7 +65,7 @@ def download(work_dir: str, sleep: int):
click.echo("Retrieving Kihonjoho (IMI version)")
with open(kihonjoho_imi_file, "w", encoding="utf-8") as f:
writer = None
for row in get_kihonjoho_imi(kihonjoho_csv_file, client):
for row in get_kihonjoho_imi(kihonjoho_csv_file, client, sleep):
if writer is None:
writer = csv.DictWriter(f, fieldnames=row.keys())
writer.writeheader()
@ -123,7 +124,15 @@ MAPPER_TYPES = [
@click.option(
"--output-dir", "-o", type=click.Path(exists=True, file_okay=False, writable=True)
)
def convert(work_dir: str, mappers: list[str], processes: int, output_dir: str):
@click.option(
"--format",
"-f",
type=click.Choice([v.name for v in RDFFormatType]),
default=RDFFormatType.nq.name,
)
def convert(
work_dir: str, mappers: list[str], processes: int, output_dir: str, format: str
):
if not mappers:
mappers = MAPPER_TYPES
if not output_dir:
@ -166,11 +175,11 @@ def convert(work_dir: str, mappers: list[str], processes: int, output_dir: str):
case _:
raise NotImplementedError
output_file = os.path.join(output_dir, f"{m}.nt")
output_file = os.path.join(output_dir, f"{m}.{format}")
click.echo(f"output: {output_file}")
click.echo(f"Running {m} mapper ...")
with open(output_file, "w") as f:
mapper.run(n_jobs=processes, output=f)
mapper.run(n_jobs=processes, output=f, format=RDFFormatType[format])
@cli.command(help="Fetch CSV data from OutputCSV endpoint")

View file

@ -2,13 +2,15 @@ import csv
import sys
from abc import ABC, abstractmethod
from typing import IO, Iterator, Tuple, Union
from enum import Enum
from joblib import Parallel, delayed
from rdflib import BNode
from rdflib import BNode, URIRef
from rdflib import Literal as LiteralRdflib
from rdflib.graph import _ObjectType, _PredicateType, _SubjectType, _TripleType
from rdflib.namespace import RDF
from rdflib.plugins.serializers.nt import _nt_row
from rdflib.plugins.serializers.nquads import _nq_row
_TripleMapType = Tuple[
_SubjectType, _PredicateType, Union[str, _ObjectType, "BlankPredicateObjectMap"]
@ -16,6 +18,7 @@ _TripleMapType = Tuple[
_PredicateObjectType = Tuple[
_PredicateType, Union[str, _ObjectType, "BlankPredicateObjectMap"]
]
RDFFormatType = Enum("RDFFormatType", ["nt", "nq"])
class Literal:
@ -66,8 +69,8 @@ def flatten_triple_map(triple_map: _TripleMapType) -> list[_TripleType]:
return triples
def serialize_triple(triple: _TripleMapType) -> str:
return _nt_row(triple)
def serialize_triple(triple: _TripleMapType, graph: URIRef | None = None) -> str:
return _nq_row(triple, graph) if graph else _nt_row(triple)
class CSV2RDFMapper(ABC):
@ -80,33 +83,39 @@ class CSV2RDFMapper(ABC):
for row in reader:
yield row
def run(self, n_jobs: int = -1, output: IO[str] = sys.stdout):
if n_jobs == 1:
for row in self.iterator():
for triple_map in self.map_to_triples(self.preprocess(row)):
for triple in flatten_triple_map(triple_map):
output.write(serialize_triple(triple))
else:
def job(row: dict[str, str]) -> str:
return "".join(
[
serialize_triple(triple)
for triple_map in self.map_to_triples(self.preprocess(row))
for triple in flatten_triple_map(triple_map)
]
)
res = Parallel(n_jobs=n_jobs, return_as="generator_unordered", verbose=1)(
delayed(job)(row) for row in self.iterator()
def run(
self,
n_jobs: int = -1,
output: IO[str] = sys.stdout,
format: RDFFormatType = RDFFormatType.nq,
):
def job(row: dict[str, str]) -> str:
return "".join(
[
serialize_triple(
triple,
graph=self.graph if format == RDFFormatType.nq else None,
)
for triple_map in self.map_to_triples(self.preprocess(row))
for triple in flatten_triple_map(triple_map)
]
)
for lines in res:
output.write(lines)
res = Parallel(n_jobs=n_jobs, return_as="generator_unordered", verbose=1)(
delayed(job)(row) for row in self.iterator()
)
for lines in res:
output.write(lines)
@staticmethod
def preprocess(row: dict[str, str]) -> dict[str, str | None]:
return {key: val if val != "" else None for key, val in row.items()}
@property
@abstractmethod
def graph(self) -> URIRef:
raise NotImplementedError
@staticmethod
@abstractmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
@ -123,6 +132,7 @@ from .tokkyo import GbizInfoTokkyoMapper
from .zaimu import GbizInfoZaimuMapper
__all__ = [
"RDFFormatType",
"GbizInfoHojinMapper",
"GbizInfoHojyokinMapper",
"GbizInfoChotatsuMapper",

View file

@ -1,11 +1,16 @@
from rdflib import URIRef
from ..namespace import *
from . import _TripleMapType, bpo
from . import _TripleMapType
from ._katsudo import GbizInfoKatsudoMapper
class GbizInfoChotatsuMapper(GbizInfoKatsudoMapper):
"""調達情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/chotatsu")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
ss = HJ_EXT[f"{row['ID-識別値']}_{row['キー情報']}"]

View file

@ -7,6 +7,10 @@ from . import CSV2RDFMapper, Literal, _TripleMapType, bpo
class GbizInfoHojinMapper(CSV2RDFMapper):
"""法人情報・法人基本情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/hojin")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
s = HJ_DATA[row["ID-識別値"]]

View file

@ -1,3 +1,5 @@
from rdflib import URIRef
from ..namespace import *
from . import Literal, _TripleMapType, bpo
from ._katsudo import GbizInfoKatsudoMapper
@ -6,6 +8,10 @@ from ._katsudo import GbizInfoKatsudoMapper
class GbizInfoHojyokinMapper(GbizInfoKatsudoMapper):
"""補助金情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/hojyokin")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
ss = HJ_EXT[f"{row['ID-識別値']}_{row['キー情報']}"]

View file

@ -1,3 +1,5 @@
from rdflib import URIRef
from ..namespace import *
from . import _TripleMapType
from ._katsudo import GbizInfoKatsudoMapper
@ -6,6 +8,10 @@ from ._katsudo import GbizInfoKatsudoMapper
class GbizInfoHyoshoMapper(GbizInfoKatsudoMapper):
"""表彰情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/hyosho")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
ss = HJ_EXT[f"{row['ID-識別値']}_{row['キー情報']}"]

View file

@ -1,3 +1,5 @@
from rdflib import URIRef
from ..namespace import *
from . import Literal, _TripleMapType, bpo
from ._katsudo import GbizInfoKatsudoMapper
@ -6,6 +8,10 @@ from ._katsudo import GbizInfoKatsudoMapper
class GbizInfoShokubaMapper(GbizInfoKatsudoMapper):
"""職場情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/shokuba")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
s = HJ_DATA[row["ID-識別値"]]

View file

@ -1,3 +1,5 @@
from rdflib import URIRef
from ..namespace import *
from . import _TripleMapType
from ._katsudo import GbizInfoKatsudoMapper
@ -6,6 +8,10 @@ from ._katsudo import GbizInfoKatsudoMapper
class GbizInfoTodokedeMapper(GbizInfoKatsudoMapper):
"""届出認定情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/todokede")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
ss = HJ_EXT[f"{row['ID-識別値']}_{row['キー情報']}"]

View file

@ -1,3 +1,5 @@
from rdflib import URIRef
from ..namespace import *
from . import _TripleMapType, bpo
from ._katsudo import GbizInfoKatsudoMapper
@ -6,6 +8,10 @@ from ._katsudo import GbizInfoKatsudoMapper
class GbizInfoTokkyoMapper(GbizInfoKatsudoMapper):
"""特許情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/tokkyo")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
ss = HJ_EXT[f"{row['ID-識別値']}_{row['キー情報']}"]

View file

@ -1,3 +1,5 @@
from rdflib import URIRef
from ..namespace import *
from . import Literal, _TripleMapType, bpo
from ._katsudo import GbizInfoKatsudoMapper
@ -6,6 +8,10 @@ from ._katsudo import GbizInfoKatsudoMapper
class GbizInfoZaimuMapper(GbizInfoKatsudoMapper):
"""財務情報"""
@property
def graph(self) -> URIRef:
return URIRef("http://hojin-info.go.jp/graph/zaimu")
@staticmethod
def map_to_triples(row: dict[str, str]) -> list[_TripleMapType]:
s = HJ_DATA[row["ID-識別値"]]