=========================== GAVO DaCHS: File Processing =========================== :Author: Markus Demleitner :Email: gavo@ari.uni-heidelberg.de This is a short manual on how to use DaCHS' helpers to preprocess data before ingesting it. Sometimes you want to change something on the input files you are receiving. While usually we recommend coping with the input through grammars, rowmakers, and the like since this helps maitaining consistency with what the scientists intended and also stability when new data arrives, there are cases when you deliver data to users, most frequently, with FITS files. There, you may need to add or change headers. However, sometimes you just want to traverse all sources. Let's cover this case first. Processors ---------- The basic infrastructure for manipulating sources is the FileProcessor class, available from gavo.helpers. Here is an example checking whether the sizes of files match what an (externally defined) function ``_getExpectedSize(fName) -> int`` returns:: import os from gavo import api from gavo.helpers import processing class SizeChecker(FileProcessor): def process(self, srcName): found = os.path.getsize(srcName) expected = _getExpectedSize(srcName) if found!=expected: print "%s: is %s, should be %s"%(srcName, found, expected) if __name__=="__main__": procmain(SizeChecker, "potsdam/q", "import") The call to ``procmain`` arranges for the command line to be parsed and expects, in addition to the processor *class*, an id for the resource descriptor for the data it should process, and the id of the data descriptor that ingests the files. Processor Command line '''''''''''''''''''''' The processors can define command line options of their own. You could, for example, read the expected sizes from some sort of catalogue. To do that, define an addOptions static method, like this:: class Processor(processing.FileProcessor): @staticmethod addOptions(optParser): processing.FileProcessor.addOptions(optParser) optParser.add_option("--cat-name", help="Resdir-relative path to" " the plate catalogue", action="store", type="str", dest="catPath", default="res/plates.cat") Make sure you always do the upward call. Cf. the optparse documentation for what you can do. The options object returned by optParser is available as the opts attribute on your processor. To keep the chance of name clashes in this sort of inheritance low, always use long options only. Simple FileProcessors support the following options: --filter It takes a value, a substring that has to be in the source's name for it to be processed. This is for when you want to try out new code on just one file or a small subset of files. --bail Rather than going on when a process method lets an exception escape, abort the processing at the first error and dump a traceback. Use this to figure out bugs in your (or our) code. --report More on this in `Processor Report Generation`_ Auxillaries ''''''''''' Once you have the catalogue name, you will want to read it and make it available to the process method. To allow you to do this, you can override the _createAuxillaries(dd) method. It receives the data descriptor of the data to be processed. Here's an example:: class Processor(processing.FileProcessor): def _createAuxillaries(self, dd): self.catEntriesUsed = 0 catPath = os.path.join(dd.rd.resdir, self.opts.catPath) self.catalogue = {} for ln in open(catPath): id, val = ln.split() self.catalogue[id] = val As you can see, you can access the options given on the command line as self.opts here. Gathering Data '''''''''''''' If you want your processor to gather data, you can use the fact that procmain returns the processor it created. Here is a version of the simple size checker above that outputs a sorted list of bad files:: class SizeChecker(FileProcessor): def _createAuxillaries(self, dd): self.mess = [] def process(self, srcName): found = os.path.getsize(srcName) expected = _getExpectedSize(srcName) if found!=expected: self.mess.append((srcName, expected, found)) if __name__=="__main__": res = procmain(SizeChecker, "potsdam/q", "import") res.mess.sort(key=lambda rec: abs(rec[1]-rec[2])) for name, expected, found in res.mess: print "%10d %10d %8d %s"%(expected, found, expected-found, name) Processor Report Generation ''''''''''''''''''''''''''' Most of the time, when gathering data (or otherwise), what you are doing is basically generate a report of some sort. For such simple cases, you will usually want to use the --report option. This causes the processor to skip process and instead call a method that will in turn call the ``classify(sourceName)`` method. It must return a string that will serve as a class label. At the end of the run, the processor will print a summary of the class frequencies. Here's what such a classify method could look like:: def classify(self, srcName): hdr = self.getPrimaryHeader(srcName) try: ignored = "FILTER_A" in hdr return "ok" except ValueError: # botched cards on board return "botched" Utility Methods ''''''''''''''' FileProcessor instances have some utility methods handy when processing files for DaCHS: * ``getProductKey(fName) -> str`` returns the "product key" fName would have; this currently is just fName's path relative to the inputsDir (or an exception if fName is not below inputsDir). This method lets you easily interchange data between your file processor and ignore elements or the inputRelativePath macro in RDs. Basic FITS Manipulation ----------------------- For manipulating FITS headers, there is the HeaderProcessor class. It is a FileProcessor, so everything said there applies here as well, except that you usually do not want to override the process method. Rather, you will probably want to override the ``_isProcessed(srcName) -> boolean`` method and one of * ``_mungeHeader(srcName, header) -> pyfits hdr`` or * ``_getHeader(srcName) -> pyfits hdr``. ``_isProcessed`` must return True if you think the name file already has your new headers, False otherwise. Files for which _isProcessed returns True are not touched. ``_getHeader`` is the method called by process to obtain a new header. It must return the complete new header for the file named in the argument. Since it is very common to base this on the file's existing header, there is ``_mungeHeader`` that receives the current header. _mungeHeader should in general raise a processing.CannotComputeHeader exception if it cannot generate a header (e.g., missing catalogue entry, nonsensical input data). If you return None from either _mungeHeader or _getHeader, a generic CannotComputeHeader exception will be raised. Note again that you have to return a *complete* header, i.e., including all cards you want to keep from the original header (but see `Header Selection`_). A somewhat silly example could look like this:: from gavo import api from gavo import processing class SillyProcessor(processing.FileProcessor): def _isProcessed(self, srcName): return self.getPrimaryHeader(srcName).has_key("NUMPIXELS") def _mungeHeader(self, srcName, hdr): hdr.update("NUMPIXELS") = hdr["NAXIS1"]*hdr["NAXIS2"] return hdr if __name__=="__main__": processing.procmain(SillyProcessor, "testdata/theRD", "sillyData") Processors are expected to have an addOptions static method receiving an optparser.OptionParser instance and adding options it wants to see. Call --help on the program above to see FileProcessor's options. Things are arranged like this (check out the process and _makeCache methods in the source code), where proc stands of the name of the ingesting program: * ``proc`` computes headers for all input files not yet having "cached" headers. Cached headers live alongside the fits files and have ".hdr" attached to them. The headers are *not* applied to the original files. * ``proc --apply --no-compute`` applies cached headers to the input files that do not yet have headers. In particular when processing is lengthy (e.g., astrometrical calibration), it is probably a good idea to keep processing and header application a two-step process. * ``proc --apply`` in addition tries to compute header caches and applies them. This could be the default operation when header computation is fast * ``proc --reprocess`` recreates caches (without this option, cached headers are never touched). You want this option if you found a bug in your _getHeader method and need to to recompute all the headers. * ``proc --reheader --apply`` replaces processed headers on the source files. This is necessary when you want to apply reprocessed headers. Without --reheader, to header that looks like it is "fixed" (according to your _isProcessed code) is ever touched. Admittedly, this logic is a bit convolved, but the fine-grained manipulation intensity is nice when your operations are expensive. By default, files for which the processing code raises exceptions are ignored; the number of files ignored is shown when procmain is finished. If you want to run more than one processor over a given dataset, you will have to override the headerExt class attribute of your processors so all are distinct. By default, the attribute contains ".hdr". Without overriding it, your processors would overwrite the other's cached headers. However, that's usually not enough since on --apply only one header would win. One way of coping is by always applying one processor before running the next. Another could be the use of keepKeys (see below). By the way, if the original FITS header is badly broken or you don't want to use it anyway, you can override the _getHeader(srcName) -> header method. Its default implementation is something like:: def _getHeader(self, srcName): return self._mungeHeader(srcName, self.getPrimaryHeader(srcName)) The getPrimaryHeader(srcName) -> pyfits header method is a convenience method of FITSProcessors with obvious functionality. Header Selection ---------------- Due to the way pyfits manipulates header fields without data, certain headers must be taken from the original file, overwriting values in the cached headers. These are the headers actually describing the data format, available in the processor's keepKeys attribute. Right now, this is:: keepKeys = set(["SIMPLE", "BITPIX", "NAXIS", "NAXIS1", "NAXIS2", "EXTEND", "BZERO", "BSCALE"]) You can amend this list as necessary in your _createAuxillaries method, most likely like this:: self.keepKeys = self.keepKeys.copy() self.keepKeys.add("EXPTIME") You will have to do this if you have more than one processor (using headerExt) and want to be able to apply them in any sequence. This, however, is not usually worth the effort. Since these operations may mess up the sequence of header cards in a way that violates the FITS standard, after this the new headers are sorted. This is done via fitstools.sortHeaders. This function can take two additional functions commentFilter and historyFilter, both receiving the card value and returning True to keep the card and False to discard it. Processors take these from like-named methods that you can override. The default implementation keeps all comments and history items. For example, to nuke all comment cards not containing "IMPORTANT", you could define:: def commentFilter(self, comment): return "IMPORTANT" in comment Astrometry.net -------------- Calibration using Astrometry.net '''''''''''''''''''''''''''''''' If you have uncalibrated (optical) images, you can try to automatically calibrate them using astrometry.net. The DC software comes with an interface to it in helpers.anet, and the file processing infrastructure is what you want to use here. You probably want to inherit from AnetHeaderProcessor, more or less like this:: from gavo.helpers import processing class MyProcessor(processing.AnetHeaderProcessor): sp_indices = ["index-208.fits"], sp_lower_pix = 0.1 sp_upper_pix = 0.2 sp_endob = 50 def _getHeader(self, srcName): hdr = processing.AnetHeaderProcessor._getHeader(self, srcName) if hdr is not None: hdr.update("LOCATION", "Unknown") return hdr The class attributes starting with ``sp\_`` are parameters for the solver. The `anet module docstring`_ explains what is available. The endob parameter is important on larger images because it instructs anet to give up when no identification has been possible within the first endob objects. It keeps the solver from wasting enormous amounts of time on potentially thousands of spurious detections, e.g., on photographic plates. The call to ``_getHeader`` illustrates how to modify the default behaviour (which is to just add the WCS headers if they are available). If you want to use SExtractor for source extraction, add a sexControl class attribute. If it is empty, extraction will be done using some default parameters. You can add more (refer to the sextractor manual):: sexControl = """ DETECT_MINAREA 100 DETECT_THRESH 8 SEEING_FWHM 1.2 """ -- do not change CATALOG_TYPE, CATALOG_NAME, and PARAMETERS_NAME. You can even filter what sextractor has obtained. To do that, define and ``objectFilter`` method (in addition to the ``sexControl`` attribute):: from gavo.utils import pyfits ... def objectFilter(inName): """throws out funny-looking objects from inName and throws out objects near the border. """ hdulist = pyfits.open(inName) data = hdulist[1].data width = max(data.field("X_IMAGE")) height = max(data.field("Y_IMAGE")) badBorder = 0.3 data = data[data.field("ELONGATION")<1.2] data = data[data.field("X_IMAGE")>width*badBorder] data = data[data.field("X_IMAGE")height*badBorder] data = data[data.field("Y_IMAGE") ws.png We use gm (from GraphicsMagick) here since netpbm's fitstopnm has issues with large files. You will want to use different scales for larger or smaller images both in gm convert's scale and plotxy's -S option, or leave them out altogether, like this:: gm convert -flip img.fits pnm:- | $ANET_PATH/plotxy -I - -i img.axy -C red -P -w 2 -N50 -X X_IMAGE -Y Y_IMAGE > ws.png for smaller images. Also, change the argument to -N if you change endob in the solverParameters to get an idea which objects are actually looked at. What to Try ''''''''''' In the case of calibration failures you may play around with SExtractor's parameters DETECT_MINAREA and DETECT_THRESH. This is done by running:: calibrate.py --minarea=MINAREA --detectthreshold=DETECTTHRESH DETECT_THRESH refers to the detection threshold (in sigma) above the local background. A group (of pixels) is formed by a number of pixels connected to each other whose values exceed the local threshold. DETECT_MINAREA sets a lower bound on the number of pixels a group should have to trigger a detection. The default values used for the calibration are MINAREA = 300 and DETECTTHRESH = 4. In some cases it is useful to decrease the MINAREA parameter and to increase the detection reliability by increasing the threshold value, e.g.:: calibrate.py --minarea=10 --detectthreshold=6