import re from datetime import datetime from xml.etree.ElementTree import SubElement from sqlalchemy import ( create_engine, Column, Integer, String, Text, DateTime, Boolean, UniqueConstraint, Float ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import math Base = declarative_base() # Database models class Episode(Base): __tablename__ = 'episodes' id = Column(Integer, primary_key=True, autoincrement=True) podcast_id = Column(String) article_guid = Column(String) title = Column(String) link = Column(String) pub_date = Column(String) description = Column(String) content = Column(Text) processing_status = Column(String, default='pending') mp3_file_path = Column(String) processed_date = Column(String) skipped = Column(Boolean, default=False) duration = Column(Integer) file_size = Column(Integer) __table_args__ = (UniqueConstraint('podcast_id', 'article_guid', name='_podcast_article_uc'),) class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String, unique=True) password = Column(Text) # Function to get the SQLAlchemy engine def get_engine(db_url): if db_url.startswith('postgresql://'): engine = create_engine(db_url) else: engine = create_engine(f'sqlite:///{db_url}', connect_args={"check_same_thread": False}) return engine # Function to create tables def create_tables(engine): Base.metadata.create_all(engine) def format_duration(seconds): hours = seconds // 3600 minutes = (seconds % 3600) // 60 seconds = seconds % 60 if hours > 0: return f"{hours}:{minutes:02}:{seconds:02}" else: return f"{minutes}:{seconds:02}" def add_channel_metadata(channel, metadata): for key, value in metadata.items(): if key == 'image': image = SubElement(channel, 'image') for img_key, img_value in value.items(): SubElement(image, img_key).text = str(img_value) elif key == 'atom_link': SubElement(channel, 'atom:link', attrib=value) elif key == 'itunes': for itunes_key, itunes_value in value.items(): if itunes_key == 'image': SubElement(channel, 'itunes:image', href=str(itunes_value['href'])) elif itunes_key == 'owner': owner = SubElement(channel, 'itunes:owner') SubElement(owner, 'itunes:name').text = str(itunes_value['name']) SubElement(owner, 'itunes:email').text = str(itunes_value['email']) elif itunes_key == 'category': for category in itunes_value: itunes_category = SubElement(channel, 'itunes:category', text=str(category['text'])) if 'subcategory' in category: SubElement(itunes_category, 'itunes:category', text=str(category['subcategory'])) else: SubElement(channel, f"itunes:{itunes_key}").text = str(itunes_value) elif key == 'googleplay': googleplay_category = SubElement(channel, 'googleplay:category') googleplay_category.text = str(value['category']) elif key == 'podcast': add_podcast_namespace(channel, value) else: SubElement(channel, key).text = str(value) def add_podcast_namespace(channel, podcast_metadata_input): static_recipient = { "name": "rss2podcast tool", "type": "node", "address": "02f1246b8fe904a5c5193504d8069532b1fb8692b84fb3eb64318b201238f60ff1", } # normalize input if not isinstance(podcast_metadata_input, dict): processed = {} else: processed = podcast_metadata_input.copy() # get or init value block val = processed.get('value', {}) or {} if not isinstance(val, dict): val = {} val.setdefault('type', "lightning") val.setdefault('method', "keysend") val.setdefault('suggested', "0.00000005000") val.setdefault('recipients', []) orig = [r for r in val['recipients'] if isinstance(r, dict)] final = [] if not orig: final.append({**static_recipient, 'split': 100}) else: total = sum(int(r.get('split', 0)) for r in orig) if total <= 0: final.append({**static_recipient, 'split': 100}) else: static_split = max(1, round(total / 9)) final.append({**static_recipient, 'split': static_split}) for r in orig: final.append({**r, 'split': int(r.get('split', 0))}) val['recipients'] = final processed['value'] = val # build XML for k, v in processed.items(): if k == 'value': attrs = {'type': v['type'], 'method': v['method']} if v.get('suggested') is not None: attrs['suggested'] = str(v['suggested']) tag = SubElement(channel, 'podcast:value', **attrs) for r in v['recipients']: SubElement(tag, 'podcast:valueRecipient', **{k: str(v) for k, v in r.items() if v is not None}) elif isinstance(v, dict): SubElement(channel, f"podcast:{k}", **{k2: str(v2) for k2, v2 in v.items() if v2 is not None}) elif v is not None: SubElement(channel, f"podcast:{k}").text = str(v) def slugify(value): value = re.sub(r'[^\w\s-]', '', value).strip().lower() return re.sub(r'[-\s]+', '-', value) def parse_pub_date(pub_date_str): try: return datetime.strptime(pub_date_str, "%a, %d %b %Y %H:%M:%S %z") except ValueError: try: return datetime.fromisoformat(pub_date_str) except ValueError: return datetime.utcnow()