Split sentences in CTM-edit files
Generating sentences from Riksdag: in progress
from pathlib import Path
BASE = Path("/Users/joregan/Playing/rd_ctm_edit")
files = []
with open("/tmp/terror-files") as terrorf:
for ff in terrorf.readlines():
files.append(ff.strip())
def tidy(word, lc=True):
if word[0:1] == '"':
word = word[1:]
if word[-1:] == '"':
word = word[:-1]
if word[-1:] in ".!?,;-":
word = word[:-1]
if lc:
return word.lower()
else:
return word
import difflib
a = "afkanistan"
b = "Afghanistan."
def ratio(a, b):
return difflib.SequenceMatcher(None, a, tidy(b)).ratio()
print(ratio(a, b))
print(ratio("male", "Mali"))
print(ratio("normader", "Nomader"))
def slurp(filename):
lines = []
with open(filename) as infile:
for line in infile.readlines():
lines.append(line.strip())
return lines
testing = slurp("/Users/joregan/Playing/rd_ctm_edit/H001UFöU4")
testing[0:5]
CONJ = ["och", "men"]
_SUBS = """
afkanistan Afghanistan
maly Mali
male Mali
övvik Ö-vik
abebégymnasiet ABB-gymnasiet
abeflygplan A/B-flygplan
abefs ABF:s
abfs ABF:s
abies ABS
abieffkurserna ABF-kurserna
abief ABF
aczen Axén
adeärrkort ADR-kort
adeärr ADR
adiellbedömningen ADL-bedömningen
adiesselstationer ADSL-stationer
adihodepreparaten adhd-preparaten
adihodemediciner adhd-mediciner
adihodeläkemedel adhd-läkemedel
adihodedroger adhd-droger
aduptioner adoptioner
aeff AF
aelless ALS
aenddeesstrategin ANDTS-strategin
aettaffisch A1-affisch
aendete ANDT
aendeteanvändandet ANDT-användandet
aendeteessfrågorna ANDTS-frågorna
aendeteesspolitiken ANDTS-politiken
aendeteessstrategin ANDTS-strategin
aendetefrågan ANDT-frågan
aendetefrågor ANDT-frågor
aendeteområdet ANDT-området
aendetessområdet ANDTS-området
aendetesspolitiskt ANDTS-politiskt
aendetesstrategi ANDTS-strategi
aendetestrategi ANDT-strategi
aendetestrategin ANDT-strategin
rutavdraget RUT-avdraget
sahäll Sahel
akim Aqim
alkaida_relaterade al-Qaida-relaterade
libien Libyen
malie Mali
kunskapoc kunskap_och
adisabeba Addis_Abeba
eus EU:s
gihadister jihadister
peesertester PCR-tester
beesiärtest PCR-test
"""
def get_subs():
subs = {}
for l in _SUBS.split("\n"):
if l.strip() == "":
continue
p = l.strip().split(" ")
if not p[0] in subs:
subs[p[0]] = []
for part in p[1:]:
subs[p[0]].append(part)
return subs
def is_subst(a, b, lc=False):
if a in SUBS:
if b in SUBS[a]:
return True
elif lc and b.lower() in SUBS[a]:
return True
return False
def modify_pairs(sent_a, sent_b):
def get_start_dur(sent_a, sent_b):
start = sent_a[2]
a_start = float(sent_a[2])
a_dur = float(sent_a[3])
b_start = float(sent_b[2])
b_dur = float(sent_b[3])
b_end = b_start + b_dur
new_dur = b_end - a_start
return start, "{:.3f}".format(new_dur)
a = sent_a.split(" ")
b = sent_b.split(" ")
changed = False
if a[4] == tidy(b[6]) or is_subst(a[4], tidy(b[6], False)):
if a[6] == "<eps>":
changed = True
a[4] = a[6] = b[6]
b[6] = "<eps>"
a[7] = "cor"
b[7] = "ins"
if b[4] in CONJ:
b[7] = "ins-conj"
elif a[4] == "<eps>":
if a[6] + b[6] == b[4]:
changed = True
joined = f"{a[6]}_{b[6]}"
b[4] = b[6] = joined
b[7] = "cor"
a = []
elif a[4] + b[4] == tidy(a[6]) and b[6] == "<eps>":
print("a")
changed = True
start, end = get_start_dur(a, b)
b[4] = b[6] = a[6]
b[7] = "cor"
b[2] = start
b[3] = end
a = []
elif a[4] + b[4] == tidy(b[6]) and a[6] == "<eps>":
print("b")
changed = True
start, end = get_start_dur(a, b)
b[4] = b[6]
b[7] = "cor"
b[2] = start
b[3] = end
a = []
if changed:
return (" ".join(a), " ".join(b))
else:
return None
modify_pairs("2442207150019781021 1 58.4 0.78 invandringspolitiken 1.0 <eps> ins",
"2442207150019781021 1 59.9 0.079 men 1.0 invandringspolitiken. sub")
modify_pairs("2442207150019781021 1 97.12 0.0 <eps> 1.0 insats del",
"2442207150019781021 1 97.18 0.739 insatsregeringen 1.0 regeringen sub")
modify_pairs("2442207150019781021 1 121.18 0.559 afkanistan 1.0 <eps> ins",
"2442207150019781021 1 122.62 0.079 och 1.0 Afghanistan. sub")
modify_pairs("2442207150019781021 1 121.18 0.559 af 1.0 <eps> ins",
"2442207150019781021 1 122.62 0.079 ghanistan 1.0 Afghanistan. sub")
def modify_single(sentence):
if sentence == "END":
return sentence
parts = sentence.split(" ")
tidied = tidy(parts[6])
if is_subst(parts[4], tidied, True):
parts[4] = parts[6]
parts[7] = "cor"
return " ".join(parts)
return sentence
SUBS = get_subs()
is_subst("male", "Mali")
%pip install more-itertools
import more_itertools
for pair in more_itertools.windowed(testing[0:6], 2):
print(pair)
def partition(lines):
def has_final(line):
FINAL = ".!?"
parts = line.strip().split(" ")
piece = parts[6]
if piece.endswith('"'):
piece = piece[:-1]
return piece[-1:] in FINAL
def is_capital(line):
parts = line.strip().split(" ")
piece = parts[6]
return piece[0:1].isupper()
def splittable(a, b):
return has_final(a) and is_capital(b)
sentences = []
current = []
last_mod = ""
for pair in more_itertools.windowed(lines + ["END"], 2):
first = modify_single(pair[0])
second = modify_single(pair[1])
if last_mod != "":
second = last_mod
last_mod = ""
if second == "END":
current.append(first)
sentences.append(current[:])
else:
mod = modify_pairs(first, second)
if mod is not None:
first = mod[0]
second = mod[1]
last_mod = second
elif splittable(first, second):
current.append(first)
sentences.append(current[:])
current = []
else:
current.append(first)
return sentences
parting = partition(testing)
ttt = parting[0]
ttt
def sentence_correctness(sentence):
length = len(sentence)
score = 0
for word in sentence:
parts = word.split(" ")
if parts[-1].startswith("cor"):
score += 1
return score / length
sentence_correctness(parting[4])
parting[25]
parting[26]
parting[28]
def find_terror_segs(segments):
terror = []
def ismatch(word):
return "terror" in word or "teror" in word
for segment in segments:
for line in segment:
parts = line.split(" ")
if ismatch(parts[4]) or ismatch(parts[6].lower()):
terror.append(segment)
return terror
tsegs = find_terror_segs(parting)
len(tsegs)
tsegs
def get_readable_sentence(sentence):
outwords = []
for word in sentence:
parts = word.split(" ")
if parts[-1].startswith("cor"):
outwords.append(parts[6])
else:
if parts[4] == "<eps>":
outwords.append(f"+({parts[6]})")
elif parts[6] == "<eps>":
outwords.append(f"-({parts[4]})")
else:
outwords.append(f"({parts[4]}/{parts[6]})")
return " ".join(outwords)
get_readable_sentence(tsegs[0])
tsegs[0]
get_readable_sentence(parting[28])
OUTPATH = Path("/tmp/outfiles")
for file in files:
filestem = Path(file).stem
OUTFILE = OUTPATH / filestem
TERRORFILE = OUTPATH / f"TERROR_{filestem}"
with open(OUTFILE, "w") as outputfile, open(TERRORFILE, "w") as terrorfile:
lines = slurp(file)
if lines == []:
continue
partitions = partition(lines)
for ptn in partitions:
for line in ptn:
outputfile.write(line + "\n")
outputfile.write("\n")
terror_segs = find_terror_segs(partitions)
for tsg in terror_segs:
for line in tsg:
terrorfile.write(line + "\n")
terrorfile.write("\n")
%cd /Users/joregan/Playing/sync_asr
%pip install -e .
from sync_asr.ctm_edit import split_sentences, ctm_from_file, generate_filename
from pathlib import Path
BASEOUT = Path("/Users/joregan/rd_ctm_edit/riksdag_spoken_sentences")
CLEANDIR = BASEOUT / "clean"
INDIR = BASEOUT / "noisy"
NOISYDIR = BASEOUT / "noisy2"
#INDIR = Path("/tmp/rdtmp/")
def check_correct(lines):
for line in lines:
if line.edit != "cor":
return False
return True
noisy = []
for file in INDIR.glob("H*"):
noisy = []
if file.name == "H810255":
continue
counter = 1
lines = ctm_from_file(file)
splits = split_sentences(lines)
def write_noisy():
outfile = NOISYDIR / f"{file.name}_{counter:04d}"
with open(outfile, "w") as of:
for line in noisy:
of.write(str(line) + "\n")
for split in splits:
if check_correct(split):
fn = generate_filename(split)
with open(CLEANDIR / fn, "w") as of:
for line in split:
of.write(str(line) + "\n")
if noisy != []:
write_noisy()
counter += 1
noisy = []
else:
noisy += split
if noisy != []:
write_noisy()