2023-09-08 08:25:59 +00:00
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
from subprocess import run, CalledProcessError
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
|
|
|
converted_dir = "converted"
|
|
|
|
|
|
|
|
def is_image_file(path):
|
|
|
|
lower_path = path.lower()
|
|
|
|
return lower_path.endswith('.png') or lower_path.endswith('.jpg') or lower_path.endswith('.jpeg')
|
|
|
|
|
|
|
|
def convert_image(image_path):
|
|
|
|
print(f"Converting {image_path}...")
|
|
|
|
converted_path = os.path.join(converted_dir, os.path.basename(image_path))
|
|
|
|
cmd = [
|
|
|
|
"convert",
|
|
|
|
image_path,
|
|
|
|
"-colorspace", "Gray",
|
|
|
|
"-resize", "300%",
|
|
|
|
"-threshold", "55%",
|
|
|
|
"-type", "Grayscale",
|
|
|
|
converted_path
|
|
|
|
]
|
|
|
|
|
|
|
|
try:
|
|
|
|
run(cmd, check=True)
|
|
|
|
print(f"Converted image output to {converted_path}!")
|
|
|
|
return converted_path
|
|
|
|
except CalledProcessError:
|
|
|
|
print(f"Error converting {image_path} with ImageMagick. Using original for Tesseract.")
|
|
|
|
return image_path
|
|
|
|
|
|
|
|
def ocr_image(image_path):
|
|
|
|
print(f"OCR'ing {image_path}...")
|
|
|
|
text_filename = os.path.basename(image_path).replace(".jpg", ".txt")
|
|
|
|
text_path = os.path.join(converted_dir, text_filename)
|
|
|
|
cmd = ["tesseract", image_path, text_path.replace(".txt", "")]
|
|
|
|
try:
|
|
|
|
run(cmd, check=True)
|
|
|
|
print(f"OCRed to {text_path}!")
|
|
|
|
return text_path
|
|
|
|
except CalledProcessError:
|
|
|
|
print(f"Error processing {image_path} with Tesseract. Skipping.")
|
|
|
|
return None
|
|
|
|
|
|
|
|
def process_image(image_path):
|
|
|
|
converted_path = convert_image(image_path)
|
|
|
|
print(f"OCR'ing image {image_path} (now at {converted_path})...")
|
|
|
|
text_path = ocr_image(converted_path)
|
|
|
|
if text_path and os.path.exists(text_path):
|
|
|
|
with open(text_path, 'r') as text_file:
|
|
|
|
text_content = text_file.read()
|
|
|
|
print(f"Added text from {text_path} to final output.")
|
|
|
|
return text_content
|
|
|
|
else:
|
|
|
|
print(f"Cannot locate {text_path}! Cannot add text to final output!")
|
|
|
|
return None
|
|
|
|
|
|
|
|
def main(directory_path):
|
|
|
|
final_text = []
|
|
|
|
|
|
|
|
if not os.path.exists(converted_dir):
|
|
|
|
os.mkdir(converted_dir)
|
|
|
|
|
|
|
|
image_paths = []
|
|
|
|
for root, dirs, files in os.walk(directory_path):
|
|
|
|
for file in files:
|
|
|
|
image_path = os.path.join(root, file)
|
|
|
|
if is_image_file(image_path):
|
|
|
|
image_paths.append(image_path)
|
|
|
|
|
|
|
|
# Use a ThreadPoolExecutor to process images in parallel
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
|
|
final_text = list(executor.map(process_image, image_paths))
|
|
|
|
|
|
|
|
# Filter out any None values and write the text to final.txt
|
|
|
|
final_text = [text for text in final_text if text is not None]
|
2023-09-08 09:58:19 +00:00
|
|
|
FINAL_OUTPUT = "final.txt"
|
|
|
|
with open(FINAL_OUTPUT, 'w') as f:
|
2023-09-08 08:25:59 +00:00
|
|
|
f.write("\n".join(final_text))
|
|
|
|
|
2023-09-08 09:58:19 +00:00
|
|
|
print(f"All images processed! Final output saved to {FINAL_OUTPUT}")
|
2023-09-11 16:09:18 +00:00
|
|
|
return final_text # Add this line
|
2023-09-08 09:58:19 +00:00
|
|
|
|
|
|
|
|
2023-09-08 08:25:59 +00:00
|
|
|
if __name__ == "__main__":
|
|
|
|
if len(sys.argv) != 2:
|
|
|
|
print("Usage: python images2text.py <directory_path>")
|
|
|
|
sys.exit(1)
|
|
|
|
main(sys.argv[1])
|