diff --git a/assets/js/liveview/live_socket.js b/assets/js/liveview/live_socket.js index a6208ff4cff9..ed287267f70c 100644 --- a/assets/js/liveview/live_socket.js +++ b/assets/js/liveview/live_socket.js @@ -19,12 +19,30 @@ if (csrfToken && websocketUrl) { }) } } + let Uploaders = {} + Uploaders.S3 = function (entries, onViewError) { + entries.forEach(entry => { + let xhr = new XMLHttpRequest() + onViewError(() => xhr.abort()) + xhr.onload = () => xhr.status === 200 ? entry.progress(100) : entry.error() + xhr.onerror = () => entry.error() + xhr.upload.addEventListener("progress", (event) => { + if (event.lengthComputable) { + let percent = Math.round((event.loaded / event.total) * 100) + if (percent < 100) { entry.progress(percent) } + } + }) + let url = entry.meta.url + xhr.open("PUT", url, true) + xhr.send(entry.file) + }) + } let token = csrfToken.getAttribute("content") let url = websocketUrl.getAttribute("content") let liveUrl = (url === "") ? "/live" : new URL("/live", url).href; let liveSocket = new LiveSocket(liveUrl, Socket, { heartbeatIntervalMs: 10000, - params: { _csrf_token: token }, hooks: Hooks, dom: { + params: { _csrf_token: token }, hooks: Hooks, uploaders: Uploaders, dom: { // for alpinejs integration onBeforeElUpdated(from, to) { if (from._x_dataStack) { diff --git a/config/.env.dev b/config/.env.dev index db52322d91f6..09f1d18e8509 100644 --- a/config/.env.dev +++ b/config/.env.dev @@ -20,3 +20,12 @@ GOOGLE_CLIENT_SECRET=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 PROMEX_DISABLED=false SITE_DEFAULT_INGEST_THRESHOLD=1000000 + +S3_DISABLED=false +S3_ACCESS_KEY_ID=minioadmin +S3_SECRET_ACCESS_KEY=minioadmin +S3_REGION=us-east-1 +S3_ENDPOINT=http://localhost:9000 +S3_EXPORTS_BUCKET=exports +S3_IMPORTS_BUCKET=imports +S3_HOST_FOR_CLICKHOUSE=172.19.0.4 diff --git a/config/.env.test b/config/.env.test index 050d6f879965..304108bbac00 100644 --- a/config/.env.test +++ b/config/.env.test @@ -15,3 +15,12 @@ IP_GEOLOCATION_DB=test/priv/GeoLite2-City-Test.mmdb SITE_DEFAULT_INGEST_THRESHOLD=1000000 GOOGLE_CLIENT_ID=fake_client_id GOOGLE_CLIENT_SECRET=fake_client_secret + +S3_DISABLED=false +S3_ACCESS_KEY_ID=minioadmin +S3_SECRET_ACCESS_KEY=minioadmin +S3_REGION=us-east-1 +S3_ENDPOINT=http://localhost:9000 +S3_EXPORTS_BUCKET=exports +S3_IMPORTS_BUCKET=imports +S3_HOST_FOR_CLICKHOUSE=172.19.0.4 diff --git a/config/runtime.exs b/config/runtime.exs index 3b29d71b9a1d..4c9dbf8674e5 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -701,3 +701,74 @@ if not is_selfhost do config :plausible, Plausible.Site, default_ingest_threshold: site_default_ingest_threshold end + +s3_disabled? = + config_dir + |> get_var_from_path_or_env("S3_DISABLED", "true") + |> String.to_existing_atom() + +unless s3_disabled? do + s3_env = [ + %{ + name: "S3_ACCESS_KEY_ID", + example: "AKIAIOSFODNN7EXAMPLE" + }, + %{ + name: "S3_SECRET_ACCESS_KEY", + example: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + }, + %{ + name: "S3_REGION", + example: "us-east-1" + }, + %{ + name: "S3_ENDPOINT", + example: "https://.r2.cloudflarestorage.com" + }, + %{ + name: "S3_EXPORTS_BUCKET", + example: "my-exports-bucket" + }, + %{ + name: "S3_IMPORTS_BUCKET", + example: "my-imports-bucket" + } + ] + + s3_env = + Enum.map(s3_env, fn var -> + Map.put(var, :value, get_var_from_path_or_env(config_dir, var.name)) + end) + + s3_missing_env = Enum.filter(s3_env, &is_nil(&1.value)) + + unless s3_missing_env == [] do + raise """ + Missing S3 configuration. Please set #{s3_missing_env |> Enum.map(& &1.name) |> Enum.join(", ")} environment variable(s): + + #{s3_missing_env |> Enum.map(fn %{name: name, example: example} -> "\t#{name}=#{example}" end) |> Enum.join("\n")} + """ + end + + s3_env_value = fn name -> + s3_env |> Enum.find(&(&1.name == name)) |> Map.fetch!(:value) + end + + config :ex_aws, + http_client: Plausible.S3.Client, + access_key_id: s3_env_value.("S3_ACCESS_KEY_ID"), + secret_access_key: s3_env_value.("S3_SECRET_ACCESS_KEY"), + region: s3_env_value.("S3_REGION") + + %URI{scheme: s3_scheme, host: s3_host, port: s3_port} = URI.parse(s3_env_value.("S3_ENDPOINT")) + + config :ex_aws, :s3, + scheme: s3_scheme <> "://", + host: s3_host, + port: s3_port + + config :plausible, Plausible.S3, + exports_bucket: s3_env_value.("S3_EXPORTS_BUCKET"), + imports_bucket: s3_env_value.("S3_IMPORTS_BUCKET"), + host_for_clickhouse: get_var_from_path_or_env(config_dir, "S3_HOST_FOR_CLICKHOUSE") +end diff --git a/lib/plausible/exports.ex b/lib/plausible/exports.ex new file mode 100644 index 000000000000..3e0d1ad7d983 --- /dev/null +++ b/lib/plausible/exports.ex @@ -0,0 +1,343 @@ +defmodule Plausible.Exports do + @moduledoc """ + Contains functions to export data for events and sessions as Zip archives. + """ + + import Ecto.Query + + # TODO visit_duration + # TODO time_on_page + # TODO visits spanning multiple days deduplication + # TODO sampling + # TODO export_visitors_q only can use just sessions_v2? + # TODO export/import events_v2? most likely no + + # @sample to_string(2_000_000) + + @doc """ + Builds Ecto queries to export data from `events_v2` and `sessions_v2` + tables into the format of `imported_*` tables for a website. + """ + @spec export_queries(pos_integer, extname: String.t()) :: %{String.t() => Ecto.Query.t()} + def export_queries(site_id, opts \\ []) do + extname = opts[:extname] || ".csv" + + %{ + "imported_visitors#{extname}" => export_visitors_q(site_id), + "imported_sources#{extname}" => export_sources_q(site_id), + # TODO this query can result in `MEMORY_LIMIT_EXCEEDED` error + "imported_pages#{extname}" => export_pages_q(site_id), + "imported_entry_pages#{extname}" => export_entry_pages_q(site_id), + "imported_exit_pages#{extname}" => export_exit_pages_q(site_id), + "imported_locations#{extname}" => export_locations_q(site_id), + "imported_devices#{extname}" => export_devices_q(site_id), + "imported_browsers#{extname}" => export_browsers_q(site_id), + "imported_operating_systems#{extname}" => export_operating_systems_q(site_id) + } + end + + defmacrop date(timestamp) do + quote do + selected_as(fragment("toDate(?)", unquote(timestamp)), :date) + end + end + + defmacrop visit_duration(t) do + quote do + selected_as( + fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).duration), + :visit_duration + ) + end + end + + defmacrop visitors(t) do + quote do + selected_as( + fragment("toUInt64(round(uniq(?)*any(_sample_factor)))", unquote(t).user_id), + :visitors + ) + end + end + + defmacrop visits(t) do + quote do + selected_as(sum(unquote(t).sign), :visits) + end + end + + defmacrop bounces(t) do + # TODO multiply by sample_factor? + quote do + selected_as( + fragment("greatest(sum(?*?),0)", unquote(t).sign, unquote(t).is_bounce), + :bounces + ) + end + end + + @spec export_visitors_q(pos_integer) :: Ecto.Query.t() + def export_visitors_q(site_id) do + visitors_sessions_q = + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + group_by: selected_as(:date), + select: %{ + date: date(s.start), + bounces: bounces(s), + visits: visits(s), + visit_duration: visit_duration(s) + # TODO + # visitors: visitors(s) + } + + visitors_events_q = + from e in "events_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: e.site_id == ^site_id, + group_by: selected_as(:date), + select: %{ + date: date(e.timestamp), + visitors: visitors(e), + pageviews: + selected_as( + fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name), + :pageviews + ) + } + + visitors_q = + "e" + |> with_cte("e", as: ^visitors_events_q) + |> with_cte("s", as: ^visitors_sessions_q) + + from e in visitors_q, + full_join: s in "s", + on: e.date == s.date, + order_by: selected_as(:date), + select: [ + selected_as(fragment("greatest(?,?)", s.date, e.date), :date), + e.visitors, + e.pageviews, + s.bounces, + s.visits, + s.visit_duration + ] + end + + @spec export_sources_q(pos_integer) :: Ecto.Query.t() + def export_sources_q(site_id) do + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + group_by: [ + selected_as(:date), + selected_as(:source), + s.utm_medium, + s.utm_campaign, + s.utm_content, + s.utm_term + ], + order_by: selected_as(:date), + select: [ + date(s.start), + selected_as(s.referrer_source, :source), + s.utm_medium, + s.utm_campaign, + s.utm_content, + s.utm_term, + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_pages_q(pos_integer) :: Ecto.Query.t() + def export_pages_q(site_id) do + window_q = + from e in "events_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: e.site_id == ^site_id, + select: %{ + timestamp: e.timestamp, + next_timestamp: + over(fragment("leadInFrame(?)", e.timestamp), + partition_by: e.session_id, + order_by: e.timestamp, + frame: fragment("ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING") + ), + pathname: e.pathname, + hostname: e.hostname, + name: e.name, + user_id: e.user_id, + _sample_factor: fragment("_sample_factor") + } + + from e in subquery(window_q), + group_by: [selected_as(:date), e.pathname], + order_by: selected_as(:date), + select: [ + date(e.timestamp), + selected_as(fragment("any(?)", e.hostname), :hostname), + selected_as(e.pathname, :page), + visitors(e), + selected_as( + fragment("toUInt64(round(countIf(?='pageview')*any(_sample_factor)))", e.name), + :pageviews + ), + # TODO are exits pageviews or any events? + selected_as( + fragment("toUInt64(round(countIf(?=0)*any(_sample_factor)))", e.next_timestamp), + :exits + ), + selected_as( + fragment("sum(greatest(?,0))", e.next_timestamp - e.timestamp), + :time_on_page + ) + ] + end + + @spec export_entry_pages_q(pos_integer) :: Ecto.Query.t() + def export_entry_pages_q(site_id) do + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.entry_page], + order_by: selected_as(:date), + select: [ + date(s.start), + s.entry_page, + visitors(s), + selected_as( + fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign), + :entrances + ), + visit_duration(s), + bounces(s) + ] + end + + @spec export_exit_pages_q(pos_integer) :: Ecto.Query.t() + def export_exit_pages_q(site_id) do + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.exit_page], + order_by: selected_as(:date), + select: [ + date(s.start), + s.exit_page, + visitors(s), + selected_as( + fragment("toUInt64(round(sum(?)*any(_sample_factor)))", s.sign), + :exits + ) + ] + end + + @spec export_locations_q(pos_integer) :: Ecto.Query.t() + def export_locations_q(site_id) do + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + where: s.city_geoname_id != 0 and s.country_code != "\0\0" and s.country_code != "ZZ", + group_by: [selected_as(:date), s.country_code, selected_as(:region), s.city_geoname_id], + order_by: selected_as(:date), + select: [ + date(s.start), + selected_as(s.country_code, :country), + selected_as(s.subdivision1_code, :region), + selected_as(s.city_geoname_id, :city), + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_devices_q(pos_integer) :: Ecto.Query.t() + def export_devices_q(site_id) do + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.screen_size], + order_by: selected_as(:date), + select: [ + date(s.start), + selected_as(s.screen_size, :device), + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_browsers_q(pos_integer) :: Ecto.Query.t() + def export_browsers_q(site_id) do + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.browser], + order_by: selected_as(:date), + select: [ + date(s.start), + s.browser, + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @spec export_operating_systems_q(pos_integer) :: Ecto.Query.t() + def export_operating_systems_q(site_id) do + from s in "sessions_v2", + # hints: ["SAMPLE", unsafe_fragment(^@sample)], + where: s.site_id == ^site_id, + group_by: [selected_as(:date), s.operating_system], + order_by: selected_as(:date), + select: [ + date(s.start), + s.operating_system, + visitors(s), + visits(s), + visit_duration(s), + bounces(s) + ] + end + + @doc """ + Creates a streamable Zip archive from the provided (named) Ecto queries. + + Example usage: + + {:ok, pool} = Ch.start_link(pool_size: 1) + + DBConnection.run(pool, fn conn -> + conn + |> stream_archive(export_queries(_site_id = 1), format: "CSVWithNames") + |> Stream.into(File.stream!("export.zip")) + |> Stream.run() + end) + + """ + @spec stream_archive(DBConnection.t(), %{String.t() => Ecto.Query.t()}, [Ch.query_option()]) :: + Enumerable.t() + def stream_archive(conn, named_queries, opts \\ []) do + entries = + Enum.map(named_queries, fn {name, query} -> + {sql, params} = Plausible.ClickhouseRepo.to_sql(:all, query) + + datastream = + conn + |> Ch.stream(sql, params, opts) + |> Stream.map(fn %Ch.Result{data: data} -> data end) + + Zstream.entry(name, datastream, coder: Zstream.Coder.Stored) + end) + + Zstream.zip(entries) + end +end diff --git a/lib/plausible/exports/s3.ex b/lib/plausible/exports/s3.ex new file mode 100644 index 000000000000..1902a682025a --- /dev/null +++ b/lib/plausible/exports/s3.ex @@ -0,0 +1,50 @@ +defmodule Plausible.Exports.S3 do + # TODO unique + use Oban.Worker, queue: :s3_data_export + + @impl true + def perform(job) do + %Oban.Job{ + args: %{ + "site_id" => site_id, + "email_to" => email, + "s3_path" => s3_path + } + } = job + + {:ok, ch} = + Plausible.ClickhouseRepo.config() + |> Keyword.replace!(:pool_size, 1) + |> Ch.start_link() + + download_url = + DBConnection.run( + ch, + fn conn -> + conn + |> Plausible.Exports.stream_archive( + Plausible.Exports.export_queries(site_id, extname: ".csv"), + format: "CSVWithNames" + ) + |> Plausible.S3.export_upload_multipart(s3_path) + end, + timeout: :infinity + ) + + Plausible.Mailer.deliver_now!( + Bamboo.Email.new_email( + from: "plausible@email.com", + to: email, + subject: "EXPORT SUCCESS", + text_body: """ + download it from #{download_url}! hurry up! you have 24 hours!" + """, + html_body: """ + download it from here! hurry up! you have 24 hours! + """ + ) + ) + + :ok + end +end diff --git a/lib/plausible/imported/csv_importer.ex b/lib/plausible/imported/csv_importer.ex index 8c7844fe304b..49b1edd6ddae 100644 --- a/lib/plausible/imported/csv_importer.ex +++ b/lib/plausible/imported/csv_importer.ex @@ -1,6 +1,6 @@ defmodule Plausible.Imported.CSVImporter do @moduledoc """ - CSV importer stub. + CSV importer. """ use Plausible.Imported.Importer @@ -16,10 +16,116 @@ defmodule Plausible.Imported.CSVImporter do def email_template(), do: "google_analytics_import.html" @impl true - def parse_args(%{"s3_path" => s3_path}), do: [s3_path: s3_path] + def parse_args(%{"uploads" => uploads}), do: [uploads: uploads] @impl true - def import_data(_site_import, _opts) do - :ok + def import_data(site_import, opts) do + %Plausible.Imported.SiteImport{id: import_id, site_id: site_id} = site_import + uploads = Keyword.fetch!(opts, :uploads) + + %{access_key_id: s3_access_key_id, secret_access_key: s3_secret_access_key} = + Plausible.S3.import_clickhouse_credentials() + + {:ok, ch} = + Plausible.IngestRepo.config() + |> Keyword.replace!(:pool_size, 1) + |> Ch.start_link() + + Enum.each(uploads, fn upload -> + %{"filename" => filename, "s3_path" => s3_path} = upload + + ".csv" = Path.extname(filename) + table = Path.rootname(filename) + + s3_structure = Plausible.Imports.input_structure(table) + s3_url = Plausible.S3.import_clickhouse_url(s3_path) + + statement = + """ + INSERT INTO {table:Identifier} \ + SELECT {site_id:UInt64} AS site_id, {import_id:UInt64} AS import_id, * \ + FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String})\ + """ + + params = %{ + "table" => table, + "site_id" => site_id, + "import_id" => import_id, + "s3_url" => s3_url, + "s3_access_key_id" => s3_access_key_id, + "s3_secret_access_key" => s3_secret_access_key, + "s3_format" => "CSVWithNames", + "s3_structure" => s3_structure + } + + Ch.query!(ch, statement, params, timeout: :infinity) + end) + end + + @doc """ + Creates a collectable stream which pipes the data to an `imported_*` table. + + Example usage: + + {:ok, pool} = Ch.start_link(pool_size: 1) + + DBConnection.run(pool, fn conn -> + File.stream!("imported_browsers.csv", _64kb = 64000) + |> Stream.into(import_stream(conn, _site_id = 12, _table = "imported_browsers", _format = "CSVWithNames")) + |> Stream.run() + end) + + """ + @spec import_stream(DBConnection.t(), pos_integer, String.t(), String.t(), [Ch.query_option()]) :: + Ch.Stream.t() + def import_stream(conn, site_id, table, format, opts \\ []) do + :ok = ensure_supported_format(format) + + statement = + [ + "INSERT INTO {table:Identifier} SELECT {site_id:UInt64} AS site_id, * FROM input('", + input_structure(table), + "') FORMAT ", + format, + ?\n + ] + + Ch.stream(conn, statement, %{"table" => table, "site_id" => site_id}, opts) + end + + input_structures = %{ + "imported_browsers" => + "date Date, browser String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32", + "imported_devices" => + "date Date, device String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32", + "imported_entry_pages" => + "date Date, entry_page String, visitors UInt64, entrances UInt64, visit_duration UInt64, bounces UInt64", + "imported_exit_pages" => "date Date, exit_page String, visitors UInt64, exits UInt64", + "imported_locations" => + "date Date, country String, region String, city UInt64, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32", + "imported_operating_systems" => + "date Date, operating_system String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32", + "imported_pages" => + "date Date, hostname String, page String, visitors UInt64, pageviews UInt64, exits UInt64, time_on_page UInt64", + "imported_sources" => + "date Date, source String, utm_medium String, utm_campaign String, utm_content String, utm_term String, visitors UInt64, visits UInt64, visit_duration UInt64, bounces UInt32", + "imported_visitors" => + "date Date, visitors UInt64, pageviews UInt64, bounces UInt64, visits UInt64, visit_duration UInt64" + } + + for {table, input_structure} <- input_structures do + def input_structure(unquote(table)), do: unquote(input_structure) + end + + def input_structure(table) do + raise ArgumentError, "table #{table} is not supported for data import" + end + + for format <- ["Native", "CSVWithNames"] do + defp ensure_supported_format(unquote(format)), do: :ok + end + + defp ensure_supported_format(format) do + raise ArgumentError, "format #{format} is not supported for data import" end end diff --git a/lib/plausible/s3.ex b/lib/plausible/s3.ex new file mode 100644 index 000000000000..44ecd5f483a9 --- /dev/null +++ b/lib/plausible/s3.ex @@ -0,0 +1,109 @@ +defmodule Plausible.S3 do + @moduledoc """ + Helper functions for S3 exports/imports. + """ + + defp config, do: Application.fetch_env!(:plausible, __MODULE__) + defp config(key), do: Keyword.fetch!(config(), key) + + @doc """ + Chunks and uploads Zip archive to the pre-configured bucket. + + Returns a presigned URL to download the exported Zip archive from S3. + The URL expires in 24 hours. + + In the current implementation the bucket always goes into the path component. + """ + @spec export_upload_multipart(Enumerable.t(), Path.t()) :: :uri_string.uri_string() + def export_upload_multipart(stream, s3_path) do + config = ExAws.Config.new(:s3) + bucket = config(:exports_bucket) + + stream + |> chunk_into_parts(_5MiB = 5 * 1024 * 1024) + |> ExAws.S3.upload(bucket, s3_path, + content_disposition: ~s|attachment; filename="Plausible.zip"|, + content_type: "application/zip" + ) + |> ExAws.request!() + + {:ok, download_url} = + ExAws.S3.presigned_url(config, :get, bucket, s3_path, expires_in: _24hr = 86400) + + download_url + end + + defp chunk_into_parts(stream, min_part_size) do + Stream.chunk_while( + stream, + _acc = %{buffer_size: 0, buffer: [], min_part_size: min_part_size}, + _chunk_fun = &buffer_until_big_enough/2, + _after_fun = &flush_leftovers/1 + ) + end + + defp buffer_until_big_enough(data, acc) do + %{buffer_size: prev_buffer_size, buffer: prev_buffer, min_part_size: min_part_size} = acc + new_buffer_size = prev_buffer_size + IO.iodata_length(data) + new_buffer = [prev_buffer | data] + + if new_buffer_size > min_part_size do + # TODO PR to make ExAws.Operation.ExAws.Operation.S3.put_content_length_header/3 accept iodata + {:cont, IO.iodata_to_binary(new_buffer), %{acc | buffer_size: 0, buffer: []}} + else + {:cont, %{acc | buffer_size: new_buffer_size, buffer: new_buffer}} + end + end + + defp flush_leftovers(acc) do + # TODO PR to make ExAws.Operation.ExAws.Operation.S3.put_content_length_header/3 accept iodata + {:cont, IO.iodata_to_binary(acc.buffer), %{acc | buffer_size: 0, buffer: []}} + end + + @doc """ + Presigns an upload for an imported file. + + In the current implementation the bucket always goes into the path component. + + Example: + + import_presign_upload(_site_id = 123, _filename = "imported_browsers.csv") + %{ + object: "123/imported_browsers.csv", + url: "http://localhost:9000/imports/123/imported_browsers.csv?X" + } + + """ + @spec import_presign_upload(pos_integer, String.t()) :: + %{object: Path.t(), url: :uri_string.uri_string()} + def import_presign_upload(site_id, filename) do + config = ExAws.Config.new(:s3) + s3_path = Path.join(to_string(site_id), filename) + bucket = config(:imports_bucket) + {:ok, upload_url} = ExAws.S3.presigned_url(config, :put, bucket, s3_path) + %{object: s3_path, url: upload_url} + end + + @doc """ + Returns `access_key_id` and `secret_access_key` to be used by ClickHouse during imports from S3. + """ + @spec import_clickhouse_credentials :: + %{access_key_id: String.t(), secret_access_key: String.t()} + def import_clickhouse_credentials do + %{access_key_id: access_key_id, secret_access_key: secret_access_key} = ExAws.Config.new(:s3) + %{access_key_id: access_key_id, secret_access_key: secret_access_key} + end + + @doc """ + Returns S3 URL for an object to be used by ClickHouse during imports from S3. + + In the current implementation the bucket always goes into the path component. + """ + @spec import_clickhouse_url(Path.t()) :: :uri_string.uri_string() + def import_clickhouse_url(s3_path) do + %{scheme: scheme, host: host} = config = ExAws.Config.new(:s3) + host = Keyword.get(config(), :host_for_clickhouse) || host + port = ExAws.S3.Utils.sanitized_port_component(config) + Path.join(["#{scheme}#{host}#{port}", config(:imports_bucket), s3_path]) + end +end diff --git a/lib/plausible/s3/client.ex b/lib/plausible/s3/client.ex new file mode 100644 index 000000000000..54817c8da41c --- /dev/null +++ b/lib/plausible/s3/client.ex @@ -0,0 +1,17 @@ +defmodule Plausible.S3.Client do + @moduledoc false + @behaviour ExAws.Request.HttpClient + + @impl true + def request(method, url, body, headers, opts) do + req = Finch.build(method, url, headers, body) + + case Finch.request(req, Plausible.Finch, opts) do + {:ok, %Finch.Response{status: status, headers: headers, body: body}} -> + {:ok, %{status_code: status, headers: headers, body: body}} + + {:error, reason} -> + {:error, %{reason: reason}} + end + end +end diff --git a/lib/plausible_web/endpoint.ex b/lib/plausible_web/endpoint.ex index 5b6c6f93066a..393d9642f909 100644 --- a/lib/plausible_web/endpoint.ex +++ b/lib/plausible_web/endpoint.ex @@ -61,8 +61,15 @@ defmodule PlausibleWeb.Endpoint do plug(PromEx.Plug, prom_ex_module: Plausible.PromEx) plug(Plug.Telemetry, event_prefix: [:phoenix, :endpoint]) + max_multpart_length = + on_full_build do + _default = 8_000_000 + else + _100MB = 100_000_000 + end + plug(Plug.Parsers, - parsers: [:urlencoded, :multipart, :json], + parsers: [:urlencoded, {:multipart, length: max_multpart_length}, :json], pass: ["*/*"], json_decoder: Phoenix.json_library() ) diff --git a/mix.exs b/mix.exs index 8bd488ca7fc8..2b2f22b319b8 100644 --- a/mix.exs +++ b/mix.exs @@ -67,7 +67,7 @@ defmodule Plausible.MixProject do {:bcrypt_elixir, "~> 3.0"}, {:bypass, "~> 2.1", only: [:dev, :test, :small_test]}, {:cachex, "~> 3.4"}, - {:ecto_ch, "~> 0.3"}, + {:ecto_ch, "~> 0.3.2"}, {:cloak, "~> 1.1"}, {:cloak_ecto, "~> 1.2"}, {:combination, "~> 0.0.3"}, @@ -127,6 +127,7 @@ defmodule Plausible.MixProject do {:mjml_eex, "~> 0.9.0"}, {:mjml, "~> 1.5.0"}, {:heroicons, "~> 0.5.0"}, + {:zstream, "~> 0.6.4"}, {:zxcvbn, git: "https://github.com/techgaun/zxcvbn-elixir.git"}, {:open_api_spex, "~> 3.18"}, {:joken, "~> 2.5"}, @@ -135,7 +136,10 @@ defmodule Plausible.MixProject do {:esbuild, "~> 0.7", runtime: Mix.env() in [:dev, :small_dev]}, {:tailwind, "~> 0.2.0", runtime: Mix.env() in [:dev, :small_dev]}, {:ex_json_logger, "~> 1.4.0"}, - {:ecto_network, "~> 1.5.0"} + {:ecto_network, "~> 1.5.0"}, + {:ex_aws, "~> 2.5"}, + {:ex_aws_s3, "~> 2.5"}, + {:sweet_xml, "~> 0.7.4"} ] end diff --git a/mix.lock b/mix.lock index 8a108e96e2c6..e1d1edca46ca 100644 --- a/mix.lock +++ b/mix.lock @@ -10,7 +10,7 @@ "cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"}, "castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, - "ch": {:hex, :ch, "0.2.4", "d510fbb5542d009f7c5b00bb1ecab73307b6066d9fb9b220600257d462cba67f", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "8f065d15aaf912ae8da56c9ca5298fb2d1a09108d006de589bcf8c2b39a7e2bb"}, + "ch": {:hex, :ch, "0.2.5-rc.0", "c9a6faed74b6ffefec83cafff49ee0bdc38fb34a6b021b8e0a4819e1cb7b9b80", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "ca7b7a1c79d6b61ad42db16d1857a6bc93051d443b25dc620b9c997778d19525"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, "cldr_utils": {:hex, :cldr_utils, "2.24.2", "364fa30be55d328e704629568d431eb74cd2f085752b27f8025520b566352859", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.5", [hex: :certifi, repo: "hexpm", optional: true]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm", "3362b838836a9f0fa309de09a7127e36e67310e797d556db92f71b548832c7cf"}, "cloak": {:hex, :cloak, "1.1.2", "7e0006c2b0b98d976d4f559080fabefd81f0e0a50a3c4b621f85ceeb563e80bb", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "940d5ac4fcd51b252930fd112e319ea5ae6ab540b722f3ca60a85666759b9585"}, @@ -41,6 +41,8 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "esbuild": {:hex, :esbuild, "0.8.1", "0cbf919f0eccb136d2eeef0df49c4acf55336de864e63594adcea3814f3edf41", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "25fc876a67c13cb0a776e7b5d7974851556baeda2085296c14ab48555ea7560f"}, "eternal": {:hex, :eternal, "1.2.2", "d1641c86368de99375b98d183042dd6c2b234262b8d08dfd72b9eeaafc2a1abd", [:mix], [], "hexpm", "2c9fe32b9c3726703ba5e1d43a1d255a4f3f2d8f8f9bc19f094c7cb1a7a9e782"}, + "ex_aws": {:hex, :ex_aws, "2.5.1", "7418917974ea42e9e84b25e88b9f3d21a861d5f953ad453e212f48e593d8d39f", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1b95431f70c446fa1871f0eb9b183043c5a625f75f9948a42d25f43ae2eff12b"}, + "ex_aws_s3": {:hex, :ex_aws_s3, "2.5.3", "422468e5c3e1a4da5298e66c3468b465cfd354b842e512cb1f6fbbe4e2f5bdaf", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "4f09dd372cc386550e484808c5ac5027766c8d0cd8271ccc578b82ee6ef4f3b8"}, "ex_cldr": {:hex, :ex_cldr, "2.37.5", "9da6d97334035b961d2c2de167dc6af8cd3e09859301a5b8f49f90bd8b034593", [:mix], [{:cldr_utils, "~> 2.21", [hex: :cldr_utils, repo: "hexpm", optional: false]}, {:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:gettext, "~> 0.19", [hex: :gettext, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: true]}], "hexpm", "74ad5ddff791112ce4156382e171a5f5d3766af9d5c4675e0571f081fe136479"}, "ex_cldr_currencies": {:hex, :ex_cldr_currencies, "2.15.1", "e92ba17c41e7405b7784e0e65f406b5f17cfe313e0e70de9befd653e12854822", [:mix], [{:ex_cldr, "~> 2.34", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "31df8bd37688340f8819bdd770eb17d659652078d34db632b85d4a32864d6a25"}, "ex_cldr_numbers": {:hex, :ex_cldr_numbers, "2.32.3", "b631ff94c982ec518e46bf4736000a30a33d6b58facc085d5f240305f512ad4a", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:digital_token, "~> 0.3 or ~> 1.0", [hex: :digital_token, repo: "hexpm", optional: false]}, {:ex_cldr, "~> 2.37", [hex: :ex_cldr, repo: "hexpm", optional: false]}, {:ex_cldr_currencies, ">= 2.14.2", [hex: :ex_cldr_currencies, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "7b626ff1e59a0ec9c3c5db5ce9ca91a6995e2ab56426b71f3cbf67181ea225f5"}, @@ -131,6 +133,7 @@ "siphash": {:hex, :siphash, "3.2.0", "ec03fd4066259218c85e2a4b8eec4bb9663bc02b127ea8a0836db376ba73f2ed", [:make, :mix], [], "hexpm", "ba3810701c6e95637a745e186e8a4899087c3b079ba88fb8f33df054c3b0b7c3"}, "sleeplocks": {:hex, :sleeplocks, "1.1.2", "d45aa1c5513da48c888715e3381211c859af34bee9b8290490e10c90bb6ff0ca", [:rebar3], [], "hexpm", "9fe5d048c5b781d6305c1a3a0f40bb3dfc06f49bf40571f3d2d0c57eaa7f59a5"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, + "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, "tailwind": {:hex, :tailwind, "0.2.2", "9e27288b568ede1d88517e8c61259bc214a12d7eed271e102db4c93fcca9b2cd", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}], "hexpm", "ccfb5025179ea307f7f899d1bb3905cd0ac9f687ed77feebc8f67bdca78565c4"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, @@ -146,5 +149,6 @@ "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, + "zstream": {:hex, :zstream, "0.6.4", "169ce887a443d4163085ee682ab1b0ad38db8fa45e843927b9b431a92f4b7d9e", [:mix], [], "hexpm", "acc6c35b6db9eb2cfe8b85e972cb9dc1b730f8efeb76c5bbe871216fe639d9a1"}, "zxcvbn": {:git, "https://github.com/techgaun/zxcvbn-elixir.git", "aede1d49d39e89d7b3d1c381de5f04c9907d8171", []}, } diff --git a/test/plausible/exports_test.exs b/test/plausible/exports_test.exs new file mode 100644 index 000000000000..7c28f2c2d4b1 --- /dev/null +++ b/test/plausible/exports_test.exs @@ -0,0 +1,90 @@ +defmodule Plausible.ExportsTest do + use Plausible.DataCase + + describe "stream_archive/3" do + test "build a valid uncompressed Zip archive for the provided queries" do + tmp_path = tmp_touch("plausible_exports_stream_archive_test.zip") + + named_queries = %{ + "three.csv" => + from(n in fragment("numbers(3)"), + select: [ + n.number, + n.number + 1000 + ] + ), + "thousand.csv" => + from(n in fragment("numbers(1000)"), + select: [ + n.number, + selected_as(fragment("toString(?)", n.number), :not_number) + ] + ) + } + + DBConnection.run(ch(), fn conn -> + conn + |> Plausible.Exports.stream_archive(named_queries, format: "CSVWithNames") + |> Stream.into(File.stream!(tmp_path)) + |> Stream.run() + end) + + assert {:ok, files} = :zip.unzip(to_charlist(tmp_path), cwd: System.tmp_dir!()) + on_exit(fn -> Enum.each(files, &File.rm!/1) end) + + read_csv = fn name -> + Enum.find(files, fn file -> Path.basename(file) == name end) + |> File.read!() + |> NimbleCSV.RFC4180.parse_string(skip_headers: false) + end + + assert read_csv.("three.csv") == [ + ["number", "plus(number, 1000)"], + ["0", "1000"], + ["1", "1001"], + ["2", "1002"] + ] + + assert [ + ["number", "not_number"], + ["0", "0"], + ["1", "1"], + ["2", "2"], + ["3", "3"] | rest + ] = read_csv.("thousand.csv") + + assert length(rest) == 1000 - 4 + end + + test "raises in case of an error and halts the stream" do + bad_queries = %{ + "invalid" => from(t in "bad_table", select: t.bad_column), + "valid" => from(n in fragment("numbers(1)"), select: n.number) + } + + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + DBConnection.run(ch(), fn conn -> + conn + |> Plausible.Exports.stream_archive(bad_queries) + |> Stream.run() + end) + end + end + end + + defp ch do + {:ok, conn} = + Plausible.ClickhouseRepo.config() + |> Keyword.replace!(:pool_size, 1) + |> Ch.start_link() + + conn + end + + defp tmp_touch(name) do + tmp_path = Path.join(System.tmp_dir!(), name) + File.touch!(tmp_path) + on_exit(fn -> File.rm!(tmp_path) end) + tmp_path + end +end diff --git a/test/plausible/imports_test.exs b/test/plausible/imports_test.exs new file mode 100644 index 000000000000..3e0ff7506ac2 --- /dev/null +++ b/test/plausible/imports_test.exs @@ -0,0 +1,102 @@ +defmodule Plausible.ImportsTest do + use Plausible.DataCase + import Ecto.Query + + describe "import_stream/4" do + test "for imported_browsers.csv" do + site_id = :rand.uniform(10_000_000) + tmp_path = tmp_touch("imported_browsers.csv") + + File.write!(tmp_path, """ + browser,date,visitors,visits,visit_duration,bounces + Chrome,2023-10-20,1,1,200,1 + Edge,2023-10-20,1,1,100,1 + Chrome,2023-10-21,2,3,130,1 + """) + + DBConnection.run(ch(), fn conn -> + File.stream!(tmp_path) + |> Stream.into( + Plausible.Imports.import_stream(conn, site_id, "imported_browsers", "CSVWithNames") + ) + |> Stream.run() + end) + + selected_columns = [ + :site_id, + :date, + :browser, + :visitors, + :visits, + :visit_duration, + :bounces + ] + + assert "imported_browsers" + |> where(site_id: ^site_id) + |> select([b], map(b, ^selected_columns)) + |> Plausible.ClickhouseRepo.all() == [ + %{ + date: ~D[2023-10-20], + site_id: site_id, + browser: "Chrome", + visitors: 1, + visits: 1, + visit_duration: 200, + bounces: 1 + }, + %{ + date: ~D[2023-10-20], + site_id: site_id, + browser: "Edge", + visitors: 1, + visits: 1, + visit_duration: 100, + bounces: 1 + }, + %{ + date: ~D[2023-10-21], + site_id: site_id, + browser: "Chrome", + visitors: 2, + visits: 3, + visit_duration: 130, + bounces: 1 + } + ] + end + + @tag :skip + test "for imported_devices.csv" + @tag :skip + test "for imported_entry_pages.csv" + @tag :skip + test "for imported_exit_pages.csv" + @tag :skip + test "for imported_locations.csv" + @tag :skip + test "for imported_operating_systems.csv" + @tag :skip + test "for imported_pages.csv" + @tag :skip + test "for imported_sources.csv" + @tag :skip + test "for imported_visitors.csv" + end + + defp ch do + {:ok, conn} = + Plausible.IngestRepo.config() + |> Keyword.replace!(:pool_size, 1) + |> Ch.start_link() + + conn + end + + defp tmp_touch(name) do + tmp_path = Path.join(System.tmp_dir!(), name) + File.touch!(tmp_path) + on_exit(fn -> File.rm!(tmp_path) end) + tmp_path + end +end