rss2podcast/utils.py

156 lines
5.8 KiB
Python
Raw Normal View History

2024-11-05 14:45:19 +01:00
import re
from datetime import datetime
from xml.etree.ElementTree import SubElement
from sqlalchemy import (
create_engine, Column, Integer, String, Text, DateTime, Boolean, UniqueConstraint, Float
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import math
Base = declarative_base()
# Database models
class Episode(Base):
__tablename__ = 'episodes'
id = Column(Integer, primary_key=True, autoincrement=True)
podcast_id = Column(String)
article_guid = Column(String)
title = Column(String)
link = Column(String)
pub_date = Column(String)
description = Column(String)
content = Column(Text)
processing_status = Column(String, default='pending')
mp3_file_path = Column(String)
processed_date = Column(String)
skipped = Column(Boolean, default=False)
duration = Column(Integer)
file_size = Column(Integer)
__table_args__ = (UniqueConstraint('podcast_id', 'article_guid', name='_podcast_article_uc'),)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String, unique=True)
password = Column(Text)
# Function to get the SQLAlchemy engine
def get_engine(db_url):
if db_url.startswith('postgresql://'):
engine = create_engine(db_url)
else:
engine = create_engine(f'sqlite:///{db_url}', connect_args={"check_same_thread": False})
return engine
# Function to create tables
def create_tables(engine):
Base.metadata.create_all(engine)
def format_duration(seconds):
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
if hours > 0:
return f"{hours}:{minutes:02}:{seconds:02}"
else:
return f"{minutes}:{seconds:02}"
def add_channel_metadata(channel, metadata):
for key, value in metadata.items():
if key == 'image':
image = SubElement(channel, 'image')
for img_key, img_value in value.items():
SubElement(image, img_key).text = str(img_value)
elif key == 'atom_link':
SubElement(channel, 'atom:link', attrib=value)
elif key == 'itunes':
for itunes_key, itunes_value in value.items():
if itunes_key == 'image':
SubElement(channel, 'itunes:image', href=str(itunes_value['href']))
elif itunes_key == 'owner':
owner = SubElement(channel, 'itunes:owner')
SubElement(owner, 'itunes:name').text = str(itunes_value['name'])
SubElement(owner, 'itunes:email').text = str(itunes_value['email'])
elif itunes_key == 'category':
for category in itunes_value:
itunes_category = SubElement(channel, 'itunes:category', text=str(category['text']))
if 'subcategory' in category:
SubElement(itunes_category, 'itunes:category', text=str(category['subcategory']))
else:
SubElement(channel, f"itunes:{itunes_key}").text = str(itunes_value)
elif key == 'googleplay':
googleplay_category = SubElement(channel, 'googleplay:category')
googleplay_category.text = str(value['category'])
elif key == 'podcast':
add_podcast_namespace(channel, value)
else:
SubElement(channel, key).text = str(value)
def add_podcast_namespace(channel, podcast_metadata_input):
static_recipient = {
"name": "rss2podcast tool",
"type": "node",
"address": "02f1246b8fe904a5c5193504d8069532b1fb8692b84fb3eb64318b201238f60ff1",
}
# normalize input
if not isinstance(podcast_metadata_input, dict):
processed = {}
else:
processed = podcast_metadata_input.copy()
# get or init value block
val = processed.get('value', {}) or {}
if not isinstance(val, dict):
val = {}
val.setdefault('type', "lightning")
val.setdefault('method', "keysend")
val.setdefault('suggested', "0.00000005000")
val.setdefault('recipients', [])
orig = [r for r in val['recipients'] if isinstance(r, dict)]
final = []
if not orig:
final.append({**static_recipient, 'split': 100})
else:
total = sum(int(r.get('split', 0)) for r in orig)
if total <= 0:
final.append({**static_recipient, 'split': 100})
else:
static_split = max(1, round(total / 9))
final.append({**static_recipient, 'split': static_split})
for r in orig:
final.append({**r, 'split': int(r.get('split', 0))})
val['recipients'] = final
processed['value'] = val
# build XML
for k, v in processed.items():
if k == 'value':
attrs = {'type': v['type'], 'method': v['method']}
if v.get('suggested') is not None:
attrs['suggested'] = str(v['suggested'])
tag = SubElement(channel, 'podcast:value', **attrs)
for r in v['recipients']:
SubElement(tag, 'podcast:valueRecipient',
**{k: str(v) for k, v in r.items() if v is not None})
elif isinstance(v, dict):
SubElement(channel, f"podcast:{k}",
**{k2: str(v2) for k2, v2 in v.items() if v2 is not None})
elif v is not None:
SubElement(channel, f"podcast:{k}").text = str(v)
def slugify(value):
value = re.sub(r'[^\w\s-]', '', value).strip().lower()
return re.sub(r'[-\s]+', '-', value)
def parse_pub_date(pub_date_str):
try:
return datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %z")
except ValueError:
try:
return datetime.fromisoformat(pub_date_str)
except ValueError:
return datetime.utcnow()