Source code for parseur.cli

import sys
from urllib.parse import urlparse, urlunparse

import click

import parseur


[docs]def headers_to_dict(headers): headers_dict = {} for h in headers: if ":" not in h: click.echo(f"❌ Invalid header format: {h}") sys.exit(1) key, value = h.split(":", 1) headers_dict[key.strip()] = value.strip() return headers_dict
@click.group()
[docs]def cli(): """Parseur CLI - manage Parseur.com from the command line.""" pass
@cli.command() @click.option( "--api-key", required=True, help="Your Parseur API key", ) @click.option( "--api-base", default=parseur.DEFAULT_API_BASE, help="Optional API base URL", )
[docs]def init(api_key, api_base): """Initialize the CLI with your API token and optional base URL.""" config = parseur.Config(parseur.CONFIG_PATH) config.api_key = api_key config.api_base = api_base config.save() click.echo(f"✅ Parseur CLI initialized and config saved to {parseur.CONFIG_PATH}")
# ------------------------ # Mailbox commands # ------------------------ @cli.command("list-mailboxes") @click.option("--search", help="Search string (mailbox name or email prefix)") @click.option( "--order-by", type=click.Choice([e.value for e in parseur.MailboxOrderKey]), help=( "Order by field. Use one of: " "name, document_count, template_count, " "PARSEDOK_count (processed), PARSEDKO_count (failed), " "QUOTAEXC_count (quota exceeded), EXPORTKO_count (export failed)" ), ) @click.option( "--descending/--ascending", default=False, help="Sort descending (default is ascending)", )
[docs]def list_mailboxes(search, order_by, descending): """ List all mailboxes with optional filtering and sorting. """ order_by_enum = parseur.MailboxOrderKey(order_by) if order_by else None click.echo("[") for idx, mailbox in enumerate( parseur.Mailbox.iter( search=search, order_by=order_by_enum, ascending=not descending, ) ): click.echo((", " if idx != 0 else "") + parseur.to_json(mailbox)) click.echo("]")
@cli.command("get-mailbox") @click.argument("mailbox_id", type=int)
[docs]def get_mailbox(mailbox_id): """Get details of a mailbox.""" result = parseur.Mailbox.retrieve(mailbox_id) click.echo(parseur.to_json(result))
@cli.command("get-mailbox-schema") @click.argument("mailbox_id", type=int)
[docs]def get_mailbox_schema(mailbox_id): """Get schema of a mailbox.""" result = parseur.Mailbox.schema(mailbox_id) click.echo(parseur.to_json(result))
# ------------------------ # Document commands # ------------------------ @cli.command("list-documents") @click.argument("mailbox_id", type=int) @click.option( "--search", help="Search string (document id, name, template name, email addresses, metadata)", ) @click.option( "--order-by", type=click.Choice([e.value for e in parseur.DocumentOrderKey]), help="Order by field (name, created, processed, status)", ) @click.option( "--descending/--ascending", default=False, help="Sort descending (default is ascending)", ) @click.option( "--received-after", type=click.DateTime(formats=["%Y-%m-%d"]), help="Filter documents received after this date (YYYY-MM-DD)", ) @click.option( "--received-before", type=click.DateTime(formats=["%Y-%m-%d"]), help="Filter documents received before this date (YYYY-MM-DD)", ) @click.option( "--with-result", is_flag=True, help="Include parsed result with each document", )
[docs]def list_documents( mailbox_id, search, order_by, descending, received_after, received_before, with_result, ): """ List all documents in a mailbox with optional filtering, sorting, and result inclusion. """ # Convert order_by string to enum if provided order_by_enum = parseur.DocumentOrderKey(order_by) if order_by else None click.echo("[") for idx, doc in enumerate( parseur.Document.iter( mailbox_id=mailbox_id, search=search, order_by=order_by_enum, ascending=not descending, received_after=received_after, received_before=received_before, with_result=with_result, ) ): click.echo((", " if idx != 0 else "") + parseur.to_json(doc)) click.echo("]")
@cli.command("get-document") @click.argument("document_id", type=str)
[docs]def get_document(document_id): """Get details of a document.""" result = parseur.Document.retrieve(document_id) click.echo(parseur.to_json(result))
@cli.command("reprocess-document") @click.argument("document_id", type=str)
[docs]def reprocess_document(document_id): """Reprocess a document.""" result = parseur.Document.reprocess(document_id) click.echo(parseur.to_json(result))
@cli.command("skip-document") @click.argument("document_id", type=str)
[docs]def skip_document(document_id): """Skip a document.""" result = parseur.Document.skip(document_id) click.echo(parseur.to_json(result))
@cli.command("copy-document") @click.argument("document_id", type=str) @click.argument("target_mailbox_id", type=int)
[docs]def copy_document(document_id, target_mailbox_id): """Copy a document to another mailbox.""" result = parseur.Document.copy(document_id, target_mailbox_id) click.echo(parseur.to_json(result))
@cli.command("get-document-logs") @click.argument("document_id", type=str)
[docs]def get_document_logs(document_id): """Get logs of a document.""" logs = parseur.Document.logs(document_id) click.echo(parseur.to_json(logs))
@cli.command("delete-document") @click.argument("document_id", type=str)
[docs]def delete_document(document_id): """Delete a document.""" parseur.Document.delete(document_id) click.echo(f"✅ Document {document_id} deleted.")
@cli.command("upload-file") @click.argument("mailbox_id", type=int) @click.argument("file_path", type=click.Path(exists=True))
[docs]def upload_file(mailbox_id, file_path): """Upload a document file to a mailbox.""" result = parseur.Document.upload_file(mailbox_id, file_path) click.echo(parseur.to_json(result))
@cli.command("upload-folder") @click.argument("mailbox_id", type=int) @click.argument("folder_path", type=str)
[docs]def upload_folder(mailbox_id, folder_path): """Upload all files from a glob path.""" results = list(parseur.Document.upload_folder(mailbox_id, folder_path)) click.echo(parseur.to_json(results))
@cli.command("upload-text") @click.option("--recipient", required=True, help="Mailbox email address") @click.option("--subject", required=True, help="Subject line for the document") @click.option("--sender", default=None, help="Sender email (optional)") @click.option("--body-html", default=None, help="HTML text content") @click.option("--body-plain", default=None, help="Plain text content")
[docs]def upload_text(recipient, subject, sender, body_html, body_plain): """Upload text content to a mailbox by email address.""" result = parseur.Document.upload_text( recipient, subject, sender, body_html, body_plain ) click.echo(parseur.to_json(result))
# ------------------------ # Webhook commands # ------------------------ @cli.command("create-webhook") @click.option( "--event", required=True, type=click.Choice([e.value for e in parseur.ParseurEvent]), help="Event type to listen for", ) @click.option( "--target-url", required=True, help="The URL to receive webhook POSTs, e.g. https://api.example.com/parseur.", ) @click.option( "--mailbox-id", type=int, help="Mailbox ID (required for document events).", ) @click.option( "--table-field-id", type=str, help="Table field ID in 'PF12345' format (required for table events).", ) @click.option( "--header", multiple=True, type=str, help="Custom HTTP header in 'Key:Value' format. Can be used multiple times.", ) @click.option( "--name", type=str, help="Optional name for the webhook.", )
[docs]def create_webhook(event, target_url, mailbox_id, table_field_id, header, name): """ Create a new custom webhook for your Parseur account. """ headers_dict = headers_to_dict(header) event_enum = parseur.ParseurEvent(event) result = parseur.Webhook.create( event=event_enum, target_url=target_url, mailbox_id=mailbox_id, table_field_id=table_field_id, headers=headers_dict or None, name=name, ) click.echo(parseur.to_json(result))
@cli.command("get-webhook") @click.argument("webhook_id", type=int)
[docs]def get_webhook(webhook_id): """Get details of a webhook.""" result = parseur.Webhook.retrieve(webhook_id) click.echo(parseur.to_json(result))
@cli.command("delete-webhook") @click.argument("webhook_id", type=int)
[docs]def delete_webhook(webhook_id): """ Delete a registered webhook by its ID. This command permanently removes the webhook from your Parseur account. """ parseur.Webhook.delete(webhook_id) click.echo(f"✅ Webhook {webhook_id} deleted.")
@cli.command("enable-webhook") @click.argument("mailbox_id", type=int) @click.argument("webhook_id", type=int)
[docs]def enable_webhook(mailbox_id, webhook_id): """ Enable a webhook for the specified mailbox. Activates the webhook by adding it to the mailbox. """ result = parseur.Webhook.enable(mailbox_id, webhook_id) click.echo(parseur.to_json(result))
@cli.command("pause-webhook") @click.argument("mailbox_id", type=int) @click.argument("webhook_id", type=int)
[docs]def pause_webhook(mailbox_id, webhook_id): """ Pause a webhook for the specified mailbox. Removes the webhook from the mailbox without deleting it. """ result = parseur.Webhook.pause(mailbox_id, webhook_id) click.echo(parseur.to_json(result))
@cli.command("list-webhooks")
[docs]def list_webhooks(): """List all registered webhooks.""" webhooks = parseur.Webhook.list() click.echo(parseur.to_json(webhooks))
@cli.command("listen") @click.option( "--event", required=True, type=click.Choice([e.value for e in parseur.ParseurEvent]), help="Event type to listen for", ) @click.option( "--mailbox-id", type=int, help="Mailbox ID (required for document events).", ) @click.option( "--table-field-id", type=str, help="Table field ID in 'PF12345' format (required for table events).", ) @click.option( "--header", multiple=True, type=str, help="Custom HTTP header in 'Key:Value' format. Can be used multiple times.", ) @click.option( "--name", type=str, help="Optional name for the webhook.", ) @click.option( "--redirect-url", type=str, help="Optional URL to forward received events to.", ) @click.option( "--redirect-port", type=int, help="Optional local port to forward received events to (http://localhost:<port>).", ) @click.option( "--silent", is_flag=True, default=False, help="Do not print event payloads to stdout.", )
[docs]def listen( event, mailbox_id, table_field_id, header, name, redirect_url, redirect_port, silent, ): """ Listen to a Parseur event in real time with a temporary webhook. Example: parseur listen --event document.parsed_ok --mailbox-id 12345 parseur listen --event document.processed --mailbox-id 12345 --redirect-url http://localhost --redirect-port 8000 """ from . import server if name is None: name = f"CLI listener for {event}" # Check redirect consistency if redirect_port and not redirect_url: raise click.ClickException("--redirect-port requires --redirect-url") if redirect_url: try: parsed = urlparse(redirect_url) if not parsed.scheme or not parsed.netloc: raise ValueError("URL missing scheme or host") # Rebuild URL with port override if given netloc = parsed.hostname if redirect_port: netloc = f"{parsed.hostname}:{redirect_port}" if parsed.username or parsed.password: auth = f"{parsed.username}:{parsed.password}@" netloc = auth + netloc redirect_url = urlunparse( ( parsed.scheme, netloc, parsed.path or "", parsed.params, parsed.query, parsed.fragment, ) ) click.echo(f"🔁 Events will be forwarded to: {redirect_url}") except Exception as e: raise click.ClickException(f"Invalid redirect URL: {redirect_url} ({e})") # Parse custom headers headers_dict = headers_to_dict(header) event_enum = parseur.ParseurEvent(event) # Run the local listener server.run_listener( event=event_enum, mailbox_id=mailbox_id, table_field_id=table_field_id, headers=headers_dict, name=name, redirect_url=redirect_url, silent=silent, )
if __name__ == "__main__": cli()