TODO:

  • normalise text for inline marks ([breath], etc.)
  • get annotation IDs in a better way (currently there's a hardcoded list)
  • do something better with truncated entries (skip?)
def slurpfile(filename) -> str:
    with open(filename) as inf:
        return inf.read().strip()

The API key is just read from label_studio_mine in the current directory (relative to the notebook). It's available under "Accounts & Settings" in the user menu, top right of the screen.

timecode_dir = "/Users/joregan/timecode_cut"
output_dir = "/tmp/textgrid_cut"
host = "http://130.237.3.107:8080/api/"
api_token: str = slurpfile("../_drafts/label_studio_mine")
from pathlib import Path

timecode_path = Path(timecode_dir)
output_path = Path(output_dir)
def get_timecode_offsets(filename):
    if type(filename) == Path:
        filename = str(filename)

    with open(filename) as inf:
        lines = [l.strip() for l in inf.readlines()]
        assert lines[0] == ",Frame,Time (Seconds),TimeCode", f"CSV file ({filename}) seems to be incorrect"
        p_start = lines[1].split(",")
        start = float(p_start[2])
        p_end = lines[-1].split(",")
        end = float(p_end[2])
        return start, end
import requests
import json
from pathlib import Path

headers = {
    "Authorization": f"Token {api_token}"
}

FIXME: need a better way to get these than hardcoding a list, but that will take a bunch of reading API docs

IDS = [
    89,
    163,
    164,
    165,
    166,
    167,
    168,
    169,
    170,
    171,
    172,
    173,
    174,
    175,
    176,
    177,
    178,
    223,
    224,
    225,
    226,
    227,
    228,
    230,
    231,
    232,
    233,
    234,
    235,
    236,
    237,
    238,
    239,
    240,
    241,
    264,

    286,
    297,
    295,
    298,
    290,
    287,
    285,
    282,
    281,
    280,
    279,
    278,
    277,
    276,
    275,
    273,
    271,
    272,
    289,
    291,
    292,
    265,
    288,
    293,
    299,
    303,
    304,
    302,
    267,
    270,
    266,
    284,
    162,
    161,
    229
]
def get_task(task_id):
    ep = f"{host}tasks/{task_id}"
    req = requests.get(ep, headers=headers)
    if req.status_code != 200:
        return {}
    data = json.loads(req.text)
    return data
def get_annotation(annot_it):
    ep = f"{host}annotations/{annot_it}"
    req = requests.get(ep, headers=headers)
    assert req.status_code == 200
    data = json.loads(req.text)
    return data
data = get_annotation(264)
def combine_labels(data):
    combined = {}

    if "result" in data:
        for res in data["result"]:
            if not res["id"] in combined:
                combined[res["id"]] = res
            else:
                if "text" in res["value"]:
                    combined[res["id"]]["value"]["text"] = res["value"]["text"]
                elif "labels" in res["value"]:
                    combined[res["id"]]["value"]["labels"] = res["value"]["labels"]
    return combined
OK_STARTS = [
    "hsi_7_0719_210_003_inter",
    "hsi_4_0717_211_001_main",
    "hsi_7_0719_211_004_main",
    "hsi_7_0719_210_002_main",
    "hsi_7_0719_211_002_main"
]
OK_ENDS = [
    "hsi_3_0715_209_006_main",
    "hsi_3_0715_227_003_main",
    "hsi_7_0719_222_002_inter",
    "hsi_7_0719_227_002_main",
    "hsi_7_0719_211_002_main",
    "hsi_6_0718_209_002_main",
    "hsi_6_0718_209_001_main",
    "hsi_6_0718_210_001_main"
]

DELETE_ENDS = [
    "hsi_7_0719_222_004_inter",
    "hsi_6_0718_209_001_inter"
]

DELETE_STARTS = [
    "hsi_5_0718_209_001_main",
]

FIXES_START = {
    "hsi_3_0715_210_010_main": "o welcome",
    "hsi_6_0718_209_001_inter": "Hi. It's like it looks like a very nice place you have here.",
    "hsi_7_0719_227_002_inter": "Yeah that's a very nice apartment you have here.",
    "hsi_7_0719_222_002_main": "you know, it was really expensive property."
}

FIXES_END = {
    "hsi_7_0719_209_001_inter": "Okay, but yeah, yeah",
    "hsi_7_0719_210_001_main": "sometimes when I am reading and I feel that I don't have that much light [smack] I just turn that on. But uh otherwise I just keep this setup here with the small lamp and the chair and I just read something have a glass of wine or something that's real",
    "hsi_7_0719_222_004_main": "But maybe maybe if you if you put a small table, like two small tables next, I mean, i- on both sides of the fireplace",
    "hsi_5_0718_222_003_main": "so so and i was thinking of the bins there, uh, do you need one of the bins because you could take one if you want to yes of course take it it's you can have it because i was anyway trying to so so yeah okay"
}
def adjust_times_write_tsv(data):
    task = data["task"]
    task_data = get_task(task)
    if "data" in task_data and "audio" in task_data["data"]:
        orig_file = task_data["data"]["audio"]
        parts = orig_file.split("/")
        orig_file = parts[-1]
    if orig_file:
        out_part = orig_file.replace(".wav", ".csv")
        orig_file = out_part.replace("_main", "").replace("_inter", "")
    else:
        return []
    tsv_file = timecode_path / orig_file
    if not output_path.is_dir():
        output_path.mkdir()
    out_file = output_path / out_part

    if not tsv_file.exists():
        return []
    start, end = get_timecode_offsets(str(tsv_file))

    results = []

    combined = combine_labels(data)

    for item in combined:
        val = combined[item]["value"]
        if not "labels" in val:
            continue
        if not "Speech" in val["labels"]:
            continue
        e_start = val["start"]
        e_end = val["end"]
        text = val["text"]
        if len(text) > 1:
            for t in text:
                t = t.strip()
                if not (t.startswith("/") and t.endswith("/")):
                    text = t
        else:
            if text[0].startswith("/"):
                text = None
            else:
                text = text[0]

        new_start = e_start - start
        new_end = e_end - start

        if text is None:
            continue

        out_stem = out_part.replace(".csv", "")

        if new_end < 0.0:
            continue
        elif e_start >= end and e_end > end:
            continue
        elif new_start < 0.0 and new_end > 0.0:
            if text != "":
                if out_stem in FIXES_START:
                    text = FIXES_START[out_stem]
                if out_stem in DELETE_STARTS:
                    continue
                if not (out_stem in OK_STARTS or out_stem in FIXES_START):
                    print("Warning", out_part, "truncating entry", e_start, e_end, text)
            if results == []:
                results.append((0.0, new_end, text))
            else:
                print("Shouldn't have existing entries!!", out_part, e_start, e_end, text)
                results.append((0.0, new_end, text))
        elif e_start >= start and e_end <= end:
            results.append((new_start, new_end, text))
        elif e_start <= end and e_end > end:
            if text != "":
                if out_stem in DELETE_ENDS:
                    continue
                if out_stem in FIXES_END:
                    text = FIXES_END[out_stem]
                if not (out_stem in OK_ENDS or out_stem in FIXES_END):
                    print("Warning", out_part, "truncating entry", e_start, e_end, text)
            results.append((new_start, new_end, text))
        else:
            print("There should be no default case", out_part, e_start, e_end, text)

    sorted_results = sorted(results, key=lambda x: x[0])

    with open(out_file, "w") as outf:
        for res in sorted_results:
            outf.write("\t".join([str(x) for x in list(res)]) + "\n")
        
for transcription in IDS:
    data = get_annotation(transcription)
    if not "task" in data:
        print("Error with task", transcription)
        continue
    else:
        adjust_times_write_tsv(data)