diff --git a/CHANGES.rst b/CHANGES.rst index 56334472..44482a3d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,13 @@ CHANGES ======= +0.3.0 +----- + +* Added bulk annotations for Projects and Datasets +* Changed csv parsing to proceed line-by-line (streaming) + + 0.2.2 ----- diff --git a/src/omero_cli_metadata.py b/src/omero_cli_metadata.py index 6701578e..5c18e4b1 100755 --- a/src/omero_cli_metadata.py +++ b/src/omero_cli_metadata.py @@ -476,20 +476,20 @@ def populate(self, args): cfgid = cfgann.getFile().getId() md.linkAnnotation(cfgann) - # Note some contexts only support a subset of these args - ctx = context_class(client, args.obj, file=args.file, fileid=fileid, - cfg=args.cfg, cfgid=cfgid, attach=args.attach, - options=localcfg) - ctx.parse() + loops = 0 + ms = 0 if not args.dry_run: wait = args.wait - if not wait: - loops = 0 - ms = 0 - else: + if wait: ms = 5000 loops = int((wait * 1000) / ms) + 1 - ctx.write_to_omero(batch_size=args.batch, loops=loops, ms=ms) + + # Note some contexts only support a subset of these args + ctx = context_class(client, args.obj, file=args.file, fileid=fileid, + cfg=args.cfg, cfgid=cfgid, attach=args.attach, + options=localcfg, batch_size=args.batch, + loops=loops, ms=ms) + ctx.parse() def rois(self, args): "Manage ROIs" diff --git a/src/populate_metadata.py b/src/populate_metadata.py index 3939eb79..d9e84c3d 100644 --- a/src/populate_metadata.py +++ b/src/populate_metadata.py @@ -99,14 +99,24 @@ def usage(error): DATASET_NAME_COLUMN = 'Dataset Name' IMAGE_NAME_COLUMN = 'Image Name' +ADDED_COLUMN_NAMES = [PLATE_NAME_COLUMN, + WELL_NAME_COLUMN, + DATASET_NAME_COLUMN, + IMAGE_NAME_COLUMN, + 'image'] + + COLUMN_TYPES = { 'plate': PlateColumn, 'well': WellColumn, 'image': ImageColumn, - 'roi': RoiColumn, 'd': DoubleColumn, 'l': LongColumn, 's': StringColumn, - 'b': BoolColumn + 'dataset': DatasetColumn, 'roi': RoiColumn, + 'd': DoubleColumn, 'l': LongColumn, 's': StringColumn, 'b': BoolColumn } REGEX_HEADER_SPECIFIER = r'# header ' +DEFAULT_TABLE_NAME = 'bulk_annotations' +MAX_COLUMN_COUNT = 512 + class Skip(object): """Instance to denote a row skip request.""" @@ -135,7 +145,7 @@ class HeaderResolver(object): } project_keys = { - 'dataset': DatasetColumn, + 'dataset': StringColumn, # DatasetColumn 'dataset_name': StringColumn, 'image': ImageColumn, 'image_name': StringColumn, @@ -243,11 +253,19 @@ def _create_columns(self, klass): else: try: keys = getattr(self, "%s_keys" % klass) - column = keys[header_as_lower]( - name, description, list()) + log.debug("Adding keys %r" % keys) + if keys[header_as_lower] is StringColumn: + column = keys[header_as_lower]( + name, description, + self.DEFAULT_COLUMN_SIZE, list()) + else: + column = keys[header_as_lower]( + name, description, list()) except KeyError: + log.debug("Adding string column %r" % name) column = StringColumn( name, description, self.DEFAULT_COLUMN_SIZE, list()) + log.debug("New column %r" % column) columns.append(column) append = [] for column in columns: @@ -289,8 +307,13 @@ def __init__(self, client, target_object): q = "select x.details.group.id from %s x where x.id = %d " % ( self.target_type, self.target_id ) - self.target_group = unwrap( - self.client.sf.getQueryService().projection(q, None)) + rows = unwrap( + self.client.sf.getQueryService().projection( + q, None, {'omero.group': '-1'})) + if rows is None or len(rows) != 1: + raise MetadataError( + "Cannot find %s:%d" % (self.target_type, self.target_id)) + self.target_group = rows[0][0] # The goal is to make this the only instance of # a if/elif/else block on the target_class. All # logic should be placed in a the concrete wrapper @@ -334,18 +357,43 @@ def resolve(self, column, value, row): if len(self.wrapper.images_by_id) == 1: images_by_id = self.wrapper.images_by_id.values()[0] else: - for column, plate in row: + for column, column_value in row: if column.__class__ is PlateColumn: - images_by_id = self.images_by_id[ - self.plates_by_name[plate].id.val + images_by_id = self.wrapper.images_by_id[ + self.wrapper.plates_by_name[column_value].id.val ] log.debug( - "Got plate %i", self.plates_by_name[plate].id.val + "Got plate %i", + self.wrapper.plates_by_name[column_value].id.val ) - break + break + elif column.name.lower() == "dataset name": + # DatasetColumn unimplemented at the momnet + # We can still access column names though + images_by_id = self.wrapper.images_by_id[ + self.wrapper.datasets_by_name[column_value].id.val + ] + log.debug( + "Got dataset %i", + self.wrapper.datasets_by_name[column_value].id.val + ) + break + elif column.name.lower() == "dataset": + # DatasetColumn unimplemented at the momnet + # We can still access column names though + images_by_id = self.wrapper.images_by_id[ + self.wrapper.datasets_by_id[ + int(column_value)].id.val + ] + log.debug( + "Got dataset %i", + self.wrapper.datasets_by_id[ + int(column_value)].id.val + ) + break if images_by_id is None: raise MetadataError( - 'Unable to locate Plate column in Row: %r' % row + 'Unable to locate Parent column in Row: %r' % row ) try: return images_by_id[long(value)].id.val @@ -357,6 +405,9 @@ def resolve(self, column, value, row): return self.wrapper.resolve_well(column, row, value) if PlateColumn is column_class: return self.wrapper.resolve_plate(column, row, value) + # Prepared to handle DatasetColumn + if DatasetColumn is column_class: + return self.wrapper.resolve_dataset(column, row, value) if column_as_lower in ('row', 'column') \ and column_class is LongColumn: try: @@ -651,7 +702,10 @@ def __init__(self, value_resolver): self._load() def get_image_id_by_name(self, iname, dname=None): - return self.images_by_name[iname] + return self.images_by_name[iname].id.val + + def get_image_name_by_id(self, iid, did): + return self.images_by_id[did][iid].name.val def _load(self): query_service = self.client.getSession().getQueryService() @@ -668,12 +722,12 @@ def _load(self): data = list() while True: parameters.page(len(data), 1000) - rv = unwrap(query_service.projection(( - 'select distinct i.id, i.name from Dataset as d ' + rv = query_service.findAllByQuery(( + 'select distinct i from Dataset as d ' 'join d.imageLinks as l ' 'join l.child as i ' 'where d.id = :id order by i.id desc'), - parameters, {'omero.group': '-1'})) + parameters, {'omero.group': '-1'}) if len(rv) == 0: break else: @@ -681,13 +735,17 @@ def _load(self): if not data: raise MetadataError('Could not find target object!') - for iid, iname in data: - self.images_by_id[iid] = iname + images_by_id = dict() + for image in data: + iname = image.name.val + iid = image.id.val + images_by_id[iid] = image if iname in self.images_by_name: raise Exception("Image named %s(id=%d) present. (id=%s)" % ( iname, self.images_by_name[iname], iid )) - self.images_by_name[iname] = iid + self.images_by_name[iname] = image + self.images_by_id[self.target_object.id.val] = images_by_id log.debug('Completed parsing dataset: %s' % self.target_name) @@ -695,12 +753,24 @@ class ProjectWrapper(PDIWrapper): def __init__(self, value_resolver): super(ProjectWrapper, self).__init__(value_resolver) - self.graph_by_id = defaultdict(lambda: dict()) - self.graph_by_name = defaultdict(lambda: dict()) + self.images_by_id = defaultdict(lambda: dict()) + self.images_by_name = defaultdict(lambda: dict()) + self.datasets_by_id = dict() + self.datasets_by_name = dict() self._load() def get_image_id_by_name(self, iname, dname=None): - return self.graph_by_name[dname][iname][2] + return self.images_by_name[dname][iname].id.val + + def get_image_name_by_id(self, iid, did=None): + return self.images_by_id[did][iid].name.val + + def resolve_dataset(self, column, row, value): + try: + return self.datasets_by_name[value].id.val + except KeyError: + log.warn('Project is missing dataset: %s' % value) + return Skip() def _load(self): query_service = self.client.getSession().getQueryService() @@ -718,7 +788,7 @@ def _load(self): while True: parameters.page(len(data), 1000) rv = unwrap(query_service.projection(( - 'select distinct d.id, d.name, i.id, i.name ' + 'select distinct d, i ' 'from Project p ' 'join p.datasetLinks as pdl ' 'join pdl.child as d ' @@ -734,9 +804,12 @@ def _load(self): raise MetadataError('Could not find target object!') seen = dict() - for row in data: - did, dname, iid, iname = row - + for dataset, image in data: + did = dataset.id.val + dname = dataset.name.val + iid = image.id.val + iname = image.name.val + log.info("Adding dataset:%d image:%s" % (did, iid)) if dname in seen and seen[dname] != did: raise Exception("Duplicate datasets: '%s' = %s, %s" % ( dname, seen[dname], did @@ -751,17 +824,43 @@ def _load(self): else: seen[ikey] = iid - self.graph_by_id[did][iid] = row - self.graph_by_name[dname][iname] = row + self.images_by_id[did][iid] = image + self.images_by_name[did][iname] = image + self.datasets_by_id[did] = dataset + self.datasets_by_name[dname] = dataset log.debug('Completed parsing project: %s' % self.target_object.id.val) +class ParsingUtilFactory(object): + + def get_filter_for_plate(self, column_index, target_name): + return lambda row: True if row[column_index] == target_name else False + + def get_generic_filter(self): + return lambda row: True + + def __init__(self, client, target_object, value_resolver): + self.target_object = target_object + self.target_class = target_object.__class__ + self.value_resolver = value_resolver + + def get_value_resolver(self): + return self.value_resolver + + def get_filter_function(self, column_index=-1): + if PlateI is self.target_class and column_index != -1: + return self.get_filter_for_plate( + column_index, unwrap(self.target_object.getName())) + else: + return self.get_generic_filter() + + class ParsingContext(object): """Generic parsing context for CSV files.""" def __init__(self, client, target_object, file=None, fileid=None, cfg=None, cfgid=None, attach=False, column_types=None, - options=None): + options=None, batch_size=1000, loops=10, ms=500): ''' This lines should be handled outside of the constructor: @@ -779,7 +878,10 @@ def __init__(self, client, target_object, file=None, fileid=None, self.target_object = target_object self.file = file self.column_types = column_types - self.value_resolver = ValueResolver(self.client, self.target_object) + self.value_resolver = ValueResolver(client, target_object) + self.parsing_util_factory = ParsingUtilFactory(client, + target_object, + self.value_resolver) def create_annotation_link(self): self.target_class = self.target_object.__class__ @@ -803,87 +905,194 @@ def get_column_widths(self): widths.append(None) return widths - def parse_from_handle(self, data): - rows = list(csv.reader(data, delimiter=',')) - first_row_is_types = HeaderResolver.is_row_column_types(rows[0]) + def preprocess_from_handle(self, data): + reader = csv.reader(data, delimiter=',') + first_row = reader.next() + header_row = first_row + first_row_is_types = HeaderResolver.is_row_column_types(first_row) header_index = 0 - rows_index = 1 if first_row_is_types: header_index = 1 - rows_index = 2 - log.debug('Header: %r' % rows[header_index]) - for h in rows[0]: + header_row = reader.next() + log.debug('Header: %r' % header_row) + for h in first_row: if not h: raise Exception('Empty column header in CSV: %s' - % rows[header_index]) + % first_row[header_index]) if self.column_types is None and first_row_is_types: - self.column_types = HeaderResolver.get_column_types(rows[0]) + self.column_types = HeaderResolver.get_column_types(first_row) log.debug('Column types: %r' % self.column_types) self.header_resolver = HeaderResolver( - self.target_object, rows[header_index], + self.target_object, header_row, column_types=self.column_types) self.columns = self.header_resolver.create_columns() log.debug('Columns: %r' % self.columns) + if len(self.columns) > MAX_COLUMN_COUNT: + log.warn("Column count exceeds max column count") + + self.preprocess_data(reader) + + def parse_from_handle_stream(self, data): + reader = csv.reader(data, delimiter=',') + first_row = reader.next() + header_row = first_row + first_row_is_types = HeaderResolver.is_row_column_types(first_row) + if first_row_is_types: + header_row = reader.next() + + plate_header_index = -1 + for i, name in enumerate(header_row): + if name.lower() == 'plate': + plate_header_index = i + break + + self.filter_function = self.parsing_util_factory.get_filter_function( + plate_header_index) + + table = self.create_table() + self.populate_from_reader(reader, self.filter_function, table, 1000) + self.create_file_annotation(table) - valuerows = rows[rows_index:] - log.debug('Got %d rows', len(valuerows)) - valuerows = self.value_resolver.subselect( - valuerows, rows[header_index]) - self.populate(valuerows) - self.post_process() log.debug('Column widths: %r' % self.get_column_widths()) log.debug('Columns: %r' % [ (o.name, len(o.values)) for o in self.columns]) + def create_table(self): + sf = self.client.getSession() + group = str(self.value_resolver.target_group) + sr = sf.sharedResources() + table = sr.newTable(1, DEFAULT_TABLE_NAME, {'omero.group': group}) + if table is None: + raise MetadataError( + "Unable to create table: %s" % DEFAULT_TABLE_NAME) + original_file = table.getOriginalFile() + log.info('Created new table OriginalFile:%d' % original_file.id.val) + table.initialize(self.columns) + return table + def parse(self): if self.file.endswith(".gz"): + data_for_preprocessing = gzip.open(self.file, "rb") data = gzip.open(self.file, "rb") else: + data_for_preprocessing = open(self.file, 'U') data = open(self.file, 'U') try: - return self.parse_from_handle(data) + self.preprocess_from_handle(data_for_preprocessing) + return self.parse_from_handle_stream(data) finally: data.close() - def populate(self, rows): - nrows = len(rows) - for (r, row) in enumerate(rows): - values = list() + def preprocess_data(self, reader): + # Get count of data columns - e.g. NOT Well Name + column_count = 0 + for column in self.columns: + if column.name not in ADDED_COLUMN_NAMES: + column_count += 1 + for i, row in enumerate(reader): row = [(self.columns[i], value) for i, value in enumerate(row)] for column, original_value in row: - log.debug('Row %d/%d Original value %s, %s', - r + 1, nrows, original_value, column.name) + log.debug('Original value %s, %s', + original_value, column.name) value = self.value_resolver.resolve( column, original_value, row) if value.__class__ is Skip: break - values.append(value) try: log.debug("Value's class: %s" % value.__class__) if value.__class__ is str: column.size = max(column.size, len(value)) + # The following are needed for + # getting post process column sizes + if column.__class__ is WellColumn: + column.values.append(value) + elif column.__class__ is ImageColumn: + column.values.append(value) + elif column.name.lower() == "plate": + column.values.append(value) except TypeError: log.error('Original value "%s" now "%s" of bad type!' % ( original_value, value)) raise - if value.__class__ is not Skip: - values.reverse() - for column in self.columns: - if not values: - if isinstance(column, ImageColumn) or \ - column.name in (PLATE_NAME_COLUMN, - WELL_NAME_COLUMN, - IMAGE_NAME_COLUMN): - # Then assume that the values will be calculated - # later based on another column. - continue - else: - msg = 'Column %s has no values.' % column.name - log.error(msg) - raise IndexError(msg) + self.post_process() + for column in self.columns: + column.values = [] + + def populate_row(self, row): + values = list() + row = [(self.columns[i], value) for i, value in enumerate(row)] + for column, original_value in row: + log.debug('Original value %s, %s', + original_value, column.name) + value = self.value_resolver.resolve( + column, original_value, row) + if value.__class__ is Skip: + break + values.append(value) + if value.__class__ is not Skip: + values.reverse() + for column in self.columns: + if not values: + if isinstance(column, ImageColumn) or \ + column.name in (PLATE_NAME_COLUMN, + WELL_NAME_COLUMN, + IMAGE_NAME_COLUMN): + # Then assume that the values will be calculated + # later based on another column. + continue else: - column.values.append(values.pop()) + msg = 'Column %s has no values.' % column.name + log.error(msg) + raise IndexError(msg) + else: + column.values.append(values.pop()) + + def populate_from_reader(self, + reader, + filter_function, + table, + batch_size=1000): + row_count = 0 + for (r, row) in enumerate(reader): + log.debug('Row %d', r) + if filter_function(row): + self.populate_row(row) + row_count = row_count + 1 + if row_count >= batch_size: + self.post_process() + table.addData(self.columns) + for column in self.columns: + column.values = [] + row_count = 0 + if row_count != 0: + log.debug("DATA TO ADD") + log.debug(self.columns) + self.post_process() + table.addData(self.columns) + table.close() + + def create_file_annotation(self, table): + sf = self.client.getSession() + group = str(self.value_resolver.target_group) + update_service = sf.getUpdateService() + + original_file = table.getOriginalFile() + file_annotation = FileAnnotationI() + file_annotation.ns = rstring( + 'openmicroscopy.org/omero/bulk_annotations') + file_annotation.description = rstring(DEFAULT_TABLE_NAME) + file_annotation.file = OriginalFileI(original_file.id.val, False) + link = self.create_annotation_link() + link.parent = self.target_object + link.child = file_annotation + update_service.saveObject(link, {'omero.group': group}) + + def populate(self, rows): + nrows = len(rows) + for (r, row) in enumerate(rows): + log.debug('Row %d/%d', r + 1, nrows) + self.populate_row(row) def post_process(self): target_class = self.target_object.__class__ @@ -893,8 +1102,10 @@ def post_process(self): plate_name_column = None image_column = None image_name_column = None + resolve_image_names = False + resolve_image_ids = False for column in self.columns: - columns_by_name[column.name] = column + columns_by_name[column.name.lower()] = column if column.__class__ is PlateColumn: log.warn("PlateColumn is unimplemented") elif column.__class__ is WellColumn: @@ -905,8 +1116,16 @@ def post_process(self): plate_name_column = column elif column.name == IMAGE_NAME_COLUMN: image_name_column = column + log.debug("Image name column len:%d" % len(column.values)) + if len(column.values) > 0: + resolve_image_ids = True + log.debug("Resolving Image Ids") elif column.__class__ is ImageColumn: image_column = column + log.debug("Image column len:%d" % len(column.values)) + if len(column.values) > 0: + resolve_image_names = True + log.debug("Resolving Image Ids") if well_name_column is None and plate_name_column is None \ and image_name_column is None: @@ -921,8 +1140,8 @@ def post_process(self): try: well_id = well_column.values[i] plate = None - if "Plate" in columns_by_name: # FIXME - plate = columns_by_name["Plate"].values[i] + if "plate" in columns_by_name: # FIXME + plate = columns_by_name["plate"].values[i] v = self.value_resolver.get_well_name(well_id, plate) except KeyError: log.warn( @@ -936,30 +1155,60 @@ def post_process(self): if image_name_column is not None and ( DatasetI is target_class or - ProjectI is target_class): + ProjectI is target_class) and \ + resolve_image_names and not resolve_image_ids: + iname = "" + try: + log.debug(image_name_column) + iid = image_column.values[i] + did = self.target_object.id.val + if "dataset name" in columns_by_name: + dname = columns_by_name["dataset name"].values[i] + did = self.value_resolver.wrapper.datasets_by_name[ + dname].id.val + elif "dataset" in columns_by_name: + did = int(columns_by_name["dataset"].values[i]) + log.debug("Using Dataset:%d" % did) + iname = self.value_resolver.get_image_name_by_id( + iid, did) + except KeyError: + log.warn( + "%d not found in image ids" % iid) + assert i == len(image_name_column.values) + image_name_column.values.append(iname) + image_name_column.size = max( + image_name_column.size, len(iname)) + elif image_name_column is not None and ( + DatasetI is target_class or + ProjectI is target_class) and \ + resolve_image_ids and not resolve_image_names: iid = -1 try: + log.debug(image_column) iname = image_name_column.values[i] - did = None - if "Dataset Name" in columns_by_name: # FIXME - did = columns_by_name["Dataset Name"].values[i] + did = self.target_object.id.val + if "dataset name" in columns_by_name: + dname = columns_by_name["dataset name"].values[i] + did = self.value_resolver.wrapper.datasets_by_name[ + dname].id.val + elif "dataset" in columns_by_name: + did = int(columns_by_name["dataset"].values[i]) + log.debug("Using Dataset:%d" % did) iid = self.value_resolver.get_image_id_by_name( iname, did) except KeyError: log.warn( - "%s not found in image names" % iname) + "%d not found in image ids" % iid) assert i == len(image_column.values) image_column.values.append(iid) - image_name_column.size = max( - image_name_column.size, len(iname)) elif image_name_column is not None and ( ScreenI is target_class or PlateI is target_class): iid = image_column.values[i] log.info("Checking image %s", iid) pid = None - if 'Plate' in columns_by_name: - pid = columns_by_name['Plate'].values[i] + if 'plate' in columns_by_name: + pid = columns_by_name['plate'].values[i] iname = self.value_resolver.get_image_name_by_id(iid, pid) image_name_column.values.append(iname) image_name_column.size = max( @@ -969,59 +1218,13 @@ def post_process(self): log.info('Missing image name column, skipping.') if plate_name_column is not None: - plate = columns_by_name['Plate'].values[i] # FIXME + plate = columns_by_name['plate'].values[i] # FIXME v = self.value_resolver.get_plate_name_by_id(plate) plate_name_column.size = max(plate_name_column.size, len(v)) plate_name_column.values.append(v) else: log.info('Missing plate name column, skipping.') - def write_to_omero(self, batch_size=1000, loops=10, ms=500): - sf = self.client.getSession() - group = str(self.value_resolver.target_group) - sr = sf.sharedResources() - update_service = sf.getUpdateService() - name = 'bulk_annotations' - table = sr.newTable(1, name, {'omero.group': group}) - if table is None: - raise MetadataError( - "Unable to create table: %s" % name) - original_file = table.getOriginalFile() - log.info('Created new table OriginalFile:%d' % original_file.id.val) - - values = [] - length = -1 - for x in self.columns: - if length < 0: - length = len(x.values) - else: - assert length == len(x.values) - values.append(x.values) - x.values = None - - table.initialize(self.columns) - log.info('Table initialized with %d columns.' % (len(self.columns))) - - i = 0 - for pos in xrange(0, length, batch_size): - i += 1 - for idx, x in enumerate(values): - self.columns[idx].values = x[pos:pos+batch_size] - table.addData(self.columns) - count = min(batch_size, length - pos) - log.info('Added %s rows of column data (batch %s)', count, i) - - table.close() - file_annotation = FileAnnotationI() - file_annotation.ns = rstring( - 'openmicroscopy.org/omero/bulk_annotations') - file_annotation.description = rstring(name) - file_annotation.file = OriginalFileI(original_file.id.val, False) - link = self.create_annotation_link() - link.parent = self.target_object - link.child = file_annotation - update_service.saveObject(link, {'omero.group': group}) - class _QueryContext(object): """ @@ -1136,7 +1339,8 @@ class BulkToMapAnnotationContext(_QueryContext): """ def __init__(self, client, target_object, file=None, fileid=None, - cfg=None, cfgid=None, attach=False, options=None): + cfg=None, cfgid=None, attach=False, options=None, + batch_size=1000, loops=10, ms=10, dry_run=False): """ :param client: OMERO client object :param target_object: The object to be annotated @@ -1172,6 +1376,10 @@ def __init__(self, client, target_object, file=None, fileid=None, self.options = {} if options: self.options = options + if batch_size: + self.batch_size = batch_size + else: + self.batch_size = 1000 def _init_namespace_primarykeys(self): try: @@ -1321,9 +1529,10 @@ def parse(self): assert table try: - return self.populate(table) + self.populate(table) finally: table.close() + self.write_to_omero() def _get_additional_targets(self, target): iids = [] @@ -1409,7 +1618,7 @@ def idcolumn_to_omeroclass(col): def _write_log(self, text): log.debug("BulkToMapAnnotation:write_to_omero - %s" % text) - def write_to_omero(self, batch_size=1000, loops=10, ms=500): + def write_to_omero(self): i = 0 cur = 0 links = [] @@ -1422,22 +1631,25 @@ def write_to_omero(self, batch_size=1000, loops=10, ms=500): for cma in cmas: batch, ma = self._create_map_annotation_links(cma) self._write_log("found batch of size %s" % len(batch)) - if len(batch) < batch_size: + if len(batch) < self.batch_size: links.append(batch) cur += len(batch) - if cur > 10 * batch_size: + if cur > 10 * self.batch_size: self._write_log("running batches. accumulated: %s" % cur) - i += self._write_links(links, batch_size, i) + i += self._write_links(links, self.batch_size, i) links = [] cur = 0 else: self._write_log("running grouped_batch") - sz = self._save_annotation_and_links(batch, ma, batch_size) + print("_save_annotation_and_links with batch size %d" % + (self.batch_size)) + sz = self._save_annotation_and_links(batch, ma, + self.batch_size) i += sz log.info('Created/linked %d MapAnnotations (total %s)', sz, i) # Handle any remaining writes - i += self._write_links(links, batch_size, i) + i += self._write_links(links, self.batch_size, i) def _write_links(self, links, batch_size, i): count = 0 @@ -1457,7 +1669,9 @@ class DeleteMapAnnotationContext(_QueryContext): """ def __init__(self, client, target_object, file=None, fileid=None, - cfg=None, cfgid=None, attach=False, options=None): + cfg=None, cfgid=None, attach=False, options=None, + batch_size=1000, loops=10, ms=500, dry_run=False): + """ :param client: OMERO client object :param target_object: The object to be processed @@ -1481,9 +1695,16 @@ def __init__(self, client, target_object, file=None, fileid=None, self.options = {} if options: self.options = options + if batch_size: + self.batch_size = batch_size + else: + self.batch_size = 1000 + self.loops = loops + self.ms = ms def parse(self): - return self.populate() + self.populate() + self.write_to_omero() def _get_annotations_for_deletion( self, objtype, objids, anntype, nss, getlink=False): @@ -1646,14 +1867,14 @@ def populate(self): log.debug("FileAnnotations in %s: %s", [NSBULKANNOTATIONSCONFIG], self.fileannids) - def write_to_omero(self, batch_size=1000, loops=10, ms=500): + def write_to_omero(self): for objtype, maids in self.mapannids.iteritems(): - for batch in self._batch(maids, sz=batch_size): + for batch in self._batch(maids, sz=self.batch_size): self._write_to_omero_batch( - {"%sAnnotationLink" % objtype: batch}, loops, ms) - for batch in self._batch(self.fileannids, sz=batch_size): + {"%sAnnotationLink" % objtype: batch}, self.loops, self.ms) + for batch in self._batch(self.fileannids, sz=self.batch_size): self._write_to_omero_batch({"FileAnnotation": batch}, - loops, ms) + self.loops, self.ms) def _write_to_omero_batch(self, to_delete, loops=10, ms=500): del_cmd = omero.cmd.Delete2(targetObjects=to_delete) @@ -1680,6 +1901,8 @@ def parse_target_object(target_object): type, id = target_object.split(':') if 'Dataset' == type: return DatasetI(long(id), False) + if 'Project' == type: + return ProjectI(long(id), False) if 'Plate' == type: return PlateI(long(id), False) if 'Screen' == type: @@ -1716,7 +1939,6 @@ def parse_column_types(column_type_list): password = None hostname = 'localhost' port = 4064 # SSL - info = False session_key = None logging_level = logging.INFO thread_count = 1 @@ -1731,8 +1953,6 @@ def parse_column_types(column_type_list): hostname = argument if option == "-p": port = int(argument) - if option == "-i": - info = True if option == "-k": session_key = argument if option == "-d": @@ -1767,10 +1987,12 @@ def parse_column_types(column_type_list): log.debug('Creating pool of %d threads' % thread_count) thread_pool = ThreadPool(thread_count) ctx = context_class( - client, target_object, file, column_types=column_types) + client, + target_object, + file=file, + column_types=column_types) + ctx.parse() - if not info: - ctx.write_to_omero() finally: pass client.closeSession() diff --git a/test/integration/metadata/test_populate.py b/test/integration/metadata/test_populate.py index 878531b0..dcecc94c 100644 --- a/test/integration/metadata/test_populate.py +++ b/test/integration/metadata/test_populate.py @@ -44,7 +44,7 @@ from omero.rtypes import rdouble, rlist, rstring, unwrap from omero.sys import ParametersI -from omero.util.populate_metadata import ( +from populate_metadata import ( get_config, ParsingContext, BulkToMapAnnotationContext, @@ -905,12 +905,10 @@ def _test_parsing_context(self, fixture, batch_size): self.delete(child_anns) csv = fixture.get_csv() - ctx = ParsingContext(self.client, target, file=csv) + ctx = ParsingContext(self.client, + target, + file=csv) ctx.parse() - if batch_size is None: - ctx.write_to_omero() - else: - ctx.write_to_omero(batch_size=batch_size, loops=10, ms=250) # Get file annotations anns = fixture.get_annotations() @@ -950,14 +948,11 @@ def _test_bulk_to_map_annotation_context(self, fixture, batch_size): anns = fixture.get_annotations() fileid = anns[0].file.id.val ctx = BulkToMapAnnotationContext( - self.client, target, fileid=fileid, cfg=cfg) - ctx.parse() + self.client, target, fileid=fileid, cfg=cfg, batch_size=batch_size) + assert len(fixture.get_child_annotations()) == 0 + ctx.parse() - if batch_size is None: - ctx.write_to_omero() - else: - ctx.write_to_omero(batch_size=batch_size) oas = fixture.get_child_annotations() assert len(oas) == fixture.ann_count fixture.assert_child_annotations(oas) @@ -969,14 +964,12 @@ def _test_delete_map_annotation_context(self, fixture, batch_size): cfg = fixture.get_cfg() target = fixture.get_target() - ctx = DeleteMapAnnotationContext(self.client, target, cfg=cfg) - ctx.parse() + ctx = DeleteMapAnnotationContext(self.client, target, + cfg=cfg, batch_size=batch_size) + assert len(fixture.get_child_annotations()) == fixture.ann_count + ctx.parse() - if batch_size is None: - ctx.write_to_omero() - else: - ctx.write_to_omero(batch_size=batch_size) assert len(fixture.get_child_annotations()) == 0 assert len(fixture.get_all_map_annotations()) == 0 @@ -1103,11 +1096,10 @@ def _test_bulk_to_map_annotation_dedup(self, fixture1, fixture2, ns): fileid = anns[0].file.id.val ctx = BulkToMapAnnotationContext( self.client, target, fileid=fileid, cfg=cfg, options=options) - ctx.parse() assert len(fixture1.get_child_annotations()) == ann_count assert len(fixture2.get_child_annotations()) == 0 - ctx.write_to_omero() + ctx.parse() oas1 = fixture1.get_child_annotations() oas2 = fixture2.get_child_annotations() @@ -1160,7 +1152,6 @@ def _test_delete_map_annotation_context_dedup( self.client, fixture1.get_target(), cfg=fixture1.get_cfg(), options=options) ctx.parse() - ctx.write_to_omero(loops=10, ms=250) if ns == NSBULKANNOTATIONS: assert len(fixture1.get_child_annotations()) == 8 @@ -1179,7 +1170,6 @@ def _test_delete_map_annotation_context_dedup( self.client, fixture2.get_target(), cfg=fixture2.get_cfg(), options=options) ctx.parse() - ctx.write_to_omero(loops=10, ms=250) if ns == NSBULKANNOTATIONS: assert len(fixture1.get_child_annotations()) == 8 @@ -1275,7 +1265,6 @@ def test_delete_attach(self, ns, attach): ctx = DeleteMapAnnotationContext( self.client, target, attach=attach, options=options) ctx.parse() - ctx.write_to_omero() after = self._get_annotations_config(fixture) if attach and ns != NSBULKANNOTATIONS: diff --git a/test/integration/tablestest/test_populate_metadata.py b/test/integration/tablestest/test_populate_metadata.py index 013dfc4e..b033fe52 100644 --- a/test/integration/tablestest/test_populate_metadata.py +++ b/test/integration/tablestest/test_populate_metadata.py @@ -29,7 +29,7 @@ from omero.model import PlateI, WellI, WellSampleI, OriginalFileI from omero.rtypes import rint, rstring, unwrap -from omero.util.populate_metadata import ParsingContext +from populate_metadata import ParsingContext from omero.constants.namespaces import NSBULKANNOTATIONS @@ -79,9 +79,10 @@ def test_populate_metadata_plate(self): row_count = 1 col_count = 2 plate = self.create_plate(row_count, col_count) - ctx = ParsingContext(self.client, plate, csv_name) + ctx = ParsingContext(self.client, + plate, + file=csv_name) ctx.parse() - ctx.write_to_omero() # Delete local temp file os.remove(csv_name)