Website to allow artists to upload their artwork to multiple sites from a single interface https://multiupload.us/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

twitter.py 6.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import json
  2. from typing import Any, List
  3. import tweepy
  4. from flask import Response, current_app, g, redirect, request, session
  5. from constant import Sites
  6. from models import Account, SavedSubmission, SubmissionGroup, db
  7. from sentry import sentry
  8. from sites import AccountExists, BadCredentials, Site, SiteError
  9. from submission import Rating, Submission
  10. SHORT_NAMES = {
  11. Sites.FurAffinity: 'FA',
  12. Sites.Weasyl: 'Weasyl',
  13. Sites.FurryNetwork: 'FN',
  14. Sites.Inkbunny: 'IB',
  15. Sites.SoFurry: 'SF',
  16. Sites.Tumblr: 'Tumblr',
  17. Sites.DeviantArt: 'DA',
  18. Sites.Twitter: 'Twitter',
  19. }
  20. class Twitter(Site):
  21. """Twitter."""
  22. SITE = Sites.Twitter
  23. def __init__(self, credentials=None, account=None):
  24. super().__init__(credentials, account)
  25. if credentials:
  26. self.credentials = json.loads(credentials)
  27. @staticmethod
  28. def _get_oauth_handler() -> tweepy.OAuthHandler:
  29. key = current_app.config['TWITTER_KEY']
  30. secret = current_app.config['TWITTER_SECRET']
  31. callback = current_app.config['TWITTER_CALLBACK']
  32. return tweepy.OAuthHandler(key, secret, callback)
  33. def pre_add_account(self) -> Response:
  34. auth = self._get_oauth_handler()
  35. try:
  36. auth_url = auth.get_authorization_url()
  37. except tweepy.TweepError:
  38. sentry.captureException()
  39. raise SiteError('An error occurred with Twitter.')
  40. session['request_token'] = auth.request_token
  41. return redirect(auth_url)
  42. def add_account_callback(self) -> dict:
  43. verifier = request.args.get('oauth_verifier', None)
  44. token = session.pop('request_token', None)
  45. auth = self._get_oauth_handler()
  46. auth.request_token = token
  47. try:
  48. auth.get_access_token(verifier)
  49. except tweepy.TweepError:
  50. raise BadCredentials()
  51. session['taccess'] = auth.access_token
  52. session['tsecret'] = auth.access_token_secret
  53. api = tweepy.API(auth)
  54. me = api.me()
  55. return {'me': me}
  56. def add_account(self, data: dict) -> Account:
  57. auth = self._get_oauth_handler()
  58. auth.set_access_token(session['taccess'], session['tsecret'])
  59. api = tweepy.API(auth)
  60. me = api.me()
  61. if Account.lookup_username(self.SITE, g.user.id, me.screen_name):
  62. raise AccountExists()
  63. account = Account(
  64. self.SITE,
  65. session['id'],
  66. me.screen_name,
  67. json.dumps(
  68. {'token': session.pop('taccess'), 'secret': session.pop('tsecret')}
  69. ),
  70. )
  71. db.session.add(account)
  72. db.session.commit()
  73. return account
  74. def submit_artwork(self, submission: Submission, extra: Any = None) -> str:
  75. auth = self._get_oauth_handler()
  76. auth.set_access_token(self.credentials['token'], self.credentials['secret'])
  77. api = tweepy.API(auth)
  78. use_custom_text = extra.get('twitter-custom', 'n')
  79. custom_text = extra.get('twitter-custom-text')
  80. tw_format: str = extra.get('twitter-format', '')
  81. links: list = extra.get('twitter-links')
  82. if use_custom_text == 'y':
  83. status = custom_text.strip()
  84. else:
  85. hashtags = submission.hashtags
  86. if submission.rating in (Rating.mature, Rating.explicit):
  87. if '#NSFW' not in (hashtag.upper() for hashtag in hashtags):
  88. hashtags.append('#NSFW')
  89. hashtags_str = self.tag_str(hashtags)
  90. status = '{title} {hashtags}'.format(
  91. title=submission.title, hashtags=hashtags_str.strip()
  92. )
  93. if links:
  94. if tw_format == 'single' or tw_format == '':
  95. status = status + ' ' + links[0][1]
  96. elif tw_format == 'multi':
  97. status += '\n'
  98. for link in links:
  99. name = SHORT_NAMES[link[0]]
  100. status += '\n{name}: {link}'.format(name=name, link=link[1])
  101. try:
  102. noimage = self.account['twitter_noimage']
  103. if submission.rating == Rating.explicit and (
  104. noimage and noimage.val == 'yes'
  105. ):
  106. tweet = api.update_status(status=status, possibly_sensitive=True)
  107. else:
  108. filename, bytes = submission.resize_image(1280, 1280)
  109. tweet = api.update_with_media(
  110. filename=filename,
  111. file=bytes,
  112. status=status,
  113. possibly_sensitive=False
  114. if submission.rating == Rating.general
  115. else True,
  116. )
  117. except tweepy.TweepError as ex:
  118. raise SiteError(ex.reason)
  119. return 'https://twitter.com/{username}/status/{id}'.format(
  120. username=tweet.user.screen_name, id=tweet.id_str
  121. )
  122. def upload_group(self, group: SubmissionGroup, extra: Any = None):
  123. master: SavedSubmission = group.master
  124. s: Submission = master.submission
  125. submissions: List[SavedSubmission] = group.submissions
  126. images = list(self.collect_images(submissions, max_size=2000))
  127. auth = self._get_oauth_handler()
  128. auth.set_access_token(self.credentials['token'], self.credentials['secret'])
  129. api = tweepy.API(auth)
  130. data = master.data
  131. use_custom_text = data.get('twitter-custom', 'n')
  132. custom_text = data.get('twitter-custom-text')
  133. tw_format: str = data.get('twitter-format', '')
  134. links: list = extra.get('twitter-links')
  135. if use_custom_text == 'y':
  136. status = custom_text.strip()
  137. else:
  138. status = '{title} {hashtags}'.format(
  139. title=master.title, hashtags=self.tag_str(s.hashtags)
  140. ).strip()
  141. if links:
  142. if tw_format == 'single' or tw_format == '':
  143. status = status + ' ' + links[0][1]
  144. elif tw_format == 'multi':
  145. status += '\n'
  146. for link in links:
  147. name = SHORT_NAMES[link[0]]
  148. status += '\n{name}: {link}'.format(name=name, link=link[1])
  149. try:
  150. media_ids = []
  151. for image in images:
  152. res = api.media_upload(
  153. filename=image['original_filename'], file=image['bytes']
  154. )
  155. media_ids.append(res.media_id)
  156. tweet = api.update_status(
  157. status=status,
  158. media_ids=media_ids,
  159. possibly_sensitive=False if s.rating == Rating.general else True,
  160. )
  161. except tweepy.TweepError as ex:
  162. raise SiteError(ex.reason)
  163. return 'https://twitter.com/{username}/status/{id}'.format(
  164. username=tweet.user.screen_name, id=tweet.id_str
  165. )
  166. @staticmethod
  167. def supports_group() -> bool:
  168. return True