LabelStudio annotations to TSV
raw output
Version of this that does not trim the outputs.
def slurpfile(filename) -> str:
with open(filename) as inf:
return inf.read().strip()
The API key is just read from label_studio_mine
in the current directory (relative to the notebook). It's available under "Accounts & Settings" in the user menu, top right of the screen.
timecode_dir = "/Users/joregan/timecode_cut"
output_dir = "/tmp/textgrid_cut"
host = "http://130.237.3.107:8080/api/"
api_token: str = slurpfile("label_studio_mine")
from pathlib import Path
timecode_path = Path(timecode_dir)
output_path = Path(output_dir)
def get_timecode_offsets(filename):
if type(filename) == Path:
filename = str(filename)
with open(filename) as inf:
lines = [l.strip() for l in inf.readlines()]
assert lines[0] == ",Frame,Time (Seconds),TimeCode", f"CSV file ({filename}) seems to be incorrect"
p_start = lines[1].split(",")
start = float(p_start[2])
p_end = lines[-1].split(",")
end = float(p_end[2])
return start, end
import requests
import json
from pathlib import Path
headers = {
"Authorization": f"Token {api_token}"
}
FIXME: need a better way to get these than hardcoding a list, but that will take a bunch of reading API docs
IDS = [
89,
163,
164,
165,
166,
167,
168,
169,
170,
171,
172,
173,
174,
175,
176,
177,
178,
223,
224,
225,
226,
227,
228,
230,
231,
232,
233,
234,
235,
236,
237,
238,
239,
240,
241,
264,
286,
297,
295,
298,
290,
287,
285,
282,
281,
280,
279,
278,
277,
276,
275,
273,
271,
272,
289,
291,
292,
265,
288,
293,
299,
303,
304,
302
]
def get_task(task_id):
ep = f"{host}tasks/{task_id}"
req = requests.get(ep, headers=headers)
if req.status_code != 200:
return {}
data = json.loads(req.text)
return data
def get_annotation(annot_it):
ep = f"{host}annotations/{annot_it}"
req = requests.get(ep, headers=headers)
assert req.status_code == 200
data = json.loads(req.text)
return data
data = get_annotation(264)
def combine_labels(data):
combined = {}
if "result" in data:
for res in data["result"]:
if not res["id"] in combined:
combined[res["id"]] = res
else:
if "text" in res["value"]:
combined[res["id"]]["value"]["text"] = res["value"]["text"]
elif "labels" in res["value"]:
combined[res["id"]]["value"]["labels"] = res["value"]["labels"]
return combined
def adjust_times_write_tsv(data):
task = data["task"]
task_data = get_task(task)
if "data" in task_data and "audio" in task_data["data"]:
orig_file = task_data["data"]["audio"]
parts = orig_file.split("/")
orig_file = parts[-1]
if orig_file:
out_part = orig_file.replace(".wav", ".tsv")
else:
return []
out_file = output_path / out_part
if not output_path.is_dir():
output_path.mkdir()
results = []
combined = combine_labels(data)
for item in combined:
val = combined[item]["value"]
if not "labels" in val:
continue
if not "Speech" in val["labels"]:
continue
start = val["start"]
end = val["end"]
text = val["text"]
if len(text) > 1:
for t in text:
if not (t.startswith("/") and t.endswith("/")):
text = t
else:
text = text[0]
results.append((start, end, text))
with open(out_file, "w") as outf:
for res in results:
outf.write("\t".join([str(x) for x in list(res)]) + "\n")
for transcription in IDS:
data = get_annotation(transcription)
if not "task" in data:
print("Error with task", transcription)
continue
else:
adjust_times_write_tsv(data)