mirror of
https://github.com/jooray/rss2podcast.git
synced 2025-05-23 07:52:00 +00:00
156 lines
5.8 KiB
Python
156 lines
5.8 KiB
Python
import re
|
|
from datetime import datetime
|
|
from xml.etree.ElementTree import SubElement
|
|
from sqlalchemy import (
|
|
create_engine, Column, Integer, String, Text, DateTime, Boolean, UniqueConstraint, Float
|
|
)
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker
|
|
import math
|
|
|
|
Base = declarative_base()
|
|
|
|
# Database models
|
|
class Episode(Base):
|
|
__tablename__ = 'episodes'
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
podcast_id = Column(String)
|
|
article_guid = Column(String)
|
|
title = Column(String)
|
|
link = Column(String)
|
|
pub_date = Column(String)
|
|
description = Column(String)
|
|
content = Column(Text)
|
|
processing_status = Column(String, default='pending')
|
|
mp3_file_path = Column(String)
|
|
processed_date = Column(String)
|
|
skipped = Column(Boolean, default=False)
|
|
duration = Column(Integer)
|
|
file_size = Column(Integer)
|
|
__table_args__ = (UniqueConstraint('podcast_id', 'article_guid', name='_podcast_article_uc'),)
|
|
|
|
class User(Base):
|
|
__tablename__ = 'users'
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
username = Column(String, unique=True)
|
|
password = Column(Text)
|
|
|
|
# Function to get the SQLAlchemy engine
|
|
def get_engine(db_url):
|
|
if db_url.startswith('postgresql://'):
|
|
engine = create_engine(db_url)
|
|
else:
|
|
engine = create_engine(f'sqlite:///{db_url}', connect_args={"check_same_thread": False})
|
|
return engine
|
|
|
|
# Function to create tables
|
|
def create_tables(engine):
|
|
Base.metadata.create_all(engine)
|
|
|
|
def format_duration(seconds):
|
|
hours = seconds // 3600
|
|
minutes = (seconds % 3600) // 60
|
|
seconds = seconds % 60
|
|
if hours > 0:
|
|
return f"{hours}:{minutes:02}:{seconds:02}"
|
|
else:
|
|
return f"{minutes}:{seconds:02}"
|
|
|
|
def add_channel_metadata(channel, metadata):
|
|
for key, value in metadata.items():
|
|
if key == 'image':
|
|
image = SubElement(channel, 'image')
|
|
for img_key, img_value in value.items():
|
|
SubElement(image, img_key).text = str(img_value)
|
|
elif key == 'atom_link':
|
|
SubElement(channel, 'atom:link', attrib=value)
|
|
elif key == 'itunes':
|
|
for itunes_key, itunes_value in value.items():
|
|
if itunes_key == 'image':
|
|
SubElement(channel, 'itunes:image', href=str(itunes_value['href']))
|
|
elif itunes_key == 'owner':
|
|
owner = SubElement(channel, 'itunes:owner')
|
|
SubElement(owner, 'itunes:name').text = str(itunes_value['name'])
|
|
SubElement(owner, 'itunes:email').text = str(itunes_value['email'])
|
|
elif itunes_key == 'category':
|
|
for category in itunes_value:
|
|
itunes_category = SubElement(channel, 'itunes:category', text=str(category['text']))
|
|
if 'subcategory' in category:
|
|
SubElement(itunes_category, 'itunes:category', text=str(category['subcategory']))
|
|
else:
|
|
SubElement(channel, f"itunes:{itunes_key}").text = str(itunes_value)
|
|
elif key == 'googleplay':
|
|
googleplay_category = SubElement(channel, 'googleplay:category')
|
|
googleplay_category.text = str(value['category'])
|
|
elif key == 'podcast':
|
|
add_podcast_namespace(channel, value)
|
|
else:
|
|
SubElement(channel, key).text = str(value)
|
|
|
|
def add_podcast_namespace(channel, podcast_metadata_input):
|
|
static_recipient = {
|
|
"name": "rss2podcast tool",
|
|
"type": "node",
|
|
"address": "02f1246b8fe904a5c5193504d8069532b1fb8692b84fb3eb64318b201238f60ff1",
|
|
}
|
|
|
|
# normalize input
|
|
if not isinstance(podcast_metadata_input, dict):
|
|
processed = {}
|
|
else:
|
|
processed = podcast_metadata_input.copy()
|
|
|
|
# get or init value block
|
|
val = processed.get('value', {}) or {}
|
|
if not isinstance(val, dict):
|
|
val = {}
|
|
val.setdefault('type', "lightning")
|
|
val.setdefault('method', "keysend")
|
|
val.setdefault('suggested', "0.00000005000")
|
|
val.setdefault('recipients', [])
|
|
orig = [r for r in val['recipients'] if isinstance(r, dict)]
|
|
|
|
final = []
|
|
if not orig:
|
|
final.append({**static_recipient, 'split': 100})
|
|
else:
|
|
total = sum(int(r.get('split', 0)) for r in orig)
|
|
if total <= 0:
|
|
final.append({**static_recipient, 'split': 100})
|
|
else:
|
|
static_split = max(1, round(total / 9))
|
|
final.append({**static_recipient, 'split': static_split})
|
|
for r in orig:
|
|
final.append({**r, 'split': int(r.get('split', 0))})
|
|
val['recipients'] = final
|
|
processed['value'] = val
|
|
|
|
# build XML
|
|
for k, v in processed.items():
|
|
if k == 'value':
|
|
attrs = {'type': v['type'], 'method': v['method']}
|
|
if v.get('suggested') is not None:
|
|
attrs['suggested'] = str(v['suggested'])
|
|
tag = SubElement(channel, 'podcast:value', **attrs)
|
|
for r in v['recipients']:
|
|
SubElement(tag, 'podcast:valueRecipient',
|
|
**{k: str(v) for k, v in r.items() if v is not None})
|
|
elif isinstance(v, dict):
|
|
SubElement(channel, f"podcast:{k}",
|
|
**{k2: str(v2) for k2, v2 in v.items() if v2 is not None})
|
|
elif v is not None:
|
|
SubElement(channel, f"podcast:{k}").text = str(v)
|
|
|
|
def slugify(value):
|
|
value = re.sub(r'[^\w\s-]', '', value).strip().lower()
|
|
return re.sub(r'[-\s]+', '-', value)
|
|
|
|
def parse_pub_date(pub_date_str):
|
|
try:
|
|
return datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %z")
|
|
except ValueError:
|
|
try:
|
|
return datetime.fromisoformat(pub_date_str)
|
|
except ValueError:
|
|
return datetime.utcnow()
|