""" A script to facilitate creating and maintaining a mapping between terms used in the VO Registry and the UAT. """ import json import os import re import sys import time import pytrigdict def get_with_cache(cache_name, parser, producer): if (os.path.exists(cache_name) and time.time()-os.path.getmtime(cache_name)<3600*24*150): with open(cache_name, "rb") as f: return parser(f) stuff = producer() with open(cache_name, "wb") as f: f.write(stuff) return get_with_cache(cache_name, parser, None) def get_longest_common_prefix(strings): strings = set(strings) common_prefix = strings.pop() while strings: other = strings.pop() for index in range(min(len(common_prefix)+1, len(other)+1)): if other[:index]!=common_prefix[:index]: break common_prefix = other[:index] return common_prefix[:-1] def get_ivoa_voc(voc_id): """returns a deserialised desise dictionary of an IVOA vocabulary. This will cache the desise in the current vocabulary and only pull it anew every 150 days. """ def fetch_voc(): from urllib import request vocrepo_url = "http://www.ivoa.net/rdf/" voc_req = request.Request( vocrepo_url+voc_id, headers={"accept": "application/x-desise+json"}) return request.urlopen(voc_req).read() return get_with_cache( ".cachedvoc-"+voc_id.replace("/", "_"), json.load, fetch_voc) def get_subjects(): """returns a list of pairs of term and frequency. It's sorted by decreasing frequency. """ def query_subjects(): import pyvo srv = pyvo.dal.TAPService("http://dc.g-vo.org/tap") res = srv.run_sync(""" select res_subject as term, count(*) as ct from rr.res_subject group by res_subject order by ct desc""").table return json.dumps([[r["term"], int(r["ct"])] for r in res]).encode("utf-8") return get_with_cache( ".term_freqs", json.load, query_subjects) def load_stuff(): """returns the vocabulary, the known subjects, and the matcher for the vocabulary. """ voc = get_ivoa_voc("uat") subjects = get_subjects() matcher = pytrigdict.Trigdict() for term, t_data in voc["terms"].items(): matcher[t_data["label"]] = term return voc, subjects, matcher def bootstrap(): """prints matches the script is rather confident of. (there's still lots of junk in there, so there was quite a bit of hand editing involved; I don't think this will ever need to run again). """ voc, subjects, matcher = load_stuff() for s, ct in subjects: mat = matcher[s] if mat and mat[0][0]>0.7: print("{}\t{}".format(s, mat[0][1])) def label_to_term(label:str): """returns an IVOA term for a label. "term" is the thing behind the hash. It needs to consist of letters and a few other things exclusively. We're replacing runs of one or more non-letters by a single dash. For optics, we're also lowercasing the whole thing. ConceptMapping makes sure what's resulting is unique within the IVOA UAT. """ return re.sub("[^a-z0-9]+", "-", label.lower()) def map_interactively(): """runs a Tk UI to match registry subjects to UAT concepts. This will write new mappings to stdout. """ import tkinter class TermSelector(tkinter.Frame): def __init__(self, parent, voc): tkinter.Frame.__init__(self, parent) self.__parent = parent self.voc = voc self._write_in_label = tkinter.Label(self, width=40, height=20, justify=tkinter.LEFT, anchor=tkinter.NW, relief=tkinter.SUNKEN) self.update_idletasks() self._write_in_label_placed = False def reset(self): for child in self.pack_slaves(): child.pack_forget() def set_for_options(self, options, handler): self.reset() if options is None: # sentinel for "no term left"; do nothing return tkinter.Button(self, text="Postpone", background="#ccffcc", command=lambda: handler("ivoa:TryAgain") ).pack(expand=1, fill=tkinter.X, side=tkinter.TOP) tkinter.Button(self, text="Bad/invalid term", background="#eecccc", command=lambda: handler("ivoa:None") ).pack(expand=1, fill=tkinter.X, side=tkinter.TOP) for o in options: tkinter.Button(self, text=o, command=lambda o=o: handler(o) ).pack(expand=1, fill=tkinter.X, side=tkinter.TOP) self._write_in_value = tkinter.StringVar(self) self._write_in_value.trace_add('write', self._show_completions) self._write_in = tkinter.Entry(self.__parent, textvariable=self._write_in_value) self._write_in_value.set("") self._write_in.place(anchor=tkinter.SE, relx=1, rely=1, width=300, height=40) self._write_in.lift() self._write_in.bind("", lambda ev: handler(self._write_in.get())) self._write_in.bind("", self._expand_longest_prefix) self._write_in.focus_set() def _show_completions(self, name, index, mode): prefix = self._write_in_value.get() if prefix: if not self._write_in_label_placed: self._write_in_label.place(relx=0, rely=0, anchor=tkinter.NW) self._write_in_label.lift() self._write_in_label_placed = True else: if self._write_in_label_placed: self._write_in_label.place_forget() self._write_in_label_placed = False return self._write_in_label.config( text=self._get_current_completions(prefix)) def _get_matches_for(self, prefix): return set(term for term in self.voc["terms"] if term.startswith(prefix)) def _expand_longest_prefix(self, ev): prefix = self._write_in_value.get() matches = self._get_matches_for(prefix) if len(matches)==1: self._write_in.delete(0, tkinter.END) self._write_in.insert(0, list(matches)[0]) elif not matches: pass else: longest_prefix = get_longest_common_prefix(matches) if len(longest_prefix)>len(prefix): self._write_in.delete(0, tkinter.END) self._write_in.insert(0, longest_prefix) return "break" def _get_current_completions(self, prefix): matches = self._get_matches_for(prefix) if not matches: return "(----------)" max_len = max(len(t) for t in matches) while len(matches)>20: matches = set(m[:max_len-1] for m in matches) max_len -= 1 return "\n".join(sorted(matches)) class UI(tkinter.Tk): def __init__(self, already_mapped): tkinter.Tk.__init__(self) self.voc, self.subjects, self.matcher = load_stuff() self.already_mapped = already_mapped self._build_children() self.bind("", lambda ev: self.quit()) self._fill_for_next() def _build_children(self): self.to_map_label = tkinter.Label(self, text="(No term)") self.to_map_label.grid(row=0, column=0, sticky=tkinter.NW+tkinter.SE) self.options_frame = TermSelector(self, self.voc) self.options_frame.grid(row=0, column=1, sticky=tkinter.NW+tkinter.SE) self.columnconfigure(0, weight=40) self.columnconfigure(1, weight=60) def _fill_for_next(self): while self.subjects: subject, freq = self.subjects.pop(0) if (subject not in self.already_mapped and label_to_term(subject) not in self.voc["terms"]): break else: self._fill_when_done() return self._fill_for_subject(subject, freq) def _fill_when_done(self): self.to_map_label.config(text="(All subjects mapped)") self.options_frame.set_for_options(None) def _fill_for_subject(self, subject, freq): self.current_subject = subject self.to_map_label.config(text="{}\n{}".format(subject, freq)) matches = reversed(self.matcher.bestmatches(subject, 20)) self.options_frame.set_for_options( [m[1] for m in matches], self.selection_made) def selection_made(self, term): print("{}\t{}".format(re.sub(r"\s+", " ", self.current_subject), term)) self._fill_for_next() with open("res/mapping.tsv") as f: already_mapped = [ln.split("\t")[0] for ln in f] ui = UI(already_mapped) ui.geometry("800x800") ui.mainloop() sys.stderr.write("Copy res/mapping.tsv to ../rr/res/uat-mapping.tsv\n") if __name__=="__main__": # bootstrap() map_interactively()