from . import releases
from . import settings
from . import talk
+from . import tweets
from . import wiki
from . import zeiterfassung
from .decorators import *
"cleanup-messages" : self.messages.queue.cleanup,
"scan-files" : self.releases.scan_files,
"send-all-messages" : self.messages.queue.send_all,
+ "tweet" : self.tweets.tweet,
"update-blog-feeds" : self.blog.update_feeds,
}
@lazy_property
def messages(self):
return messages.Messages(self)
+
+ @lazy_property
+ def tweets(self):
+ return tweets.Tweets(self)
ORDER BY ts_rank(search_index.document, to_tsquery('english', %s)) DESC \
LIMIT %s", query, query, limit)
+ def has_had_recent_activity(self, t=None):
+ if t is None:
+ t = datetime.timedelta(hours=24)
+
+ res = self.db.get("SELECT COUNT(*) AS count FROM blog \
+ WHERE published_at IS NOT NULL AND published_at >= NOW() - %s", t)
+
+ if res and res.count > 0:
+ return True
+
+ return False
+
def create_post(self, title, text, author, tags=[], lang="markdown"):
"""
Creates a new post and returns the resulting Post object
--- /dev/null
+#!/usr/bin/python3
+
+import datetime
+import logging
+import tornado.gen
+import twython
+
+from .misc import Object
+
+class Tweets(Object):
+ @tornado.gen.coroutine
+ def tweet(self):
+ """
+ Sends a random promotional tweet
+ """
+ # Do not tweet too often
+ if self.has_had_recent_activity():
+ logging.debug("Won't tweet because we recently did it")
+ return
+
+ # Do not tweet when there was a blog post
+ if self.backend.blog.has_had_recent_activity():
+ logging.debug("Won't tweet because the blog has had activity")
+ return
+
+ # Select a tweet
+ tweet = self._get_random_tweet()
+ if not tweet:
+ logging.warning("Could not find anything to tweet")
+ return
+
+ # Tweet the tweet
+ with self.db.transaction():
+ self._tweet(tweet)
+
+ def has_had_recent_activity(self, t=None):
+ if t is None:
+ t = datetime.timedelta(days=3)
+
+ res = self.db.get("SELECT COUNT(*) AS count FROM tweets \
+ WHERE last_tweeted_at IS NOT NULL AND last_tweeted_at >= NOW() - %s", t)
+
+ if res and res.count > 0:
+ return True
+
+ return False
+
+ def _get_random_tweet(self):
+ res = self.db.get(
+ "WITH candidate_tweets AS (SELECT id, \
+ (CURRENT_TIMESTAMP - COALESCE(last_tweeted_at, '1970-01-01')) * RANDOM() AS age \
+ FROM tweets \
+ WHERE (last_tweeted_at IS NULL OR last_tweeted_at <= CURRENT_TIMESTAMP - INTERVAL '1 month') \
+ ) \
+ SELECT tweets.* FROM candidate_tweets \
+ LEFT JOIN tweets ON candidate_tweets.id = tweets.id \
+ ORDER BY age DESC LIMIT 1")
+
+ return res
+
+ def _tweet(self, tweet):
+ logging.info("Tweeting: %s" % tweet.message)
+
+ # Update database status
+ self.db.execute("UPDATE tweets \
+ SET last_tweeted_at = CURRENT_TIMESTAMP, total_tweets = total_tweets + 1 \
+ WHERE id = %s", tweet.id)
+
+ # Connect to twitter
+ twitter = twython.Twython(
+ self.settings.get("twitter_consumer_key"),
+ self.settings.get("twitter_consumer_secret"),
+ self.settings.get("twitter_%s_access_token" % tweet.account),
+ self.settings.get("twitter_%s_access_token_secret" % tweet.account),
+ )
+
+ # Update status
+ twitter.update_status(status=tweet.message)