Visualizzazione post con etichetta Ollama. Mostra tutti i post
Visualizzazione post con etichetta Ollama. Mostra tutti i post

lunedì 15 giugno 2026

Mappa frane da news on line

Correva l'anno 2008 ed il mio direttore di Dipartimento mi aveva chiesto di lavorare sui Big Data per estrarre dati di geolocalizzazione di eventi franosi da notizie sui quotidiani

Non ho piu' il codice dell'epoca ma piu' o meno venivano scaricati gli RSS da Google News, si prendeva il link ed il rank della fonte (all'epoca era ancora possibile), veniva scaricato il documento html originale e salvato su Db per poi un primitivo parsing basato su parole chiave con annessa geolocalizzazione. 

Molto primitivo e molto poco funzionante. La cosa piu' fastidiosa era che era molto complicato distinguere se la frana era avvenuta il giorno prima oppure se la notizia riguardava eventi pregressi (se per esempio erano stati stanziati finanziamenti dell'evento di 5 anni prima o se era stata ripristinata la viabilita' chiusa un mese prima...peraltro in inglese landslide e' utilizzato per vittoria politica schiacciante winning landslide quindi era necessario interpretare il contesto)

Adesso ho voluto riprovare con tecnologie moderne basandomi su un agente AI 



Le differenze principali rispetto al 2008 e' che i giornali adesso usano spesso i paywall per accedere al documento completo, twitter ti permette di usare le API di ricerca solo a pagamento ma abbiamo il vantaggio di poter usare LLM per il parsing delle notizie in modo da poter aggirare le limitazioni viste nel 2008 usando il contesto

Prima di iniziare si deve avere un server Ollama con alcuni modelli caricati (sul mio Mac mini M4 16 Gb ho provato gemma3:12b, llama3.1, qwen2.5:7b (qwen sembra essere quello più idoneo per generare dati tabellari come Json in output)

Nello script successivo i passi sono

1) ricerca su Google News tramite una query minima con estrazione del link alla notizia originale

2) download (dove non dietro paywall) del testo della notizia

3) eliminazione dei tag html

4) passaggio del testo della notizia ad ollama per il parsing contestuale tramite prompt che ricerca specifiche informazioni (se la notizia e' recente, se vi sono state interruzioni di strade, se ci sono state evacuazioni di abitati o sfollati)

5) tentativo di geocoding

6) restituzione di un file geojson 

la fase computazionalmente più impegnativa e' quella relativa all'LLM. Per non saturare la memoria ogni notizia viene processata singolarmente


questa una mappa del file geojson prodotto dallo script

https://c1p81.altervista.org/landslide/frane_map.html


"""
Agente di rilevamento frane da Google News con geocodifica e output GeoJSON.
"""

import json
import time
import logging
from pathlib import Path
from datetime import datetime
from typing import List, Optional

import feedparser
import geojson
import httpx
import ollama
from bs4 import BeautifulSoup
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from pydantic import BaseModel, Field

# ---------------------------------------------------------------------------
# Configurazione
# ---------------------------------------------------------------------------

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)

MAX_ARTICOLI = 20
MAX_PREVIEW_CHARS = 150 # usato solo come fallback se il download dell'articolo fallisce
MAX_TESTO_ARTICOLO = 1500 # caratteri massimi del testo completo scaricato
MAX_RETRY_OLLAMA = 3
OUTPUT_DIR = Path(".")

GOOGLE_NEWS_URL = (
"https://news.google.com/rss/search"
"?q=frana+OR+smottamento+when:30d&hl=it&gl=IT&ceid=IT:it"
)

OLLAMA_MODEL = "gemma3:12b" # modifica qui per cambiare modello
# OLLAMA_MODEL = "llama3.1"
# OLLAMA_MODEL = "qwen2.5:7b"
# OLLAMA_MODEL = "gemma3:4b"
OLLAMA_OPTIONS = {
"num_ctx": 16384,
"num_predict": 8192, # aumentato per evitare troncamento del JSON
"temperature": 0.1,
}

SYSTEM_PROMPT = (
"Sei un analista geologo esperto di rischio idrogeologico. "
"Estrai le frane e associa sempre l'URL della fonte corretto ad ogni record. "
"Per ogni evento, valuta attentamente: "
"(1) se il testo menziona interruzioni, chiusure o blocchi di strade, viabilità, "
"sentieri o ferrovie causati dalla frana: imposta 'interruzione_stradale' a true; "
"(2) se il testo menziona evacuazioni, sfollati, persone fatte uscire dalle abitazioni "
"o ordini di evacuazione: imposta 'evacuazione' a true; "
"(3) se la data di pubblicazione è più vecchia di 7 giorni rispetto ad oggi, "
"o se il testo descrive eventi passati o già risolti: imposta 'notizia_vecchia' a true. "
"In assenza di menzione esplicita, imposta i campi a false. "
"Genera un output JSON valido. Sii conciso nella descrizione per risparmiare spazio."
)

# ---------------------------------------------------------------------------
# Modelli Pydantic
# ---------------------------------------------------------------------------


class SingolaFrana(BaseModel):
localita: str = Field(
description="Nome del comune, frazione o località specifica in Italia."
)
provincia: str = Field(
description="Sigla della provincia (es. 'SO', 'TO', 'BG')."
)
descrizione: str = Field(
description="Breve sintesi dell'evento o dei danni descritti nell'articolo."
)
interruzione_stradale: bool = Field(
description="True se la frana ha causato interruzione, chiusura o blocco di strade, viabilità o ferrovie. False altrimenti."
)
evacuazione: bool = Field(
description="True se la frana ha causato evacuazione di persone, sfollati o ordini di evacuazione. False altrimenti."
)
notizia_vecchia: bool = Field(
description="True se la frana è accaduta più di una settimana fa rispetto alla data odierna o se il testo fa riferimento a eventi passati, storici o già risolti. False se l'evento è recente (ultimi 7 giorni)."
)
url: Optional[str] = Field(
description="L'URL esatto dell'articolo da cui è stata estratta la notizia."
)


class ListaFrane(BaseModel):
eventi: List[SingolaFrana] = Field(
description="Lista di tutte le frane distinte rilevate negli articoli analizzati."
)


# ---------------------------------------------------------------------------
# Tool 1a: Scraping testo completo articolo
# ---------------------------------------------------------------------------

# Header che simulano un browser reale per evitare blocchi base
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "it-IT,it;q=0.9",
}

# Testo che indica una pagina di consenso cookie invece dell'articolo reale
COOKIE_WALL_MARKERS = [
"accetta tutto",
"rifiuta tutto",
"cookie e dati",
"contenuti non personalizzati",
"impostazioni cookie",
"privacy policy",
"we use cookies",
]


def is_cookie_wall(testo: str) -> bool:
"""Ritorna True se il testo è una pagina di consenso cookie, non un articolo."""
testo_lower = testo.lower()
hits = sum(1 for marker in COOKIE_WALL_MARKERS if marker in testo_lower)
return hits >= 2


# Domini noti per paywall o blocco scraping — usa solo lo snippet RSS
PAYWALL_DOMAINS = {
"corriere.it", "repubblica.it", "sole24ore.com",
"stampa.it", "ilmessaggero.it", "lastampa.it",
}


def estrai_dominio(url: str) -> str:
"""Estrae il dominio base da un URL."""
try:
from urllib.parse import urlparse
return urlparse(url).netloc.replace("www.", "")
except Exception:
return ""


def scarica_testo_articolo(url: str) -> str:
"""
Scarica e pulisce il testo completo di un articolo.
Segue i redirect di Google News, salta i paywall noti.
Ritorna stringa vuota se il download fallisce.
"""
dominio = estrai_dominio(url)
if dominio in PAYWALL_DOMAINS:
logger.debug(f"Paywall noto, uso snippet RSS per: {dominio}")
return ""

try:
response = httpx.get(
url,
headers=HEADERS,
timeout=10,
follow_redirects=True,
)
if response.status_code != 200:
logger.debug(f"HTTP {response.status_code} per {url}")
return ""

soup = BeautifulSoup(response.text, "html.parser")

# Rimuovi elementi non editoriali
for tag in soup(["script", "style", "nav", "footer", "header",
"aside", "form", "noscript", "iframe", "figure",
"button", "input", "select", "textarea"]):
tag.decompose()

# Preferisci tag semantici dell'articolo se presenti
contenuto = (
soup.find("article")
or soup.find("main")
or soup.find(attrs={"itemprop": "articleBody"})
or soup.find(class_=lambda c: c and any(
kw in c.lower() for kw in ["article-body", "article__body",
"post-content", "entry-content",
"news-body", "article-content"]
))
)

if contenuto:
# Estrai solo i paragrafi <p> per evitare testo di navigazione residuo
paragrafi = contenuto.find_all("p")
if paragrafi:
testo = " ".join(p.get_text(strip=True) for p in paragrafi)
else:
testo = contenuto.get_text(separator=" ", strip=True)
else:
# Fallback: tutti i <p> della pagina
paragrafi = soup.find_all("p")
testo = " ".join(p.get_text(strip=True) for p in paragrafi if len(p.get_text(strip=True)) > 40)

if not testo or is_cookie_wall(testo):
logger.debug(f"Cookie wall o testo vuoto rilevato per: {url}")
return ""

return testo[:MAX_TESTO_ARTICOLO]

except httpx.TimeoutException:
logger.warning(f"Timeout scaricando: {url}")
return ""
except Exception as e:
logger.warning(f"Errore scaricando {url}: {e}")
return ""


# ---------------------------------------------------------------------------
# Tool 1: Recupero notizie da Google News
# ---------------------------------------------------------------------------


def recupero_emergenza_google_news(max_risultati: int = MAX_ARTICOLI) -> str:
"""
Scarica gli articoli da Google News RSS.
Per ogni articolo tenta di scaricare il testo completo;
se fallisce usa lo snippet RSS come fallback.
"""
logger.info("Recupero notizie da Google News...")
feed = feedparser.parse(GOOGLE_NEWS_URL)

risultati_raw = []
for entry in feed.entries[:max_risultati]:
titolo = entry.get("title", "")
snippet = BeautifulSoup(
entry.get("summary", ""), "html.parser"
).get_text().strip()
link = entry.get("link", "")

# Testo base = titolo + snippet RSS (sempre disponibile)
testo_base = f"{titolo}. {snippet}"[:MAX_PREVIEW_CHARS * 2]

# Prova a scaricare il testo completo come arricchimento
testo_completo = scarica_testo_articolo(link)
if testo_completo:
testo = testo_completo
fonte_testo = "articolo completo"
else:
testo = testo_base
fonte_testo = "snippet RSS"

logger.info(f" [{fonte_testo}] {titolo[:60]}")

risultati_raw.append(
{
"titolo": titolo,
"testo": testo,
"link": link,
"data_pubblicazione": entry.get("published", "sconosciuta"),
}
)

logger.info(f"Pronti {len(risultati_raw)} articoli per l'analisi.")
return json.dumps(risultati_raw, ensure_ascii=False)


# ---------------------------------------------------------------------------
# Costruzione prompt (un articolo alla volta)
# ---------------------------------------------------------------------------


def costruisci_prompt_singolo(articolo: dict) -> str:
"""Costruisce il prompt per un singolo articolo."""
today = datetime.now().strftime("%d/%m/%Y")
testo = articolo['testo']

# Log di debug: mostra i primi 300 caratteri del testo mandato a Ollama
logger.debug(f" Testo inviato a Ollama:\n {testo[:300]!r}")

return (
f"La data di oggi è {today}.\n\n"
"Analizza il seguente articolo e determina se descrive uno o più eventi "
"di frana o smottamento avvenuti in Italia.\n\n"
"IMPORTANTE: Il testo potrebbe contenere menu, pubblicità o testo di navigazione "
"del sito web — ignorali e concentrati solo sul contenuto giornalistico.\n\n"
"Se NON contiene notizie di frane, rispondi con: {\"eventi\": []}.\n\n"
"Per ogni evento trovato:\n"
"1. Estrai 'localita' e 'provincia' (sigla, es. 'SO').\n"
"2. Scrivi una breve 'descrizione' dell'evento.\n"
"3. Copia ESATTAMENTE nel campo 'url' questo valore: " + articolo['link'] + "\n"
"4. 'interruzione_stradale': true se si menziona chiusura/blocco di strade, "
"vie, sentieri o ferrovie. Altrimenti false.\n"
"5. 'evacuazione': true se si menziona evacuazione, sfollati o ordini di "
"allontanamento. Altrimenti false.\n"
"6. 'notizia_vecchia': true se la data di pubblicazione è più di 7 giorni fa "
"rispetto ad oggi, o se l'evento è descritto come passato/risolto. "
"Altrimenti false.\n\n"
f"Data pubblicazione: {articolo.get('data_pubblicazione', 'sconosciuta')}\n"
f"Testo articolo:\n{testo}"
)


# ---------------------------------------------------------------------------
# Tool 2: Chiamata a Ollama con retry
# ---------------------------------------------------------------------------


def ripara_json_troncato(testo: str) -> str:
"""
Tenta di riparare un JSON troncato chiudendo le strutture aperte.
Funziona per il caso più comune: array di oggetti interrotto a metà.
"""
testo = testo.strip()
# Rimuovi l'ultimo oggetto incompleto (non chiuso da '}')
ultimo_oggetto_completo = testo.rfind("},")
if ultimo_oggetto_completo != -1:
testo = testo[: ultimo_oggetto_completo + 1]
# Chiudi array e oggetto radice se necessario
if not testo.endswith("]"):
testo += "]"
if not testo.endswith("}}"):
testo += "}"
return testo


def chiedi_a_ollama(prompt: str, max_retry: int = MAX_RETRY_OLLAMA) -> dict:
"""
Invia il prompt a Ollama e ritorna il dizionario JSON parsato.
Riprova fino a max_retry volte in caso di risposta non valida.
"""
for tentativo in range(1, max_retry + 1):
try:
logger.info(f"Chiamata a Ollama (tentativo {tentativo}/{max_retry})...")
response = ollama.chat(
model=OLLAMA_MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
format=ListaFrane.model_json_schema(),
options=OLLAMA_OPTIONS,
)
testo = response["message"]["content"]
# alcuni modelli (es. Gemma) wrappano l'output in ```json ... ```
testo = testo.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
dati = json.loads(testo)
return dati

except json.JSONDecodeError as e:
logger.warning(f"Tentativo {tentativo} — JSON malformato: {e}")
# Prova a riparare il JSON troncato prima del prossimo tentativo
testo_raw = testo.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip()
try:
testo_riparato = ripara_json_troncato(testo_raw)
dati = json.loads(testo_riparato)
logger.info("JSON riparato con successo.")
return dati
except Exception:
logger.warning("Riparazione fallita, ritento da capo.")
if tentativo < max_retry:
time.sleep(2)
except KeyError as e:
logger.warning(f"Tentativo {tentativo} fallito — chiave mancante: {e}")
if tentativo < max_retry:
time.sleep(2)

raise RuntimeError(
f"Ollama non ha restituito un JSON valido dopo {max_retry} tentativi."
)


# ---------------------------------------------------------------------------
# Tool 3: Geocodifica e salvataggio GeoJSON
# ---------------------------------------------------------------------------


def deduplica_frane(lista: List[SingolaFrana]) -> List[SingolaFrana]:
"""Rimuove eventi duplicati basandosi su (localita, provincia)."""
visti = set()
uniche = []
for f in lista:
chiave = (f.localita.lower().strip(), f.provincia.lower().strip())
if chiave not in visti:
visti.add(chiave)
uniche.append(f)
else:
logger.warning(f"Duplicato rimosso: {f.localita} ({f.provincia})")
return uniche


def geocode_and_save(
lista_frane: List[SingolaFrana],
output_dir: Path = OUTPUT_DIR,
) -> Optional[Path]:
"""
Geocodifica ogni frana e salva un file GeoJSON con timestamp nel nome.
Restituisce il path del file creato, oppure None se nessuna frana è mappabile.
"""
geolocator = Nominatim(user_agent="ollama_geo_agent", timeout=10)
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=2, max_retries=3, error_wait_seconds=5)
features = []

for f in lista_frane:
query_geo = f"{f.localita}, {f.provincia}, Italia"
location = None

# Primo tentativo: con provincia
try:
location = geocode(query_geo)
except Exception as e:
logger.warning(f"Errore rete su '{query_geo}': {e}")

# Secondo tentativo: senza provincia
if location is None:
try:
logger.warning(f"Riprovo senza provincia per: {f.localita}")
location = geocode(f"{f.localita}, Italia")
except Exception as e:
logger.error(f"Errore rete anche sul fallback per '{f.localita}': {e}")

if location is None:
logger.warning(f"Geocodifica fallita definitivamente per: {f.localita}")
continue

punto = geojson.Point((location.longitude, location.latitude))
feature = geojson.Feature(
geometry=punto,
properties={
"localita": f.localita,
"provincia": f.provincia,
"descrizione": f.descrizione,
"interruzione_stradale": f.interruzione_stradale,
"evacuazione": f.evacuazione,
"notizia_vecchia": f.notizia_vecchia,
"url_notizia": f.url or "Non disponibile",
"fonte": "Google News",
},
)
features.append(feature)
stradale = "⚠️ strada interrotta" if f.interruzione_stradale else "viabilità ok"
evacuazione = "🚨 evacuazione" if f.evacuazione else "nessuna evacuazione"
eta = "📅 notizia vecchia" if f.notizia_vecchia else "🆕 recente"
logger.info(f"Mappato: {f.localita} ({f.provincia}) — {stradale}{evacuazione}{eta}")

if not features:
logger.warning("Nessuna frana geocodificata. File GeoJSON non creato.")
return None

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = output_dir / f"frane_{timestamp}.geojson"

fc = geojson.FeatureCollection(features)
with open(output_path, "w", encoding="utf-8") as out:
geojson.dump(fc, out, indent=2, ensure_ascii=False)

logger.info(f"File '{output_path}' creato con {len(features)} eventi.")
return output_path


# ---------------------------------------------------------------------------
# Agent loop principale
# ---------------------------------------------------------------------------


def esegui_agente():
# 1. Recupero notizie
notizie_json = recupero_emergenza_google_news()
lista_articoli = json.loads(notizie_json)

if not lista_articoli:
logger.info("Nessun articolo recuperato. Uscita.")
return

# 2. Analisi un articolo alla volta
tutti_gli_eventi: List[SingolaFrana] = []

for i, articolo in enumerate(lista_articoli):
logger.info(f"Analisi articolo {i+1}/{len(lista_articoli)}: {articolo.get('titolo', '')[:60]}")
prompt = costruisci_prompt_singolo(articolo)

try:
dati_json = chiedi_a_ollama(prompt)
eventi_raw = dati_json.get("eventi", [])
if not eventi_raw:
logger.info(" → Nessuna frana rilevata in questo articolo.")
continue
for item in eventi_raw:
try:
tutti_gli_eventi.append(SingolaFrana(**item))
except Exception as e:
logger.warning(f" → Evento non valido saltato: {e}")
logger.info(f" → {len(eventi_raw)} evento/i estratto/i.")
except RuntimeError as e:
logger.error(f" → Ollama fallito su articolo {i+1}: {e}")
continue

if not tutti_gli_eventi:
logger.info("Nessun evento estratto da nessun articolo.")
return

# 3. Deduplicazione globale
lista_frane = deduplica_frane(tutti_gli_eventi)
logger.info(f"Totale eventi unici: {len(lista_frane)}")

# 4. Geocodifica e salvataggio
geocode_and_save(lista_frane)


if __name__ == "__main__":
esegui_agente()


lunedì 30 dicembre 2024

Ollama e Jetson Nano 4G

Ancora nel tentativo di utilizzare in modo serio la Jetson Nano ho provato ad usarla per Ollama seguendo le istruzioni da https://dev.to/ajeetraina/running-ollama-2-on-nvidia-jetson-nano-with-gpu-using-docker-hfi (e' stato usato il docker perche' usando lo script di installazione dal sito di Ollama fallisce per la necessita' di capabilitie maggiore di quella disponibile da Jetson Nano)

Per creare il docker si seguono i comandi

curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey \
    | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg
curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list \
    | sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' \
    | sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
sudo apt-get update


sudo apt-get install -y nvidia-container-toolkit


sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker


sudo docker run -d --gpus=all --runtime nvidia -v ollama:/root/.ollama -p 11434:11434 --name ollama ollama/ollama

sudo docker exec -it ollama ollama run llama2


Alcune persone su Github si lamentano che il docker non sfrutti la GPU della Jetson ma installando il comando jtop (tramite  sudo pip3 install -U jetson-stats) si vede che il carico sulla GPU non e' a zero

schermata di jtop

Per vedere se ollama e' in fuzione basta bussare alla porta 11434



La mia Jetson ha una ram di 4Gb ma solo pochi modelli sono in grado di girare (a meno di non quantizzarli)

Funzionano i seguenti modelli

tinyllama

llama3.2:1b

starcoder:1b 

Non funzionano invece  

orca-mini:3b (ncessita 5.1 GiB di ram )

mistral-small (necessita 14.4 Gib)

llama3.2


Per interagire si puo' installare web ui (che e' in generale decisamente piu' lenta della linea di comando) tramite il docker


docker run -d -p 3000:8080 --runtime=nvidia --add-host=host.docker.internal:host-gateway -v open-webui:/app/backend/data --name open-webui2 --restart always ghcr.io/open-webui/open-webui:cuda


docker run -d -p 3000:8080 --gpus all --add-host=host.docker.internal:host-gateway -v open-webui:/app/backend/data --name open-webui --restart always ghcr.io/open-webui/open-webui:cuda



L'unico aspetto veramente positivo di Jetson nano e' il consumo massimo di 5W



comunque a questo semplice prompt l'AI TinyLLama ha fatto una serie veramente sanguionosa di errori

>>> raccontami la storia di Albert Einstein

Einstein, uno dei più grandi pensatori e fisici del XX secolo, nacque nel 1879 a Ulm, in Germania. La sua infanzia fu caratterizzata da una forte intesa con gli  elementi naturali e una grande affinità per la matematica.Nel 1890 Einstein iniziò le sue scuole medie a Kiel e Karlsruhe, dove cominciò a studiare fisica e matematica. Nel 1896 si unì al Liceo Max Planck di Göttingene, dove incontrò il suo futuro compagno di insegnamento, Max Planck.Einstein studiò filosofia, letteratura e lingue, ma la sua vera passione era la fisica. Durante i suoi primi anni universitari, si avvicinò alla teoria della relatività Speciala di Albert Michelson e George Gamow, che cercavano di spiegare l'effetto fotoelettrico.Nel 1905 Einstein pubblicò i suoi esperimenti sull'effetto elettronico, che dimostrarono la connessione tra le onde e le particelle. Questa scoperta lo aprì alla teoria della relatività generale di Albert Einstein, sviluppata dopo il 1905. Nel 1914 Einstein si unì al corpo terrestre militare tedesco durante la Prima Guerra Mondiale e fu catturato dai soldati austriaci. Durante l'arresto Einstein scrisse la sua famosa lettera a Max Planck, in cui chiese di essere reso responsabile per il tempo di vita che aveva trascorso nella prigione. Nel 1919 Einstein ricevette la medaglia Nobel per la fisica per la sua teoria della relatività generale. Tuttavia, ricevette anche una serie di minacce e messaggi oscure, tra cui un'ultima minaccia che lo costringe a lasciare l'Austria nel 1933.Durante il regime nazista, Einstein si ritirò in Svizzera, dove continuò a studiare fisica. Nel 1932 ricevette la medaglia Copley per la sua carriera accademica. Tuttavia, si unì all'Ordine di Malta nel 1933 e fu invitato a lasciare gli Stati Uniti. Nel 1935 Einstein pubblicò il suo libro "La relatività generale", che divenne un successo internazionale. Tuttavia, ricevette sempre più minacce e messaggi oscari, e nel 1938 scrisse la sua famosa formula dell'Einstein-Rosen bridge. Durante l'Anno di Natale 1939 Einstein fu attaccato con una bomba atomica a Praga, in Cecoslovacchia. La sua salute si deteriorò ulteriormente negli anni successivi e morì il 18 April 1955 a Princeton, nel New Jersey. Einstein lasciò un'impronta indelebile sulla scienza e nella cultura, e la sua teoria della relatività generale rimane una delle fondamenta fondazionali dell'attuale fisica. La sua storia è un esempio di come la curiosità, l'iniziativa e la determinazione possano portare a grandi risultati scientifici.





martedì 17 settembre 2024

RAG con Ollama Mistral e LangChain

Una altra prova usando questo repository https://github.com/CallumJMac/lessons

Il folder di riferimento e'  lessons/1. RAG/examples/pixegami /PDF_files_langchain/rag-tutorial-v2-main

I files Pdf vanno messi nel folder data

Poi si lancia populate_database.py. La persistenza e' data da ChromaDB

import argparse
import os
import shutil
from langchain.document_loaders.pdf import PyPDFDirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.schema.document import Document
from get_embedding_function import get_embedding_function
from langchain.vectorstores.chroma import Chroma


CHROMA_PATH = "chroma"
DATA_PATH = "data"


def main():

# Check if the database should be cleared (using the --clear flag).
parser = argparse.ArgumentParser()
parser.add_argument("--reset", action="store_true", help="Reset the database.")
args = parser.parse_args()
if args.reset:
print("✨ Clearing Database")
clear_database()

# Create (or update) the data store.
documents = load_documents()
chunks = split_documents(documents)
add_to_chroma(chunks)


def load_documents():
document_loader = PyPDFDirectoryLoader(DATA_PATH)
return document_loader.load()


def split_documents(documents: list[Document]):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=80,
length_function=len,
is_separator_regex=False,
)
return text_splitter.split_documents(documents)


def add_to_chroma(chunks: list[Document]):
# Load the existing database.
db = Chroma(
persist_directory=CHROMA_PATH,
embedding_function=get_embedding_function()
)

# Calculate Page IDs.
chunks_with_ids = calculate_chunk_ids(chunks)

# Add or Update the documents.
existing_items = db.get(include=[]) # IDs are always included by default
existing_ids = set(existing_items["ids"])
print(f"Number of existing documents in DB: {len(existing_ids)}")

# Only add documents that don't exist in the DB.
new_chunks = []
for chunk in chunks_with_ids:
if chunk.metadata["id"] not in existing_ids:
new_chunks.append(chunk)

if len(new_chunks):
print(f"👉 Adding new documents: {len(new_chunks)}")
new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks]
db.add_documents(new_chunks, ids=new_chunk_ids)
db.persist()
else:
print("✅ No new documents to add")


def calculate_chunk_ids(chunks):

# This will create IDs like "data/monopoly.pdf:6:2"
# Page Source : Page Number : Chunk Index

last_page_id = None
current_chunk_index = 0

for chunk in chunks:
source = chunk.metadata.get("source")
page = chunk.metadata.get("page")
current_page_id = f"{source}:{page}"

# If the page ID is the same as the last one, increment the index.
if current_page_id == last_page_id:
current_chunk_index += 1
else:
current_chunk_index = 0

# Calculate the chunk ID.
chunk_id = f"{current_page_id}:{current_chunk_index}"
last_page_id = current_page_id

# Add it to the page meta-data.
chunk.metadata["id"] = chunk_id

return chunks


def clear_database():
if os.path.exists(CHROMA_PATH):
shutil.rmtree(CHROMA_PATH)


if __name__ == "__main__":
main()

In seguito si puo' effettuare la query da linea di comando come 

python query_data.py "what's monopoly"


import argparse
from langchain.vectorstores.chroma import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain_community.llms.ollama import Ollama

from get_embedding_function import get_embedding_function

CHROMA_PATH = "chroma"

PROMPT_TEMPLATE = """
Answer the question based only on the following context:

{context}

---

Answer the question based on the above context: {question}
"""


def main():
# Create CLI.
parser = argparse.ArgumentParser()
parser.add_argument("query_text", type=str, help="The query text.")
args = parser.parse_args()
query_text = args.query_text
query_rag(query_text)


def query_rag(query_text: str):
# Prepare the DB.
embedding_function = get_embedding_function()
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function)

# Search the DB.
results = db.similarity_search_with_score(query_text, k=5)

context_text = "\n\n---\n\n".join([doc.page_content for doc, _score in results])
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query_text)
# print(prompt)

model = Ollama(model="mistral")
response_text = model.invoke(prompt)

sources = [doc.metadata.get("id", None) for doc, _score in results]
formatted_response = f"Response: {response_text}\nSources: {sources}"
print(formatted_response)
return response_text


if __name__ == "__main__":
main()


giusto per dare un'idea questa e' la risposta

Response:  Monopoly is a property trading game from Parker Brothers designed for ages 8 and up, suitable for 2 to 8 players. The gameboard is used along with tokens, houses, hotels, Chance and Community Chest cards, Title Deed cards, play money, and a Banker's tray. Players can choose to play by the classic rules or use the Speed Die for faster gameplay. In Monopoly, the objective is to become the wealthiest player by buying, renting, and selling properties.


Usando LLama3:7b la risposta e' stata

A simple one!

According to the context, Monopoly is a "Property Trading Game" from Parker Brothers.

 





LLama3 Anita

A seguito di questo post ho provato a vedere ho provato a vedere cosa accadeva ad utilizzare un modello specifico per la lingua italiana in...