mirror of
https://github.com/pseudbot/pseudbot.git
synced 2026-05-26 14:05:07 +00:00
added new multimedia superpowers
On startup, the code now initializes MEDIA with suitable media files found in the `media` directory that satisfy Twitter's media file requirements. The `filetype` Python module has been added as a dependency to ensure that files in `media` are what they claim to be. In the bot class I added a new _parse_mention method, broke up the pasta chain builder method, and added a new recursive _tweet_media method. The _parse_mention method had to have some `stupid_emoji` hackery thanks to the way the 🖼 emoji gets encoded in some tweets. A new fetch-media script has been added along with a new util function named `download_tweet_media` See media/README.md for more information on how to use the newly-added multimedia superpowers.
This commit is contained in:
@@ -122,6 +122,9 @@ pseud.json
|
||||
# Pseudbot JSON dumps
|
||||
*.dump.json
|
||||
|
||||
# Pseudbot test media
|
||||
media/*
|
||||
|
||||
# Pseudbot last id file
|
||||
last_id
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ reinstall:
|
||||
|
||||
readme-preview:
|
||||
pandoc README.md -s -c img/pub.css -o README.html
|
||||
pandoc media/README.md -s -c $(PWD)/img/pub.css -o media/README.html
|
||||
|
||||
format:
|
||||
black -v -l 80 pseudbot/*
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
# Media
|
||||
This is your home for Pseudbot's media files.
|
||||
|
||||
## Adding media
|
||||
Place thematically-related media files together in subdirectories of this one.
|
||||
|
||||
## Using media
|
||||
Mention Pseudbot in a tweet followed by the 🖼 emoji and a category. If you
|
||||
wanted to tweet a randomly-selected image from the `smart` directory at the
|
||||
head of a copypasta response chain (note that your instance of Pseudbot will
|
||||
have a different handle) you could tweet at the bot:
|
||||
```
|
||||
@pseudbot 🖼 smart
|
||||
```
|
||||
|
||||
If you want to only tweet just an image with no attached copypasta chain, add a
|
||||
second 🖼 emoji at the end of your tweet like so:
|
||||
```
|
||||
@pseudbot 🖼 smart 🖼
|
||||
```
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 13 KiB |
+93
-20
@@ -1,4 +1,5 @@
|
||||
import random
|
||||
import re
|
||||
from sys import stderr
|
||||
from textwrap import indent
|
||||
from time import sleep, time
|
||||
@@ -7,6 +8,7 @@ from tweepy.errors import Forbidden, TooManyRequests
|
||||
import typing
|
||||
|
||||
from .exceptions import *
|
||||
from .media import MEDIA
|
||||
from .pastas import PASTAS
|
||||
from .util import get_timestamp_s, jdump, log_t_by_sname, surl_prefix
|
||||
|
||||
@@ -127,7 +129,28 @@ class PseudBot:
|
||||
|
||||
jdump(jsons, extra_tag=self.screen_name)
|
||||
|
||||
def _tweet_pasta(self, id_reply_to: int, pasta: [str]):
|
||||
def _tweet_media(
|
||||
self, id_reply_to: int, parent_screen_name: str, media: [str] = []
|
||||
):
|
||||
_stat = self.last_stat
|
||||
try:
|
||||
self.last_stat = self.tapi.update_status_with_media(
|
||||
"@" + parent_screen_name,
|
||||
in_reply_to_status_id=id_reply_to,
|
||||
filename=media.pop(0),
|
||||
)
|
||||
except Forbidden:
|
||||
return _stat
|
||||
|
||||
if len(media) > 0:
|
||||
sleep(2)
|
||||
return self._tweet_media(
|
||||
self.last_stat.id, self.last_stat.user.screen_name, media
|
||||
)
|
||||
else:
|
||||
return self.last_stat
|
||||
|
||||
def _tweet_pasta(self, id_reply_to: int, pasta: [str], media: [str] = []):
|
||||
"""
|
||||
Recursively tweet an entire pasta, noodle by noodle::
|
||||
In this house we stan recursion.
|
||||
@@ -135,16 +158,23 @@ class PseudBot:
|
||||
_stat = self.last_stat
|
||||
try:
|
||||
noodle = pasta.pop(0)
|
||||
self.last_stat = self.tapi.update_status(
|
||||
noodle, in_reply_to_status_id=id_reply_to
|
||||
)
|
||||
if len(media) > 0:
|
||||
self.last_stat = self.tapi.update_status_with_media(
|
||||
noodle,
|
||||
in_reply_to_status_id=id_reply_to,
|
||||
filename=media.pop(0),
|
||||
)
|
||||
else:
|
||||
self.last_stat = self.tapi.update_status(
|
||||
noodle, in_reply_to_status_id=id_reply_to
|
||||
)
|
||||
self._log_tweet(noodle, self.last_stat)
|
||||
except Forbidden:
|
||||
return _stat
|
||||
if len(pasta) > 0:
|
||||
pasta[0] = "@" + self.last_stat.user.screen_name + " " + pasta[0]
|
||||
sleep(2)
|
||||
return self._tweet_pasta(self.last_stat.id, pasta)
|
||||
return self._tweet_pasta(self.last_stat.id, pasta, media)
|
||||
else:
|
||||
return self.last_stat
|
||||
|
||||
@@ -225,14 +255,7 @@ class PseudBot:
|
||||
for tweet in tweets:
|
||||
self._send_pasta_chain(tweet)
|
||||
|
||||
def _send_pasta_chain(self, tweet):
|
||||
"""
|
||||
Send a copypasta chain.
|
||||
"""
|
||||
pasta = []
|
||||
while len(pasta) < 1:
|
||||
pasta = random.choice(PASTAS)
|
||||
|
||||
def _get_reply_parent(self, tweet) -> (int, str):
|
||||
if tweet.in_reply_to_screen_name is not None:
|
||||
if tweet.in_reply_to_screen_name != self.screen_name:
|
||||
parent_name = tweet.in_reply_to_screen_name
|
||||
@@ -248,13 +271,63 @@ class PseudBot:
|
||||
parent_name = None
|
||||
|
||||
if tweet.in_reply_to_status_id is not None and parent_name is not None:
|
||||
pasta[0] = "@" + tweet.in_reply_to_screen_name + " " + pasta[0]
|
||||
self.last_stat = self._tweet_pasta(
|
||||
tweet.in_reply_to_status_id, pasta
|
||||
)
|
||||
reply_to_screen_name = tweet.in_reply_to_screen_name
|
||||
parent_id = tweet.in_reply_to_status_id
|
||||
else:
|
||||
pasta[0] = "@" + tweet.user.screen_name + " " + pasta[0]
|
||||
self.last_stat = self._tweet_pasta(tweet.id, pasta)
|
||||
reply_to_screen_name = tweet.user.screen_name
|
||||
parent_id = tweet.id
|
||||
|
||||
return (parent_id, reply_to_screen_name)
|
||||
|
||||
def _parse_mention(self, tweet):
|
||||
"""
|
||||
Parse commands in tweet and do something
|
||||
"""
|
||||
words = re.split(r'[\s.;\-():"]+', tweet.text)
|
||||
media = []
|
||||
do_pasta = True
|
||||
|
||||
stupid_emoji = "🖼" + b"\xef\xb8\x8f".decode()
|
||||
if stupid_emoji in words or "🖼" in words:
|
||||
for i in range(len(words)):
|
||||
if words[i] in ("🖼", stupid_emoji):
|
||||
try:
|
||||
media_category = words[i + 1]
|
||||
i += 1
|
||||
except IndexError:
|
||||
do_pasta = False
|
||||
break
|
||||
|
||||
if media_category in MEDIA:
|
||||
media.append(random.choice(MEDIA[media_category]))
|
||||
|
||||
if len(media) == 0:
|
||||
media = None
|
||||
|
||||
(parent_id, parent_screen_name) = self._get_reply_parent(tweet)
|
||||
|
||||
if do_pasta is True:
|
||||
pasta = self._make_pasta_chain(parent_screen_name)
|
||||
self._tweet_pasta(parent_id, pasta, media)
|
||||
elif len(media) > 0:
|
||||
self._tweet_media(parent_id, parent_screen_name, media)
|
||||
else:
|
||||
print(
|
||||
'[WARN]: Unable to parse tweet: "{}"'.format(tweet.text),
|
||||
file=stderr,
|
||||
)
|
||||
|
||||
def _make_pasta_chain(self, parent_screen_name: str) -> [str]:
|
||||
"""
|
||||
Send a copypasta chain.
|
||||
"""
|
||||
pasta = []
|
||||
while len(pasta) < 1:
|
||||
pasta = random.choice(PASTAS)
|
||||
|
||||
pasta[0] = "@" + parent_screen_name + " " + pasta[0]
|
||||
|
||||
return pasta
|
||||
|
||||
def _reply_mentions(self):
|
||||
"""
|
||||
@@ -269,7 +342,7 @@ class PseudBot:
|
||||
|
||||
self.last_id = max(tweet.id, self.last_id)
|
||||
|
||||
self._send_pasta_chain(tweet)
|
||||
self._parse_mention(tweet)
|
||||
|
||||
if self.last_stat is not None:
|
||||
print("Finished chain with {}".format(self.last_stat.id))
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
from filetype import guess
|
||||
from os import listdir
|
||||
import os.path as op
|
||||
|
||||
|
||||
def validate_img_size(media_path: str) -> bool:
|
||||
img_sz = op.getsize(media_path)
|
||||
|
||||
if img_sz > 0 and img_sz <= 5242880:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def validate_vid_size(media_path: str) -> bool:
|
||||
vid_sz = op.getsize(media_path)
|
||||
|
||||
if vid_sz > 0 and vid_sz <= 1073741824:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def validate_media(media_path: str) -> bool:
|
||||
kind = guess(media_path)
|
||||
if kind is None:
|
||||
return False
|
||||
|
||||
if kind.extension in ("jpg", "jpeg") and kind.mime == "image/jpeg":
|
||||
return validate_img_size(media_path)
|
||||
elif kind.extension == "png" and kind.mime == "image/png":
|
||||
return validate_img_size(media_path)
|
||||
if kind.extension == "gif" and kind.mime == "image/gif":
|
||||
return validate_img_size(media_path)
|
||||
elif kind.extension == "mp4" and kind.mime == "video/mp4":
|
||||
return validate_vid_size(media_path)
|
||||
elif kind.extension == "mov" and kind.mime == "video/quicktime":
|
||||
return validate_vid_size(media_path)
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def get_media() -> dict:
|
||||
media = {}
|
||||
|
||||
media_prefix = op.abspath("media")
|
||||
for cat in listdir("media"):
|
||||
fullcat = op.join(media_prefix, cat)
|
||||
|
||||
if op.isdir(fullcat):
|
||||
items = []
|
||||
for itm in listdir(fullcat):
|
||||
fullitm = op.join(fullcat, itm)
|
||||
|
||||
if op.isfile(fullitm):
|
||||
if validate_media(fullitm) is True:
|
||||
items.append(fullitm)
|
||||
|
||||
if len(items) > 0:
|
||||
media[cat] = items
|
||||
|
||||
return media
|
||||
|
||||
|
||||
MEDIA = get_media()
|
||||
@@ -1,5 +1,7 @@
|
||||
import inspect
|
||||
import json as j
|
||||
from os.path import basename
|
||||
import requests
|
||||
from time import time
|
||||
import typing
|
||||
|
||||
@@ -37,3 +39,21 @@ def log_t_by_sname(tweet):
|
||||
surl_prefix(tweet.user.screen_name) + str(tweet.id),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def download_tweet_media(tweet: dict):
|
||||
if "extended_entities" in tweet:
|
||||
try:
|
||||
media = tweet["extended_entities"]["media"]
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
for item in media:
|
||||
dl_url = item["media_url_https"]
|
||||
r = requests.get(dl_url, stream=True)
|
||||
if r.status_code == 200:
|
||||
filename = basename(dl_url)
|
||||
print('[MEDIA]: Saving media to "{}"'.format(filename))
|
||||
with open(filename, mode="wb") as f:
|
||||
for chunk in r.iter_content(1024):
|
||||
f.write(chunk)
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
filetype
|
||||
tweepy
|
||||
requests[socks]
|
||||
|
||||
Executable
+60
@@ -0,0 +1,60 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import json as j
|
||||
from pseudbot.util import download_tweet_media
|
||||
from sys import argv as ARGV
|
||||
import typing
|
||||
|
||||
|
||||
def parse_args(args: [str], name: str):
|
||||
parser = argparse.ArgumentParser(prog=name)
|
||||
|
||||
parser.add_argument(
|
||||
"json_dump",
|
||||
type=argparse.FileType("r"),
|
||||
help="JSON File containing a Twitter info dictionary dump",
|
||||
)
|
||||
|
||||
return parser.parse_args(args=args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
prog_name = ARGV.pop(0)
|
||||
print(
|
||||
""
|
||||
+ " ,▄▄▄▄▄,\n"
|
||||
+ " ▄▄ ▄███████████▄▄▄▄█▀\n"
|
||||
+ " ▐███▌ ,█████████████████▄▄▄\n"
|
||||
+ " ▐██████▄ ███████████████████▀\n"
|
||||
+ " ██████████▌▄, █████████████████\n"
|
||||
+ " ▓█████████████████████████████████\n"
|
||||
+ " ▐██████████████████████████████████▓\n"
|
||||
+ " ██████████████████████████████████\n"
|
||||
+ " ▀███████████████████████████████▌\n"
|
||||
+ " ,▐▓███████████████████████████▌\n"
|
||||
+ " ████████████████████████████▀\n"
|
||||
+ " ╙█████████████████████████`\n"
|
||||
+ " `▀▓██████████████████▀\n"
|
||||
+ " ,▄▓██████████████████▓└\n"
|
||||
+ "`▀██████████████████████▀└\n"
|
||||
+ " ╙▀▌▓████████▓▌▀└\n"
|
||||
+ " _ _\n"
|
||||
+ " _ __ ___ ___ __| (_) __ _\n"
|
||||
+ " | '_ ` _ \ / _ \/ _` | |/ _` |\n"
|
||||
+ " | | | | | | __/ (_| | | (_| |\n"
|
||||
+ " |_| |_| |_|\___|\__,_|_|\__,_|\n"
|
||||
+ " _\n"
|
||||
+ " __| |_ _ _ __ ___ _ __ ___ _ __\n"
|
||||
+ " / _` | | | | '_ ` _ \| '_ \ / _ \ '__|\n"
|
||||
+ "| (_| | |_| | | | | | | |_) | __/ |\n"
|
||||
+ " \__,_|\__,_|_| |_| |_| .__/ \___|_|\n"
|
||||
+ " |_|\n"
|
||||
)
|
||||
|
||||
opts = parse_args(args=ARGV, name=prog_name)
|
||||
|
||||
tweets = j.loads(opts.json_dump.read())
|
||||
for tweet in tweets:
|
||||
download_tweet_media(tweet)
|
||||
|
||||
opts.json_dump.close()
|
||||
Reference in New Issue
Block a user