extracted contants and added logging
This commit is contained in:
parent
c3bdf2dd5e
commit
a24d0ea96f
|
@ -1,16 +1,19 @@
|
||||||
import sys
|
import sys
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from logging_config import setup_logging
|
||||||
from images2text import main as ocr_images
|
from images2text import main as ocr_images
|
||||||
from prompt4cards import prompt_for_card_content, response_to_json
|
from prompt4cards import prompt_for_card_content, response_to_json
|
||||||
from json2deck import to_package
|
from json2deck import to_package
|
||||||
|
|
||||||
|
setup_logging()
|
||||||
|
|
||||||
def images_to_package(directory_path, outfile):
|
def images_to_package(directory_path, outfile):
|
||||||
ocr_text = ocr_images(directory_path)
|
ocr_text = ocr_images(directory_path)
|
||||||
response_text = prompt_for_card_content(ocr_text)
|
response_text = prompt_for_card_content(ocr_text)
|
||||||
deck_json = response_to_json(response_text)
|
deck_json = response_to_json(response_text)
|
||||||
to_package(deck_json).write_to_file(outfile)
|
to_package(deck_json).write_to_file(outfile)
|
||||||
print(f"Deck created at: {outfile}")
|
logging.info(f"Deck created at: {outfile}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
14
constants.py
Normal file
14
constants.py
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
# File and Directory Constants
|
||||||
|
CONVERTED_DIR = "converted"
|
||||||
|
FINAL_OUTPUT = "final.txt"
|
||||||
|
IMAGE_EXTENSIONS = ['.png', '.jpg', '.jpeg']
|
||||||
|
OUTPUT_FILENAME = "output_deck.json"
|
||||||
|
|
||||||
|
# API Constants
|
||||||
|
API_KEY_ENV = "OPENAI_API_KEY"
|
||||||
|
CHAT_MODEL = "gpt-3.5-turbo"
|
||||||
|
|
||||||
|
# Error Messages
|
||||||
|
NO_IMAGE_PART_ERROR = 'No image part'
|
||||||
|
NO_SELECTED_FILE_ERROR = 'No selected file'
|
||||||
|
INVALID_FILENAME_ERROR = 'Invalid filename'
|
|
@ -1,17 +1,20 @@
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from logging_config import setup_logging
|
||||||
from subprocess import run, CalledProcessError
|
from subprocess import run, CalledProcessError
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from utilities import is_image_file, ensure_directory_exists
|
||||||
|
from constants import CONVERTED_DIR, FINAL_OUTPUT
|
||||||
|
|
||||||
converted_dir = "converted"
|
|
||||||
|
|
||||||
def is_image_file(path):
|
setup_logging()
|
||||||
lower_path = path.lower()
|
|
||||||
return lower_path.endswith('.png') or lower_path.endswith('.jpg') or lower_path.endswith('.jpeg')
|
|
||||||
|
|
||||||
def convert_image(image_path):
|
def convert_image(image_path):
|
||||||
print(f"Converting {image_path}...")
|
logging.info(f"Converting {image_path}...")
|
||||||
converted_path = os.path.join(converted_dir, os.path.basename(image_path))
|
converted_path = os.path.join(CONVERTED_DIR, os.path.basename(image_path))
|
||||||
cmd = [
|
cmd = [
|
||||||
"convert",
|
"convert",
|
||||||
image_path,
|
image_path,
|
||||||
|
@ -24,43 +27,45 @@ def convert_image(image_path):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
run(cmd, check=True)
|
run(cmd, check=True)
|
||||||
print(f"Converted image output to {converted_path}!")
|
logging.info(f"Converted image output to {converted_path}!")
|
||||||
return converted_path
|
return converted_path
|
||||||
except CalledProcessError:
|
except CalledProcessError:
|
||||||
print(f"Error converting {image_path} with ImageMagick. Using original for Tesseract.")
|
logging.info(f"Error converting {image_path} with ImageMagick. Using original for Tesseract.")
|
||||||
return image_path
|
return image_path
|
||||||
|
|
||||||
|
|
||||||
def ocr_image(image_path):
|
def ocr_image(image_path):
|
||||||
print(f"OCR'ing {image_path}...")
|
logging.info(f"OCR'ing {image_path}...")
|
||||||
text_filename = os.path.basename(image_path).replace(".jpg", ".txt")
|
text_filename = os.path.basename(image_path).replace(".jpg", ".txt")
|
||||||
text_path = os.path.join(converted_dir, text_filename)
|
text_path = os.path.join(CONVERTED_DIR, text_filename)
|
||||||
cmd = ["tesseract", image_path, text_path.replace(".txt", "")]
|
cmd = ["tesseract", image_path, text_path.replace(".txt", "")]
|
||||||
try:
|
try:
|
||||||
run(cmd, check=True)
|
run(cmd, check=True)
|
||||||
print(f"OCRed to {text_path}!")
|
logging.info(f"OCRed to {text_path}!")
|
||||||
return text_path
|
return text_path
|
||||||
except CalledProcessError:
|
except CalledProcessError:
|
||||||
print(f"Error processing {image_path} with Tesseract. Skipping.")
|
logging.info(f"Error processing {image_path} with Tesseract. Skipping.")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def process_image(image_path):
|
def process_image(image_path):
|
||||||
converted_path = convert_image(image_path)
|
converted_path = convert_image(image_path)
|
||||||
print(f"OCR'ing image {image_path} (now at {converted_path})...")
|
logging.info(f"OCR'ing image {image_path} (now at {converted_path})...")
|
||||||
text_path = ocr_image(converted_path)
|
text_path = ocr_image(converted_path)
|
||||||
if text_path and os.path.exists(text_path):
|
if text_path and os.path.exists(text_path):
|
||||||
with open(text_path, 'r') as text_file:
|
with open(text_path, 'r') as text_file:
|
||||||
text_content = text_file.read()
|
text_content = text_file.read()
|
||||||
print(f"Added text from {text_path} to final output.")
|
logging.info(f"Added text from {text_path} to final output.")
|
||||||
return text_content
|
return text_content
|
||||||
else:
|
else:
|
||||||
print(f"Cannot locate {text_path}! Cannot add text to final output!")
|
logging.info(f"Cannot locate {text_path}! Cannot add text to final output!")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def main(directory_path):
|
def main(directory_path):
|
||||||
final_text = []
|
final_text = []
|
||||||
|
|
||||||
if not os.path.exists(converted_dir):
|
ensure_directory_exists(CONVERTED_DIR)
|
||||||
os.mkdir(converted_dir)
|
|
||||||
|
|
||||||
image_paths = []
|
image_paths = []
|
||||||
for root, dirs, files in os.walk(directory_path):
|
for root, dirs, files in os.walk(directory_path):
|
||||||
|
@ -75,11 +80,10 @@ def main(directory_path):
|
||||||
|
|
||||||
# Filter out any None values and write the text to final.txt
|
# Filter out any None values and write the text to final.txt
|
||||||
final_text = [text for text in final_text if text is not None]
|
final_text = [text for text in final_text if text is not None]
|
||||||
FINAL_OUTPUT = "final.txt"
|
|
||||||
with open(FINAL_OUTPUT, 'w') as f:
|
with open(FINAL_OUTPUT, 'w') as f:
|
||||||
f.write("\n".join(final_text))
|
f.write("\n".join(final_text))
|
||||||
|
|
||||||
print(f"All images processed! Final output saved to {FINAL_OUTPUT}")
|
logging.info(f"All images processed! Final output saved to {FINAL_OUTPUT}")
|
||||||
return final_text # Add this line
|
return final_text # Add this line
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,12 @@
|
||||||
import json
|
import json
|
||||||
import genanki
|
import genanki
|
||||||
import sys
|
import sys
|
||||||
|
import logging
|
||||||
|
from logging_config import setup_logging
|
||||||
|
|
||||||
|
|
||||||
|
setup_logging()
|
||||||
|
|
||||||
|
|
||||||
# Create a new model for our cards. This is necessary for genanki.
|
# Create a new model for our cards. This is necessary for genanki.
|
||||||
MY_MODEL = genanki.Model(
|
MY_MODEL = genanki.Model(
|
||||||
|
@ -52,4 +58,4 @@ if __name__ == "__main__":
|
||||||
input_json = sys.argv[1]
|
input_json = sys.argv[1]
|
||||||
output_apkg = sys.argv[2]
|
output_apkg = sys.argv[2]
|
||||||
json_file_to_package(input_json).write_to_file(output_apkg)
|
json_file_to_package(input_json).write_to_file(output_apkg)
|
||||||
print(f"Deck created at: {output_apkg}")
|
logging.info(f"Deck created at: {output_apkg}")
|
||||||
|
|
11
logging_config.py
Normal file
11
logging_config.py
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
def setup_logging():
|
||||||
|
logging.basicConfig(level=logging.DEBUG,
|
||||||
|
format='%(asctime)s [%(levelname)s] - %(module)s: %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
|
# If you also want to save logs to a file, you can add the below lines.
|
||||||
|
# file_handler = logging.FileHandler('ankiai.log')
|
||||||
|
# file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] - %(module)s: %(message)s'))
|
||||||
|
# logging.getLogger().addHandler(file_handler)
|
|
@ -1,12 +1,11 @@
|
||||||
import openai
|
import openai
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
import json
|
import json
|
||||||
|
from constants import API_KEY_ENV, CHAT_MODEL, OUTPUT_FILENAME
|
||||||
|
|
||||||
CHAT_MODEL = "gpt-3.5-turbo"
|
|
||||||
OUTPUT_FILENAME = "output_deck.json"
|
|
||||||
|
|
||||||
API_KEY = os.environ.get("OPENAI_API_KEY")
|
API_KEY = os.environ.get(API_KEY_ENV)
|
||||||
if not API_KEY:
|
if not API_KEY:
|
||||||
raise ValueError("Please set the OPENAI_API_KEY environment variable.")
|
raise ValueError("Please set the OPENAI_API_KEY environment variable.")
|
||||||
|
|
||||||
|
@ -86,7 +85,7 @@ def response_to_json(response_text):
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if len(sys.argv) != 2:
|
if len(sys.argv) != 2:
|
||||||
print("Usage: python text2jsondeck.py <text_file_path>")
|
print("Usage: python prompt4cards.py <text_file_path>")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
text_file_path = sys.argv[1]
|
text_file_path = sys.argv[1]
|
||||||
|
|
38
server.py
38
server.py
|
@ -1,48 +1,54 @@
|
||||||
from flask import Flask, request, send_from_directory, jsonify
|
|
||||||
from werkzeug.utils import secure_filename
|
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from logging_config import setup_logging
|
||||||
|
from flask import Flask, request, send_from_directory, jsonify
|
||||||
|
from werkzeug.utils import secure_filename
|
||||||
from ankiai import images_to_package
|
from ankiai import images_to_package
|
||||||
|
from constants import IMAGE_KEY, OUTPUT_FILE, NO_IMAGE_PART_ERROR, NO_SELECTED_FILE_ERROR, INVALID_FILENAME_ERROR
|
||||||
|
|
||||||
|
|
||||||
|
setup_logging()
|
||||||
|
|
||||||
|
|
||||||
|
from logging_config import setup_logging
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
IMAGE_KEY = 'image'
|
|
||||||
OUTPUT_FILE = 'cards.apkg'
|
|
||||||
TEMP_DIR = tempfile.mkdtemp()
|
|
||||||
|
|
||||||
def save_uploaded_images(images, directory):
|
def save_uploaded_images(images, directory):
|
||||||
for img in images:
|
for img in images:
|
||||||
# Sanitize the filename
|
|
||||||
safe_filename = secure_filename(img.filename)
|
safe_filename = secure_filename(img.filename)
|
||||||
|
|
||||||
if not safe_filename:
|
if not safe_filename:
|
||||||
# Handle the case where the filename becomes empty after sanitization
|
raise ValueError(INVALID_FILENAME_ERROR)
|
||||||
raise ValueError("Invalid filename")
|
|
||||||
|
|
||||||
filename = os.path.join(directory, safe_filename)
|
filename = os.path.join(directory, safe_filename)
|
||||||
img.save(filename)
|
img.save(filename)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/deck-from-images', methods=['POST'])
|
@app.route('/deck-from-images', methods=['POST'])
|
||||||
def deck_from_images():
|
def deck_from_images():
|
||||||
if IMAGE_KEY not in request.files:
|
if IMAGE_KEY not in request.files:
|
||||||
return jsonify({'error': 'No image part'}), 400
|
return jsonify({'error': NO_IMAGE_PART_ERROR}), 400
|
||||||
|
|
||||||
images = request.files.getlist(IMAGE_KEY)
|
images = request.files.getlist(IMAGE_KEY)
|
||||||
|
|
||||||
if not images or not any(img.filename != '' for img in images):
|
if not images or not any(img.filename != '' for img in images):
|
||||||
return jsonify({'error': 'No selected file'}), 400
|
return jsonify({'error': NO_SELECTED_FILE_ERROR}), 400
|
||||||
|
|
||||||
save_uploaded_images(images, TEMP_DIR)
|
temp_dir = tempfile.mkdtemp()
|
||||||
|
|
||||||
|
save_uploaded_images(images, temp_dir)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
images_to_package(TEMP_DIR, OUTPUT_FILE)
|
images_to_package(temp_dir, OUTPUT_FILE)
|
||||||
return send_from_directory('.', OUTPUT_FILE, as_attachment=True)
|
return send_from_directory('.', OUTPUT_FILE, as_attachment=True)
|
||||||
except Exception as e: # Consider catching more specific exceptions
|
except Exception as e:
|
||||||
|
logging.error("Exception occurred: "+str(e), exc_info=True)
|
||||||
return jsonify({'error': str(e)}), 500
|
return jsonify({'error': str(e)}), 500
|
||||||
finally:
|
finally:
|
||||||
shutil.rmtree(TEMP_DIR)
|
shutil.rmtree(temp_dir)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
app.run(debug=True)
|
app.run(debug=True)
|
||||||
|
|
9
utilities.py
Normal file
9
utilities.py
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
import os
|
||||||
|
from constants import IMAGE_EXTENSIONS
|
||||||
|
|
||||||
|
def is_image_file(path):
|
||||||
|
return any(path.lower().endswith(ext) for ext in IMAGE_EXTENSIONS)
|
||||||
|
|
||||||
|
def ensure_directory_exists(directory):
|
||||||
|
if not os.path.exists(directory):
|
||||||
|
os.mkdir(directory)
|
Loading…
Reference in New Issue
Block a user