gbizinfo-lod/gbizinfo_lod/command.py
2024-06-18 03:17:57 +09:00

182 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os
import shutil
import time
from typing import Iterator
import click
from . import __version__
from .client import GbizinfoClient
from .mappers import *
@click.group()
def cli():
pass
@cli.command()
def version():
click.echo(f"gbizinfo_lod/{__version__}")
@cli.command(help="Download CSV files")
@click.argument(
"work_dir", type=click.Path(exists=True, file_okay=False, writable=True)
)
@click.option("--sleep", type=int, default=3, help="Sleep time in seconds")
def download(work_dir: str, sleep: int):
client = GbizinfoClient()
csv_files = {
# CSV形式法人活動情報語彙対応版
0: "TodokedeNinteijoho",
1: "Hyoshojoho",
2: "Hojokinjoho",
3: "Chotatsujoho",
4: "Tokkyojoho",
5: "Zaimujoho",
6: "Shokubajoho",
# CSV形式
7: "Kihonjoho",
}
for file_id, name in csv_files.items():
zip_file_name = f"{name}_UTF-8.zip"
csv_file_name = f"{name}_UTF-8.csv"
zip_file_path = os.path.join(work_dir, zip_file_name)
csv_file_path = os.path.join(work_dir, csv_file_name)
click.echo(f"Downloading {zip_file_name}")
client.download_csv_to_file(zip_file_path, file_id)
click.echo(f"Unpacking {zip_file_name}")
shutil.unpack_archive(zip_file_path, work_dir)
os.remove(zip_file_path)
if not os.path.exists(csv_file_path):
raise click.ClickException(f"{csv_file_path} not found")
kihonjoho_csv_file = os.path.join(work_dir, "Kihonjoho_UTF-8.csv")
kihonjoho_imi_file = os.path.join(work_dir, "Kihonjoho_IMI_UTF-8.csv")
if not os.path.exists(kihonjoho_imi_file):
click.echo("Retrieving Kihonjoho (IMI version)")
with open(kihonjoho_imi_file, "w", encoding="utf-8") as f:
writer = None
for row in get_kihonjoho_imi(kihonjoho_csv_file, client):
if writer is None:
writer = csv.DictWriter(f, fieldnames=row.keys())
writer.writeheader()
writer.writerow(row)
def get_kihonjoho_imi(
csv_file: str, client: GbizinfoClient, sleep: int = 3
) -> Iterator[dict]:
with open(csv_file, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
count = 0
chunk = []
for row in reader:
if not row["法人名"]:
click.echo(f"skip: {row['法人番号']}", err=True)
continue
chunk.append(row["法人番号"])
if len(chunk) == client.CSV_MAX_ROWS:
count += len(chunk)
click.echo(f"{count} ...")
for r in client.output_csv(chunk):
yield r
chunk.clear()
time.sleep(sleep)
if len(chunk) > 0:
for r in client.output_csv(chunk):
yield r
count += len(chunk)
click.echo(f"{count}")
MAPPER_TYPES = [
"hojin",
"hojyokin",
"chotatsu",
"hyosho",
"todokede",
"tokkyo",
"shokuba",
"zaimu",
]
@cli.command(help="Convert CSV files to RDF")
@click.argument("work_dir")
@click.option(
"--mapper", "-m", "mappers", multiple=True, type=click.Choice(MAPPER_TYPES)
)
@click.option("--processes", "-p", type=int, default=-1)
@click.option(
"--output-dir", "-o", type=click.Path(exists=True, file_okay=False, writable=True)
)
def convert(work_dir: str, mappers: list[str], processes: int, output_dir: str):
if not mappers:
mappers = MAPPER_TYPES
if not output_dir:
output_dir = work_dir
for m in mappers:
match m:
case "hojin":
mapper = GbizInfoHojinMapper(
os.path.join(work_dir, "Kihonjoho_IMI_UTF-8.csv")
)
case "hojyokin":
mapper = GbizInfoHojyokinMapper(
os.path.join(work_dir, "Hojokinjoho_UTF-8.csv")
)
case "chotatsu":
mapper = GbizInfoChotatsuMapper(
os.path.join(work_dir, "Chotatsujoho_UTF-8.csv")
)
case "hyosho":
mapper = GbizInfoHyoshoMapper(
os.path.join(work_dir, "Hyoshojoho_UTF-8.csv")
)
case "todokede":
mapper = GbizInfoTodokedeMapper(
os.path.join(work_dir, "TodokedeNinteijoho_UTF-8.csv")
)
case "tokkyo":
mapper = GbizInfoTokkyoMapper(
os.path.join(work_dir, "Tokkyojoho_UTF-8.csv")
)
case "shokuba":
mapper = GbizInfoShokubaMapper(
os.path.join(work_dir, "Shokubajoho_UTF-8.csv")
)
case "zaimu":
mapper = GbizInfoZaimuMapper(
os.path.join(work_dir, "Zaimujoho_UTF-8.csv")
)
case _:
raise NotImplementedError
output_file = os.path.join(output_dir, f"{m}.nt")
click.echo(f"output: {output_file}")
click.echo(f"Running {m} mapper ...")
with open(output_file, "w") as f:
mapper.run(n_jobs=processes, output=f)
@cli.command(help="Fetch CSV data from OutputCSV endpoint")
@click.argument("hojin_bango")
def output_csv(hojin_bango: str):
client = GbizinfoClient()
res = client.output_csv([hojin_bango])
for row in res:
click.echo(row)