First attempts at alignment (take 3)
Continued
TEST_A = "/Users/joregan/Playing/rd_ctm_edit/H5C120171011va"
lines = []
with open(TEST_A) as f:
for line in f.readlines():
lines.append(line.strip())
def accept_all(lines):
outlines = []
for line in lines:
parts = line.split(" ")
if parts[-1] == "cor":
outlines.append(line)
elif parts[-1] == "sub":
parts[4] = parts[6]
parts[-1] = "cor"
outlines.append(" ".join(parts))
return outlines
l2 = accept_all(lines)
l2[0:2]
def ctm_to_timed(lines):
output = []
for line in lines:
parts = line.split(" ")
start = float(parts[2])
dur = float(parts[3])
output.append({
"start": start,
"end": start + dur,
"text": parts[6]
})
return output
side_a = ctm_to_timed(accept_all(lines))
phonfile = "/Users/joregan/Playing/rd_phonetic/2442205210012872721_480p.json"
import json
with open(phonfile) as f:
pieces = json.load(f)
def hf_json_to_timed(data):
output = []
for chunk in data["chunks"]:
output.append({
"start": chunk["timestamp"][0],
"end": chunk["timestamp"][1],
"text": chunk["text"]
})
return output
side_b = hf_json_to_timed(pieces)
def prune_to_other(left, right, fudge=0.5):
output = []
for item in right:
if item["start"] < left[0]["start"] - fudge:
continue
elif item["end"] > left[-1]["end"] + fudge:
continue
else:
output.append(item)
return left, output
new_a, new_b = prune_to_other(side_a, side_b)
def end_cost(a, b):
return abs(a["end"] - b["end"])
def start_cost(a, b):
return abs(a["start"] - b["start"])
def cost(a, b):
starts = start_cost(a, b)
ends = end_cost(a, b)
return starts + ends
def in_start_range(a, b, range=0.2):
return abs(a["start"] - b["start"]) <= range
def in_end_range(a, b, range=0.2):
return abs(a["end"] - b["end"]) <= range
def in_range(a, b, range=0.2):
r_start = in_start_range(a, b, range)
r_end = in_end_range(a, b, range)
return r_start or r_end
def falls_between(a1, a2, b):
if b["end"] <= a2["start"] and b["start"] >= a1["end"]:
return True
return False
import numpy as np
def approx_eq(start1, start2, factor=0.04):
return start1 == start2 or abs(start1 - start2) < factor
def align_times(new_a, new_b, merge_end_flexibility=0.06):
s1 = len(new_a)
s2 = len(new_b)
additionals = []
merges = {}
dist_matrix = np.matrix(np.ones((s1, s2)))
pair_cost = 0.0
for i in range(s1):
for j in range(s2):
if not in_range(new_a[i], new_b[j]):
continue
if i == 0 and new_b[j]["end"] < new_a[0]["start"]:
additionals.append((-1, 0, j))
dist_matrix[i, j] = 1.0
continue
elif i < (s1 - 1) and falls_between(new_a[i], new_a[i + 1], new_b[j]):
additionals.append((i, i + 1, j))
dist_matrix[i, j] = 1.0
continue
elif i == s1 and new_b[j]["start"] >= new_a[i]["end"]:
additionals.append((i, -1, j))
dist_matrix[i, j] = 1.0
continue
if approx_eq(new_a[i]["start"], new_b[j]["start"]):
tmp_j = j
fwd = []
extent = new_b[tmp_j]
if i < (s1 - 2) and new_b[tmp_j]["end"] < new_a[i + 1]["end"]:
extent = new_a[i + 1]
while tmp_j < (s2 - 1) and not in_end_range(new_a[i], extent, merge_end_flexibility):
fwd.append((end_cost(new_a[i], new_b[tmp_j]), tmp_j))
tmp_j += 1
if len(fwd) > 1:
sfwd = sorted(fwd)
new_j = sfwd[0][1]
if new_j != j:
pair_cost = sfwd[0][0]
merges[i] = [x for x in range(j, new_j + 1)]
j = new_j
if pair_cost != 1.:
pair_cost = cost(new_a[i], new_b[j])
dist_matrix[i, j] = pair_cost
return dist_matrix, additionals, merges
dist_m, additions, mrg = align_times(new_a, new_b)
new_b[24]
additions
dist_m.shape
def walk_matrix(dist_matrix, additions):
i = 0
j = 0
s1 = dist_matrix.shape[0]
s2 = dist_matrix.shape[1]
path = []
def do_additions(i, j):
if (i-1, i, j) in additions:
return True
if i+1 < s1 and (i, i + 1, j) in additions:
return True
if i == s1 and (i, -1, j) in additions:
return True
return False
while i < s1:
while j < s2:
if not i in mrg:
if do_additions(i, j):
j += 1
continue
pairs = []
tmpj = j
while tmpj < s2 - 1 and dist_matrix[i,tmpj] != 1.0:
pairs.append((dist_matrix[i,tmpj], tmpj))
tmpj += 1
if pairs != []:
spairs = sorted(pairs)
j = spairs[0][1]
path.append((i, j))
i += 1
j += 1
continue
else:
path += [(i, x) for x in mrg[i]]
j = mrg[i][-1] + 1
i += 1
continue
return path
additions
path = walk_matrix(dist_m, additions)
for pp in path:
print(new_a[pp[0]]["text"], new_b[pp[1]]["text"])
mrg
for aa in mrg:
for bb in mrg[aa]:
print(new_a[aa], new_b[bb])
import pandas as pd
df = pd.DataFrame(data=dist_m,index=[x["text"] for x in new_a], columns=[x["text"] for x in new_b])
from phonemizer import phonemize
for it_a in new_a:
print(it_a["text"], phonemize(it_a["text"], language='sv'))
CHECK_MERGED = [
("rɪksasleːdamøːtœ̞", "rɪksdɑːɡsleːdamøːtər"),
("alaspatiːæɳa", "aliːanspatiːərna"),
("sentəpatiːət", "sɛntərpatiːət"),
("kɪsdemɔkɑːtɔɳa", "kriːstdəmɔkrɑːtɛrna")
]
from difflib import SequenceMatcher
for mpair in CHECK_MERGED:
a = mpair[0]
b = mpair[1]
s = SequenceMatcher(None, a, b)
for tag, i1, i2, j1, j2 in s.get_opcodes():
print('{:7} a[{}:{}] --> b[{}:{}] {!r:>8} --> {!r}'.format(
tag, i1, i2, j1, j2, a[i1:i2], b[j1:j2]))
print(s.ratio())
print()
df
phon_dist = np.matrix(np.ones((len(new_a), len(new_b))))
for i in range(len(new_a)):
a = phonemize(new_a[i]["text"], language='sv')
for j in range(len(new_b)):
s = SequenceMatcher(None, a, new_b[j]["text"])
phon_dist[i, j] = s.ratio()
df = pd.DataFrame(data=phon_dist,index=[x["text"] for x in new_a], columns=[x["text"] for x in new_b])
df
collected = set()
with open("/tmp/lev-su") as f, open("/tmp/matched.tsv", "w") as of:
for line in f.readlines():
parts = line.strip().split()
ph1 = phonemize(parts[1], language='sv')
ph2 = phonemize(parts[2], language='sv')
if ph1 == ph2:
of.write(f"{parts[1]}\t{parts[2]}\n")