Created
July 6, 2021 20:18
-
-
Save ocuil/12cbe4e9e2b82d9b7609d6fd8f22f17e to your computer and use it in GitHub Desktop.
python deduplicate docs stored in elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code come from https://www.elastic.co/es/blog/how-to-find-and-remove-duplicate-documents-in-elasticsearch | |
# Just add the step to clear duplicate docs :-) | |
# Thanks @elastic !!! =) | |
import hashlib | |
from elasticsearch import Elasticsearch | |
es = Elasticsearch( | |
cloud_id="XXXXXXXXX:YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY", | |
http_auth=("USER", "PASSWORD"), | |
) | |
dict_of_duplicate_docs = {} | |
# La línea siguiente define los campos que | |
# se usarán para determinar si un documento es un duplicado. | |
keys_to_include_in_hash = ["key1", "key2", | |
"key3", "key4", "key5"] | |
# Procesar documentos que devuelve search/scroll actual | |
def populate_dict_of_duplicate_docs(hits): | |
for item in hits: | |
combined_key = "" | |
for mykey in keys_to_include_in_hash: | |
combined_key += str(item['_source'][mykey]) | |
_id = item["_id"] | |
hashval = hashlib.md5(combined_key.encode('utf-8')).digest() | |
# Si el hashval es nuevo, crearemos una clave nueva | |
# en dict_of_duplicate_docs, a la que se | |
# asignará un valor de una matriz vacía. | |
# Luego insertamos inmediatamente el _id en la matriz. | |
# Si el hashval ya existe, entonces | |
# solo insertaremos el _id nuevo en la matriz existente. | |
dict_of_duplicate_docs.setdefault(hashval, []).append(_id) | |
# Realiza el bucle por todos los documentos del índice y completa | |
# la estructura de datos dict_of_duplicate_docs. | |
def scroll_over_all_docs(): | |
data = es.search(index="INDEX", scroll='1m', | |
body={"query": {"match_all": {}}}) | |
# Obtén la ID de scroll. | |
sid = data['_scroll_id'] | |
scroll_size = len(data['hits']['hits']) | |
# Antes del scroll, procesa el batch actual de resultados. | |
populate_dict_of_duplicate_docs(data['hits']['hits']) | |
while scroll_size > 0: | |
data = es.scroll(scroll_id=sid, scroll='2m') | |
# Procesa el batch actual de resultados. | |
populate_dict_of_duplicate_docs(data['hits']['hits']) | |
# Actualiza la ID de scroll. | |
sid = data['_scroll_id'] | |
# Obtén la cantidad de resultados del último scroll. | |
scroll_size = len(data['hits']['hits']) | |
def loop_over_hashes_and_remove_duplicates(): | |
count = 0 | |
# Busca en el hash de valores de documentos para comprobar | |
# si se encontraron hashes duplicados. | |
for hashval, array_of_ids in dict_of_duplicate_docs.items(): | |
if len(array_of_ids) > 1: | |
count += 1 | |
print("********** Duplicate docs hash=%s **********" % hashval) | |
# Obtén los documentos mapeados al hashval actual. | |
matching_docs = es.mget(index="INDEX", doc_type="doc", body={ | |
"ids": array_of_ids}) | |
es.delete(index=matching_docs['docs'][1]['_index'], | |
id=matching_docs['docs'][1]['_id'] | |
) | |
print(count) | |
def main(): | |
scroll_over_all_docs() | |
loop_over_hashes_and_remove_duplicates() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment