101 Commits
0.9.0 ... 0.9.5

Author SHA1 Message Date
KnugiHK
282c99c7dd PyPi can't have git source as dependency 2023-06-25 13:57:11 +08:00
KnugiHK
8dec2a7e97 Bug fix on missing attribute in the Message class. 2023-06-25 13:30:44 +08:00
KnugiHK
f9dedc7930 Increase the row size 2023-06-25 13:14:39 +08:00
KnugiHK
e2f497dbb6 Bug fix on exported chat 2023-06-25 12:42:37 +08:00
KnugiHK
989bddca37 Limit the reply quote length 2023-06-21 17:24:50 +08:00
KnugiHK
40d060628f Coding style 2023-06-21 17:04:58 +08:00
KnugiHK
032af6cdcf Refactor 2023-06-21 17:01:14 +08:00
KnugiHK
e243abe2a4 Bug fix 2023-06-21 16:52:39 +08:00
KnugiHK
b8f0af5f31 Refactor 2023-06-21 16:24:36 +08:00
KnugiHK
030fef53e1 I found that old schema also has such tables 2023-06-20 19:53:05 +08:00
KnugiHK
3ed269e17f Support a lot of metadata in Android's new schema 2023-06-20 19:12:38 +08:00
KnugiHK
1e3ee5e322 Fix incorrect group message sender name 2023-06-20 16:03:24 +08:00
KnugiHK
6c740e69a5 Fix wrongly determined metadata 2023-06-20 15:32:05 +08:00
KnugiHK
828c8a1a72 Fix only one group chat is rendered when contact db is not present 2023-06-20 15:31:24 +08:00
KnugiHK
1fb8588752 Add more alias to wtsexporter command 2023-06-20 14:58:44 +08:00
KnugiHK
6636210e4c Fix the reply jump offset after introducing status 2023-06-20 14:52:32 +08:00
KnugiHK
cc0105647a Refactor a bit 2023-06-20 14:50:57 +08:00
KnugiHK
4fa360a389 No longer support direct execution on the script 2023-06-20 14:41:31 +08:00
KnugiHK
e5228855d2 Temporary workaround mentioned in #48 2023-06-20 14:39:54 +08:00
KnugiHK
db1cdf8189 Add contact's status if present (iOS) 2023-06-20 14:37:25 +08:00
KnugiHK
08ce61e68e Make them optional when importing! 2023-06-20 14:37:12 +08:00
KnugiHK
138dd5351f Add status to JSON import 2023-06-20 14:19:07 +08:00
KnugiHK
136152dc18 Add contact's status if present (Android) 2023-06-20 14:07:42 +08:00
KnugiHK
55bc62cdc1 Add fallback to Android contact name just like commit 5f6b764 2023-06-20 13:49:22 +08:00
KnugiHK
d430c7bfba Update wording on progress 2023-06-19 20:40:10 +08:00
KnugiHK
672b85474e Bug fix on the wrong type of media_wa_type in old Android schema 2023-06-19 20:26:13 +08:00
KnugiHK
525d88f2c6 Fix < Python 3.11 compatibility 2023-06-19 20:09:45 +08:00
KnugiHK
b57087794a Bump version for preparing next release 2023-06-19 17:56:24 +08:00
KnugiHK
be316ebb89 Add aliases to extras_require 2023-06-18 15:15:39 +08:00
KnugiHK
1078f7e5f7 Bug fix on sticker not resizing 2023-06-18 15:10:00 +08:00
KnugiHK
ba2a88067a Update setup.py 2023-06-18 14:36:15 +08:00
KnugiHK
af53ba978b Merge branch 'main' into dev 2023-06-16 19:27:56 +08:00
Knugi
d55a42a549 Merge pull request #36 from nahoj/patch-1
Where to find the key file
2023-06-16 19:27:34 +08:00
KnugiHK
506f8e89f4 Implement import json and output HTML 2023-06-16 19:19:33 +08:00
KnugiHK
fff11b26a5 Store the device type 2023-06-16 19:12:06 +08:00
KnugiHK
fa66ef3a52 Output all attribute to JSON file 2023-06-16 19:11:55 +08:00
Johan Grande
0b93ae567e rephrase as suggested 2023-06-16 12:38:38 +02:00
Johan Grande
c62e07cb0a Merge branch 'main' into patch-1 2023-06-16 12:33:17 +02:00
KnugiHK
317d785d50 Bug fix on raise of exception when "media_folder" does not exists on the filesystem 2023-06-16 18:27:49 +08:00
KnugiHK
38c1e47be9 Add iphone-backup-decrypt to setup.py as extra dependency 2023-06-16 17:57:33 +08:00
KnugiHK
ff95625edf Dropping Python 3.7 and adding Python 3.11 support 2023-06-16 17:50:08 +08:00
KnugiHK
14854193bd Merge branch 'main' into dev 2023-06-16 14:26:34 +08:00
KnugiHK
67c1b43669 Update README.md 2023-06-16 14:07:04 +08:00
KnugiHK
23046e01ba Add links to README 2023-06-16 13:48:36 +08:00
KnugiHK
c366e656af Further reduce the maximum length of vcard file name #51 2023-06-16 01:46:27 +08:00
KnugiHK
41f45fb07c PEP8 2023-06-16 01:43:43 +08:00
KnugiHK
be9e790b12 Better handling of binary message #44 2023-06-16 01:25:51 +08:00
KnugiHK
bfdc68cd6a Add autoescape to rendering 2023-06-16 01:10:24 +08:00
KnugiHK
594a04adbc Update the way to handle encrypted iOS backup file
Since this commit, iphone_backup_decrypt must be re-installed
2023-06-15 21:32:57 +08:00
KnugiHK
20b2eec047 Support Android call logs 2023-06-15 18:25:29 +08:00
KnugiHK
011c8ff1e7 Support missed call (PM) metadata for Android 2023-06-15 17:44:38 +08:00
KnugiHK
e4c47ea41f Update whatsapp.html 2023-06-15 17:01:13 +08:00
KnugiHK
c344f05b05 Bug fix on wrong alias 2023-06-15 17:00:55 +08:00
KnugiHK
88ef4989fc Fix wrong error message 2023-06-15 17:00:34 +08:00
KnugiHK
f7f6b01c86 Resize sticker 2023-06-15 16:59:54 +08:00
KnugiHK
a49a911e03 Replace image rendered in the HTML to thumbnail if possible 2023-06-15 16:19:35 +08:00
KnugiHK
3443143744 Restore code for downloading media from whatsapp server 2023-06-15 02:16:53 +08:00
KnugiHK
5f6b764bb9 Add fallback to iOS contact name
Fallback = the name set by the contact
2023-06-14 21:53:23 +08:00
KnugiHK
3940b2991f Try to reduce the size of the template 2023-06-13 21:23:44 +08:00
KnugiHK
dc1df8a03e Just coding style 2023-06-13 19:48:31 +08:00
KnugiHK
dd5ec2219c Sync changes 2023-06-13 19:46:09 +08:00
KnugiHK
e0c2cf5f66 Implement iOS avatar #48 2023-06-13 19:44:16 +08:00
KnugiHK
8cdb694a16 Modify the root directory name of iOS media 2023-06-13 17:00:40 +08:00
KnugiHK
8294f06587 Extract whole WhatsApp directory instead of wanted files only 2023-06-13 16:29:27 +08:00
KnugiHK
200dea218f Update help message 2023-06-13 14:01:10 +08:00
Knugi
df93033c6c Update README.md 2023-06-13 05:54:14 +00:00
KnugiHK
8f90733da2 Change "Gathering" to "Processing" 2023-06-11 01:35:40 +08:00
KnugiHK
3fdf6d0818 Update LICENSE 2023-06-11 01:33:09 +08:00
KnugiHK
2fa5c4268e Rewrite a bit 2023-06-11 01:22:21 +08:00
KnugiHK
ed658d78dc Fix incorrect media path on iOS #49 2023-06-11 01:21:07 +08:00
KnugiHK
0280325b4a Bug fix 2023-06-11 00:47:11 +08:00
KnugiHK
a42ec5d762 Beautify 2023-06-10 19:58:14 +08:00
KnugiHK
c419dd5d39 Raise error if time is not str and int 2023-06-10 19:54:09 +08:00
KnugiHK
8d036a6d87 Rewrite a bit 2023-06-10 19:45:29 +08:00
KnugiHK
42435c38cc Add <br> to newline 2023-06-10 19:32:48 +08:00
KnugiHK
32caab7c40 Distinguish between media missing and media omitted 2023-06-10 19:32:38 +08:00
KnugiHK
0897dc2897 Implement export TXT chat #22 2023-06-10 19:24:39 +08:00
KnugiHK
f63b180500 Implement splitted outputs #23 2023-06-08 18:16:47 +08:00
KnugiHK
dbdfdaedcf Refine code to use the data model 2023-06-08 17:51:57 +08:00
KnugiHK
0e802f4554 Remove old file 2023-06-08 16:50:33 +08:00
KnugiHK
41dd5e545f Make the not supported note looks less intimidating
#39
2023-06-08 15:46:12 +08:00
KnugiHK
8750315e8e Update __main__.py 2023-06-02 02:41:06 +08:00
KnugiHK
e9499c3bb7 Add highlighting when navigate to replied message 2023-06-02 02:41:01 +08:00
KnugiHK
80c3ed11f6 Partially implement reply feature in iOS
#28
2023-06-02 01:27:05 +08:00
KnugiHK
7c78cce221 Update link for reporting offsets 2023-06-01 22:26:32 +08:00
KnugiHK
32a312d332 Add offset mentioned in #46 2023-06-01 22:21:58 +08:00
KnugiHK
328f34e632 Merge branch 'main' into dev 2023-05-19 13:25:21 +08:00
KnugiHK
9ac8839ecc Workaround for non-UTF8 message
#44
2023-05-19 13:23:51 +08:00
Knugi
e7113d72d7 Update README.md 2023-05-18 09:34:59 +00:00
KnugiHK
0a0ae8cf15 Update python-publish.yml 2023-05-18 16:37:35 +08:00
KnugiHK
b1d8d173a2 Bug fix on too long vCard file name 2023-05-17 01:06:31 +08:00
KnugiHK
3bd6f288ea Bug fix file exist exception 2023-05-17 00:48:33 +08:00
KnugiHK
bf06795962 Add checksum on compiled binaries 2023-05-17 00:19:45 +08:00
KnugiHK
20d8e1384a Bump version 2023-05-17 00:08:52 +08:00
KnugiHK
6fd0e61b64 Update help test 2023-05-16 23:29:10 +08:00
KnugiHK
bbb47cd839 Bug fix 2023-05-16 23:25:12 +08:00
Knugi
c155064ae1 Update README.md 2023-05-16 13:38:16 +00:00
Knugi
d4efd919f9 Update python-publish.yml 2023-05-16 11:49:57 +00:00
Knugi
13d761286e Update compile-binary.yml 2023-05-16 11:48:25 +00:00
Knugi
a943808734 Update compile-binary.yml 2023-05-16 11:48:00 +00:00
Johan Grande
8380487e44 Where to find the key file 2023-03-17 12:04:40 +01:00
15 changed files with 1268 additions and 1264 deletions

View File

@@ -1,6 +1,8 @@
name: Compile standalone binary
on:
release:
types: [published]
workflow_dispatch:
permissions:
@@ -12,7 +14,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
@@ -24,6 +26,7 @@ jobs:
run: |
python -m nuitka --onefile --include-data-file=./Whatsapp_Chat_Exporter/whatsapp.html=./Whatsapp_Chat_Exporter/whatsapp.html --follow-imports Whatsapp_Chat_Exporter/__main__.py
cp __main__.bin wtsexporter_linux_x64
sha256sum wtsexporter_linux_x64
- uses: actions/upload-artifact@v3
with:
name: binary-linux
@@ -35,7 +38,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
@@ -47,6 +50,7 @@ jobs:
run: |
python -m nuitka --onefile --include-data-file=./Whatsapp_Chat_Exporter/whatsapp.html=./Whatsapp_Chat_Exporter/whatsapp.html --assume-yes-for-downloads --follow-imports Whatsapp_Chat_Exporter\__main__.py
copy __main__.exe wtsexporter_x64.exe
Get-FileHash wtsexporter_x64.exe
- uses: actions/upload-artifact@v3
with:
name: binary-windows
@@ -58,7 +62,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v3
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
@@ -70,6 +74,7 @@ jobs:
run: |
python -m nuitka --onefile --include-data-file=./Whatsapp_Chat_Exporter/whatsapp.html=./Whatsapp_Chat_Exporter/whatsapp.html --follow-imports Whatsapp_Chat_Exporter/__main__.py
cp __main__.bin wtsexporter_macos_x64
shasum -a 256 wtsexporter_macos_x64
- uses: actions/upload-artifact@v3
with:
name: binary-macos

View File

@@ -18,9 +18,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install dependencies
@@ -30,7 +30,7 @@ jobs:
- name: Build package
run: python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}

View File

@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2021 Knugi
Copyright (c) 2021-2023 Knugi
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@@ -44,6 +44,8 @@ pip install whatsapp-chat-exporter["android_backup"] # install along with this s
```
### Crypt15 is now the easiest way to decrypt a backup. If you have the 32 bytes hex key generated when you enable End-to-End encrypted backup, you can use it to decrypt the backup. If you do not have the 32 bytes hex key, you can still use the key file extracted just like extacting key file for Crypt12 and Crypt14 to decrypt the backup.
#### Crypt12 or Crypt14
You will need the decryption key file from your phone. If you have root access, you can find it as `/data/data/com.whatsapp/files/key`. Otherwise, if you used WhatsApp-Key-DB-Extractor before, it will appear in the WhatsApp backup directory as `WhatsApp/Databases/.nomedia`.
Place the decryption key file (key) and the encrypted WhatsApp Backup (msgstore.db.crypt14) in the working directory. If you also want the name of your contacts, get the contact database, which is called wa.db. And copy the WhatsApp (Media) directory from your phone directly.
And now, you should have something like this in the working directory.
@@ -77,20 +79,28 @@ If you have the 32 bytes hex key, simply put the hex key in the -k option and in
wtsexporter -a -k 432435053b5204b08e5c3823423399aa30ff061435ab89bc4e6713969cdaa5a8 -b msgstore.db.crypt15
```
## Working with iPhone
Do an iPhone Backup with iTunes first.
### Encrypted iPhone Backup
**If you are working on unencrypted iPhone backup, skip this**
## Working with iOS/iPadOS (iPhone or iPad)
Do an iPhone/iPad Backup with iTunes first.
* iPhone backup on Mac: https://support.apple.com/HT211229
* iPhone backup on Windows: https://support.apple.com/HT212156
* iPad backup: https://support.apple.com/guide/ipad/ipad9a74df05xx/ipados
### Encrypted iOS/iPadOS Backup
**If you are working on unencrypted iOS/iPadOS backup, skip this**
If you want to work on an encrypted iPhone Backup, you should install iphone_backup_decrypt from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py.
If you want to work on an encrypted iOS/iPadOS Backup, you should install iphone_backup_decrypt from [KnugiHK/iphone_backup_decrypt](https://github.com/KnugiHK/iphone_backup_decrypt) before you run the extract_iphone_media.py.
```sh
pip install git+https://github.com/KnugiHK/iphone_backup_decrypt
```
### Extracting
Simply invoke the following command from shell, remember to replace the username and device id correspondingly in the command.
#### Windows
```sh
wtsexporter -i -b "C:\Users\[Username]\AppData\Roaming\Apple Computer\MobileSync\Backup\[device id]"
```
#### Mac
```sh
wtsexporter -i -b "~/Library/Application Support/MobileSync/Backup/[device id]"
```
## Results
After extracting, you will get these:
@@ -100,16 +110,21 @@ After extracting, you will get these:
#### Group Message
![Group Message](imgs/group.png)
*The above screenshots were taken longgggggggggggggg ago. I am going to update them when possible.*
## More options
Invoke the wtsexporter with --help option will show you all options available.
```sh
> wtsexporter --help
usage: wtsexporter [options]
usage: wtsexporter [-h] [-a] [-i] [-e EXPORTED] [-w WA] [-m MEDIA] [-b BACKUP] [-o OUTPUT] [-j [JSON]] [-d DB] [-k KEY] [-t TEMPLATE] [-s] [-c] [--offline OFFLINE] [--size [SIZE]]
[--no-html] [--check-update] [--assume-first-as-me]
options:
-h, --help show this help message and exit
-a, --android Define the target as Android
-i, --iphone, --ios Define the target as iPhone
-i, --iphone, --ios Define the target as iPhone/iPad
-e EXPORTED, --exported EXPORTED
Define the target as exported chat file and specify the path to the file
-w WA, --wa WA Path to contact database (default: wa.db/ContactsV2.sqlite)
-m MEDIA, --media MEDIA
Path to WhatsApp media folder (default: WhatsApp)
@@ -123,20 +138,24 @@ options:
-k KEY, --key KEY Path to key file
-t TEMPLATE, --template TEMPLATE
Path to custom HTML template
-e, --embedded Embed media into HTML file (not yet implemented)
-s, --showkey Show the HEX key used to decrypt the database
-c, --move-media Move the media directory to output directory if the flag is set, otherwise copy it
--offline OFFLINE Relative path to offline static files
--size SIZE, --output-size SIZE
Maximum size of a single output file in bytes, 0 for auto (not yet implemented)
--size [SIZE], --output-size [SIZE], --split [SIZE]
Maximum (rough) size of a single output file in bytes, 0 for auto
--no-html Do not output html files
--check-update Check for updates
--check-update Check for updates (require Internet access)
--assume-first-as-me Assume the first message in a chat as sent by me (must be used together with -e)
WhatsApp Chat Exporter: 0.9.5 Licensed with MIT
```
# To do
1. Reply in iPhone
See [issues](https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues).
# Copyright
This is a MIT licensed project.
The Telegram Desktop's export is the reference for whatsapp.html in this repo
WhatsApp Chat Exporter is not affiliated, associated, authorized, endorsed by, or in any way officially connected with the WhatsApp LLC, or any of its subsidiaries or its affiliates. The official WhatsApp LLC website can be found at https://www.whatsapp.com/.

View File

@@ -1 +1,3 @@
__version__ = "0.9.0"
#!/usr/bin/python3
__version__ = "0.9.5"

View File

@@ -1,18 +1,21 @@
try:
from .__init__ import __version__
except ImportError:
from Whatsapp_Chat_Exporter.__init__ import __version__
from Whatsapp_Chat_Exporter import extract, extract_iphone
from Whatsapp_Chat_Exporter import extract_iphone_media
from Whatsapp_Chat_Exporter.data_model import ChatStore
from Whatsapp_Chat_Exporter.utility import Crypt, check_update
from argparse import ArgumentParser
#!/usr/bin/python3
import os
import sqlite3
import shutil
import json
import string
import glob
from Whatsapp_Chat_Exporter import extract_exported, extract_iphone
from Whatsapp_Chat_Exporter import extract, extract_iphone_media
from Whatsapp_Chat_Exporter.data_model import ChatStore
from Whatsapp_Chat_Exporter.utility import Crypt, check_update, import_from_json
from argparse import ArgumentParser, SUPPRESS
from sys import exit
try:
from .__init__ import __version__
except ImportError:
from Whatsapp_Chat_Exporter.__init__ import __version__
def main():
@@ -36,7 +39,14 @@ def main():
dest='iphone',
default=False,
action='store_true',
help="Define the target as iPhone")
help="Define the target as iPhone/iPad")
parser.add_argument(
"-e",
"--exported",
dest="exported",
default=None,
help="Define the target as exported chat file and specify the path to the file"
)
parser.add_argument(
"-w",
"--wa",
@@ -93,12 +103,11 @@ def main():
help="Path to custom HTML template"
)
parser.add_argument(
"-e",
"--embedded",
dest="embedded",
default=False,
action='store_true',
help="Embed media into HTML file (not yet implemented)"
help=SUPPRESS or "Embed media into HTML file (not yet implemented)"
)
parser.add_argument(
"-s",
@@ -125,9 +134,13 @@ def main():
parser.add_argument(
"--size",
"--output-size",
"--split",
dest="size",
nargs='?',
type=int,
const=0,
default=None,
help="Maximum size of a single output file in bytes, 0 for auto (not yet implemented)"
help="Maximum (rough) size of a single output file in bytes, 0 for auto"
)
parser.add_argument(
"--no-html",
@@ -141,7 +154,28 @@ def main():
dest="check_update",
default=False,
action='store_true',
help="Check for updates"
help="Check for updates (require Internet access)"
)
parser.add_argument(
"--assume-first-as-me",
dest="assume_first_as_me",
default=False,
action='store_true',
help="Assume the first message in a chat as sent by me (must be used together with -e)"
)
parser.add_argument(
"--no-avatar",
dest="no_avatar",
default=False,
action='store_true',
help="Do not render avatar in HTML output"
)
parser.add_argument(
"--import",
dest="import_json",
default=False,
action='store_true',
help="Import JSON file and convert to HTML output"
)
args = parser.parse_args()
@@ -150,15 +184,21 @@ def main():
exit(check_update())
# Sanity checks
if args.android and args.iphone:
if args.android and args.iphone and args.exported and args.import_json:
print("You must define only one device type.")
exit(1)
if not args.android and not args.iphone:
if not args.android and not args.iphone and not args.exported and not args.import_json:
print("You must define the device type.")
exit(1)
if args.no_html and not args.json:
print("You must either specify a JSON output file or enable HTML output.")
exit(1)
if args.import_json and (args.android or args.iphone or args.exported or args.no_html):
print("You can only use --import with -j and without --no-html.")
exit(1)
elif args.import_json and not os.path.isfile(args.json):
print("JSON file not found.")
exit(1)
data = {}
@@ -212,17 +252,25 @@ def main():
with sqlite3.connect(contact_db) as db:
db.row_factory = sqlite3.Row
contacts(db, data)
elif args.iphone:
import sys
if "--iphone" in sys.argv:
print("WARNING: The --iphone flag is deprecated and will be removed in the future. Use --ios instead.")
print(
"WARNING: The --iphone flag is deprecated and will"
"be removed in the future. Use --ios instead."
)
contacts = extract_iphone.contacts
messages = extract_iphone.messages
media = extract_iphone.media
vcard = extract_iphone.vcard
create_html = extract_iphone.create_html
create_html = extract.create_html
if args.media is None:
args.media = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
if args.backup is not None:
extract_iphone_media.extract_media(args.backup)
if not os.path.isdir(args.media):
extract_iphone_media.extract_media(args.backup)
else:
print("WhatsApp directory already exists, skipping WhatsApp file extraction.")
if args.db is None:
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
else:
@@ -231,17 +279,58 @@ def main():
contact_db = "ContactsV2.sqlite"
else:
contact_db = args.wa
if args.media is None:
args.media = "Message"
if os.path.isfile(contact_db):
with sqlite3.connect(contact_db) as db:
db.row_factory = sqlite3.Row
contacts(db, data)
if os.path.isfile(msg_db):
with sqlite3.connect(msg_db) as db:
db.row_factory = sqlite3.Row
messages(db, data)
media(db, data, args.media)
vcard(db, data)
if not args.exported and not args.import_json:
if os.path.isfile(msg_db):
with sqlite3.connect(msg_db) as db:
db.row_factory = sqlite3.Row
messages(db, data, args.media)
media(db, data, args.media)
vcard(db, data)
if args.android:
extract.calls(db, data)
if not args.no_html:
create_html(
data,
args.output,
args.template,
args.embedded,
args.offline,
args.size,
args.no_avatar
)
else:
print(
"The message database does not exist. You may specify the path "
"to database file with option -d or check your provided path."
)
exit(2)
if os.path.isdir(args.media):
media_path = os.path.join(args.output, args.media)
if os.path.isdir(media_path):
print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n")
else:
if not args.move_media:
if os.path.isdir(media_path):
print("\nWhatsApp directory already exists in output directory. Skipping...", end="\n")
else:
print("\nCopying media directory...", end="\n")
shutil.copytree(args.media, media_path)
else:
try:
shutil.move(args.media, f"{args.output}/")
except PermissionError:
print("\nCannot remove original WhatsApp directory. "
"Perhaps the directory is opened?", end="\n")
elif args.exported:
extract_exported.messages(args.exported, data, args.assume_first_as_me)
if not args.no_html:
create_html(
extract.create_html(
data,
args.output,
args.template,
@@ -249,28 +338,20 @@ def main():
args.offline,
args.size
)
else:
print(
"The message database does not exist. You may specify the path "
"to database file with option -d or check your provided path."
for file in glob.glob(r'*.*'):
shutil.copy(file, args.output)
elif args.import_json:
import_from_json(args.json, data)
extract.create_html(
data,
args.output,
args.template,
args.embedded,
args.offline,
args.size
)
exit(2)
if os.path.isdir(args.media):
if os.path.isdir(f"{args.output}/{args.media}"):
print("Media directory already exists in output directory. Skipping...")
else:
if not args.move_media:
print("Copying media directory...")
shutil.copytree(args.media, f"{args.output}/WhatsApp")
else:
try:
shutil.move(args.media, f"{args.output}/")
except PermissionError:
print("Cannot remove original WhatsApp directory. "
"Perhaps the directory is opened?")
if args.json:
if args.json and not args.import_json:
if isinstance(data[next(iter(data))], ChatStore):
data = {jik: chat.to_json() for jik, chat in data.items()}
with open(args.json, "w") as f:

View File

@@ -1,17 +1,34 @@
#!/usr/bin/python3
import os
from datetime import datetime
from typing import Union
class ChatStore():
def __init__(self, name=None):
def __init__(self, type, name=None, media=None):
if name is not None and not isinstance(name, str):
raise TypeError("Name must be a string or None")
self.name = name
self.messages = {}
self.type = type
if media is not None:
from Whatsapp_Chat_Exporter.utility import Device
if self.type == Device.IOS:
self.my_avatar = os.path.join(media, "Media/Profile/Photo.jpg")
elif self.type == Device.ANDROID:
self.my_avatar = None # TODO: Add Android support
else:
self.my_avatar = None
else:
self.my_avatar = None
self.their_avatar = None
self.their_avatar_thumb = None
self.status = None
def add_message(self, id, message):
if not isinstance(message, Message):
raise TypeError("Chat must be a Chat object")
raise TypeError("message must be a Message object")
self.messages[id] = message
def delete_message(self, id):
@@ -19,23 +36,47 @@ class ChatStore():
del self.messages[id]
def to_json(self):
serialized_msgs = {id : msg.to_json() for id,msg in self.messages.items()}
return {'name' : self.name, 'messages' : serialized_msgs}
serialized_msgs = {id: msg.to_json() for id, msg in self.messages.items()}
return {
'name': self.name,
'type': self.type,
'my_avatar': self.my_avatar,
'their_avatar': self.their_avatar,
'their_avatar_thumb': self.their_avatar_thumb,
'status': self.status,
'messages': serialized_msgs
}
def get_last_message(self):
return tuple(self.messages.values())[-1]
def get_messages(self):
return self.messages.values()
class Message():
def __init__(self, from_me: Union[bool,int], timestamp: int, time: str, key_id: int):
def __init__(self, from_me: Union[bool,int], timestamp: int, time: Union[int,float,str], key_id: int):
self.from_me = bool(from_me)
self.timestamp = timestamp / 1000 if timestamp > 9999999999 else timestamp
self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M")
if isinstance(time, int) or isinstance(time, float):
self.time = datetime.fromtimestamp(time/1000).strftime("%H:%M")
elif isinstance(time, str):
self.time = time
else:
raise TypeError("Time must be a string or integer")
self.media = False
self.key_id = key_id
self.meta = False
self.data = None
self.sender = None
self.safe = False
self.mime = None
# Extra
self.reply = None
self.quoted_data = None
self.caption = None
self.thumb = None # Android specific
self.sticker = False
def to_json(self):
return {
@@ -47,7 +88,11 @@ class Message():
'meta' : self.meta,
'data' : self.data,
'sender' : self.sender,
'safe' : self.safe,
'mime' : self.mime,
'reply' : self.reply,
'quoted_data' : self.quoted_data,
'caption' : self.caption
'caption' : self.caption,
'thumb' : self.thumb,
'sticker' : self.sticker
}

View File

@@ -11,8 +11,10 @@ import hmac
from pathlib import Path
from mimetypes import MimeTypes
from hashlib import sha256
from base64 import b64decode, b64encode
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import sanitize_except, determine_day, Crypt
from Whatsapp_Chat_Exporter.utility import MAX_SIZE, ROW_SIZE, determine_metadata, get_status_location
from Whatsapp_Chat_Exporter.utility import rendering, Crypt, Device, get_file_name, setup_template
from Whatsapp_Chat_Exporter.utility import brute_force_offset, CRYPT14_OFFSETS
try:
@@ -49,7 +51,7 @@ def _extract_encrypted_key(keyfile):
key_stream += byte.to_bytes(1, "big", signed=True)
return _generate_hmac_of_hmac(key_stream)
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False):
if not support_backup:
@@ -82,7 +84,7 @@ def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=Fals
raise ValueError("The crypt15 file must be at least 131 bytes")
t1 = t2 = None
iv = database[8:24]
db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type
db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type
db_ciphertext = database[db_offset:]
if t1 != t2:
@@ -130,7 +132,7 @@ def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=Fals
f"The offsets of your IV and database are {start_iv} and "
f"{start_db}, respectively. To include your offsets in the "
"program, please report it by creating an issue on GitHub: "
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues/new"
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/discussions/47"
)
break
if not decompressed:
@@ -152,16 +154,18 @@ def contacts(db, data):
c = db.cursor()
c.execute("""SELECT count() FROM wa_contacts""")
total_row_number = c.fetchone()[0]
print(f"Gathering contacts...({total_row_number})")
print(f"Processing contacts...({total_row_number})")
c.execute("""SELECT jid, display_name FROM wa_contacts; """)
c.execute("""SELECT jid, COALESCE(display_name, wa_name) as display_name, status FROM wa_contacts; """)
row = c.fetchone()
while row is not None:
data[row["jid"]] = ChatStore(row["display_name"])
data[row["jid"]] = ChatStore(Device.ANDROID, row["display_name"])
if row["status"] is not None:
data[row["jid"]].status = row["status"]
row = c.fetchone()
def messages(db, data):
def messages(db, data, media_folder):
# Get message history
c = db.cursor()
try:
@@ -169,7 +173,7 @@ def messages(db, data):
except sqlite3.OperationalError:
c.execute("""SELECT count() FROM message""")
total_row_number = c.fetchone()[0]
print(f"Gathering messages...(0/{total_row_number})", end="\r")
print(f"Processing messages...(0/{total_row_number})", end="\r")
phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net")
try:
@@ -182,16 +186,38 @@ def messages(db, data):
messages.edit_version,
messages.thumb_image,
messages.remote_resource,
messages.media_wa_type,
CAST(messages.media_wa_type as INTEGER) as media_wa_type,
messages.latitude,
messages.longitude,
messages_quotes.key_id as quoted,
messages.key_id,
messages_quotes.data as quoted_data,
messages.media_caption
messages.media_caption,
missed_call_logs.video_call,
chat.subject as chat_subject,
message_system.action_type,
message_system_group.is_me_joined,
jid_old.raw_string as old_jid,
jid_new.raw_string as new_jid
FROM messages
LEFT JOIN messages_quotes
ON messages.quoted_row_id = messages_quotes._id
LEFT JOIN missed_call_logs
ON messages._id = missed_call_logs.message_row_id
INNER JOIN jid jid_global
ON messages.key_remote_jid = jid_global.raw_string
LEFT JOIN chat
ON chat.jid_row_id = jid_global._id
LEFT JOIN message_system
ON message_system.message_row_id = messages._id
LEFT JOIN message_system_group
ON message_system_group.message_row_id = messages._id
LEFT JOIN message_system_number_change
ON message_system_number_change.message_row_id = messages._id
LEFT JOIN jid jid_old
ON jid_old._id = message_system_number_change.old_jid_row_id
LEFT JOIN jid jid_new
ON jid_new._id = message_system_number_change.new_jid_row_id
WHERE messages.key_remote_jid <> '-1';"""
)
except sqlite3.OperationalError:
@@ -205,15 +231,20 @@ def messages(db, data):
message_future.version as edit_version,
message_thumbnail.thumbnail as thumb_image,
message_media.file_path as remote_resource,
message_media.mime_type as media_wa_type,
message_location.latitude,
message_location.longitude,
message_quoted.key_id as quoted,
message.key_id,
message_quoted.text_data as quoted_data,
message.message_type,
message.message_type as media_wa_type,
jid_group.raw_string as group_sender_jid,
chat.subject as chat_subject
chat.subject as chat_subject,
missed_call_logs.video_call,
message.sender_jid_row_id,
message_system.action_type,
message_system_group.is_me_joined,
jid_old.raw_string as old_jid,
jid_new.raw_string as new_jid
FROM message
LEFT JOIN message_quoted
ON message_quoted.message_row_id = message._id
@@ -231,6 +262,18 @@ def messages(db, data):
ON jid_global._id = chat.jid_row_id
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
LEFT JOIN missed_call_logs
ON message._id = missed_call_logs.message_row_id
LEFT JOIN message_system
ON message_system.message_row_id = message._id
LEFT JOIN message_system_group
ON message_system_group.message_row_id = message._id
LEFT JOIN message_system_number_change
ON message_system_number_change.message_row_id = message._id
LEFT JOIN jid jid_old
ON jid_old._id = message_system_number_change.old_jid_row_id
LEFT JOIN jid jid_new
ON jid_new._id = message_system_number_change.new_jid_row_id
WHERE key_remote_jid <> '-1';"""
)
except Exception as e:
@@ -240,115 +283,123 @@ def messages(db, data):
else:
table_message = False
i = 0
content = c.fetchone()
while True:
try:
content = c.fetchone()
except sqlite3.OperationalError:
continue
else:
break
while content is not None:
if content["key_remote_jid"] not in data:
data[content["key_remote_jid"]] = ChatStore()
data[content["key_remote_jid"]] = ChatStore(Device.ANDROID, content["chat_subject"])
if content["key_remote_jid"] is None:
continue # Not sure
data[content["key_remote_jid"]].add_message(content["_id"], Message(
from_me=content["key_from_me"],
continue # Not sure
if "sender_jid_row_id" in content:
sender_jid_row_id = content["sender_jid_row_id"]
else:
sender_jid_row_id = None
message = Message(
from_me=not sender_jid_row_id and content["key_from_me"],
timestamp=content["timestamp"],
time=content["timestamp"],
key_id=content["key_id"],
))
)
if isinstance(content["data"], bytes):
message.data = ("The message is binary data and its base64 is "
'<a href="https://gchq.github.io/CyberChef/#recipe=From_Base64'
"('A-Za-z0-9%2B/%3D',true,false)Text_Encoding_Brute_Force"
f"""('Decode')&input={b64encode(b64encode(content["data"])).decode()}">""")
message.data += b64encode(content["data"]).decode("utf-8") + "</a>"
message.safe = message.meta = True
data[content["key_remote_jid"]].add_message(content["_id"], message)
i += 1
content = c.fetchone()
continue
if "-" in content["key_remote_jid"] and content["key_from_me"] == 0:
name = None
name = fallback = None
if table_message:
if content["chat_subject"] is not None:
if content["sender_jid_row_id"] > 0:
_jid = content["group_sender_jid"]
else:
_jid = content["key_remote_jid"]
if _jid in data:
name = data[_jid].name
fallback = _jid.split('@')[0] if "@" in _jid else None
else:
fallback = None
if _jid in data:
name = data[_jid].name
if "@" in _jid:
fallback = _jid.split('@')[0]
else:
if content["remote_resource"] in data:
name = data[content["remote_resource"]].name
if content["remote_resource"] is not None:
if content["remote_resource"] in data:
name = data[content["remote_resource"]].name
if "@" in content["remote_resource"]:
fallback = content["remote_resource"].split('@')[0]
else:
fallback = None
else:
fallback = None
data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback
message.sender = name or fallback
else:
data[content["key_remote_jid"]].messages[content["_id"]].sender = None
message.sender = None
if content["quoted"] is not None:
data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"]
data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"]
message.reply = content["quoted"]
if content["quoted_data"] is not None and len(content["quoted_data"]) > 200:
message.quoted_data = content["quoted_data"][:201] + "..."
else:
message.quoted_data = content["quoted_data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].reply = None
message.reply = None
if not table_message and content["media_caption"] is not None:
# Old schema
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["media_caption"]
elif table_message and content["message_type"] == 1 and content["data"] is not None:
message.caption = content["media_caption"]
elif table_message and content["media_wa_type"] == 1 and content["data"] is not None:
# New schema
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["data"]
message.caption = content["data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].caption = None
message.caption = None
if content["status"] == 6: # 6 = Metadata, otherwise it's a message
if (not table_message and "-" in content["key_remote_jid"]) or \
(table_message and content["chat_subject"] is not None):
# Is Group
if content["data"] is not None:
try:
int(content["data"])
except ValueError:
msg = f"The group name changed to {content['data']}"
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
data[content["key_remote_jid"]].delete_message(content["_id"])
if content["status"] == 6: # 6 = Metadata, otherwise assume a message
message.meta = True
name = fallback = None
if table_message:
if content["sender_jid_row_id"] > 0:
_jid = content["group_sender_jid"]
if _jid in data:
name = data[_jid].name
if "@" in _jid:
fallback = _jid.split('@')[0]
else:
thumb_image = content["thumb_image"]
if thumb_image is not None:
if b"\x00\x00\x01\x74\x00\x1A" in thumb_image:
# Add user
added = phone_number_re.search(
thumb_image.decode("unicode_escape"))[0]
if added in data:
name_right = data[added].name
else:
name_right = added.split('@')[0]
if content["remote_resource"] is not None:
if content["remote_resource"] in data:
name_left = data[content["remote_resource"]].name
else:
name_left = content["remote_resource"].split('@')[0]
msg = f"{name_left} added {name_right or 'You'}"
else:
msg = f"Added {name_right or 'You'}"
elif b"\xac\xed\x00\x05\x74\x00" in thumb_image:
# Changed number
original = content["remote_resource"].split('@')[0]
changed = thumb_image[7:].decode().split('@')[0]
msg = f"{original} changed to {changed}"
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content["data"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
name = "You"
else:
# Private chat
if content["data"] is None and content["thumb_image"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
_jid = content["remote_resource"]
if _jid is not None:
if _jid in data:
name = data[_jid].name
if "@" in _jid:
fallback = _jid.split('@')[0]
else:
name = "You"
message.data = determine_metadata(content, name or fallback)
if isinstance(message.data, str) and "<br>" in message.data:
message.safe = True
if message.data is None:
if content["video_call"] is not None: # Missed call
message.meta = True
if content["video_call"] == 1:
message.data = "A video call was missed"
elif content["video_call"] == 0:
message.data = "A voice call was missed"
elif content["data"] is None and content["thumb_image"] is None:
message.meta = True
message.data = None
else:
# Real message
if content["media_wa_type"] == 20: # Sticker is a message
message.sticker = True
if content["key_from_me"] == 1:
if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["message_type"] == 15:
if content["status"] == 5 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15:
msg = "Message deleted"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
message.meta = True
else:
if content["media_wa_type"] == "5":
if content["media_wa_type"] == 5:
msg = f"Location shared: {content['latitude'], content['longitude']}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
message.meta = True
else:
msg = content["data"]
if msg is not None:
@@ -357,13 +408,13 @@ def messages(db, data):
if "\n" in msg:
msg = msg.replace("\n", "<br>")
else:
if content["status"] == 0 and content["edit_version"] == 7 or table_message and content["message_type"] == 15:
if content["status"] == 0 and content["edit_version"] == 7 or table_message and content["media_wa_type"] == 15:
msg = "Message deleted"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
message.meta = True
else:
if content["media_wa_type"] == "5":
if content["media_wa_type"] == 5:
msg = f"Location shared: {content['latitude'], content['longitude']}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
message.meta = True
else:
msg = content["data"]
if msg is not None:
@@ -371,14 +422,20 @@ def messages(db, data):
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
message.data = msg
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].add_message(content["_id"], message)
i += 1
if i % 1000 == 0:
print(f"Gathering messages...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(f"Gathering messages...({total_row_number}/{total_row_number})", end="\r")
print(f"Processing messages...({i}/{total_row_number})", end="\r")
while True:
try:
content = c.fetchone()
except sqlite3.OperationalError:
continue
else:
break
print(f"Processing messages...({total_row_number}/{total_row_number})", end="\r")
def media(db, data, media_folder):
@@ -386,7 +443,7 @@ def media(db, data, media_folder):
c = db.cursor()
c.execute("""SELECT count() FROM message_media""")
total_row_number = c.fetchone()[0]
print(f"\nGathering media...(0/{total_row_number})", end="\r")
print(f"\nProcessing media...(0/{total_row_number})", end="\r")
i = 0
try:
c.execute("""SELECT messages.key_remote_jid,
@@ -394,10 +451,14 @@ def media(db, data, media_folder):
file_path,
message_url,
mime_type,
media_key
media_key,
file_hash,
thumbnail
FROM message_media
INNER JOIN messages
ON message_media.message_row_id = messages._id
LEFT JOIN media_hash_thumbnail
ON message_media.file_hash = media_hash_thumbnail.media_hash
ORDER BY messages.key_remote_jid ASC"""
)
except sqlite3.OperationalError:
@@ -406,7 +467,9 @@ def media(db, data, media_folder):
file_path,
message_url,
mime_type,
media_key
media_key,
file_hash,
thumbnail
FROM message_media
INNER JOIN message
ON message_media.message_row_id = message._id
@@ -414,43 +477,55 @@ def media(db, data, media_folder):
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
LEFT JOIN media_hash_thumbnail
ON message_media.file_hash = media_hash_thumbnail.media_hash
ORDER BY jid.raw_string ASC"""
)
content = c.fetchone()
mime = MimeTypes()
if not os.path.isdir(f"{media_folder}/thumbnails"):
Path(f"{media_folder}/thumbnails").mkdir(parents=True, exist_ok=True)
while content is not None:
file_path = f"{media_folder}/{content['file_path']}"
data[content["key_remote_jid"]].messages[content["message_row_id"]].media = True
message = data[content["key_remote_jid"]].messages[content["message_row_id"]]
message.media = True
if os.path.isfile(file_path):
data[content["key_remote_jid"]].messages[content["message_row_id"]].data = file_path
message.data = file_path
if content["mime_type"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = guess
message.mime = guess
else:
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = "data/data"
message.mime = "application/octet-stream"
else:
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = content["mime_type"]
message.mime = content["mime_type"]
else:
# if "https://mmg" in content[4]:
# try:
# r = requests.get(content[3])
# if r.status_code != 200:
# raise RuntimeError()
# except:
# data[content[0]]["messages"][content[1]]["data"] = "{The media is missing}"
# data[content[0]]["messages"][content[1]]["media"] = True
# data[content[0]]["messages"][content[1]]["mime"] = "media"
# else:
data[content["key_remote_jid"]].messages[content["message_row_id"]].data = "The media is missing"
data[content["key_remote_jid"]].messages[content["message_row_id"]].mime = "media"
data[content["key_remote_jid"]].messages[content["message_row_id"]].meta = True
if False: # Block execution
try:
r = requests.get(content["message_url"])
if r.status_code != 200:
raise RuntimeError()
except:
message.data = "The media is missing"
message.mime = "media"
message.meta = True
else:
...
message.data = "The media is missing"
message.mime = "media"
message.meta = True
if content["thumbnail"] is not None:
thumb_path = f"{media_folder}/thumbnails/{b64decode(content['file_hash']).hex()}.png"
if not os.path.isfile(thumb_path):
with open(thumb_path, "wb") as f:
f.write(content["thumbnail"])
message.thumb = thumb_path
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
print(f"Processing media...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(
f"Gathering media...({total_row_number}/{total_row_number})", end="\r")
f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data):
@@ -479,26 +554,83 @@ def vcard(db, data):
ON jid._id = chat.jid_row_id
ORDER BY message.chat_row_id ASC;"""
)
rows = c.fetchall()
total_row_number = len(rows)
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
base = "WhatsApp/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
for index, row in enumerate(rows):
media_name = row["media_name"] if row["media_name"] is not None else ""
file_name = "".join(x for x in media_name if x.isalnum())
file_path = f"{base}/{file_name}.vcf"
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf")
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(row["vcard"])
data[row["key_remote_jid"]].messages[row["message_row_id"]].data = media_name + \
message = data[row["key_remote_jid"]].messages[row["message_row_id"]]
message.data = media_name + \
"The vCard file cannot be displayed here, " \
f"however it should be located at {file_path}"
data[row["key_remote_jid"]].messages[row["message_row_id"]].mime = "text/x-vcard"
data[row["key_remote_jid"]].messages[row["message_row_id"]].meta = True
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
message.mime = "text/x-vcard"
message.meta = True
print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r")
def calls(db, data):
c = db.cursor()
c.execute("""SELECT count() FROM call_log""")
total_row_number = c.fetchone()[0]
if total_row_number == 0:
return
print(f"\nProcessing calls...({total_row_number})", end="\r")
c.execute("""SELECT call_log._id,
jid.raw_string,
from_me,
call_id,
timestamp,
video_call,
duration,
call_result,
bytes_transferred
FROM call_log
INNER JOIN jid
ON call_log.jid_row_id = jid._id"""
)
chat = ChatStore(Device.ANDROID, "WhatsApp Calls")
content = c.fetchone()
while content is not None:
call = Message(
from_me=content["from_me"],
timestamp=content["timestamp"],
time=content["timestamp"],
key_id=content["call_id"],
)
_jid = content["raw_string"]
if _jid in data:
name = data[_jid].name
fallback = _jid.split('@')[0] if "@" in _jid else None
call.sender = name or fallback
call.meta = True
call.data = (
f"A {'video' if content['video_call'] else 'voice'} "
f"call {'to' if call.from_me else 'from'} "
f"{name or fallback} was "
)
if content['call_result'] == 2:
call.data += "not answered." if call.from_me else "missed."
elif content['call_result'] == 3:
call.data += "unavailable."
elif content['call_result'] == 5:
call.data += (
f"initiated and lasted for {content['duration']} second(s) "
f"with {content['bytes_transferred']} bytes transferred."
)
chat.add_message(content["_id"], call)
content = c.fetchone()
data["000000000000000"] = chat
def create_html(
@@ -507,125 +639,83 @@ def create_html(
template=None,
embedded=False,
offline_static=False,
maximum_size=None
maximum_size=None,
no_avatar=False
):
if template is None:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html"
else:
template_dir = os.path.dirname(template)
template_file = os.path.basename(template)
templateLoader = jinja2.FileSystemLoader(searchpath=template_dir)
templateEnv = jinja2.Environment(loader=templateLoader)
templateEnv.globals.update(determine_day=determine_day)
templateEnv.filters['sanitize_except'] = sanitize_except
template = templateEnv.get_template(template_file)
template = setup_template(template, no_avatar)
total_row_number = len(data)
print(f"\nCreating HTML...(0/{total_row_number})", end="\r")
print(f"\nGenerating chats...(0/{total_row_number})", end="\r")
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if offline_static:
import urllib.request
static_folder = os.path.join(output_folder, offline_static)
if not os.path.isdir(static_folder):
os.mkdir(static_folder)
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f:
f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
w3css = get_status_location(output_folder, offline_static)
for current, contact in enumerate(data):
if len(data[contact].messages) == 0:
chat = data[contact]
if len(chat.messages) == 0:
continue
phone_number = contact.split('@')[0]
if "-" in contact:
file_name = ""
else:
file_name = phone_number
safe_file_name, name = get_file_name(contact, chat)
if data[contact].name is not None:
if file_name != "":
file_name += "-"
file_name += data[contact].name.replace("/", "-")
name = data[contact].name
if maximum_size is not None:
current_size = 0
current_page = 1
render_box = []
if maximum_size == 0:
maximum_size = MAX_SIZE
last_msg = chat.get_last_message().key_id
for message in chat.get_messages():
if message.data is not None and not message.meta and not message.media:
current_size += len(message.data) + ROW_SIZE
else:
current_size += ROW_SIZE + 100 # Assume media and meta HTML are 100 bytes
if current_size > maximum_size:
output_file_name = f"{output_folder}/{safe_file_name}-{current_page}.html"
rendering(
output_file_name,
template,
name,
render_box,
contact,
w3css,
f"{safe_file_name}-{current_page + 1}.html",
chat
)
render_box = [message]
current_size = 0
current_page += 1
else:
if message.key_id == last_msg:
if current_page == 1:
output_file_name = f"{output_folder}/{safe_file_name}.html"
else:
output_file_name = f"{output_folder}/{safe_file_name}-{current_page}.html"
rendering(
output_file_name,
template,
name,
render_box,
contact,
w3css,
False,
chat
)
else:
render_box.append(message)
else:
name = phone_number
safe_file_name = ''
safe_file_name = "".join(x for x in file_name if x.isalnum() or x in "- ")
with open(f"{output_folder}/{safe_file_name}.html", "w", encoding="utf-8") as f:
f.write(
template.render(
name=name,
msgs=data[contact].messages.values(),
my_avatar=None,
their_avatar=f"WhatsApp/Avatars/{contact}.j",
w3css=w3css
)
output_file_name = f"{output_folder}/{safe_file_name}.html"
rendering(
output_file_name,
template,
name,
chat.get_messages(),
contact,
w3css,
False,
chat
)
if current % 10 == 0:
print(f"Creating HTML...({current}/{total_row_number})", end="\r")
print(f"Generating chats...({current}/{total_row_number})", end="\r")
print(f"Creating HTML...({total_row_number}/{total_row_number})", end="\r")
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser()
parser.add_option(
"-w",
"--wa",
dest="wa",
default="wa.db",
help="Path to contact database")
parser.add_option(
"-m",
"--media",
dest="media",
default="WhatsApp",
help="Path to WhatsApp media folder"
)
# parser.add_option(
# "-t",
# "--template",
# dest="html",
# default="wa.db",
# help="Path to HTML template")
(options, args) = parser.parse_args()
msg_db = "msgstore.db"
output_folder = "temp"
contact_db = options.wa
media_folder = options.media
if len(args) == 1:
msg_db = args[0]
elif len(args) == 2:
msg_db = args[0]
output_folder = args[1]
data = {}
if os.path.isfile(contact_db):
with sqlite3.connect(contact_db) as db:
contacts(db, data)
if os.path.isfile(msg_db):
with sqlite3.connect(msg_db) as db:
messages(db, data)
media(db, data, media_folder)
vcard(db, data)
create_html(data, output_folder)
if not os.path.isdir(f"{output_folder}/WhatsApp"):
shutil.move(media_folder, f"{output_folder}/")
with open("result.json", "w") as f:
data = json.dumps(data)
print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)")
f.write(data)
print("Everything is done!")
print(f"Generating chats...({total_row_number}/{total_row_number})", end="\r")

View File

@@ -0,0 +1,92 @@
#!/usr/bin/python3
import os
from datetime import datetime
from mimetypes import MimeTypes
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import Device
def messages(path, data, assume_first_as_me=False):
"""Extracts messages from the exported file"""
with open(path, "r", encoding="utf8") as file:
you = ""
data["ExportedChat"] = ChatStore(Device.EXPORTED)
chat = data["ExportedChat"]
total_row_number = len(file.readlines())
file.seek(0)
for index, line in enumerate(file):
if len(line.split(" - ")) > 1:
time = line.split(" - ")[0]
if ":" not in line.split(time)[1]:
msg.data = line.split(time)[1][3:]
msg.meta = True
else:
name = line.split(time)[1].split(":")[0]
message = line.split(time)[1].split(name + ":")[1].strip()
name = name[3:]
if you == "":
if chat.name is None:
if not assume_first_as_me:
while True:
ans = input(f"Is '{name}' you? (Y/N)").lower()
if ans == "y":
you = name
break
elif ans == "n":
chat.name = name
break
else:
you = name
else:
if name != chat.name:
you = name
elif chat.name is None:
if name != you:
chat.name = name
msg = Message(
you == name,
datetime.strptime(time, "%d/%m/%Y, %H:%M").timestamp(),
time.split(", ")[1].strip(),
index
)
if "<Media omitted>" in message:
msg.data = "The media is omitted in the chat"
msg.mime = "media"
msg.meta = True
elif "(file attached)" in message:
mime = MimeTypes()
msg.media = True
file_path = os.path.join(os.path.dirname(path), message.split("(file attached)")[0].strip())
if os.path.isfile(file_path):
msg.data = file_path
guess = mime.guess_type(file_path)[0]
if guess is not None:
msg.mime = guess
else:
msg.mime = "application/octet-stream"
else:
msg.data = "The media is missing"
msg.mime = "media"
msg.meta = True
else:
msg.data = message
if "\r\n" in message:
msg.data = message.replace("\r\n", "<br>")
if "\n" in message:
msg.data = message.replace("\n", "<br>")
chat.add_message(index, msg)
else:
lookback = index - 1
while lookback not in chat.messages:
lookback -= 1
msg = chat.messages[lookback]
if msg.media:
msg.caption = line.strip()
else:
msg.data += "<br>" + line.strip()
if index % 1000 == 0:
print(f"Processing messages & media...({index}/{total_row_number})", end="\r")
print(f"Processing messages & media...({total_row_number}/{total_row_number})", end="\r")
return data

View File

@@ -1,123 +1,185 @@
#!/usr/bin/python3
import sqlite3
import json
import jinja2
import os
import shutil
from glob import glob
from pathlib import Path
from datetime import datetime
from mimetypes import MimeTypes
from Whatsapp_Chat_Exporter.utility import sanitize_except, determine_day, APPLE_TIME
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
from Whatsapp_Chat_Exporter.utility import APPLE_TIME, Device
def messages(db, data):
def contacts(db, data):
c = db.cursor()
# Get status only lol
c.execute("""SELECT count() FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
total_row_number = c.fetchone()[0]
print(f"Pre-processing contacts...({total_row_number})")
c.execute("""SELECT ZWHATSAPPID, ZABOUTTEXT FROM ZWAADDRESSBOOKCONTACT WHERE ZABOUTTEXT IS NOT NULL""")
content = c.fetchone()
while content is not None:
if not content["ZWHATSAPPID"].endswith("@s.whatsapp.net"):
_id = content["ZWHATSAPPID"] + "@s.whatsapp.net"
data[_id] = ChatStore(Device.IOS)
data[_id].status = content["ZABOUTTEXT"]
content = c.fetchone()
def messages(db, data, media_folder):
c = db.cursor()
# Get contacts
c.execute("""SELECT count() FROM ZWACHATSESSION""")
total_row_number = c.fetchone()[0]
print(f"Gathering contacts...({total_row_number})")
print(f"Processing contacts...({total_row_number})")
c.execute("""SELECT ZCONTACTJID, ZPARTNERNAME FROM ZWACHATSESSION; """)
row = c.fetchone()
while row is not None:
data[row[0]] = {"name": row[1], "messages": {}}
row = c.fetchone()
c.execute(
"""SELECT ZCONTACTJID,
ZPARTNERNAME,
ZPUSHNAME
FROM ZWACHATSESSION
LEFT JOIN ZWAPROFILEPUSHNAME
ON ZWACHATSESSION.ZCONTACTJID = ZWAPROFILEPUSHNAME.ZJID;"""
)
content = c.fetchone()
while content is not None:
is_phone = content["ZPARTNERNAME"].replace("+", "").replace(" ", "").isdigit()
if content["ZPUSHNAME"] is None or (content["ZPUSHNAME"] and not is_phone):
contact_name = content["ZPARTNERNAME"]
else:
contact_name = content["ZPUSHNAME"]
contact_id = content["ZCONTACTJID"]
if contact_id not in data:
data[contact_id] = ChatStore(Device.IOS, contact_name, media_folder)
else:
data[contact_id].name = contact_name
data[contact_id].my_avatar = os.path.join(media_folder, "Media/Profile/Photo.jpg")
path = f'{media_folder}/Media/Profile/{contact_id.split("@")[0]}'
avatars = glob(f"{path}*")
if 0 < len(avatars) <= 1:
data[contact_id].their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb") and data[content["ZCONTACTJID"]].their_avatar_thumb is None:
data[contact_id].their_avatar_thumb = avatar
elif avatar.endswith(".jpg") and data[content["ZCONTACTJID"]].their_avatar is None:
data[contact_id].their_avatar = avatar
content = c.fetchone()
# Get message history
c.execute("""SELECT count() FROM ZWAMESSAGE""")
total_row_number = c.fetchone()[0]
print(f"Gathering messages...(0/{total_row_number})", end="\r")
print(f"Processing messages...(0/{total_row_number})", end="\r")
c.execute("""SELECT COALESCE(ZFROMJID, ZTOJID),
c.execute("""SELECT COALESCE(ZFROMJID, ZTOJID) as _id,
ZWAMESSAGE.Z_PK,
ZISFROMME,
ZMESSAGEDATE,
ZTEXT,
ZMESSAGETYPE,
ZWAGROUPMEMBER.ZMEMBERJID
FROM main.ZWAMESSAGE
LEFT JOIN main.ZWAGROUPMEMBER
ON main.ZWAMESSAGE.ZGROUPMEMBER = main.ZWAGROUPMEMBER.Z_PK;""")
ZWAGROUPMEMBER.ZMEMBERJID,
ZMETADATA,
ZSTANZAID
FROM ZWAMESSAGE
LEFT JOIN ZWAGROUPMEMBER
ON ZWAMESSAGE.ZGROUPMEMBER = ZWAGROUPMEMBER.Z_PK
LEFT JOIN ZWAMEDIAITEM
ON ZWAMESSAGE.Z_PK = ZWAMEDIAITEM.ZMESSAGE;""")
i = 0
content = c.fetchone()
while content is not None:
if content[0] not in data:
data[content[0]] = {"name": None, "messages": {}}
ts = APPLE_TIME + content[3]
data[content[0]]["messages"][content[1]] = {
"from_me": bool(content[2]),
"timestamp": ts,
"time": datetime.fromtimestamp(ts).strftime("%H:%M"),
"media": False,
"reply": None,
"caption": None,
"meta": False,
"data": None
}
if "-" in content[0] and content[2] == 0:
_id = content["_id"]
Z_PK = content["Z_PK"]
if _id not in data:
data[_id] = ChatStore(Device.IOS)
path = f'{media_folder}/Media/Profile/{_id.split("@")[0]}'
avatars = glob(f"{path}*")
if 0 < len(avatars) <= 1:
data[_id].their_avatar = avatars[0]
else:
for avatar in avatars:
if avatar.endswith(".thumb"):
data[_id].their_avatar_thumb = avatar
elif avatar.endswith(".jpg"):
data[_id].their_avatar = avatar
ts = APPLE_TIME + content["ZMESSAGEDATE"]
message = Message(
from_me=content["ZISFROMME"],
timestamp=ts,
time=ts, # TODO: Could be bug
key_id=content["ZSTANZAID"][:17],
)
invalid = False
if "-" in _id and content["ZISFROMME"] == 0:
name = None
if content[6] is not None:
if content[6] in data:
name = data[content[6]]["name"]
if "@" in content[6]:
fallback = content[6].split('@')[0]
if content["ZMEMBERJID"] is not None:
if content["ZMEMBERJID"] in data:
name = data[content["ZMEMBERJID"]].name
if "@" in content["ZMEMBERJID"]:
fallback = content["ZMEMBERJID"].split('@')[0]
else:
fallback = None
else:
fallback = None
data[content[0]]["messages"][content[1]]["sender"] = name or fallback
message.sender = name or fallback
else:
data[content[0]]["messages"][content[1]]["sender"] = None
if content[5] == 6:
message.sender = None
if content["ZMESSAGETYPE"] == 6:
# Metadata
if "-" in content[0]:
if "-" in _id:
# Group
if content[4] is not None:
if content["ZTEXT"] is not None:
# Chnaged name
try:
int(content[4])
int(content["ZTEXT"])
except ValueError:
msg = f"The group name changed to {content[4]}"
data[content[0]]["messages"][content[1]]["data"] = msg
data[content[0]]["messages"][content[1]]["meta"] = True
msg = f"The group name changed to {content['ZTEXT']}"
message.data = msg
message.meta = True
else:
del data[content[0]]["messages"][content[1]]
invalid = True
else:
data[content[0]]["messages"][content[1]]["data"] = None
message.data = None
else:
data[content[0]]["messages"][content[1]]["data"] = None
message.data = None
else:
# real message
if content[2] == 1:
if content[5] == 14:
if content["ZMETADATA"] is not None and content["ZMETADATA"].startswith(b"\x2a\x14"):
quoted = content["ZMETADATA"][2:19]
message.reply = quoted.decode()
message.quoted_data = None # TODO
if content["ZMESSAGETYPE"] == 15: # Sticker
message.sticker = True
if content["ZISFROMME"] == 1:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
data[content[0]]["messages"][content[1]]["meta"] = True
message.meta = True
else:
msg = content[4]
msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
else:
if content[5] == 14:
if content["ZMESSAGETYPE"] == 14:
msg = "Message deleted"
data[content[0]]["messages"][content[1]]["meta"] = True
message.meta = True
else:
msg = content[4]
msg = content["ZTEXT"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
data[content[0]]["messages"][content[1]]["data"] = msg
message.data = msg
if not invalid:
data[_id].add_message(Z_PK, message)
i += 1
if i % 1000 == 0:
print(f"Gathering messages...({i}/{total_row_number})", end="\r")
print(f"Processing messages...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(
f"Gathering messages...({total_row_number}/{total_row_number})", end="\r")
f"Processing messages...({total_row_number}/{total_row_number})", end="\r")
def media(db, data, media_folder):
@@ -125,7 +187,7 @@ def media(db, data, media_folder):
# Get media
c.execute("""SELECT count() FROM ZWAMEDIAITEM""")
total_row_number = c.fetchone()[0]
print(f"\nGathering media...(0/{total_row_number})", end="\r")
print(f"\nProcessing media...(0/{total_row_number})", end="\r")
i = 0
c.execute("""SELECT COALESCE(ZWAMESSAGE.ZFROMJID, ZWAMESSAGE.ZTOJID) as _id,
ZMESSAGE,
@@ -142,40 +204,44 @@ def media(db, data, media_folder):
content = c.fetchone()
mime = MimeTypes()
while content is not None:
file_path = f"{media_folder}/{content[2]}"
data[content[0]]["messages"][content[1]]["media"] = True
file_path = f"{media_folder}/Message/{content['ZMEDIALOCALPATH']}"
_id = content["_id"]
ZMESSAGE = content["ZMESSAGE"]
message = data[_id].messages[ZMESSAGE]
message.media = True
if os.path.isfile(file_path):
data[content[0]]["messages"][content[1]]["data"] = file_path
if content[4] is None:
message.data = file_path
if content["ZVCARDSTRING"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
data[content[0]]["messages"][content[1]]["mime"] = guess
message.mime = guess
else:
data[content[0]]["messages"][content[1]]["mime"] = "data/data"
message.mime = "application/octet-stream"
else:
data[content[0]]["messages"][content[1]]["mime"] = content[4]
message.mime = content["ZVCARDSTRING"]
else:
# if "https://mmg" in content[4]:
# try:
# r = requests.get(content[3])
# if r.status_code != 200:
# raise RuntimeError()
# except:
# data[content[0]]["messages"][content[1]]["data"] = "{The media is missing}"
# data[content[0]]["messages"][content[1]]["mime"] = "media"
# else:
data[content[0]]["messages"][content[1]]["data"] = "The media is missing"
data[content[0]]["messages"][content[1]]["mime"] = "media"
data[content[0]]["messages"][content[1]]["meta"] = True
if content[6] is not None:
data[content[0]]["messages"][content[1]]["caption"] = content[6]
if False: # Block execution
try:
r = requests.get(content["ZMEDIAURL"])
if r.status_code != 200:
raise RuntimeError()
except:
message.data = "The media is missing"
message.mime = "media"
message.meta = True
else:
...
message.data = "The media is missing"
message.mime = "media"
message.meta = True
if content["ZTITLE"] is not None:
message.caption = content["ZTITLE"]
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
print(f"Processing media...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(
f"Gathering media...({total_row_number}/{total_row_number})", end="\r")
f"Processing media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data):
@@ -191,142 +257,24 @@ def vcard(db, data):
ON ZWAVCARDMENTION.ZMEDIAITEM = ZWAMEDIAITEM.Z_PK
INNER JOIN ZWAMESSAGE
ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK""")
rows = c.fetchall()
total_row_number = len(rows)
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
base = "Message/vCards"
contents = c.fetchall()
total_row_number = len(contents)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
base = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared/Message/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
for index, row in enumerate(rows):
file_name = "".join(x for x in row[3] if x.isalnum())
file_path = f"{base}/{file_name[:200]}.vcf"
for index, content in enumerate(contents):
file_name = "".join(x for x in content["ZVCARDNAME"] if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf")
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(row[4])
data[row[2]]["messages"][row[1]]["data"] = row[3] + \
f.write(content["ZVCARDSTRING"])
message = data[content["_id"]].messages[content["ZMESSAGE"]]
message.data = content["ZVCARDNAME"] + \
"The vCard file cannot be displayed here, " \
f"however it should be located at {file_path}"
data[row[2]]["messages"][row[1]]["mime"] = "text/x-vcard"
data[row[2]]["messages"][row[1]]["media"] = True
data[row[2]]["messages"][row[1]]["meta"] = True
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
def create_html(data, output_folder, template=None, embedded=False, offline_static=False):
if template is None:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html"
else:
template_dir = os.path.dirname(template)
template_file = os.path.basename(template)
templateLoader = jinja2.FileSystemLoader(searchpath=template_dir)
templateEnv = jinja2.Environment(loader=templateLoader)
templateEnv.globals.update(determine_day=determine_day)
templateEnv.filters['sanitize_except'] = sanitize_except
template = templateEnv.get_template(template_file)
total_row_number = len(data)
print(f"\nCreating HTML...(0/{total_row_number})", end="\r")
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if offline_static:
import urllib.request
static_folder = os.path.join(output_folder, offline_static)
if not os.path.isdir(static_folder):
os.mkdir(static_folder)
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f: f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
for current, contact in enumerate(data):
if len(data[contact]["messages"]) == 0:
continue
phone_number = contact.split('@')[0]
if "-" in contact:
file_name = ""
else:
file_name = phone_number
if data[contact]["name"] is not None:
if file_name != "":
file_name += "-"
file_name += data[contact]["name"].replace("/", "-")
name = data[contact]["name"]
else:
name = phone_number
safe_file_name = ''
safe_file_name = "".join(x for x in file_name if x.isalnum() or x in "- ")
with open(f"{output_folder}/{safe_file_name}.html", "w", encoding="utf-8") as f:
f.write(
template.render(
name=name,
msgs=data[contact]["messages"].values(),
my_avatar=None,
their_avatar=f"WhatsApp/Avatars/{contact}.j",
w3css=w3css
)
)
if current % 10 == 0:
print(f"Creating HTML...({current}/{total_row_number})", end="\r")
print(f"Creating HTML...({total_row_number}/{total_row_number})", end="\r")
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser()
parser.add_option(
"-w",
"--wa",
dest="wa",
default="wa.db",
help="Path to contact database")
parser.add_option(
"-m",
"--media",
dest="media",
default="Message",
help="Path to WhatsApp media folder"
)
# parser.add_option(
# "-t",
# "--template",
# dest="html",
# default="wa.db",
# help="Path to HTML template")
(options, args) = parser.parse_args()
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
output_folder = "temp"
contact_db = options.wa
media_folder = options.media
if len(args) == 1:
msg_db = args[0]
elif len(args) == 2:
msg_db = args[0]
output_folder = args[1]
data = {}
if os.path.isfile(msg_db):
with sqlite3.connect(msg_db) as db:
messages(db, data)
media(db, data, media_folder)
vcard(db, data)
create_html(data, output_folder)
if not os.path.isdir(f"{output_folder}/WhatsApp"):
shutil.move(media_folder, f"{output_folder}/")
with open("result.json", "w") as f:
data = json.dumps(data)
print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)")
f.write(data)
print("Everything is done!")
message.mime = "text/x-vcard"
message.media = True
message.meta = True
print(f"Processing vCards...({index + 1}/{total_row_number})", end="\r")

View File

@@ -3,9 +3,12 @@
import shutil
import sqlite3
import os
import time
import getpass
import threading
try:
from iphone_backup_decrypt import EncryptedBackup, RelativePath
from iphone_backup_decrypt import FailedToDecryptError, Domain
except ModuleNotFoundError:
support_encrypted = False
else:
@@ -13,60 +16,45 @@ else:
def extract_encrypted(base_dir, password):
backup = EncryptedBackup(backup_directory=base_dir, passphrase=password)
backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False)
print("Decrypting WhatsApp database...")
backup.extract_file(relative_path=RelativePath.WHATSAPP_MESSAGES,
try:
backup.extract_file(relative_path=RelativePath.WHATSAPP_MESSAGES,
output_filename="7c7fba66680ef796b916b067077cc246adacf01d")
backup.extract_file(relative_path=RelativePath.WHATSAPP_CONTACTS,
output_filename="ContactsV2.sqlite")
data = backup.execute_sql("""SELECT count()
FROM Files
WHERE relativePath
LIKE 'Message/Media/%'"""
)
total_row_number = data[0][0]
print(f"Gathering media...(0/{total_row_number})", end="\r")
data = backup.execute_sql("""SELECT fileID,
relativePath,
flags,
file
FROM Files
WHERE relativePath
LIKE 'Message/Media/%'"""
)
if not os.path.isdir("Message"):
os.mkdir("Message")
if not os.path.isdir("Message/Media"):
os.mkdir("Message/Media")
i = 0
for row in data:
destination = row[1]
hashes = row[0]
folder = hashes[:2]
flags = row[2]
file = row[3]
if flags == 2:
try:
os.mkdir(destination)
except FileExistsError:
pass
elif flags == 1:
decrypted = backup.decrypt_inner_file(file_id=hashes, file_bplist=file)
with open(destination, "wb") as f:
f.write(decrypted)
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
print(f"Gathering media...({total_row_number}/{total_row_number})", end="\r")
backup.extract_file(relative_path=RelativePath.WHATSAPP_CONTACTS,
output_filename="b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
except FailedToDecryptError:
print("Failed to decrypt backup: incorrect password?")
exit()
extract_thread = threading.Thread(
target=backup.extract_files_by_domain,
args=(Domain.WHATSAPP, Domain.WHATSAPP)
)
extract_thread.daemon = True
extract_thread.start()
dot = 0
while extract_thread.is_alive():
print(f"Decrypting and extracting files{'.' * dot}{' ' * (3 - dot)}", end="\r")
if dot < 3:
dot += 1
time.sleep(0.5)
else:
dot = 0
time.sleep(0.4)
print(f"All required files decrypted and extracted.", end="\n")
extract_thread.handled = True
return backup
def is_encrypted(base_dir):
with sqlite3.connect(f"{base_dir}/Manifest.db") as f:
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as f:
c = f.cursor()
try:
c.execute("""SELECT count()
FROM Files
""")
except sqlite3.OperationalError as e:
raise e # These error cannot be used to determine if the backup is encrypted
except sqlite3.DatabaseError:
return True
else:
@@ -80,57 +68,59 @@ def extract_media(base_dir):
print("Read more on how to deal with encrypted backup:")
print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage")
return False
password = getpass.getpass("Enter the password:")
print("Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:")
extract_encrypted(base_dir, password)
else:
wts_db = os.path.join(base_dir, "7c/7c7fba66680ef796b916b067077cc246adacf01d")
contact_db = os.path.join(base_dir, "b8/b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
if not os.path.isfile(wts_db):
print("WhatsApp database not found.")
exit()
else:
shutil.copyfile(wts_db, "7c7fba66680ef796b916b067077cc246adacf01d")
with sqlite3.connect(f"{base_dir}/Manifest.db") as manifest:
if not os.path.isfile(contact_db):
print("Contact database not found.")
exit()
else:
shutil.copyfile(contact_db, "b8548dc30aa1030df0ce18ef08b882cf7ab5212f")
_wts_id = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
manifest.row_factory = sqlite3.Row
c = manifest.cursor()
c.execute("""SELECT count()
FROM Files
WHERE relativePath
LIKE 'Message/Media/%'""")
c.execute(
f"""SELECT count()
FROM Files
WHERE domain = '{_wts_id}'"""
)
total_row_number = c.fetchone()[0]
print(f"Gathering media...(0/{total_row_number})", end="\r")
c.execute("""SELECT fileID,
print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r")
c.execute(f"""SELECT fileID,
relativePath,
flags
flags,
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
FROM Files
WHERE relativePath
LIKE 'Message/Media/%'""")
WHERE domain = '{_wts_id}'
ORDER BY relativePath""")
if not os.path.isdir(_wts_id):
os.mkdir(_wts_id)
row = c.fetchone()
if not os.path.isdir("Message"):
os.mkdir("Message")
if not os.path.isdir("Message/Media"):
os.mkdir("Message/Media")
i = 0
while row is not None:
destination = row[1]
hashes = row[0]
if row["relativePath"] == "":
row = c.fetchone()
continue
destination = os.path.join(_wts_id, row["relativePath"])
hashes = row["fileID"]
folder = hashes[:2]
flags = row[2]
flags = row["flags"]
if flags == 2:
try:
os.mkdir(destination)
except FileExistsError:
pass
elif flags == 1:
shutil.copyfile(f"{base_dir}/{folder}/{hashes}", destination)
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
shutil.copyfile(os.path.join(base_dir, folder, hashes), destination)
if row["_index"] % 100 == 0:
print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r")
row = c.fetchone()
print(f"Gathering media...({total_row_number}/{total_row_number})", end="\r")
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser()
(_, args) = parser.parse_args()
base_dir = args[0]
extract_media(base_dir)
print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n")

View File

@@ -1,588 +0,0 @@
#!/usr/bin/python3
import sqlite3
import json
import jinja2
import os
import shutil
import re
import io
import hmac
from pathlib import Path
from bleach import clean as sanitize
from markupsafe import Markup
from datetime import datetime
from enum import Enum
from mimetypes import MimeTypes
from hashlib import sha256
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
try:
import zlib
from Crypto.Cipher import AES
except ModuleNotFoundError:
support_backup = False
else:
support_backup = True
try:
import javaobj
except ModuleNotFoundError:
support_crypt15 = False
else:
support_crypt15 = True
def sanitize_except(html):
return Markup(sanitize(html, tags=["br"]))
def determine_day(last, current):
last = datetime.fromtimestamp(last).date()
current = datetime.fromtimestamp(current).date()
if last == current:
return None
else:
return current
CRYPT14_OFFSETS = (
{"iv": 67, "db": 191},
{"iv": 67, "db": 190},
{"iv": 66, "db": 99},
{"iv": 67, "db": 193}
)
class Crypt(Enum):
CRYPT15 = 15
CRYPT14 = 14
CRYPT12 = 12
def brute_force_offset():
for iv in range(0, 200):
for db in range(0, 200):
yield iv, iv + 16, db
def _generate_hmac_of_hmac(key_stream):
key = hmac.new(
hmac.new(
b'\x00' * 32,
key_stream,
sha256
).digest(),
b"backup encryption\x01",
sha256
)
return key.digest(), key_stream
def _extract_encrypted_key(keyfile):
key_stream = b""
for byte in javaobj.loads(keyfile):
key_stream += byte.to_bytes(1, "big", signed=True)
return _generate_hmac_of_hmac(key_stream)
def decrypt_backup(database, key, output, crypt=Crypt.CRYPT14, show_crypt15=False):
if not support_backup:
return 1
if isinstance(key, io.IOBase):
key = key.read()
if crypt is not Crypt.CRYPT15:
t1 = key[30:62]
if crypt is not Crypt.CRYPT15 and len(key) != 158:
raise ValueError("The key file must be 158 bytes")
if crypt == Crypt.CRYPT14:
if len(database) < 191:
raise ValueError("The crypt14 file must be at least 191 bytes")
current_try = 0
offsets = CRYPT14_OFFSETS[current_try]
t2 = database[15:47]
iv = database[offsets["iv"]:offsets["iv"] + 16]
db_ciphertext = database[offsets["db"]:]
elif crypt == Crypt.CRYPT12:
if len(database) < 67:
raise ValueError("The crypt12 file must be at least 67 bytes")
t2 = database[3:35]
iv = database[51:67]
db_ciphertext = database[67:-20]
elif crypt == Crypt.CRYPT15:
if not support_crypt15:
return 1
if len(database) < 131:
raise ValueError("The crypt15 file must be at least 131 bytes")
t1 = t2 = None
iv = database[8:24]
db_offset = database[0] + 2 # Skip protobuf + protobuf size and backup type
db_ciphertext = database[db_offset:]
if t1 != t2:
raise ValueError("The signature of key file and backup file mismatch")
if crypt == Crypt.CRYPT15:
if len(key) == 32:
main_key, hex_key = _generate_hmac_of_hmac(key)
else:
main_key, hex_key = _extract_encrypted_key(key)
if show_crypt15:
hex_key = [hex_key.hex()[c:c+4] for c in range(0, len(hex_key.hex()), 4)]
print("The HEX key of the crypt15 backup is: " + ' '.join(hex_key))
else:
main_key = key[126:]
decompressed = False
while not decompressed:
cipher = AES.new(main_key, AES.MODE_GCM, iv)
db_compressed = cipher.decrypt(db_ciphertext)
try:
db = zlib.decompress(db_compressed)
except zlib.error:
if crypt == Crypt.CRYPT14:
current_try += 1
if current_try < len(CRYPT14_OFFSETS):
offsets = CRYPT14_OFFSETS[current_try]
iv = database[offsets["iv"]:offsets["iv"] + 16]
db_ciphertext = database[offsets["db"]:]
continue
else:
print("Common offsets are not applicable to "
"your backup. Trying to brute force it...")
for start_iv, end_iv, start_db in brute_force_offset():
iv = database[start_iv:end_iv]
db_ciphertext = database[start_db:]
cipher = AES.new(main_key, AES.MODE_GCM, iv)
db_compressed = cipher.decrypt(db_ciphertext)
try:
db = zlib.decompress(db_compressed)
except zlib.error:
continue
else:
decompressed = True
print(
f"The offsets of your IV and database are {start_iv} and "
f"{start_db}, respectively. To include your offsets in the "
"program, please report it by creating an issue on GitHub: "
"https://github.com/KnugiHK/Whatsapp-Chat-Exporter/issues/new"
)
break
if not decompressed:
return 2
else:
return 3
else:
decompressed = True
if db[0:6].upper() == b"SQLITE":
with open(output, "wb") as f:
f.write(db)
return 0
else:
raise ValueError("The plaintext is not a SQLite database. Did you use the key to encrypt something...")
def contacts(db, data):
# Get contacts
c = db.cursor()
c.execute("""SELECT count() FROM wa_contacts""")
total_row_number = c.fetchone()[0]
print(f"Gathering contacts...({total_row_number})")
c.execute("""SELECT jid, display_name FROM wa_contacts; """)
row = c.fetchone()
while row is not None:
data[row["jid"]] = ChatStore(row["display_name"])
row = c.fetchone()
def messages(db, data):
# Get message history
c = db.cursor()
c.execute("""SELECT count() FROM message""")
total_row_number = c.fetchone()[0]
print(f"Gathering messages...(0/{total_row_number})", end="\r")
phone_number_re = re.compile(r"[0-9]+@s.whatsapp.net")
c.execute("""SELECT jid_global.raw_string as key_remote_jid,
message._id,
message.from_me as key_from_me,
message.timestamp,
message.text_data as data,
message.status,
message_future.version as edit_version,
message_thumbnail.thumbnail as thumb_image,
message_media.file_path as remote_resource,
message_media.mime_type as media_wa_type,
message_location.latitude,
message_location.longitude,
message_quoted.key_id as quoted,
message.key_id,
message_quoted.text_data as quoted_data,
message.message_type,
jid_group.raw_string as group_sender_jid,
chat.subject as chat_subject
FROM message
LEFT JOIN message_quoted
ON message_quoted.message_row_id = message._id
LEFT JOIN message_location
ON message_location.message_row_id = message._id
LEFT JOIN message_media
ON message_media.message_row_id = message._id
LEFT JOIN message_thumbnail
ON message_thumbnail.message_row_id = message._id
LEFT JOIN message_future
ON message_future.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid jid_global
ON jid_global._id = chat.jid_row_id
LEFT JOIN jid jid_group
ON jid_group._id = message.sender_jid_row_id
WHERE key_remote_jid <> '-1';""")
i = 0
content = c.fetchone()
while content is not None:
if content["key_remote_jid"] not in data:
data[content["key_remote_jid"]] = ChatStore()
if content["key_remote_jid"] is None:
continue
data[content["key_remote_jid"]].add_message(content["_id"], Message(
from_me=content["key_from_me"],
timestamp=content["timestamp"],
time=content["timestamp"],
key_id=content["key_id"],
))
if "-" in content["key_remote_jid"] and content["key_from_me"] == 0:
name = None
if content["chat_subject"] is not None:
_jid = content["group_sender_jid"]
else:
_jid = content["key_remote_jid"]
if _jid in data:
name = data[_jid].name
fallback = _jid.split('@')[0] if "@" in _jid else None
else:
fallback = None
data[content["key_remote_jid"]].messages[content["_id"]].sender = name or fallback
else:
data[content["key_remote_jid"]].messages[content["_id"]].sender = None
if content["quoted"] is not None:
data[content["key_remote_jid"]].messages[content["_id"]].reply = content["quoted"]
data[content["key_remote_jid"]].messages[content["_id"]].quoted_data = content["quoted_data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].reply = None
if content["message_type"] == 1:
data[content["key_remote_jid"]].messages[content["_id"]].caption = content["data"]
else:
data[content["key_remote_jid"]].messages[content["_id"]].caption = None
if content["status"] == 6:
if content["chat_subject"] is not None:
# Is Group
if content["data"] is not None:
try:
int(content["data"])
except ValueError:
msg = f"The group name changed to {content['data']}"
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
thumb_image = content["thumb_image"]
if thumb_image is not None:
if b"\x00\x00\x01\x74\x00\x1A" in thumb_image:
# Add user
added = phone_number_re.search(
thumb_image.decode("unicode_escape"))[0]
if added in data:
name_right = data[added]["name"]
else:
name_right = added.split('@')[0]
if content["remote_resource"] is not None:
if content["remote_resource"] in data:
name_left = data[content["remote_resource"]]["name"]
else:
name_left = content["remote_resource"].split('@')[0]
msg = f"{name_left} added {name_right or 'You'}"
else:
msg = f"Added {name_right or 'You'}"
elif b"\xac\xed\x00\x05\x74\x00" in thumb_image:
# Changed number
original = content["remote_resource"].split('@')[0]
changed = thumb_image[7:].decode().split('@')[0]
msg = f"{original} changed to {changed}"
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content["data"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
# Private chat
if content["data"] is None and content["thumb_image"] is None:
data[content["key_remote_jid"]].delete_message(content["_id"])
else:
if content["key_from_me"] == 1:
if content["status"] == 5 and content["edit_version"] == 7:
msg = "Message deleted"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content["media_wa_type"] == "5":
msg = f"Location shared: {content[10], content[11]}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
msg = content["data"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
else:
if content["status"] == 0 and content["edit_version"] == 7:
msg = "Message deleted"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
if content["media_wa_type"] == "5":
msg = f"Location shared: {content[10], content[11]}"
data[content["key_remote_jid"]].messages[content["_id"]].meta = True
else:
msg = content["data"]
if msg is not None:
if "\r\n" in msg:
msg = msg.replace("\r\n", "<br>")
if "\n" in msg:
msg = msg.replace("\n", "<br>")
data[content["key_remote_jid"]].messages[content["_id"]].data = msg
i += 1
if i % 1000 == 0:
print(f"Gathering messages...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(f"Gathering messages...({total_row_number}/{total_row_number})", end="\r")
def media(db, data, media_folder):
# Get media
c = db.cursor()
c.execute("""SELECT count() FROM message_media""")
total_row_number = c.fetchone()[0]
print(f"\nGathering media...(0/{total_row_number})", end="\r")
i = 0
c.execute("""SELECT jid.raw_string,
message_row_id,
file_path,
message_url,
mime_type,
media_key
FROM message_media
INNER JOIN message
ON message_media.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
ORDER BY jid.raw_string ASC""")
content = c.fetchone()
mime = MimeTypes()
while content is not None:
file_path = f"{media_folder}/{content['file_path']}"
data[content["raw_string"]].messages[content["message_row_id"]].media = True
if os.path.isfile(file_path):
data[content["raw_string"]].messages[content["message_row_id"]].data = file_path
if content["mime_type"] is None:
guess = mime.guess_type(file_path)[0]
if guess is not None:
data[content["raw_string"]].messages[content["message_row_id"]].mime = guess
else:
data[content["raw_string"]].messages[content["message_row_id"]].mime = "data/data"
else:
data[content["raw_string"]].messages[content["message_row_id"]].mime = content["mime_type"]
else:
# if "https://mmg" in content["mime_type"]:
# try:
# r = requests.get(content["message_url"])
# if r.status_code != 200:
# raise RuntimeError()
# except:
# data[content["raw_string"]].messages[content["message_row_id"]].data = "{The media is missing}"
# data[content["raw_string"]].messages[content["message_row_id"]].media = True
# data[content["raw_string"]].messages[content["message_row_id"]].mime = "media"
# else:
data[content["raw_string"]].messages[content["message_row_id"]].data = "The media is missing"
data[content["raw_string"]].messages[content["message_row_id"]].mime = "media"
data[content["raw_string"]].messages[content["message_row_id"]].meta = True
i += 1
if i % 100 == 0:
print(f"Gathering media...({i}/{total_row_number})", end="\r")
content = c.fetchone()
print(
f"Gathering media...({total_row_number}/{total_row_number})", end="\r")
def vcard(db, data):
c = db.cursor()
c.execute("""SELECT message_row_id,
jid.raw_string,
vcard,
message.text_data
FROM message_vcard
INNER JOIN message
ON message_vcard.message_row_id = message._id
LEFT JOIN chat
ON chat._id = message.chat_row_id
INNER JOIN jid
ON jid._id = chat.jid_row_id
ORDER BY message.chat_row_id ASC;""")
rows = c.fetchall()
total_row_number = len(rows)
print(f"\nGathering vCards...(0/{total_row_number})", end="\r")
base = "WhatsApp/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
for index, row in enumerate(rows):
media_name = row["text_data"] if row["text_data"] else ""
file_name = "".join(x for x in media_name if x.isalnum())
file_path = f"{base}/{file_name}.vcf"
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(row["vcard"])
data[row["raw_string"]].messages[row["message_row_id"]].data = media_name + \
"The vCard file cannot be displayed here, " \
f"however it should be located at {file_path}"
data[row["raw_string"]].messages[row["message_row_id"]].mime = "text/x-vcard"
data[row["raw_string"]].messages[row["message_row_id"]].meta = True
print(f"Gathering vCards...({index + 1}/{total_row_number})", end="\r")
def create_html(
data,
output_folder,
template=None,
embedded=False,
offline_static=False,
maximum_size=None
):
if template is None:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html"
else:
template_dir = os.path.dirname(template)
template_file = os.path.basename(template)
templateLoader = jinja2.FileSystemLoader(searchpath=template_dir)
templateEnv = jinja2.Environment(loader=templateLoader)
templateEnv.globals.update(determine_day=determine_day)
templateEnv.filters['sanitize_except'] = sanitize_except
template = templateEnv.get_template(template_file)
total_row_number = len(data)
print(f"\nCreating HTML...(0/{total_row_number})", end="\r")
if not os.path.isdir(output_folder):
os.mkdir(output_folder)
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if offline_static:
import urllib.request
static_folder = os.path.join(output_folder, offline_static)
if not os.path.isdir(static_folder):
os.mkdir(static_folder)
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f: f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
for current, contact in enumerate(data):
if len(data[contact].messages) == 0:
continue
phone_number = contact.split('@')[0]
if "-" in contact:
file_name = ""
else:
file_name = phone_number
if data[contact].name is not None:
if file_name != "":
file_name += "-"
file_name += data[contact].name.replace("/", "-")
name = data[contact].name
else:
name = phone_number
safe_file_name = "".join(x for x in file_name if x.isalnum() or x in "- ")
with open(f"{output_folder}/{safe_file_name}.html", "w", encoding="utf-8") as f:
f.write(
template.render(
name=name,
msgs=data[contact].messages.values(),
my_avatar=None,
their_avatar=f"WhatsApp/Avatars/{contact}.j",
w3css=w3css
)
)
if current % 10 == 0:
print(f"Creating HTML...({current}/{total_row_number})", end="\r")
print(f"Creating HTML...({total_row_number}/{total_row_number})", end="\r")
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser()
parser.add_option(
"-w",
"--wa",
dest="wa",
default="wa.db",
help="Path to contact database")
parser.add_option(
"-m",
"--media",
dest="media",
default="WhatsApp",
help="Path to WhatsApp media folder"
)
# parser.add_option(
# "-t",
# "--template",
# dest="html",
# default="wa.db",
# help="Path to HTML template")
(options, args) = parser.parse_args()
msg_db = "msgstore.db"
output_folder = "temp"
contact_db = options.wa
media_folder = options.media
if len(args) == 1:
msg_db = args[0]
elif len(args) == 2:
msg_db = args[0]
output_folder = args[1]
data = {}
if os.path.isfile(contact_db):
with sqlite3.connect(contact_db) as db:
contacts(db, data)
if os.path.isfile(msg_db):
with sqlite3.connect(msg_db) as db:
messages(db, data)
media(db, data, media_folder)
vcard(db, data)
create_html(data, output_folder)
if not os.path.isdir(f"{output_folder}/WhatsApp"):
shutil.move(media_folder, f"{output_folder}/")
with open("result.json", "w") as f:
data = json.dumps(data)
print(f"\nWriting JSON file...({int(len(data)/1024/1024)}MB)")
f.write(data)
print("Everything is done!")

View File

@@ -1,7 +1,22 @@
import jinja2
import json
import os
from bleach import clean as sanitize
from markupsafe import Markup
from datetime import datetime
from enum import Enum
from enum import IntEnum
from Whatsapp_Chat_Exporter.data_model import ChatStore
try:
from enum import StrEnum
except ImportError:
# < Python 3.11
from enum import Enum
class StrEnum(str, Enum):
pass
MAX_SIZE = 4 * 1024 * 1024 # Default 4MB
ROW_SIZE = 0x3D0
def sanitize_except(html):
@@ -17,27 +32,6 @@ def determine_day(last, current):
return current
# Android Specific
CRYPT14_OFFSETS = (
{"iv": 67, "db": 191},
{"iv": 67, "db": 190},
{"iv": 66, "db": 99},
{"iv": 67, "db": 193}
)
class Crypt(Enum):
CRYPT15 = 15
CRYPT14 = 14
CRYPT12 = 12
def brute_force_offset(max_iv=200, max_db=200):
for iv in range(0, max_iv):
for db in range(0, max_db):
yield iv, iv + 16, db
def check_update():
import urllib.request
import json
@@ -69,6 +63,230 @@ def check_update():
print("You are using the latest version of WhatsApp Chat Exporter.")
return 0
# iOS Specific
def rendering(
output_file_name,
template,
name,
msgs,
contact,
w3css,
next,
chat,
):
if chat.their_avatar_thumb is None and chat.their_avatar is not None:
their_avatar_thumb = chat.their_avatar
else:
their_avatar_thumb = chat.their_avatar_thumb
with open(output_file_name, "w", encoding="utf-8") as f:
f.write(
template.render(
name=name,
msgs=msgs,
my_avatar=chat.my_avatar,
their_avatar=chat.their_avatar,
their_avatar_thumb=their_avatar_thumb,
w3css=w3css,
next=next,
status=chat.status,
)
)
class Device(StrEnum):
IOS = "ios"
ANDROID = "android"
EXPORTED = "exported"
def import_from_json(json_file, data):
from Whatsapp_Chat_Exporter.data_model import ChatStore, Message
with open(json_file, "r") as f:
temp_data = json.loads(f.read())
total_row_number = len(tuple(temp_data.keys()))
print(f"Importing chats from JSON...(0/{total_row_number})", end="\r")
for index, (jid, chat_data) in enumerate(temp_data.items()):
chat = ChatStore(chat_data.get("type"), chat_data.get("name"))
chat.my_avatar = chat_data.get("my_avatar")
chat.their_avatar = chat_data.get("their_avatar")
chat.their_avatar_thumb = chat_data.get("their_avatar_thumb")
chat.status = chat_data.get("status")
for id, msg in chat_data.get("messages").items():
message = Message(
msg["from_me"],
msg["timestamp"],
msg["time"],
msg["key_id"],
)
message.media = msg.get("media")
message.meta = msg.get("meta")
message.data = msg.get("data")
message.sender = msg.get("sender")
message.safe = msg.get("safe")
message.mime = msg.get("mime")
message.reply = msg.get("reply")
message.quoted_data = msg.get("quoted_data")
message.caption = msg.get("caption")
message.thumb = msg.get("thumb")
message.sticker = msg.get("sticker")
chat.add_message(id, message)
data[jid] = chat
print(f"Importing chats from JSON...({index + 1}/{total_row_number})", end="\r")
def get_file_name(contact: str, chat: ChatStore):
if "@" not in contact and contact not in ("000000000000000", "000000000000001", "ExportedChat"):
raise ValueError("Unexpected contact format: " + contact)
phone_number = contact.split('@')[0]
if "-" in contact:
file_name = ""
else:
file_name = phone_number
if chat.name is not None:
if file_name != "":
file_name += "-"
file_name += chat.name.replace("/", "-")
name = chat.name
else:
name = phone_number
return "".join(x for x in file_name if x.isalnum() or x in "- "), name
# Android Specific
CRYPT14_OFFSETS = (
{"iv": 67, "db": 191},
{"iv": 67, "db": 190},
{"iv": 66, "db": 99},
{"iv": 67, "db": 193},
{"iv": 67, "db": 194},
)
class Crypt(IntEnum):
CRYPT15 = 15
CRYPT14 = 14
CRYPT12 = 12
def brute_force_offset(max_iv=200, max_db=200):
for iv in range(0, max_iv):
for db in range(0, max_db):
yield iv, iv + 16, db
def determine_metadata(content, init_msg):
msg = init_msg if init_msg else ""
if content["is_me_joined"] == 1: # Override
return f"You were added into the group by {msg}"
if content["action_type"] == 1:
msg += f''' changed the group name to "{content['data']}"'''
elif content["action_type"] == 4:
msg += " was added to the group"
elif content["action_type"] == 5:
msg += " left the group"
elif content["action_type"] == 6:
msg += f" changed the group icon"
elif content["action_type"] == 7:
msg = "You were removed"
elif content["action_type"] == 8:
msg += ("WhatsApp Internal Error Occurred: "
"you cannot send message to this group")
elif content["action_type"] == 9:
msg += " created a broadcast channel"
elif content["action_type"] == 10:
try:
old = content['old_jid'].split('@')[0]
new = content['new_jid'].split('@')[0]
except (AttributeError, IndexError):
return None
else:
msg = f"{old} changed their number to {new}"
elif content["action_type"] == 11:
msg += f''' created a group with name: "{content['data']}"'''
elif content["action_type"] == 12:
msg += f" added someone" # TODO: Find out who
elif content["action_type"] == 13:
return # Someone left the group
elif content["action_type"] == 14:
msg += f" removed someone" # TODO: Find out who
elif content["action_type"] == 15:
return # Someone promoted someone as an admin
elif content["action_type"] == 18:
if msg != "You":
msg = f"The security code between you and {msg} changed"
else:
msg = "The security code in this chat changed"
elif content["action_type"] == 19:
msg = "This chat is now end-to-end encrypted"
elif content["action_type"] == 20:
msg = "Someone joined this group by using a invite link" # TODO: Find out who
elif content["action_type"] == 27:
msg += " changed the group description to:<br>"
msg += content['data'].replace("\n", '<br>')
elif content["action_type"] == 28:
try:
old = content['old_jid'].split('@')[0]
new = content['new_jid'].split('@')[0]
except (AttributeError, IndexError):
return None
else:
msg = f"{old} changed their number to {new}"
elif content["action_type"] == 46:
return # Voice message in PM??? Seems no need to handle.
elif content["action_type"] == 47:
msg = "The contact is an official business account"
elif content["action_type"] == 50:
msg = "The contact's account type changed from business to standard"
elif content["action_type"] == 56:
msg = "Messgae timer was enabled/updated/disabled"
elif content["action_type"] == 57:
if msg != "You":
msg = f"The security code between you and {msg} changed"
else:
msg = "The security code in this chat changed"
elif content["action_type"] == 58:
msg = "You blocked this contact"
elif content["action_type"] == 67:
return # (PM) this contact use secure service from Facebook???
elif content["action_type"] == 69:
return # (PM) this contact use secure service from Facebook??? What's the difference with 67????
else:
return # Unsupported
return msg
def get_status_location(output_folder, offline_static):
w3css = "https://www.w3schools.com/w3css/4/w3.css"
if not offline_static:
return w3css
import urllib.request
static_folder = os.path.join(output_folder, offline_static)
if not os.path.isdir(static_folder):
os.mkdir(static_folder)
w3css_path = os.path.join(static_folder, "w3.css")
if not os.path.isfile(w3css_path):
with urllib.request.urlopen(w3css) as resp:
with open(w3css_path, "wb") as f: f.write(resp.read())
w3css = os.path.join(offline_static, "w3.css")
def setup_template(template, no_avatar):
if template is None:
template_dir = os.path.dirname(__file__)
template_file = "whatsapp.html"
else:
template_dir = os.path.dirname(template)
template_file = os.path.basename(template)
template_loader = jinja2.FileSystemLoader(searchpath=template_dir)
template_env = jinja2.Environment(loader=template_loader, autoescape=True)
template_env.globals.update(
determine_day=determine_day,
no_avatar=no_avatar
)
template_env.filters['sanitize_except'] = sanitize_except
return template_env.get_template(template_file)
# iOS Specific
APPLE_TIME = datetime.timestamp(datetime(2001, 1, 1))

View File

@@ -33,54 +33,118 @@
img, video {
max-width:100%;
}
a.anchor {
display: block;
position: relative;
top: -100px;
visibility: hidden;
}
div.reply{
font-size: 13px;
text-decoration: none;
}
div:target::before {
content: '';
display: block;
height: 115px;
margin-top: -115px;
visibility: hidden;
}
div:target {
border-style: solid;
border-width: 2px;
animation: border-blink 0.5s steps(1) 5;
border-color: rgba(0,0,0,0)
}
table {
width: 100%;
}
@keyframes border-blink {
0% {
border-color: #2196F3;
}
50% {
border-color: rgba(0,0,0,0);
}
}
.avatar {
border-radius:50%;
overflow:hidden;
max-width: 64px;
max-height: 64px;
}
.name {
color: #3892da;
}
.pad-left-10 {
padding-left: 10px;
}
.pad-right-10 {
padding-right: 10px;
}
.reply_link {
color: #168acc;
}
.blue {
color: #70777a;
}
.sticker {
max-width: 100px !important;
max-height: 100px !important;
}
</style>
</head>
<body>
<header class="w3-center w3-top">Chat history with {{ name }}</header>
<header class="w3-center w3-top">
Chat history with {{ name }}
{% if status is not none %}
<br>
<span class="w3-small">{{ status }}</span>
{% endif %}
</header>
<article class="w3-container">
<div class="table" style="width:100%">
<div class="table">
{% set last = {'last': 946688461.001} %}
{% for msg in msgs -%}
<div class="w3-row" style="padding-bottom: 10px">
<a class="anchor" id="{{ msg.key_id }}"></a>
<div class="w3-row w3-padding-small w3-margin-bottom" id="{{ msg.key_id }}">
{% if determine_day(last.last, msg.timestamp) is not none %}
<div class="w3-center" style="color:#70777c;padding: 10px 0 10px 0;">{{ determine_day(last.last, msg.timestamp) }}</div>
<div class="w3-center w3-padding-16 blue">{{ determine_day(last.last, msg.timestamp) }}</div>
{% if last.update({'last': msg.timestamp}) %}{% endif %}
{% endif %}
{% if msg.from_me == true %}
<div class="w3-row">
<div style="float: left; color:#70777c;">{{ msg.time }}</div>
<div style="padding-left: 10px; text-align: right; color: #3892da;">You</div>
<div class="w3-left blue">{{ msg.time }}</div>
<div class="name w3-right-align pad-left-10">You</div>
</div>
<div class="w3-row">
{% if not no_avatar and my_avatar is not none %}
<div class="w3-col m10 l10">
<div style="text-align: right;">
{% else %}
<div class="w3-col m12 l12">
{% endif %}
<div class="w3-right-align">
{% if msg.reply is not none %}
<div class="reply">
<span style="color: #70777a;">Replying to </span>
<a href="#{{msg.reply}}" style="color: #168acc;">"{{ msg.quoted_data or 'media' }}"</a>
<span class="blue">Replying to </span>
<a href="#{{msg.reply}}" class="reply_link">
{% if msg.quoted_data is not none %}
"{{msg.quoted_data}}"
{% else %}
this message
{% endif %}
</a>
</div>
{% endif %}
{% if msg.meta == true or msg.media == false and msg.data is none %}
<div style="text-align: center;" class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar">
<p>{{ msg.data or 'This message is not supported' }}</p>
</div>
<div class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar w3-threequarter w3-center">
{% if msg.safe %}
<p>{{ msg.data | safe or 'Not supported WhatsApp internal message' }}</p>
{% else %}
<p>{{ msg.data or 'Not supported WhatsApp internal message' }}</p>
{% endif %}
</div>
{% else %}
{% if msg.media == false %}
{{ msg.data | sanitize_except() }}
{% else %}
{% if "image/" in msg.mime %}
<a href="{{ msg.data }}"><img src="{{ msg.data }}" /></a>
<a href="{{ msg.data }}">
<img src="{{ msg.thumb if msg.thumb is not none else msg.data }}" {{ 'class="sticker"' | safe if msg.sticker }} />
</a>
{% elif "audio/" in msg.mime %}
<audio controls="controls" autobuffer="autobuffer">
<source src="{{ msg.data }}" />
@@ -90,7 +154,7 @@
<source src="{{ msg.data }}" />
</video>
{% elif "/" in msg.mime %}
<div style="text-align: center;" class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar">
<div class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar w3-threequarter w3-center">
<p>The file cannot be displayed here, however it should be located at <a href="./{{ msg.data }}">here</a></p>
</div>
{% else %}
@@ -104,49 +168,77 @@
{% endif %}
</div>
</div>
<div class="w3-col m2 l2" style="padding-left: 10px"><img src="{{ my_avatar }}" onerror="this.style.display='none'"></div>
{% if not no_avatar and my_avatar is not none %}
<div class="w3-col m2 l2 pad-left-10">
<a href="{{ my_avatar }}">
<img src="{{ my_avatar }}" onerror="this.style.display='none'" class="avatar">
</a>
</div>
{% endif %}
</div>
{% else %}
<div class="w3-row">
<div style="padding-right: 10px; float: left; color: #3892da;">
<div class="w3-left pad-right-10 name">
{% if msg.sender is not none %}
{{ msg.sender }}
{% else %}
{{ name }}
{% endif %}
</div>
<div style="text-align: right; color:#70777c;">{{ msg.time }}</div>
<div class="w3-right-align blue">{{ msg.time }}</div>
</div>
<div class="w3-row">
<div class="w3-col m2 l2"><img src="{{ their_avatar }}" onerror="this.style.display='none'"></div>
{% if not no_avatar %}
<div class="w3-col m2 l2">
{% if their_avatar is not none %}
<a href="{{ their_avatar }}"><img src="{{ their_avatar_thumb or '' }}" onerror="this.style.display='none'" class="avatar"></a>
{% else %}
<img src="{{ their_avatar_thumb or '' }}" onerror="this.style.display='none'" class="avatar">
{% endif %}
</div>
<div class="w3-col m10 l10">
<div style="text-align: left;">
{% else %}
<div class="w3-col m12 l12">
{% endif %}
<div class="w3-left-align">
{% if msg.reply is not none %}
<div class="reply">
<span style="color: #70777a;">Replying to </span>
<a href="#{{msg.reply}}" style="color: #168acc;">"{{ msg.quoted_data or 'media' }}"</a>
<span class="blue">Replying to </span>
<a href="#{{msg.reply}}" class="reply_link">
{% if msg.quoted_data is not none %}
"{{msg.quoted_data}}"
{% else %}
this message
{% endif %}
</a>
</div>
{% endif %}
{% if msg.meta == true or msg.media == false and msg.data is none %}
<div style="text-align: center;" class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar">
<p>{{ msg.data or 'This message is not supported' }}</p>
<div class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar w3-threequarter w3-center">
{% if msg.safe %}
<p>{{ msg.data | safe or 'Not supported WhatsApp internal message' }}</p>
{% else %}
<p>{{ msg.data or 'Not supported WhatsApp internal message' }}</p>
{% endif %}
</div>
{% else %}
{% if msg.media == false %}
{{ msg.data | sanitize_except() }}
{% else %}
{% if "image/" in msg.mime %}
<a href="{{ msg.data }}"><img src="{{ msg.data }}" /></a>
<a href="{{ msg.data }}">
<img src="{{ msg.thumb if msg.thumb is not none else msg.data }}" {{ 'class="sticker"' | safe if msg.sticker }} />
</a>
{% elif "audio/" in msg.mime %}
<audio controls preload="auto">
<audio controls="controls" autobuffer="autobuffer">
<source src="{{ msg.data }}" />
</audio>
{% elif "video/" in msg.mime %}
<video controls preload="auto">
<video controls="controls" autobuffer="autobuffer">
<source src="{{ msg.data }}" />
</video>
{% elif "/" in msg.mime %}
<div style="text-align: center;" class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar">
<div class="w3-panel w3-border-blue w3-pale-blue w3-rightbar w3-leftbar w3-threequarter w3-center">
<p>The file cannot be displayed here, however it should be located at <a href="./{{ msg.data }}">here</a></p>
</div>
{% else %}
@@ -167,7 +259,11 @@
</div>
</article>
<footer class="w3-center">
{% if next %}
<a href="./{{ next }}">Next</a>
{% else %}
End of history
{% endif %}
</footer>
</body>
</html>

View File

@@ -12,16 +12,17 @@ setuptools.setup(
version=version,
author="KnugiHK",
author_email="hello@knugi.com",
description="A Whatsapp database parser that will give you the "
"history of your Whatsapp conversations in HTML and JSON.",
description=("A Whatsapp database parser that will give you the "
"history of your Whatsapp conversations in HTML and JSON. "
"Android, iOS, iPadOS, Crypt12, Crypt14, Crypt15 supported."),
long_description=long_description,
long_description_content_type="text/markdown",
license="MIT",
keywords=[
"android", "ios", "parsing", "history","iphone", "whatsapp", "message"
"customizable", "android-backup", "crypt12", "whatsapp-chat-exporter",
"whatsapp-export", "whatsapp-database", "whatsapp-database-parser",
"whatsapp-conversations", "iphone-backup", "crypt14", "crypt15", "messages"
"android", "ios", "parsing", "history", "iphone", "message", "crypt15",
"customizable", "whatsapp", "android-backup", "messages", "crypt14",
"crypt12", "whatsapp-chat-exporter", "whatsapp-export", "iphone-backup",
"whatsapp-database", "whatsapp-database-parser", "whatsapp-conversations"
],
platforms=["any"],
url="https://github.com/KnugiHK/Whatsapp-Chat-Exporter",
@@ -31,10 +32,10 @@ setuptools.setup(
},
classifiers=[
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Development Status :: 4 - Beta",
@@ -44,7 +45,7 @@ setuptools.setup(
"Topic :: Utilities",
"Topic :: Database"
],
python_requires='>=3.7',
python_requires='>=3.8',
install_requires=[
'jinja2',
'bleach'
@@ -53,11 +54,16 @@ setuptools.setup(
'android_backup': ["pycryptodome", "javaobj-py3"],
'crypt12': ["pycryptodome"],
'crypt14': ["pycryptodome"],
'crypt15': ["pycryptodome", "javaobj-py3"]
'crypt15': ["pycryptodome", "javaobj-py3"],
'all': ["pycryptodome", "javaobj-py3"],
'everything': ["pycryptodome", "javaobj-py3"],
'backup': ["pycryptodome", "javaobj-py3"]
},
entry_points={
"console_scripts": [
"wtsexporter = Whatsapp_Chat_Exporter.__main__:main"
"wtsexporter = Whatsapp_Chat_Exporter.__main__:main",
"waexporter = Whatsapp_Chat_Exporter.__main__:main",
"whatsapp-chat-exporter = Whatsapp_Chat_Exporter.__main__:main"
]
}
)