Source code for parseur.schemas.document
from enum import Enum
import json
from marshmallow import fields, pre_load, validate
from parseur.schemas import BaseSchema
[docs]class DocumentStatus(str, Enum):
"""
Enum for Parseur document processing statuses.
"""
[docs] INCOMING = "INCOMING" # the file has been received by our system
[docs] ANALYZING = "ANALYZING" # the file is being analyzed against import parameters and mailbox settings
[docs] DELETED = "DELETED" # the file has been deleted by the user
[docs] PROGRESS = "PROGRESS" # the file is currently being processed by the AI engine for the mailbox
[docs] PARSEDOK = (
"PARSEDOK" # the file has been processed and data is available for export
)
[docs] PARSEDKO = "PARSEDKO" # the processing for this file failed
[docs] QUOTAEXC = "QUOTAEXC" # processing was stopped because the user does not have enough credits
[docs] SKIPPED = "SKIPPED" # processing was skipped because of a template
[docs] SPLIT = "SPLIT" # the file has been split into multiple documents
[docs] EXPORTKO = "EXPORTKO" # exporting for this file failed
[docs] TRANSKO = "TRANSKO" # post-processing for this file failed
[docs] INVALID = "INVALID" # the imported file is not supported by our system
[docs]class DocumentSchema(BaseSchema):
[docs] id = fields.Int(required=True)
[docs] name = fields.String(allow_none=True)
[docs] status = fields.String(
required=True,
validate=validate.OneOf([e.value for e in DocumentStatus]),
)
[docs] status_source = fields.String(allow_none=True)
[docs] received = fields.DateTime(required=True)
[docs] processed = fields.DateTime(allow_none=True)
[docs] ai_credits_used = fields.Int(required=True)
[docs] credits_used = fields.Int(required=True)
[docs] is_ai_ready = fields.Boolean(required=True)
[docs] is_ocr_ready = fields.Boolean(required=True)
[docs] is_processable = fields.Boolean(required=True)
[docs] is_split = fields.Boolean(required=True)
[docs] is_splittable = fields.Boolean(required=True)
[docs] parser = fields.Int(required=True)
[docs] template = fields.Int(allow_none=True)
[docs] attached_to = fields.Int(allow_none=True)
[docs] prev_id = fields.Int(allow_none=True)
[docs] next_id = fields.Int(allow_none=True)
[docs] content = fields.String(allow_none=True)
[docs] result = fields.Raw(allow_none=True)
[docs] csv_download_url = fields.URL(required=True)
[docs] json_download_url = fields.URL(required=True)
[docs] xls_download_url = fields.URL(required=True)
[docs] original_document_url = fields.URL(required=True)
[docs] ocr_ready_url = fields.URL(allow_none=True)
@pre_load
[docs] def parse_result_json(self, data, **kwargs):
result = data.get("result")
if result and isinstance(result, str):
try:
data["result"] = json.loads(result)
except (TypeError, ValueError):
pass
return data
[docs]class DocumentLogSchema(BaseSchema):
# Unique ID of the log entry
[docs] id = fields.Integer(required=True)
# Log event code (e.g., INCOMING)
[docs] code = fields.String(required=True)
# Creation timestamp in ISO format
[docs] created = fields.DateTime(required=True)
# Document ID
[docs] document = fields.Integer(required=True)
# Name of the document
[docs] document_name = fields.String(required=True)
# Human-readable log message
[docs] message = fields.String(required=True)
# Mailbox (parser) ID
[docs] parser = fields.Integer(required=True)
# Name of the mailbox
[docs] parser_name = fields.String(required=True)
# Optional JSON payload attached to the event
[docs] payload = fields.Raw(allow_none=True)
# Event source, typically 'DOCUMENT'
[docs] source = fields.String(allow_none=True)
# Status level, e.g. 'INFO', 'WARNING', 'ERROR'
[docs] status = fields.String(required=True)
# ID of the template involved, if any
[docs] template = fields.Integer(allow_none=True)
# Name of the template involved, if any
[docs] template_name = fields.String(allow_none=True)
# Initiator of the action, if any
[docs] initiator = fields.String(allow_none=True)
# Initiator name of the action, if any
[docs] initiator_name = fields.String(allow_none=True)
@pre_load
[docs] def parse_payload_json(self, data, **kwargs):
payload = data.get("payload")
if payload and isinstance(payload, str):
try:
data["payload"] = json.loads(payload)
except (TypeError, ValueError):
pass
return data
[docs]class AttachmentSchema(BaseSchema):
[docs] DocumentID = fields.String(required=True)
[docs] name = fields.String(required=True)
[docs]class DocumentUploadSchema(BaseSchema):
[docs] DocumentID = fields.String(allow_none=True)
[docs] attachments = fields.List(fields.Nested(AttachmentSchema), allow_none=True)
[docs] message = fields.String(required=True)