===========================
GAVO DaCHS: File Processing
===========================
:Author: Markus Demleitner
:Email: gavo@ari.uni-heidelberg.de
.. contents::
:depth: 2
:backlinks: entry
:class: toc
This is a manual on how to use DaCHS' helpers to preprocess data
before ingesting it and do other things based on iterating over lots of
sources.
Sometimes you want to change something on the input files you are
receiving. While usually we recommend coping with the input through
grammars, rowmakers, and the like since this helps maitaining
consistency with what the scientists intended and also stability when
new data arrives, there are cases when you deliver data to users,
most frequently, with FITS files. There, you may need to add or change
headers.
However, sometimes you just want to traverse all sources, maybe to
validate them, maybe to compute something from them; the prime example
for the latter is pre-computing previews.
Processors
----------
The basic infrastructure for manipulating sources is the FileProcessor
class, available from gavo.helpers.
Here is an example checking whether the sizes of files match what an
(externally defined) function ``_getExpectedSize(fName) -> int`` returns::
import os
from gavo import api
from gavo.helpers import processing
class SizeChecker(processing.FileProcessor):
def process(self, srcName):
found = os.path.getsize(srcName)
expected = _getExpectedSize(srcName)
if found!=expected:
print "%s: is %s, should be %s"%(srcName, found, expected)
if __name__=="__main__":
processing.procmain(SizeChecker, "potsdam/q", "import")
The call to ``procmain`` arranges for the command line to be
parsed and expects, in addition to the processor *class*, an id for
the resource descriptor for the data it should process, and the id of
the data descriptor that ingests the files.
As usual, you can raise ``base.SkipThis()`` to pretend process had never
been called for a certain ``srcName``.
Processor Command line
''''''''''''''''''''''
The processors can define command line options of their own. You could,
for example, read the expected sizes from some sort of catalogue. To do
that, define an addOptions static method, like this::
class Processor(processing.FileProcessor):
@staticmethod
addOptions(optParser):
processing.FileProcessor.addOptions(optParser)
optParser.add_option("--cat-name", help="Resdir-relative path to"
" the plate catalogue", action="store", type="str",
dest="catPath", default="res/plates.cat")
Make sure you always do the upward call. Cf. the optparse documentation
for what you can do. The options object returned by optParser is
available as the opts attribute on your processor. To keep the chance
of name clashes in this sort of inheritance low, always use long options
only.
Simple FileProcessors support the following options:
--filter
It takes a value, a substring that has to be in the
source's name for it to be processed. This is for when you want to try
out new code on just one file or a small subset of files.
--bail
Rather than going on when a process method lets an exception escape,
abort the processing at the first error and dump a traceback. Use
this to figure out bugs in your (or our) code.
--report
More on this in `Processor Report Generation`_
-j
Number of processes to run in parallel (see next section)
Parallel Execution
''''''''''''''''''
Processors in principle can be executed in parallel *processes* (using
the ``-j`` flag as with make), provided they are written to support this
– which means no temporary files that could have name clashes, no other
shared mutable resources without synchronization, and so on.
The main problem with when forking out workers are database connections
– in short, if you want to run your processors in parallel, you'll have
to open new connections rather than use the connection pool. The
details are even more painful, so a good rule of thumb is: If you're
using the database more or less indirectly, just let your processor run
overnight rather than try to fix its database interface.
Auxillaries
'''''''''''
Once you have the catalogue name, you will want to read it and make it
available to the process method. To allow you to do this, you can
override the _createAuxillaries(dd) method. It receives the data
descriptor of the data to be processed. Here's an example::
class Processor(processing.FileProcessor):
def _createAuxillaries(self, dd):
self.catEntriesUsed = 0
catPath = os.path.join(dd.rd.resdir, self.opts.catPath)
self.catalogue = {}
for ln in open(catPath):
id, val = ln.split()
self.catalogue[id] = val
As you can see, you can access the options given on the command line
as self.opts here.
Gathering Data
''''''''''''''
If you want your processor to gather data, you can use the fact that
procmain returns the processor it created. Here is a version of the
simple size checker above that outputs a sorted list of bad files::
class SizeChecker(processing.FileProcessor):
def _createAuxillaries(self, dd):
self.mess = []
def process(self, srcName):
found = os.path.getsize(srcName)
expected = _getExpectedSize(srcName)
if found!=expected:
self.mess.append((srcName, expected, found))
if __name__=="__main__":
res = processing.procmain(SizeChecker, "potsdam/q", "import")
res.mess.sort(key=lambda rec: abs(rec[1]-rec[2]))
for name, expected, found in res.mess:
print "%10d %10d %8d %s"%(expected, found, expected-found, name)
Processor Report Generation
'''''''''''''''''''''''''''
Most of the time, when gathering data (or otherwise), what you are doing
is basically generate a report of some sort. For such simple cases, you
will usually want to use the --report option. This causes the processor
to skip process and instead call a method that will in turn call the
``classify(sourceName)`` method. It must return a string that will
serve as a class label. At the end of the run, the processor will print
a summary of the class frequencies.
Here's what such a classify method could look like::
def classify(self, srcName):
hdr = self.getPrimaryHeader(srcName)
try:
ignored = "FILTER_A" in hdr
return "ok"
except ValueError: # botched cards on board
return "botched"
Overriding the Sources
''''''''''''''''''''''
By default, processors iterate over all the sources returned by the
referenced data element's sources element. Sometimes that is not what
you want, typically because some rowfilter adds things or because
the data is completely virtual and the input files only have a very
loose relation to what is published through the service.
In these cases, override the processor's iterIdentifiers method. It has
to yield things suitable as the parameter for process. It is a good
idea to have these be strings, though you might get away with other
objects if you accept that some error messages may look funny.
The classical case is getting accrefs from a table, like this::
from gavo import api
...
def iterIdentifiers(self):
tableId = self.dd.makes[0].table.getQName()
with api.getTableConn() as conn:
for r in conn.queryToDicts("select accref from %s"%tableId):
yield r["accref"]
A very typical case is when an "artificial" format generated on the fly
gets added to the SDM table to return something for FORMAT=compliant
queries. In the RD, this could look like this::
"\schema.data"
"image/fits"
"image/png"
\standardPreviewPath
yield row
baseAccref = os.path.splitext(row["prodtblPath"])[0]
row["prodtblAccref"] = baseAccref+".vot"
row["prodtblPath"] = "dcc://\rdIdDotted/mksdm?"+urllib.quote(
row["prodtblPath"])
row["prodtblMime"] = "application/x-votable+xml"
yield row
Note that the preview path and mime are the same for both versions,
which means that previews should only be computed for the first kind of
data. To effect that, write your PreviewMaker like this::
class PreviewMaker(processing.SpectralPreviewMaker):
sdmId = "build_sdm_data"
def iterIdentifiers(self):
for id in processing.SpectralPreviewMaker.iterIdentifiers(self):
if not id.endswith(".vot"):
yield id
Utility Methods
'''''''''''''''
FileProcessor instances have some utility methods handy when processing
files for DaCHS:
* ``getProductKey(fName) -> str`` returns the "product key" fName would
have; this currently is just fName's path relative to the inputsDir
(or an exception if fName is not below inputsDir). This method lets
you easily interchange data between your file processor and ignore
elements or the inputRelativePath macro in RDs.
Precomputing previews
---------------------
While DaCHS can compute previews of 2D FITS images on the fly, in many
cases there are good reasons to precompute previews. If you follow some
conventions when doing this, the process becomes much smoother.
When making previews, it is usually much more convenient to work with
accrefs rather than actual file paths. That is particularly true with
spectra, which in DaCHS frequently are virtual data, such that an accref
doesn't correspond to an actual file.
Where there are actual files and you didn't do any magic with the
accrefs, you can retrieve the full path by computing
``os.path.join(api.getConfig("inputsDir"), accref)``.
processing.PreviewMaker
'''''''''''''''''''''''
processing contains a ``PreviewMaker`` class with some convenience
methods. To use it, give the data descriptor a ``previewDir`` property,
like this::
previews
...
– the value is the resdir-relative name of the directory that will
contain the preview files.
To generate the previews, all you have to do is inherit from
``PreviewMaker`` and implement ``getPreviewData(srcName) -> imageData``.
PIL, stuff from `utils.imgtools`_ or something similar usually is your
friend here. Here's a full example that would compute 200x100
one-channel jpegs for some image format understood by PIL::
import os
from cStringIO import StringIO
import Image
from gavo import api
from gavo.helpers import processing
class PreviewMaker(processing.PreviewMaker):
def getPreviewData(self, accref):
srcName = os.path.join(api.getConfig("inputsDir"), accref)
im = Image.open(srcName)
scale = max(im.size)/200.
resized = im.resize((
int(im.size[0]/scale),
int(im.size[1]/scale)))
rendered = StringIO()
resized.save(rendered, format="jpeg")
return rendered.getvalue()
if __name__=="__main__":
processing.procmain(PreviewMaker, "example/q", "import")
If this were in ``bin/mkpreview.py``, you could then say::
python bin/mkpreview.py
to compute previews for all files that don't have one yet, and you
can call::
python bin/mkpreview.py --report
to see if previews are missing.
Here's how you could compute color previews when you have images in
three filters in the FITS extensions 2, 3, and 4::
import numpy
from gavo.helpers import processing
from gavo.utils import fitstools
from gavo.utils import imgtools
from gavo.utils import pyfits
def _getArrayFor(srcName, extInd):
return numpy.array(list(
fitstools.iterScaledRows(srcName, destSize=200, extInd=extInd)))
class PreviewMaker(processing.PreviewMaker):
def getPreviewData(self, srcName):
return imgtools.colorJpegFromNumpyArrays(
_getArrayFor(srcName, 1),
_getArrayFor(srcName, 2),
_getArrayFor(srcName, 1))
if __name__=="__main__":
processing.procmain(PreviewMaker, "lmu/q", "import_imgs")
Making Previews for Spectra
'''''''''''''''''''''''''''
If you already have a datalink service defined for making SDM-compliant
spectra, you can easily re-use that to generate spectral previews. For
that, there's ``processingSpectralPreviewMaker``. All it needs is the
id of data element making the SDM instances in the ``sdmId`` class
attribute. The following would do::
from gavo.helpers import processing
class PreviewMaker(processing.SpectralPreviewMaker):
sdmId = "build_sdm_data"
if __name__=="__main__":
processing.procmain(PreviewMaker, "flashheros/q", "import")
By default, this produces spectra that are logscaled on the flux axis.
You can set the class attribute ``linearFluxes = True`` to have linear
scaling instead if that works better for your data.
Basic FITS Manipulation
-----------------------
For manipulating FITS headers, there is the HeaderProcessor class. It
is a FileProcessor, so everything said there applies here as well,
except that you usually do not want to override the process method.
Rather, you will probably want to override the
``_isProcessed(srcName) -> boolean`` method and one of
* ``_mungeHeader(srcName, header) -> pyfits hdr`` or
* ``_getHeader(srcName) -> pyfits hdr``.
``_isProcessed`` must return True if you think the name file already has
your new headers, False otherwise. Files for which _isProcessed returns
True are not touched.
``_getHeader`` is the method called by process to obtain a new header.
It must return the complete new header for the file named in the
argument. Since it is very common to base this on the file's existing
header, there is ``_mungeHeader`` that receives the current header.
_mungeHeader should in general raise a processing.CannotComputeHeader
exception if it cannot generate a header (e.g., missing catalogue entry,
nonsensical input data). If you return None from either _mungeHeader or
_getHeader, a generic CannotComputeHeader exception will be raised.
Note again that you have to return a *complete* header, i.e., including all
cards you want to keep from the original header (but see
`Header Selection`_).
A somewhat silly example could look like this::
from gavo import api
from gavo import processing
class SillyProcessor(processing.HeaderProcessor):
def _isProcessed(self, srcName):
return self.getPrimaryHeader(srcName).has_key("NUMPIXELS")
def _mungeHeader(self, srcName, hdr):
hdr.update("NUMPIXELS") = hdr["NAXIS1"]*hdr["NAXIS2"]
return hdr
if __name__=="__main__":
processing.procmain(SillyProcessor, "testdata/theRD", "sillyData")
Processors are expected to have an addOptions static method receiving
an optparser.OptionParser instance and adding options it wants to see.
Call --help on the program above to see FileProcessor's options. Things
are arranged like this (check out the process and _makeCache methods in
the source code), where proc stands of the name of the ingesting program:
* ``proc`` computes headers for all input files not yet having "cached"
headers. Cached headers live alongside the fits files and have
".hdr" attached to them. The headers are *not* applied to the
original files.
* ``proc --apply --no-compute`` applies cached headers to the input
files that do not yet have headers. In particular when processing
is lengthy (e.g., astrometrical calibration), it is probably a good
idea to keep processing and header application a two-step process.
* ``proc --apply`` in addition tries to compute header caches and
applies them. This could be the default operation when header
computation is fast
* ``proc --reprocess`` recreates caches (without this option, cached
headers are never touched). You want this option if you found a
bug in your _getHeader method and need to to recompute all the
headers.
* ``proc --reheader --apply`` replaces processed headers on the source
files. This is necessary when you want to apply reprocessed headers.
Without --reheader, to header that looks like it is "fixed"
(according to your _isProcessed code) is ever touched.
Admittedly, this logic is a bit convolved, but the fine-grained
manipulation intensity is nice when your operations are expensive.
By default, files for which the processing code raises exceptions are
ignored; the number of files ignored is shown when procmain is finished.
If you want to run more than one processor over a given dataset, you
will have to override the headerExt class attribute of your processors
so all are distinct. By default, the attribute contains ".hdr".
Without overriding it, your processors would overwrite the other's
cached headers. However, that's usually not enough since on --apply
only one header would win. One way of coping is by always applying one
processor before running the next. Another could be the use of keepKeys
(see below).
By the way, if the original FITS header is badly broken or you don't
want to use it anyway, you can override the _getHeader(srcName) ->
header method. Its default implementation is something like::
def _getHeader(self, srcName):
return self._mungeHeader(srcName, self.getPrimaryHeader(srcName))
The getPrimaryHeader(srcName) -> pyfits header method is a convenience
method of FITSProcessors with obvious functionality.
Header Selection
----------------
Due to the way pyfits manipulates header fields without data, certain
headers must be taken from the original file, overwriting values in the
cached headers. These are the headers actually describing the data
format, available in the processor's keepKeys attribute. Right now,
this is::
keepKeys = set(["SIMPLE", "BITPIX", "NAXIS", "NAXIS1", "NAXIS2",
"EXTEND", "BZERO", "BSCALE"])
You can amend this list as necessary in your _createAuxillaries method,
most likely like this::
self.keepKeys = self.keepKeys.copy()
self.keepKeys.add("EXPTIME")
You will have to do this if you have more than one processor (using
headerExt) and want to be able to apply them in any sequence. This,
however, is not usually worth the effort.
Since these operations may mess up the sequence of header cards in a
way that violates the FITS standard, after this the new headers are
sorted. This is done via fitstools.sortHeaders. This function can take
two additional functions commentFilter and historyFilter, both receiving
the card value and returning True to keep the card and False to discard
it.
Processors take these from like-named methods that you can override.
The default implementation keeps all comments and history items. For
example, to nuke all comment cards not containing "IMPORTANT", you could
define::
def commentFilter(self, comment):
return "IMPORTANT" in comment
Astrometry.net
--------------
Calibration using Astrometry.net
''''''''''''''''''''''''''''''''
If you have uncalibrated (optical) images, you can try to
automatically calibrate them using astrometry.net. The DC software
comes with an interface to it in helpers.anet, and the file processing
infrastructure is what you want to use here.
You probably want to inherit from AnetHeaderProcessor, more or less like
this::
from gavo.helpers import processing
class MyProcessor(processing.AnetHeaderProcessor):
sp_indices = ["index-4215"],
sp_lower_pix = 0.1
sp_upper_pix = 0.2
sp_endob = 50
def _getHeader(self, srcName):
hdr = processing.AnetHeaderProcessor._getHeader(self, srcName)
if hdr is not None:
hdr.update("LOCATION", "Unknown")
return hdr
The class attributes starting with ``sp\_`` are parameters for the solver.
The `anet module docstring`_ explains what is available. The endob
parameter is important on larger images because it instructs anet to
give up when no identification has been possible within the first endob
objects. It keeps the solver from wasting enormous amounts of time on
potentially thousands of spurious detections, e.g., on photographic
plates.
The call to ``_getHeader`` illustrates how to modify the default
behaviour (which is to just add the WCS headers if they are available).
If you want to use SExtractor for source extraction, add a sexControl
class attribute. If it is empty, extraction will be done using some
default parameters. You can add more (refer to the sextractor manual)::
sexControl = """
DETECT_MINAREA 100
DETECT_THRESH 8
SEEING_FWHM 1.2
"""
-- do not change CATALOG_TYPE, CATALOG_NAME, and PARAMETERS_NAME.
You can even filter what sextractor has obtained. To do that, define
and ``objectFilter`` method (in addition to the ``sexControl``
attribute)::
from gavo.utils import pyfits
...
def objectFilter(inName):
"""throws out funny-looking objects from inName and throws out objects
near the border.
"""
hdulist = pyfits.open(inName)
data = hdulist[1].data
width = max(data.field("X_IMAGE"))
height = max(data.field("Y_IMAGE"))
badBorder = 0.3
data = data[data.field("ELONGATION")<1.2]
data = data[data.field("X_IMAGE")>width*badBorder]
data = data[data.field("X_IMAGE")height*badBorder]
data = data[data.field("Y_IMAGE") ws.png
We use gm (from GraphicsMagick) here since netpbm's fitstopnm has issues with
large files. You will want to use different scales for larger or
smaller images both in gm convert's scale and plotxy's -S option, or
leave them out altogether, like this::
gm convert -flip img.fits pnm:- | $ANET_PATH/plotxy -I - -i img.axy -C red -P -w 2 -N50 -s circle -X X_IMAGE -Y Y_IMAGE > ws.png
for smaller images. Also, change the argument to -N if you change endob
in the solverParameters to get an idea which objects are actually looked
at.
What to Try
'''''''''''
In the case of calibration failures you may play around with SExtractor's
parameters DETECT_MINAREA and DETECT_THRESH. This is done by running::
calibrate.py --minarea=MINAREA --detectthreshold=DETECTTHRESH
DETECT_THRESH refers to the detection threshold (in sigma) above the local
background. A group (of pixels) is formed by a number of pixels connected
to each other whose values exceed the local threshold. DETECT_MINAREA sets
a lower bound on the number of pixels a group should have to trigger a
detection.
The default values used for the calibration are MINAREA = 300 and
DETECTTHRESH = 4. In some cases it is useful to decrease the MINAREA
parameter and to increase the detection reliability by increasing the
threshold value, e.g.::
calibrate.py --minarea=10 --detectthreshold=6
.. _utils.imgtools: http://docs.g-vo.org/DaCHS/apidoc/gavo.utils.imgtools-module.html