314 lines
11 KiB
Python
314 lines
11 KiB
Python
|
|
import sys
|
|
import threading
|
|
from textual.app import App, ComposeResult
|
|
from textual.containers import Container, Vertical, Horizontal
|
|
from textual.widgets import Header, Footer, Button, Static, Label, Input, Select, DataTable, Log, ListItem, ListView
|
|
from textual.screen import Screen, ModalScreen
|
|
from textual.message import Message
|
|
from textual import on, work
|
|
|
|
# Import logic from z.py
|
|
# We need to add the current directory to path if not already there
|
|
import os
|
|
sys.path.append(os.getcwd())
|
|
import z
|
|
|
|
class InstallWork(threading.Thread):
|
|
def __init__(self, config, log_callback):
|
|
super().__init__()
|
|
self.config = config
|
|
self.log_callback = log_callback
|
|
|
|
def run(self):
|
|
z.perform_installation(self.config, log_func=self.log_callback)
|
|
|
|
class DiskSelectScreen(Screen):
|
|
"""Screen to select the target disk."""
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Label("Select Storage Disk", classes="title"),
|
|
DataTable(id="disk_table"),
|
|
Button("Next", variant="primary", id="btn_next", disabled=True),
|
|
)
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
table = self.query_one(DataTable)
|
|
table.cursor_type = "row"
|
|
table.add_columns("Name", "Size", "Model")
|
|
|
|
disks = z.get_disks()
|
|
self.disks_data = disks # keep reference
|
|
|
|
rows = []
|
|
for d in disks:
|
|
rows.append((d['name'], d['size'], d['model']))
|
|
|
|
table.add_rows(rows)
|
|
|
|
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
|
|
table = self.query_one(DataTable)
|
|
self.app.selected_disk = self.disks_data[event.cursor_row]['name']
|
|
self.query_one("#btn_next").disabled = False
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "btn_next":
|
|
self.app.push_screen("partition_select")
|
|
|
|
class PartitionSelectScreen(Screen):
|
|
"""Screen to select partitions for Seed, Sprout, and EFI."""
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Label(f"Select Partitions on {self.app.selected_disk}", classes="title"),
|
|
Vertical(
|
|
Label("Seed Partition (Read-Only Base):"),
|
|
Select([], id="sel_seed"),
|
|
Label("Sprout Partition (Writable Layer):"),
|
|
Select([], id="sel_sprout"),
|
|
Label("EFI Partition (Boot):"),
|
|
Select([], id="sel_efi"),
|
|
classes="form-group"
|
|
),
|
|
Button("Next", variant="primary", id="btn_next"),
|
|
)
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
parts = z.get_partitions(self.app.selected_disk)
|
|
# Format for Select: (label, value)
|
|
options = [(p['display'], p['path']) for p in parts]
|
|
|
|
self.query_one("#sel_seed").set_options(options)
|
|
self.query_one("#sel_sprout").set_options(options)
|
|
self.query_one("#sel_efi").set_options(options)
|
|
|
|
# Try to set sensible defaults if standard layout
|
|
# (This is a simplified attempt matching z.py defaults logic)
|
|
for _, val in options:
|
|
if val == f"{self.app.selected_disk}1":
|
|
self.query_one("#sel_seed").value = val
|
|
elif val == f"{self.app.selected_disk}2":
|
|
self.query_one("#sel_sprout").value = val
|
|
elif val == f"{self.app.selected_disk}3":
|
|
self.query_one("#sel_efi").value = val
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "btn_next":
|
|
self.app.seed_device = self.query_one("#sel_seed").value
|
|
self.app.sprout_device = self.query_one("#sel_sprout").value
|
|
self.app.efi_device = self.query_one("#sel_efi").value
|
|
|
|
if not all([self.app.seed_device, self.app.sprout_device, self.app.efi_device]):
|
|
self.notify("Please select all partitions", severity="error")
|
|
return
|
|
|
|
self.app.push_screen("config")
|
|
|
|
class ConfigScreen(Screen):
|
|
"""Screen for collecting user configuration."""
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Label("System Configuration", classes="title"),
|
|
Vertical(
|
|
Label("Hostname:"),
|
|
Input(value="arch-z", id="inp_hostname"),
|
|
Label("Username:"),
|
|
Input(value="zeev", id="inp_user"),
|
|
Label("Timezone:"),
|
|
Input(value="Europe/Helsinki", id="inp_timezone"),
|
|
|
|
Label("Root Password:"),
|
|
Input(password=True, id="inp_root_pass"),
|
|
Label("Root Password (Confirm):"),
|
|
Input(password=True, id="inp_root_pass_confirm"),
|
|
|
|
Label("User Password:"),
|
|
Input(password=True, id="inp_user_pass"),
|
|
Label("User Password (Confirm):"),
|
|
Input(password=True, id="inp_user_pass_confirm"),
|
|
|
|
classes="form-group"
|
|
),
|
|
Button("Next", variant="primary", id="btn_next"),
|
|
)
|
|
yield Footer()
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "btn_next":
|
|
# Validation
|
|
r1 = self.query_one("#inp_root_pass").value
|
|
r2 = self.query_one("#inp_root_pass_confirm").value
|
|
u1 = self.query_one("#inp_user_pass").value
|
|
u2 = self.query_one("#inp_user_pass_confirm").value
|
|
|
|
if r1 != r2:
|
|
self.notify("Root passwords do not match", severity="error")
|
|
return
|
|
if u1 != u2:
|
|
self.notify("User passwords do not match", severity="error")
|
|
return
|
|
if not r1 or not u1:
|
|
self.notify("Passwords cannot be empty", severity="error")
|
|
return
|
|
|
|
self.app.conf_hostname = self.query_one("#inp_hostname").value
|
|
self.app.conf_username = self.query_one("#inp_user").value
|
|
self.app.conf_timezone = self.query_one("#inp_timezone").value
|
|
self.app.conf_root_pass = r1
|
|
self.app.conf_user_pass = u1
|
|
|
|
self.app.push_screen("packages")
|
|
|
|
class PackageScreen(Screen):
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Label("Select Packages", classes="title"),
|
|
Label("Edit the list of packages to install (space separated):"),
|
|
Input(value=" ".join(z.default_packages), id="inp_packages"),
|
|
Button("Review Summary", variant="primary", id="btn_next"),
|
|
)
|
|
yield Footer()
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "btn_next":
|
|
pkg_str = self.query_one("#inp_packages").value
|
|
self.app.packages = pkg_str.split() if pkg_str else z.default_packages
|
|
self.app.push_screen("summary")
|
|
|
|
class SummaryScreen(Screen):
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Label("Configuration Summary", classes="title"),
|
|
Static(id="summary_text"),
|
|
Horizontal(
|
|
Button("Install", variant="error", id="btn_install"),
|
|
Button("Quit", variant="default", id="btn_quit"),
|
|
classes="buttons"
|
|
)
|
|
)
|
|
yield Footer()
|
|
|
|
def on_mount(self):
|
|
text = f"""
|
|
Disk: {self.app.selected_disk}
|
|
Seed: {self.app.seed_device}
|
|
Sprout: {self.app.sprout_device}
|
|
EFI: {self.app.efi_device}
|
|
|
|
Hostname: {self.app.conf_hostname}
|
|
User: {self.app.conf_username}
|
|
Timezone: {self.app.conf_timezone}
|
|
|
|
Packages: {len(self.app.packages)} selected
|
|
"""
|
|
self.query_one("#summary_text").update(text)
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "btn_quit":
|
|
self.app.exit()
|
|
elif event.button.id == "btn_install":
|
|
self.app.push_screen("install")
|
|
|
|
class InstallScreen(Screen):
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Label("Installing...", classes="title"),
|
|
Log(id="install_log"),
|
|
Button("Done", id="btn_done", disabled=True)
|
|
)
|
|
|
|
def on_mount(self):
|
|
log = self.query_one(Log)
|
|
|
|
config = z.InstallConfig(
|
|
seed_device=self.app.seed_device,
|
|
sprout_device=self.app.sprout_device,
|
|
efi_device=self.app.efi_device,
|
|
hostname=self.app.conf_hostname,
|
|
username=self.app.conf_username,
|
|
timezone=self.app.conf_timezone,
|
|
root_password=self.app.conf_root_pass,
|
|
user_password=self.app.conf_user_pass,
|
|
packages=self.app.packages,
|
|
# For testing safety, you might want to default to dry_run logic or prompt.
|
|
# But the user asked for the real deal. I'll add a safety switch.
|
|
dry_run=False
|
|
)
|
|
|
|
self.worker = InstallWork(config, self.write_log)
|
|
self.worker.start()
|
|
self.set_interval(0.5, self.check_done)
|
|
|
|
def write_log(self, message):
|
|
self.query_one(Log).write_line(message)
|
|
|
|
def check_done(self):
|
|
if not self.worker.is_alive():
|
|
self.query_one("#btn_done").disabled = False
|
|
self.query_one(Log).write_line("--- Process Finished ---")
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "btn_done":
|
|
self.app.exit()
|
|
|
|
class ZInstallerApp(App):
|
|
CSS = """
|
|
.title {
|
|
text-align: center;
|
|
text-style: bold;
|
|
margin: 1;
|
|
}
|
|
.form-group {
|
|
margin: 1 2;
|
|
height: auto;
|
|
}
|
|
DataTable {
|
|
height: 1fr;
|
|
}
|
|
.buttons {
|
|
align: center middle;
|
|
height: auto;
|
|
margin-top: 1;
|
|
}
|
|
Button {
|
|
margin: 1;
|
|
}
|
|
"""
|
|
|
|
selected_disk = None
|
|
seed_device = None
|
|
sprout_device = None
|
|
efi_device = None
|
|
|
|
conf_hostname = None
|
|
conf_username = None
|
|
conf_timezone = None
|
|
conf_root_pass = None
|
|
conf_user_pass = None
|
|
packages = []
|
|
|
|
SCREENS = {
|
|
"disk_select": DiskSelectScreen,
|
|
"partition_select": PartitionSelectScreen,
|
|
"config": ConfigScreen,
|
|
"packages": PackageScreen,
|
|
"summary": SummaryScreen,
|
|
"install": InstallScreen
|
|
}
|
|
|
|
def on_mount(self) -> None:
|
|
self.push_screen("disk_select")
|
|
|
|
if __name__ == "__main__":
|
|
app = ZInstallerApp()
|
|
app.run()
|