Using the python elasticsearch SDK to list objects in a bucket

Querying elasticsearch directly is not supported or recommended but here is a script that does that to list the objects in a bucket. This might be necessary in an emergency situation or to diagnose an elasticsearch cluster.

#!/usr/bin/env python3 """ USE OF THIS SCRIPT IS NOT SUPPORTED OR GENERALLY RECOMMENDED! You should use the Swarm search apis to list objects. But this script shows how the official elasticsearch SDK/api can be used to directly list the objects in a Swarm domain's bucket. Remember our schema will change so do not rely on this script! This script must run on the internal swarm network, since it contacts elasticsearch directly. Progress messages and errors are written to stderr, the objects' json to stdout. REQUIREMENTS: # The bucket's alias uuid $ curl -fsS --head --location-trusted "http://SWARM-IP/MYBUCKET?domain=MYDOMAIN.EXAMPLE.COM" | grep -i Castor-System-Alias Castor-System-Alias: 94efeee3c02195420930273e2752dbb6 # The index name or alias for the default feed in elasticsearch $ swarmctl -d swarm --feeds --feed-type searchfeeds | grep indexAlias 'indexAlias': 'myswarmcluster0', EXAMPLES: $ ./search-swarm-index.py --elasticsearch docklab1.tx.datacore.com --searchfeedindex index_lotsofdisks-cluster0 --contextid 94efeee3c02195420930273e2752dbb6 $ ./search-swarm-index.py ... --sort "name,tmborn:desc" --versions --fields "name,versioned,versionid,tmborn" $ ./search-swarm-index.py ... --sort "tmBorn:desc,etag" $ ./search-swarm-index.py ... --sort "etag" --search-after "a" --search-until "ag" # Start 16 jobs to list all objects in the bucket by sorting on ETag $ for x in 0 1 2 3 4 5 6 7 8 9 0 a b c d e f ; do ./search-swarm-index.py --contextid "94efeee3c02195420930273e2752dbb6" --sort "etag" --search-after "${x}" --search-until "${x}g" --limit 10000 > /tmp/list-${x}.json & done && wait TODO: - retry on elasticsearch failures like timeout - query swarm or gateway for the bucket's alias uuid - query swarm management api for the cluster name and feed index - support versioned objects - support --fields - support and test "pit" (point in time) queries """ import traceback import sys import math import time import getopt import json import elasticsearch try: from elasticsearch import Elasticsearch except: print('Run: python3 -m pip install "elasticsearch<7.0.0"') raise import logging # The aliases here should be lowercase and use dash instead of underscore. FIELD_ALIASES = { 'tmBorn' : { "date", "last-modified" }, 'etag' : { "hash" } } def userFieldToES(fieldName): for f in FIELD_ALIASES: if fieldName.lower() == f.lower() or fieldName.lower().replace('_','-') in FIELD_ALIASES[f]: return f return fieldName def help(): print("Usage:") print(" search-swarm-index.py") print(" --elasticsearch <url> # http://indexer1:9200") print(" --searchfeedindex <indexname> # index_myswarm.example.com0") print(" --contextid <uuid> # 86e8946a98e3fd7f1d313380af7bbb53 ") print(" -h --help = Print usage information") if __name__ == '__main__': args = sys.argv[1:] try: opts, args = getopt.getopt(args, "he:i:s:f:l:a:t:pd",["help", "elasticsearch=", "searchfeedindex=", "contextid=", "sort=", "fields=", "limit=", "search-after=", "search-until=", "timeout=", "pit", "debug"]) except getopt.GetoptError as e: print("Error parsing options: " + e.msg) help() sys.exit(2) debug = False es = '' # http://docklab1.tx.datacore.com:9200 searchfeedindex = '' # index_lotsofdisks-cluster0 contextid = '' # 94efeee3c02195420930273e2752dbb6 sort_by = [ 'name' ] fields = [] search_after = [] search_until = [] limit = 10000 timeout = 60 for opt, arg in opts: if debug: print("opt="+opt + " " + arg) if opt in ('-h', "--help"): help() sys.exit() elif opt in ("-e", "--elasticsearch"): es = arg.strip() elif opt in ("-i", "--searchfeedindex"): searchfeedindex = arg.strip() elif opt in ("-c", "--contextid"): contextid = arg.strip() elif opt in ("-t", "--timeout"): timeout = int(arg.strip()) elif opt in ("-l", "--limit"): limit = int(arg.strip()) elif opt in ("-s", "--sort"): sort_by = arg.strip().split(',') for i in range(len(sort_by)): s = sort_by[i].split(':') if len(s) == 1: sort_by[i] = userFieldToES(sort_by[i]) else: if not s[1] in [ 'asc', 'desc']: print(f'Must specify sort order like: --sort name:desc not "{s[0]}:{s[1]}"', file=sys.stderr) sys.exit(1) sort_by[i] = { userFieldToES(s[0]) : s[1] } elif opt in ("-f", "--fields"): fields = arg.strip().split(',') elif opt in ("-a", "--search-after"): search_after.append(arg.strip()) elif opt in ("-t", "--search-until"): search_until.append(arg.strip()) elif opt in ("-d", "--debug"): debug = True if not es or not searchfeedindex or not contextid: print("The elasticsearch endpoint and the bucket's contextid must be supplied") help() sys.exit(2) if debug: print(f'Checking ES version on "{es}" and enabling trace on requests, not curls will show localhost...', file=sys.stderr) es_trace_logger = logging.getLogger('elasticsearch.trace') es_trace_logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() es_trace_logger.addHandler(handler) client = Elasticsearch(es, timeout=timeout) # Call an API, in this example `info()` resp = client.info() if debug: print('elasticsearch {} info: {}'.format(es, resp)) time.sleep(1) total_search_time = 0 total_requests = 0 total_objects = 0 while True: try: query = { "query": { "bool": { "filter": [ { "match": { "contextid": contextid } } ] }, }, "sort": sort_by } if search_after: query["search_after"] = search_after if search_until: query["query"]["bool"]["filter"].append({ "range" : { sort_by[0] : { "lt" : search_until[0] } } }) resp = client.search( index=searchfeedindex, size=limit, params = {"track_total_hits":"true", "pretty" : "true"}, body=json.dumps(query) ) total_requests += 1 total_search_time += resp['took'] hits = resp['hits']['hits'] total_objects += len(hits) for h in hits: print(json.dumps(h['_source'])) objects_per_sec = total_objects / ( total_search_time / 1000.0 ) if hits: print('# Page {} / {} took {}s reporting a total of {} objects, returning a page of "{}" objects starting after "{}" up to "{}", for an ongoing total of {}s and {} documents at {} obj/sec, est {}s of {}s ({}h) left.'.format(total_requests, math.ceil(resp['hits']['total']['value'] / limit), round(resp['took'] / 1000,1), resp['hits']['total']['value'], len(hits), search_after if search_after else '(starting)', search_until if search_until else '(unlimited)', round(total_search_time/1000), total_objects, round(objects_per_sec), round(( resp['hits']['total']['value'] - total_objects) / objects_per_sec), round( resp['hits']['total']['value'] / objects_per_sec ), round( resp['hits']['total']['value'] / objects_per_sec / 60 / 60, 1 )), file=sys.stderr) search_after = resp['hits']['hits'][-1]['sort'] else: break except Exception as e: if total_objects > 0: print("This list of objects is incomplete because a failure was encountered.") print('Exception: '+str(e), file=sys.stderr) traceback.print_stack(sys.stderr) break print('FINISHED: took total of "{}" seconds of search across "{}" search requests with a page size of "{}" to get a total of "{}" objects.'.format(total_search_time / 1000, total_requests, limit, total_objects), file=sys.stderr)

 

© DataCore Software Corporation. · https://www.datacore.com · All rights reserved.