Spaces:
Sleeping
Sleeping
import json | |
import os | |
from concurrent.futures import ThreadPoolExecutor | |
import fire | |
from gtts import gTTS | |
from tqdm import tqdm | |
def update_vocab(vocab_path="data/vocab.json"): | |
compact_json(vocab_path) | |
generate_tts(vocab_path) | |
conv_to_text(vocab_path) | |
def compact_json( | |
src_path="data/vocab.json", | |
dst_path=None, | |
group_size=10, | |
ensure_ascii=False, | |
indent=2, | |
): | |
dst_path = dst_path or src_path | |
with open(src_path, "rt", encoding="UTF-8") as fp: | |
data = json.load(fp) | |
data = [item for group in data for item in group] | |
data = [data[i : i + group_size] for i in range(0, len(data), group_size)] | |
with open(dst_path, "wt", encoding="UTF-8") as fp: | |
json.dump(data, fp, cls=CompactEncoder, ensure_ascii=ensure_ascii, indent=indent) | |
print(f"output: {dst_path}") | |
class CompactEncoder(json.JSONEncoder): | |
CONTAINER_TYPES = (list, tuple, dict) | |
MAX_WIDTH = 100 | |
MAX_ITEMS = 10 | |
def __init__(self, *args, **kwargs): | |
if kwargs.get("indent") is None: | |
kwargs["indent"] = 4 | |
super().__init__(*args, **kwargs) | |
self.indentation_level = 0 | |
def encode(self, o): | |
if isinstance(o, (list, tuple)): | |
return self._encode_list(o) | |
if isinstance(o, dict): | |
return self._encode_object(o) | |
if isinstance(o, float): | |
return format(o, "g") | |
return json.dumps( | |
o, | |
skipkeys=self.skipkeys, | |
ensure_ascii=self.ensure_ascii, | |
check_circular=self.check_circular, | |
allow_nan=self.allow_nan, | |
sort_keys=self.sort_keys, | |
indent=self.indent, | |
separators=(self.item_separator, self.key_separator), | |
default=self.default if hasattr(self, "default") else None, | |
) | |
def _encode_list(self, o): | |
if self._single_line(o): | |
return "[" + ", ".join(self.encode(el) for el in o) + "]" | |
self.indentation_level += 1 | |
output = [self.indent_str + self.encode(el) for el in o] | |
self.indentation_level -= 1 | |
return "[\n" + ",\n".join(output) + "\n" + self.indent_str + "]" | |
def _encode_object(self, o): | |
if not o: | |
return "{}" | |
o = {str(k) if k is not None else "null": v for k, v in o.items()} | |
if self.sort_keys: | |
o = dict(sorted(o.items(), key=lambda x: x[0])) | |
if self._single_line(o): | |
return "{" + ", ".join(f"{self._create_kv(k,v)}" for k, v in o.items()) + "}" | |
self.indentation_level += 1 | |
output = [f"{self.indent_str}{self._create_kv(k,v)}" for k, v in o.items()] | |
self.indentation_level -= 1 | |
return "{\n" + ",\n".join(output) + "\n" + self.indent_str + "}" | |
def _create_kv(self, k, v): | |
return f"{json.dumps(k)}: {self.encode(v)}" | |
def iterencode(self, o, **_): | |
return self.encode(o) | |
def _single_line(self, o): | |
return ( | |
self._primitives_only(o) | |
and len(o) <= self.MAX_ITEMS | |
and len(str(o)) - 2 <= self.MAX_WIDTH | |
) | |
def _primitives_only(self, o: list | tuple | dict): | |
if isinstance(o, (list, tuple)): | |
return not any(isinstance(el, self.CONTAINER_TYPES) for el in o) | |
elif isinstance(o, dict): | |
return not any(isinstance(el, self.CONTAINER_TYPES) for el in o.values()) | |
def indent_str(self) -> str: | |
if isinstance(self.indent, int): | |
return " " * (self.indentation_level * self.indent) | |
elif isinstance(self.indent, str): | |
return self.indentation_level * self.indent | |
else: | |
raise ValueError(f"indent must either be of type int or str (is: {type(self.indent)})") | |
def generate_tts(src_path="data/vocab.json", output_dir="data/tts"): | |
os.makedirs(output_dir, exist_ok=True) | |
data = load_json(src_path) | |
text_list = [item["kana"] for item_list in data for item in item_list] | |
def generate_tts_worker(text): | |
fp = os.path.join(output_dir, f"{text}.mp3") | |
if os.path.exists(fp): | |
return | |
gTTS(text=text, lang="ja").save(fp) | |
with tqdm(total=len(text_list), desc="generating tts") as pbar: | |
with ThreadPoolExecutor() as executor: | |
for _ in executor.map(generate_tts_worker, text_list): | |
pbar.update(1) | |
def conv_to_text(vocab_path, dst_path="data/vocab.txt"): | |
vocab_list = load_json(vocab_path) | |
lines = list() | |
for group in vocab_list: | |
for v in group: | |
t = [t for t in (v["kana"], v["kanji"], v["meaning"]) if t] | |
lines.append(" ".join(t)) | |
lines.append("") | |
with open(dst_path, "wt", encoding="UTF-8") as fp: | |
fp.write("\n".join(lines)) | |
def load_json(path): | |
with open(path, "rt", encoding="UTF-8") as fp: | |
return json.load(fp) | |
if __name__ == "__main__": | |
fire_map = dict(update=update_vocab, compact=compact_json, tts=generate_tts) | |
fire.Fire(fire_map) | |