Breath detector textgrid to Label Studio
Because their import sucks
def slurpfile(filename) -> str:
with open(filename) as inf:
return inf.read().strip()
host = "http://130.237.3.107:8080/api/"
api_token: str = slurpfile("label_studio_mine")
input_dir = "/Users/joregan/Desktop/breath_corrected/"
import requests
import json
from pathlib import Path
headers = {
"Authorization": f"Token {api_token}"
}
def get_projects():
req = requests.get(f"{host}projects", headers=headers)
assert req.status_code == 200
data = json.loads(req.text)
return data
def get_project_id_from_name(name):
projects = get_projects()
for res in projects["results"]:
if res["title"].strip() == name.strip():
return res["id"]
get_project_id_from_name("Main 6")
def get_tasks(projectid):
req = requests.get(f"{host}tasks", headers=headers, params={"project": projectid})
assert req.status_code == 200
data = json.loads(req.text)
return data
def index_task_filestem_to_id(tasks_data):
tasks = tasks_data["tasks"]
mapping = {}
for task in tasks:
task_id = task["id"]
if "storage_filename" in task:
task_raw_path = task["storage_filename"]
else:
task_raw_path = task["data"]["audio"]
if not task_raw_path:
continue
task_stem = task_raw_path.split("/")[-1]
mapping[task_stem] = task_id
return mapping
get_project_id_from_name("Speaker 3")
tasks = get_tasks(8)
mapping = index_task_filestem_to_id(tasks)
mapping
import json
import uuid
from praatio import textgrid
labels = {}
labels["n"] = "Noise"
labels["spn"] = "Noise"
labels["b"] = "Breath"
labels["ct"] = "Cross-talk"
labels["sp"] = "Speech"
texts = {}
texts["n"] = "noise"
texts["spn"] = "spn"
texts["b"] = "breath"
texts["ct"] = "crosstalk"
texts["sp"] = ""
def tg_to_result(tgfile):
outputs = []
tg = textgrid.openTextgrid(tgfile, False)
tiername = "annot"
if not tiername in tg.tierNames:
tiername = "words"
tier = tg.getTier(tiername)
for entry in tier.entries:
text = entry.label.strip()
if text == "":
continue
if not text in labels:
continue
label = labels[text]
text = texts[text]
gen_id = str(uuid.uuid4())[:6]
segment = {
"value": {
"start": entry.start,
"end": entry.end,
"channel": 0,
"labels": [label]
},
"from_name": "labels",
"to_name": "audio",
"type": "labels",
"id": gen_id
}
rec = {
"value": {
"start": entry.start,
"end": entry.end,
"channel": 0,
"text": [text]
},
"from_name": "transcription",
"to_name": "audio",
"type": "textarea",
"id": gen_id
}
outputs.append(segment)
outputs.append(rec)
return outputs
def post_results(id, task, project, results):
ep = f"{host}annotations/{id}/?taskID={task}&project={project}"
cur_headers = {i: headers[i] for i in headers}
cur_headers["Content-type"] = "application/json"
content = {
"was_cancelled": False,
"ground_truth": False,
"project": project,
"draft_id": 0,
"parent_prediction": None,
"parent_annotation": None,
"result": results
}
r = requests.patch(ep, data=json.dumps(content), headers=cur_headers)
return r
file = f"{input_dir}hsi_4_0717_211_002_main.TextGrid"
data = tg_to_result(file)
r = post_results(263, 77, 5, data)
print(r.text)
from pathlib import Path
for fn in mapping:
tgfile = fn.replace(".wav", ".TextGrid")
if (Path(input_dir) / tgfile).exists():
print(mapping[fn], fn)
count = 230
for task in mapping:
jsonfile = task.replace(".wav", ".TextGrid")
file = f"{input_dir}{jsonfile}"
if not (Path(input_dir) / jsonfile).exists():
continue
data = tg_to_result(file)
r = post_results(count, mapping[task], 8, data)
count += 1
print(r.text)
tmap = {}
count = 99
for task in mapping:
tmap[task] = count
count += 1
from pathlib import Path
for file in Path("/Users/joregan/Playing/hsi_ctmedit/textgrid").glob("*.TextGrid"):
wavfile = file.stem + ".wav"
if wavfile in mapping:
print(wavfile, mapping[wavfile])